tests/test-wireproto-serverreactor.py
changeset 37052 8c3c47362934
child 37055 61393f888dfe
equal deleted inserted replaced
37051:40206e227412 37052:8c3c47362934
       
     1 from __future__ import absolute_import, print_function
       
     2 
       
     3 import unittest
       
     4 
       
     5 from mercurial import (
       
     6     util,
       
     7     wireprotoframing as framing,
       
     8 )
       
     9 
       
    10 ffs = framing.makeframefromhumanstring
       
    11 
       
    12 def makereactor():
       
    13     return framing.serverreactor()
       
    14 
       
    15 def sendframes(reactor, gen):
       
    16     """Send a generator of frame bytearray to a reactor.
       
    17 
       
    18     Emits a generator of results from ``onframerecv()`` calls.
       
    19     """
       
    20     for frame in gen:
       
    21         frametype, frameflags, framelength = framing.parseheader(frame)
       
    22         payload = frame[framing.FRAME_HEADER_SIZE:]
       
    23         assert len(payload) == framelength
       
    24 
       
    25         yield reactor.onframerecv(frametype, frameflags, payload)
       
    26 
       
    27 def sendcommandframes(reactor, cmd, args, datafh=None):
       
    28     """Generate frames to run a command and send them to a reactor."""
       
    29     return sendframes(reactor, framing.createcommandframes(cmd, args, datafh))
       
    30 
       
    31 class FrameTests(unittest.TestCase):
       
    32     def testdataexactframesize(self):
       
    33         data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
       
    34 
       
    35         frames = list(framing.createcommandframes(b'command', {}, data))
       
    36         self.assertEqual(frames, [
       
    37             ffs(b'command-name have-data command'),
       
    38             ffs(b'command-data continuation %s' % data.getvalue()),
       
    39             ffs(b'command-data eos ')
       
    40         ])
       
    41 
       
    42     def testdatamultipleframes(self):
       
    43         data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
       
    44         frames = list(framing.createcommandframes(b'command', {}, data))
       
    45         self.assertEqual(frames, [
       
    46             ffs(b'command-name have-data command'),
       
    47             ffs(b'command-data continuation %s' % (
       
    48                 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
       
    49             ffs(b'command-data eos x'),
       
    50         ])
       
    51 
       
    52     def testargsanddata(self):
       
    53         data = util.bytesio(b'x' * 100)
       
    54 
       
    55         frames = list(framing.createcommandframes(b'command', {
       
    56             b'key1': b'key1value',
       
    57             b'key2': b'key2value',
       
    58             b'key3': b'key3value',
       
    59         }, data))
       
    60 
       
    61         self.assertEqual(frames, [
       
    62             ffs(b'command-name have-args|have-data command'),
       
    63             ffs(br'command-argument 0 \x04\x00\x09\x00key1key1value'),
       
    64             ffs(br'command-argument 0 \x04\x00\x09\x00key2key2value'),
       
    65             ffs(br'command-argument eoa \x04\x00\x09\x00key3key3value'),
       
    66             ffs(b'command-data eos %s' % data.getvalue()),
       
    67         ])
       
    68 
       
    69 class ServerReactorTests(unittest.TestCase):
       
    70     def _sendsingleframe(self, reactor, s):
       
    71         results = list(sendframes(reactor, [ffs(s)]))
       
    72         self.assertEqual(len(results), 1)
       
    73 
       
    74         return results[0]
       
    75 
       
    76     def assertaction(self, res, expected):
       
    77         self.assertIsInstance(res, tuple)
       
    78         self.assertEqual(len(res), 2)
       
    79         self.assertIsInstance(res[1], dict)
       
    80         self.assertEqual(res[0], expected)
       
    81 
       
    82     def test1framecommand(self):
       
    83         """Receiving a command in a single frame yields request to run it."""
       
    84         reactor = makereactor()
       
    85         results = list(sendcommandframes(reactor, b'mycommand', {}))
       
    86         self.assertEqual(len(results), 1)
       
    87         self.assertaction(results[0], 'runcommand')
       
    88         self.assertEqual(results[0][1], {
       
    89             'command': b'mycommand',
       
    90             'args': {},
       
    91             'data': None,
       
    92         })
       
    93 
       
    94     def test1argument(self):
       
    95         reactor = makereactor()
       
    96         results = list(sendcommandframes(reactor, b'mycommand',
       
    97                                          {b'foo': b'bar'}))
       
    98         self.assertEqual(len(results), 2)
       
    99         self.assertaction(results[0], 'wantframe')
       
   100         self.assertaction(results[1], 'runcommand')
       
   101         self.assertEqual(results[1][1], {
       
   102             'command': b'mycommand',
       
   103             'args': {b'foo': b'bar'},
       
   104             'data': None,
       
   105         })
       
   106 
       
   107     def testmultiarguments(self):
       
   108         reactor = makereactor()
       
   109         results = list(sendcommandframes(reactor, b'mycommand',
       
   110                                          {b'foo': b'bar', b'biz': b'baz'}))
       
   111         self.assertEqual(len(results), 3)
       
   112         self.assertaction(results[0], 'wantframe')
       
   113         self.assertaction(results[1], 'wantframe')
       
   114         self.assertaction(results[2], 'runcommand')
       
   115         self.assertEqual(results[2][1], {
       
   116             'command': b'mycommand',
       
   117             'args': {b'foo': b'bar', b'biz': b'baz'},
       
   118             'data': None,
       
   119         })
       
   120 
       
   121     def testsimplecommanddata(self):
       
   122         reactor = makereactor()
       
   123         results = list(sendcommandframes(reactor, b'mycommand', {},
       
   124                                          util.bytesio(b'data!')))
       
   125         self.assertEqual(len(results), 2)
       
   126         self.assertaction(results[0], 'wantframe')
       
   127         self.assertaction(results[1], 'runcommand')
       
   128         self.assertEqual(results[1][1], {
       
   129             'command': b'mycommand',
       
   130             'args': {},
       
   131             'data': b'data!',
       
   132         })
       
   133 
       
   134     def testmultipledataframes(self):
       
   135         frames = [
       
   136             ffs(b'command-name have-data mycommand'),
       
   137             ffs(b'command-data continuation data1'),
       
   138             ffs(b'command-data continuation data2'),
       
   139             ffs(b'command-data eos data3'),
       
   140         ]
       
   141 
       
   142         reactor = makereactor()
       
   143         results = list(sendframes(reactor, frames))
       
   144         self.assertEqual(len(results), 4)
       
   145         for i in range(3):
       
   146             self.assertaction(results[i], 'wantframe')
       
   147         self.assertaction(results[3], 'runcommand')
       
   148         self.assertEqual(results[3][1], {
       
   149             'command': b'mycommand',
       
   150             'args': {},
       
   151             'data': b'data1data2data3',
       
   152         })
       
   153 
       
   154     def testargumentanddata(self):
       
   155         frames = [
       
   156             ffs(b'command-name have-args|have-data command'),
       
   157             ffs(br'command-argument 0 \x03\x00\x03\x00keyval'),
       
   158             ffs(br'command-argument eoa \x03\x00\x03\x00foobar'),
       
   159             ffs(b'command-data continuation value1'),
       
   160             ffs(b'command-data eos value2'),
       
   161         ]
       
   162 
       
   163         reactor = makereactor()
       
   164         results = list(sendframes(reactor, frames))
       
   165 
       
   166         self.assertaction(results[-1], 'runcommand')
       
   167         self.assertEqual(results[-1][1], {
       
   168             'command': b'command',
       
   169             'args': {
       
   170                 b'key': b'val',
       
   171                 b'foo': b'bar',
       
   172             },
       
   173             'data': b'value1value2',
       
   174         })
       
   175 
       
   176     def testunexpectedcommandargument(self):
       
   177         """Command argument frame when not running a command is an error."""
       
   178         result = self._sendsingleframe(makereactor(),
       
   179                                        b'command-argument 0 ignored')
       
   180         self.assertaction(result, 'error')
       
   181         self.assertEqual(result[1], {
       
   182             'message': b'expected command frame; got 2',
       
   183         })
       
   184 
       
   185     def testunexpectedcommanddata(self):
       
   186         """Command argument frame when not running a command is an error."""
       
   187         result = self._sendsingleframe(makereactor(),
       
   188                                        b'command-data 0 ignored')
       
   189         self.assertaction(result, 'error')
       
   190         self.assertEqual(result[1], {
       
   191             'message': b'expected command frame; got 3',
       
   192         })
       
   193 
       
   194     def testmissingcommandframeflags(self):
       
   195         """Command name frame must have flags set."""
       
   196         result = self._sendsingleframe(makereactor(),
       
   197                                        b'command-name 0 command')
       
   198         self.assertaction(result, 'error')
       
   199         self.assertEqual(result[1], {
       
   200             'message': b'missing frame flags on command frame',
       
   201         })
       
   202 
       
   203     def testmissingargumentframe(self):
       
   204         frames = [
       
   205             ffs(b'command-name have-args command'),
       
   206             ffs(b'command-name 0 ignored'),
       
   207         ]
       
   208 
       
   209         results = list(sendframes(makereactor(), frames))
       
   210         self.assertEqual(len(results), 2)
       
   211         self.assertaction(results[0], 'wantframe')
       
   212         self.assertaction(results[1], 'error')
       
   213         self.assertEqual(results[1][1], {
       
   214             'message': b'expected command argument frame; got 1',
       
   215         })
       
   216 
       
   217     def testincompleteargumentname(self):
       
   218         """Argument frame with incomplete name."""
       
   219         frames = [
       
   220             ffs(b'command-name have-args command1'),
       
   221             ffs(br'command-argument eoa \x04\x00\xde\xadfoo'),
       
   222         ]
       
   223 
       
   224         results = list(sendframes(makereactor(), frames))
       
   225         self.assertEqual(len(results), 2)
       
   226         self.assertaction(results[0], 'wantframe')
       
   227         self.assertaction(results[1], 'error')
       
   228         self.assertEqual(results[1][1], {
       
   229             'message': b'malformed argument frame: partial argument name',
       
   230         })
       
   231 
       
   232     def testincompleteargumentvalue(self):
       
   233         """Argument frame with incomplete value."""
       
   234         frames = [
       
   235             ffs(b'command-name have-args command'),
       
   236             ffs(br'command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
       
   237         ]
       
   238 
       
   239         results = list(sendframes(makereactor(), frames))
       
   240         self.assertEqual(len(results), 2)
       
   241         self.assertaction(results[0], 'wantframe')
       
   242         self.assertaction(results[1], 'error')
       
   243         self.assertEqual(results[1][1], {
       
   244             'message': b'malformed argument frame: partial argument value',
       
   245         })
       
   246 
       
   247     def testmissingcommanddataframe(self):
       
   248         frames = [
       
   249             ffs(b'command-name have-data command1'),
       
   250             ffs(b'command-name eos command2'),
       
   251         ]
       
   252         results = list(sendframes(makereactor(), frames))
       
   253         self.assertEqual(len(results), 2)
       
   254         self.assertaction(results[0], 'wantframe')
       
   255         self.assertaction(results[1], 'error')
       
   256         self.assertEqual(results[1][1], {
       
   257             'message': b'expected command data frame; got 1',
       
   258         })
       
   259 
       
   260     def testmissingcommanddataframeflags(self):
       
   261         frames = [
       
   262             ffs(b'command-name have-data command1'),
       
   263             ffs(b'command-data 0 data'),
       
   264         ]
       
   265         results = list(sendframes(makereactor(), frames))
       
   266         self.assertEqual(len(results), 2)
       
   267         self.assertaction(results[0], 'wantframe')
       
   268         self.assertaction(results[1], 'error')
       
   269         self.assertEqual(results[1][1], {
       
   270             'message': b'command data frame without flags',
       
   271         })
       
   272 
       
   273 if __name__ == '__main__':
       
   274     import silenttestrunner
       
   275     silenttestrunner.main(__name__)