mercurial/wireprotoframing.py
changeset 37285 3ed344546d9e
parent 37084 f0b6fbea00cf
child 37288 9bfcbe4f4745
--- a/mercurial/wireprotoframing.py	Mon Mar 26 13:51:22 2018 -0700
+++ b/mercurial/wireprotoframing.py	Mon Mar 26 13:57:22 2018 -0700
@@ -218,7 +218,7 @@
 
     return frame(h.requestid, h.typeid, h.flags, payload)
 
-def createcommandframes(requestid, cmd, args, datafh=None):
+def createcommandframes(stream, requestid, cmd, args, datafh=None):
     """Create frames necessary to transmit a request to run a command.
 
     This is a generator of bytearrays. Each item represents a frame
@@ -233,8 +233,8 @@
     if not flags:
         flags |= FLAG_COMMAND_NAME_EOS
 
-    yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
-                    flags=flags, payload=cmd)
+    yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
+                           flags=flags, payload=cmd)
 
     for i, k in enumerate(sorted(args)):
         v = args[k]
@@ -250,10 +250,10 @@
         payload[offset:offset + len(v)] = v
 
         flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
-        yield makeframe(requestid=requestid,
-                        typeid=FRAME_TYPE_COMMAND_ARGUMENT,
-                        flags=flags,
-                        payload=payload)
+        yield stream.makeframe(requestid=requestid,
+                               typeid=FRAME_TYPE_COMMAND_ARGUMENT,
+                               flags=flags,
+                               payload=payload)
 
     if datafh:
         while True:
@@ -267,15 +267,15 @@
                 assert datafh.read(1) == b''
                 done = True
 
-            yield makeframe(requestid=requestid,
-                            typeid=FRAME_TYPE_COMMAND_DATA,
-                            flags=flags,
-                            payload=data)
+            yield stream.makeframe(requestid=requestid,
+                                   typeid=FRAME_TYPE_COMMAND_DATA,
+                                   flags=flags,
+                                   payload=data)
 
             if done:
                 break
 
-def createbytesresponseframesfrombytes(requestid, data,
+def createbytesresponseframesfrombytes(stream, requestid, data,
                                        maxframesize=DEFAULT_MAX_FRAME_SIZE):
     """Create a raw frame to send a bytes response from static bytes input.
 
@@ -284,10 +284,10 @@
 
     # Simple case of a single frame.
     if len(data) <= maxframesize:
-        yield makeframe(requestid=requestid,
-                        typeid=FRAME_TYPE_BYTES_RESPONSE,
-                        flags=FLAG_BYTES_RESPONSE_EOS,
-                        payload=data)
+        yield stream.makeframe(requestid=requestid,
+                               typeid=FRAME_TYPE_BYTES_RESPONSE,
+                               flags=FLAG_BYTES_RESPONSE_EOS,
+                               payload=data)
         return
 
     offset = 0
@@ -301,15 +301,15 @@
         else:
             flags = FLAG_BYTES_RESPONSE_CONTINUATION
 
-        yield makeframe(requestid=requestid,
-                        typeid=FRAME_TYPE_BYTES_RESPONSE,
-                        flags=flags,
-                        payload=chunk)
+        yield stream.makeframe(requestid=requestid,
+                               typeid=FRAME_TYPE_BYTES_RESPONSE,
+                               flags=flags,
+                               payload=chunk)
 
         if done:
             break
 
-def createerrorframe(requestid, msg, protocol=False, application=False):
+def createerrorframe(stream, requestid, msg, protocol=False, application=False):
     # TODO properly handle frame size limits.
     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
 
@@ -319,12 +319,12 @@
     if application:
         flags |= FLAG_ERROR_RESPONSE_APPLICATION
 
-    yield makeframe(requestid=requestid,
-                    typeid=FRAME_TYPE_ERROR_RESPONSE,
-                    flags=flags,
-                    payload=msg)
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_ERROR_RESPONSE,
+                           flags=flags,
+                           payload=msg)
 
-def createtextoutputframe(requestid, atoms):
+def createtextoutputframe(stream, requestid, atoms):
     """Create a text output frame to render text to people.
 
     ``atoms`` is a 3-tuple of (formatting string, args, labels).
@@ -390,10 +390,20 @@
     if bytesleft < 0:
         raise ValueError('cannot encode data in a single frame')
 
-    yield makeframe(requestid=requestid,
-                    typeid=FRAME_TYPE_TEXT_OUTPUT,
-                    flags=0,
-                    payload=b''.join(atomchunks))
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_TEXT_OUTPUT,
+                           flags=0,
+                           payload=b''.join(atomchunks))
+
+class stream(object):
+    """Represents a logical unidirectional series of frames."""
+
+    def makeframe(self, requestid, typeid, flags, payload):
+        """Create a frame to be sent out over this stream.
+
+        Only returns the frame instance. Does not actually send it.
+        """
+        return makeframe(requestid, typeid, flags, payload)
 
 class serverreactor(object):
     """Holds state of a server handling frame-based protocol requests.
@@ -498,13 +508,14 @@
 
         return meth(frame)
 
-    def onbytesresponseready(self, requestid, data):
+    def onbytesresponseready(self, stream, requestid, data):
         """Signal that a bytes response is ready to be sent to the client.
 
         The raw bytes response is passed as an argument.
         """
         def sendframes():
-            for frame in createbytesresponseframesfrombytes(requestid, data):
+            for frame in createbytesresponseframesfrombytes(stream, requestid,
+                                                            data):
                 yield frame
 
             self._activecommands.remove(requestid)
@@ -540,9 +551,10 @@
             'framegen': makegen(),
         }
 
-    def onapplicationerror(self, requestid, msg):
+    def onapplicationerror(self, stream, requestid, msg):
         return 'sendframes', {
-            'framegen': createerrorframe(requestid, msg, application=True),
+            'framegen': createerrorframe(stream, requestid, msg,
+                                         application=True),
         }
 
     def _makeerrorresult(self, msg):