tests/test-wireproto-serverreactor.py
changeset 37285 3ed344546d9e
parent 37284 12bfc724217d
child 37288 9bfcbe4f4745
--- a/tests/test-wireproto-serverreactor.py	Mon Mar 26 13:51:22 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py	Mon Mar 26 13:57:22 2018 -0700
@@ -27,16 +27,19 @@
                                                 header.flags,
                                                 payload))
 
-def sendcommandframes(reactor, rid, cmd, args, datafh=None):
+def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
     """Generate frames to run a command and send them to a reactor."""
     return sendframes(reactor,
-                      framing.createcommandframes(rid, cmd, args, datafh))
+                      framing.createcommandframes(stream, rid, cmd, args,
+                                                  datafh))
 
 class FrameTests(unittest.TestCase):
     def testdataexactframesize(self):
         data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
 
-        frames = list(framing.createcommandframes(1, b'command', {}, data))
+        stream = framing.stream()
+        frames = list(framing.createcommandframes(stream, 1, b'command',
+                                                  {}, data))
         self.assertEqual(frames, [
             ffs(b'1 command-name have-data command'),
             ffs(b'1 command-data continuation %s' % data.getvalue()),
@@ -45,7 +48,10 @@
 
     def testdatamultipleframes(self):
         data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
-        frames = list(framing.createcommandframes(1, b'command', {}, data))
+
+        stream = framing.stream()
+        frames = list(framing.createcommandframes(stream, 1, b'command', {},
+                                                  data))
         self.assertEqual(frames, [
             ffs(b'1 command-name have-data command'),
             ffs(b'1 command-data continuation %s' % (
@@ -56,7 +62,8 @@
     def testargsanddata(self):
         data = util.bytesio(b'x' * 100)
 
-        frames = list(framing.createcommandframes(1, b'command', {
+        stream = framing.stream()
+        frames = list(framing.createcommandframes(stream, 1, b'command', {
             b'key1': b'key1value',
             b'key2': b'key2value',
             b'key3': b'key3value',
@@ -75,51 +82,54 @@
         with self.assertRaisesRegexp(ValueError,
                                      'cannot use more than 255 formatting'):
             args = [b'x' for i in range(256)]
-            list(framing.createtextoutputframe(1, [(b'bleh', args, [])]))
+            list(framing.createtextoutputframe(None, 1,
+                                               [(b'bleh', args, [])]))
 
     def testtextoutputexcessivelabels(self):
         """At most 255 labels are allowed."""
         with self.assertRaisesRegexp(ValueError,
                                      'cannot use more than 255 labels'):
             labels = [b'l' for i in range(256)]
-            list(framing.createtextoutputframe(1, [(b'bleh', [], labels)]))
+            list(framing.createtextoutputframe(None, 1,
+                                               [(b'bleh', [], labels)]))
 
     def testtextoutputformattingstringtype(self):
         """Formatting string must be bytes."""
         with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
-            list(framing.createtextoutputframe(1, [
+            list(framing.createtextoutputframe(None, 1, [
                 (b'foo'.decode('ascii'), [], [])]))
 
     def testtextoutputargumentbytes(self):
         with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
-            list(framing.createtextoutputframe(1, [
+            list(framing.createtextoutputframe(None, 1, [
                 (b'foo', [b'foo'.decode('ascii')], [])]))
 
     def testtextoutputlabelbytes(self):
         with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
-            list(framing.createtextoutputframe(1, [
+            list(framing.createtextoutputframe(None, 1, [
                 (b'foo', [], [b'foo'.decode('ascii')])]))
 
     def testtextoutputtoolongformatstring(self):
         with self.assertRaisesRegexp(ValueError,
                                      'formatting string cannot be longer than'):
-            list(framing.createtextoutputframe(1, [
+            list(framing.createtextoutputframe(None, 1, [
                 (b'x' * 65536, [], [])]))
 
     def testtextoutputtoolongargumentstring(self):
         with self.assertRaisesRegexp(ValueError,
                                      'argument string cannot be longer than'):
-            list(framing.createtextoutputframe(1, [
+            list(framing.createtextoutputframe(None, 1, [
                 (b'bleh', [b'x' * 65536], [])]))
 
     def testtextoutputtoolonglabelstring(self):
         with self.assertRaisesRegexp(ValueError,
                                      'label string cannot be longer than'):
-            list(framing.createtextoutputframe(1, [
+            list(framing.createtextoutputframe(None, 1, [
                 (b'bleh', [], [b'x' * 65536])]))
 
     def testtextoutput1simpleatom(self):
-        val = list(framing.createtextoutputframe(1, [
+        stream = framing.stream()
+        val = list(framing.createtextoutputframe(stream, 1, [
             (b'foo', [], [])]))
 
         self.assertEqual(val, [
@@ -127,7 +137,8 @@
         ])
 
     def testtextoutput2simpleatoms(self):
-        val = list(framing.createtextoutputframe(1, [
+        stream = framing.stream()
+        val = list(framing.createtextoutputframe(stream, 1, [
             (b'foo', [], []),
             (b'bar', [], []),
         ]))
@@ -137,7 +148,8 @@
         ])
 
     def testtextoutput1arg(self):
-        val = list(framing.createtextoutputframe(1, [
+        stream = framing.stream()
+        val = list(framing.createtextoutputframe(stream, 1, [
             (b'foo %s', [b'val1'], []),
         ]))
 
@@ -146,7 +158,8 @@
         ])
 
     def testtextoutput2arg(self):
-        val = list(framing.createtextoutputframe(1, [
+        stream = framing.stream()
+        val = list(framing.createtextoutputframe(stream, 1, [
             (b'foo %s %s', [b'val', b'value'], []),
         ]))
 
@@ -156,7 +169,8 @@
         ])
 
     def testtextoutput1label(self):
-        val = list(framing.createtextoutputframe(1, [
+        stream = framing.stream()
+        val = list(framing.createtextoutputframe(stream, 1, [
             (b'foo', [], [b'label']),
         ]))
 
@@ -165,7 +179,8 @@
         ])
 
     def testargandlabel(self):
-        val = list(framing.createtextoutputframe(1, [
+        stream = framing.stream()
+        val = list(framing.createtextoutputframe(stream, 1, [
             (b'foo %s', [b'arg'], [b'label']),
         ]))
 
@@ -193,7 +208,8 @@
     def test1framecommand(self):
         """Receiving a command in a single frame yields request to run it."""
         reactor = makereactor()
-        results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
+        stream = framing.stream()
+        results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
         self.assertEqual(len(results), 1)
         self.assertaction(results[0], 'runcommand')
         self.assertEqual(results[0][1], {
@@ -208,7 +224,8 @@
 
     def test1argument(self):
         reactor = makereactor()
-        results = list(sendcommandframes(reactor, 41, b'mycommand',
+        stream = framing.stream()
+        results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
                                          {b'foo': b'bar'}))
         self.assertEqual(len(results), 2)
         self.assertaction(results[0], 'wantframe')
@@ -222,7 +239,8 @@
 
     def testmultiarguments(self):
         reactor = makereactor()
-        results = list(sendcommandframes(reactor, 1, b'mycommand',
+        stream = framing.stream()
+        results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
                                          {b'foo': b'bar', b'biz': b'baz'}))
         self.assertEqual(len(results), 3)
         self.assertaction(results[0], 'wantframe')
@@ -237,7 +255,8 @@
 
     def testsimplecommanddata(self):
         reactor = makereactor()
-        results = list(sendcommandframes(reactor, 1, b'mycommand', {},
+        stream = framing.stream()
+        results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
                                          util.bytesio(b'data!')))
         self.assertEqual(len(results), 2)
         self.assertaction(results[0], 'wantframe')
@@ -350,19 +369,20 @@
         """Multiple fully serviced commands with same request ID is allowed."""
         reactor = makereactor()
         results = []
+        outstream = framing.stream()
         results.append(self._sendsingleframe(
             reactor, ffs(b'1 command-name eos command')))
-        result = reactor.onbytesresponseready(1, b'response1')
+        result = reactor.onbytesresponseready(outstream, 1, b'response1')
         self.assertaction(result, 'sendframes')
         list(result[1]['framegen'])
         results.append(self._sendsingleframe(
             reactor, ffs(b'1 command-name eos command')))
-        result = reactor.onbytesresponseready(1, b'response2')
+        result = reactor.onbytesresponseready(outstream, 1, b'response2')
         self.assertaction(result, 'sendframes')
         list(result[1]['framegen'])
         results.append(self._sendsingleframe(
             reactor, ffs(b'1 command-name eos command')))
-        result = reactor.onbytesresponseready(1, b'response3')
+        result = reactor.onbytesresponseready(outstream, 1, b'response3')
         self.assertaction(result, 'sendframes')
         list(result[1]['framegen'])
 
@@ -501,9 +521,11 @@
     def testsimpleresponse(self):
         """Bytes response to command sends result frames."""
         reactor = makereactor()
-        list(sendcommandframes(reactor, 1, b'mycommand', {}))
+        instream = framing.stream()
+        list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
-        result = reactor.onbytesresponseready(1, b'response')
+        outstream = framing.stream()
+        result = reactor.onbytesresponseready(outstream, 1, b'response')
         self.assertaction(result, 'sendframes')
         self.assertframesequal(result[1]['framegen'], [
             b'1 bytes-response eos response',
@@ -515,9 +537,11 @@
         second = b'y' * 100
 
         reactor = makereactor()
-        list(sendcommandframes(reactor, 1, b'mycommand', {}))
+        instream = framing.stream()
+        list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
-        result = reactor.onbytesresponseready(1, first + second)
+        outstream = framing.stream()
+        result = reactor.onbytesresponseready(outstream, 1, first + second)
         self.assertaction(result, 'sendframes')
         self.assertframesequal(result[1]['framegen'], [
             b'1 bytes-response continuation %s' % first,
@@ -526,9 +550,11 @@
 
     def testapplicationerror(self):
         reactor = makereactor()
-        list(sendcommandframes(reactor, 1, b'mycommand', {}))
+        instream = framing.stream()
+        list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
-        result = reactor.onapplicationerror(1, b'some message')
+        outstream = framing.stream()
+        result = reactor.onapplicationerror(outstream, 1, b'some message')
         self.assertaction(result, 'sendframes')
         self.assertframesequal(result[1]['framegen'], [
             b'1 error-response application some message',
@@ -537,11 +563,14 @@
     def test1commanddeferresponse(self):
         """Responses when in deferred output mode are delayed until EOF."""
         reactor = makereactor(deferoutput=True)
-        results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
+        instream = framing.stream()
+        results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
+                                         {}))
         self.assertEqual(len(results), 1)
         self.assertaction(results[0], 'runcommand')
 
-        result = reactor.onbytesresponseready(1, b'response')
+        outstream = framing.stream()
+        result = reactor.onbytesresponseready(outstream, 1, b'response')
         self.assertaction(result, 'noop')
         result = reactor.oninputeof()
         self.assertaction(result, 'sendframes')
@@ -551,12 +580,14 @@
 
     def testmultiplecommanddeferresponse(self):
         reactor = makereactor(deferoutput=True)
-        list(sendcommandframes(reactor, 1, b'command1', {}))
-        list(sendcommandframes(reactor, 3, b'command2', {}))
+        instream = framing.stream()
+        list(sendcommandframes(reactor, instream, 1, b'command1', {}))
+        list(sendcommandframes(reactor, instream, 3, b'command2', {}))
 
-        result = reactor.onbytesresponseready(1, b'response1')
+        outstream = framing.stream()
+        result = reactor.onbytesresponseready(outstream, 1, b'response1')
         self.assertaction(result, 'noop')
-        result = reactor.onbytesresponseready(3, b'response2')
+        result = reactor.onbytesresponseready(outstream, 3, b'response2')
         self.assertaction(result, 'noop')
         result = reactor.oninputeof()
         self.assertaction(result, 'sendframes')
@@ -567,14 +598,16 @@
 
     def testrequestidtracking(self):
         reactor = makereactor(deferoutput=True)
-        list(sendcommandframes(reactor, 1, b'command1', {}))
-        list(sendcommandframes(reactor, 3, b'command2', {}))
-        list(sendcommandframes(reactor, 5, b'command3', {}))
+        instream = framing.stream()
+        list(sendcommandframes(reactor, instream, 1, b'command1', {}))
+        list(sendcommandframes(reactor, instream, 3, b'command2', {}))
+        list(sendcommandframes(reactor, instream, 5, b'command3', {}))
 
         # Register results for commands out of order.
-        reactor.onbytesresponseready(3, b'response3')
-        reactor.onbytesresponseready(1, b'response1')
-        reactor.onbytesresponseready(5, b'response5')
+        outstream = framing.stream()
+        reactor.onbytesresponseready(outstream, 3, b'response3')
+        reactor.onbytesresponseready(outstream, 1, b'response1')
+        reactor.onbytesresponseready(outstream, 5, b'response5')
 
         result = reactor.oninputeof()
         self.assertaction(result, 'sendframes')
@@ -587,8 +620,9 @@
     def testduplicaterequestonactivecommand(self):
         """Receiving a request ID that matches a request that isn't finished."""
         reactor = makereactor()
-        list(sendcommandframes(reactor, 1, b'command1', {}))
-        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+        stream = framing.stream()
+        list(sendcommandframes(reactor, stream, 1, b'command1', {}))
+        results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
 
         self.assertaction(results[0], 'error')
         self.assertEqual(results[0][1], {
@@ -598,13 +632,15 @@
     def testduplicaterequestonactivecommandnosend(self):
         """Same as above but we've registered a response but haven't sent it."""
         reactor = makereactor()
-        list(sendcommandframes(reactor, 1, b'command1', {}))
-        reactor.onbytesresponseready(1, b'response')
+        instream = framing.stream()
+        list(sendcommandframes(reactor, instream, 1, b'command1', {}))
+        outstream = framing.stream()
+        reactor.onbytesresponseready(outstream, 1, b'response')
 
         # We've registered the response but haven't sent it. From the
         # perspective of the reactor, the command is still active.
 
-        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+        results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
         self.assertaction(results[0], 'error')
         self.assertEqual(results[0][1], {
             'message': b'request with ID 1 is already active',
@@ -613,7 +649,8 @@
     def testduplicaterequestargumentframe(self):
         """Variant on above except we sent an argument frame instead of name."""
         reactor = makereactor()
-        list(sendcommandframes(reactor, 1, b'command', {}))
+        stream = framing.stream()
+        list(sendcommandframes(reactor, stream, 1, b'command', {}))
         results = list(sendframes(reactor, [
             ffs(b'3 command-name have-args command'),
             ffs(b'1 command-argument 0 ignored'),
@@ -627,11 +664,13 @@
     def testduplicaterequestaftersend(self):
         """We can use a duplicate request ID after we've sent the response."""
         reactor = makereactor()
-        list(sendcommandframes(reactor, 1, b'command1', {}))
-        res = reactor.onbytesresponseready(1, b'response')
+        instream = framing.stream()
+        list(sendcommandframes(reactor, instream, 1, b'command1', {}))
+        outstream = framing.stream()
+        res = reactor.onbytesresponseready(outstream, 1, b'response')
         list(res[1]['framegen'])
 
-        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+        results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
         self.assertaction(results[0], 'runcommand')
 
 if __name__ == '__main__':