tests/test-cbor.py
changeset 37711 65a23cc8e75b
child 37899 2b3b6187c316
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-cbor.py	Sat Apr 14 16:36:15 2018 -0700
@@ -0,0 +1,210 @@
+from __future__ import absolute_import
+
+import io
+import unittest
+
+from mercurial.thirdparty import (
+    cbor,
+)
+from mercurial.utils import (
+    cborutil,
+)
+
+def loadit(it):
+    return cbor.loads(b''.join(it))
+
+class BytestringTests(unittest.TestCase):
+    def testsimple(self):
+        self.assertEqual(
+            list(cborutil.streamencode(b'foobar')),
+            [b'\x46', b'foobar'])
+
+        self.assertEqual(
+            loadit(cborutil.streamencode(b'foobar')),
+            b'foobar')
+
+    def testlong(self):
+        source = b'x' * 1048576
+
+        self.assertEqual(loadit(cborutil.streamencode(source)), source)
+
+    def testfromiter(self):
+        # This is the example from RFC 7049 Section 2.2.2.
+        source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
+
+        self.assertEqual(
+            list(cborutil.streamencodebytestringfromiter(source)),
+            [
+                b'\x5f',
+                b'\x44',
+                b'\xaa\xbb\xcc\xdd',
+                b'\x43',
+                b'\xee\xff\x99',
+                b'\xff',
+            ])
+
+        self.assertEqual(
+            loadit(cborutil.streamencodebytestringfromiter(source)),
+            b''.join(source))
+
+    def testfromiterlarge(self):
+        source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
+
+        self.assertEqual(
+            loadit(cborutil.streamencodebytestringfromiter(source)),
+            b''.join(source))
+
+    def testindefinite(self):
+        source = b'\x00\x01\x02\x03' + b'\xff' * 16384
+
+        it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
+
+        self.assertEqual(next(it), b'\x5f')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\x00\x01')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\x02\x03')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\xff\xff')
+
+        dest = b''.join(cborutil.streamencodeindefinitebytestring(
+            source, chunksize=42))
+        self.assertEqual(cbor.loads(dest), b''.join(source))
+
+    def testreadtoiter(self):
+        source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff')
+
+        it = cborutil.readindefinitebytestringtoiter(source)
+        self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
+        self.assertEqual(next(it), b'\xee\xff\x99')
+
+        with self.assertRaises(StopIteration):
+            next(it)
+
+class IntTests(unittest.TestCase):
+    def testsmall(self):
+        self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
+        self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
+        self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
+        self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
+        self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
+
+    def testnegativesmall(self):
+        self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
+        self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
+        self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
+        self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
+        self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
+
+    def testrange(self):
+        for i in range(-70000, 70000, 10):
+            self.assertEqual(
+                b''.join(cborutil.streamencode(i)),
+                cbor.dumps(i))
+
+class ArrayTests(unittest.TestCase):
+    def testempty(self):
+        self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
+        self.assertEqual(loadit(cborutil.streamencode([])), [])
+
+    def testbasic(self):
+        source = [b'foo', b'bar', 1, -10]
+
+        self.assertEqual(list(cborutil.streamencode(source)), [
+            b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29'])
+
+    def testemptyfromiter(self):
+        self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])),
+                         b'\x9f\xff')
+
+    def testfromiter1(self):
+        source = [b'foo']
+
+        self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [
+            b'\x9f',
+            b'\x43', b'foo',
+            b'\xff',
+        ])
+
+        dest = b''.join(cborutil.streamencodearrayfromiter(source))
+        self.assertEqual(cbor.loads(dest), source)
+
+    def testtuple(self):
+        source = (b'foo', None, 42)
+
+        self.assertEqual(cbor.loads(b''.join(cborutil.streamencode(source))),
+                         list(source))
+
+class SetTests(unittest.TestCase):
+    def testempty(self):
+        self.assertEqual(list(cborutil.streamencode(set())), [
+            b'\xd9\x01\x02',
+            b'\x80',
+        ])
+
+    def testset(self):
+        source = {b'foo', None, 42}
+
+        self.assertEqual(cbor.loads(b''.join(cborutil.streamencode(source))),
+                         source)
+
+class BoolTests(unittest.TestCase):
+    def testbasic(self):
+        self.assertEqual(list(cborutil.streamencode(True)),  [b'\xf5'])
+        self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
+
+        self.assertIs(loadit(cborutil.streamencode(True)), True)
+        self.assertIs(loadit(cborutil.streamencode(False)), False)
+
+class NoneTests(unittest.TestCase):
+    def testbasic(self):
+        self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
+
+        self.assertIs(loadit(cborutil.streamencode(None)), None)
+
+class MapTests(unittest.TestCase):
+    def testempty(self):
+        self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
+        self.assertEqual(loadit(cborutil.streamencode({})), {})
+
+    def testemptyindefinite(self):
+        self.assertEqual(list(cborutil.streamencodemapfromiter([])), [
+            b'\xbf', b'\xff'])
+
+        self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
+
+    def testone(self):
+        source = {b'foo': b'bar'}
+        self.assertEqual(list(cborutil.streamencode(source)), [
+            b'\xa1', b'\x43', b'foo', b'\x43', b'bar'])
+
+        self.assertEqual(loadit(cborutil.streamencode(source)), source)
+
+    def testmultiple(self):
+        source = {
+            b'foo': b'bar',
+            b'baz': b'value1',
+        }
+
+        self.assertEqual(loadit(cborutil.streamencode(source)), source)
+
+        self.assertEqual(
+            loadit(cborutil.streamencodemapfromiter(source.items())),
+            source)
+
+    def testcomplex(self):
+        source = {
+            b'key': 1,
+            2: -10,
+        }
+
+        self.assertEqual(loadit(cborutil.streamencode(source)),
+                         source)
+
+        self.assertEqual(
+            loadit(cborutil.streamencodemapfromiter(source.items())),
+            source)
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)