wireprotoframing: buffer emitted data to reduce frame count
authorGregory Szorc <gregory.szorc@gmail.com>
Wed, 29 Aug 2018 16:43:17 -0700
changeset 39560 84bf6ded9317
parent 39559 07b58266bce3
child 39561 d06834e0f48e
wireprotoframing: buffer emitted data to reduce frame count An upcoming commit introduces a wire protocol command that can emit hundreds of thousands of small objects. Without a buffering layer, we would emit a single, small frame for every object. Performance profiling revealed this to be a source of significant overhead for both client and server. This commit introduces a very crude buffering layer so that we emit fewer, bigger frames in such a scenario. This code will likely get rewritten in the future to be part of the streams API, as we'll need a similar strategy for compressing data. I don't want to think about it too much at the moment though. server before: user 32.500+0.000 sys 1.160+0.000 after: user 20.230+0.010 sys 0.180+0.000 client before: user 133.400+0.000 sys 93.120+0.000 after: user 68.370+0.000 sys 32.950+0.000 This appears to indicate we have significant overhead in the frame processing code on both client and server. It might be worth profiling that at some point... Differential Revision: https://phab.mercurial-scm.org/D4473
mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py	Wed Sep 05 09:06:40 2018 -0700
+++ b/mercurial/wireprotoframing.py	Wed Aug 29 16:43:17 2018 -0700
@@ -511,6 +511,98 @@
                            flags=0,
                            payload=payload)
 
+class bufferingcommandresponseemitter(object):
+    """Helper object to emit command response frames intelligently.
+
+    Raw command response data is likely emitted in chunks much smaller
+    than what can fit in a single frame. This class exists to buffer
+    chunks until enough data is available to fit in a single frame.
+
+    TODO we'll need something like this when compression is supported.
+    So it might make sense to implement this functionality at the stream
+    level.
+    """
+    def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
+        self._stream = stream
+        self._requestid = requestid
+        self._maxsize = maxframesize
+        self._chunks = []
+        self._chunkssize = 0
+
+    def send(self, data):
+        """Send new data for emission.
+
+        Is a generator of new frames that were derived from the new input.
+
+        If the special input ``None`` is received, flushes all buffered
+        data to frames.
+        """
+
+        if data is None:
+            for frame in self._flush():
+                yield frame
+            return
+
+        # There is a ton of potential to do more complicated things here.
+        # Our immediate goal is to coalesce small chunks into big frames,
+        # not achieve the fewest number of frames possible. So we go with
+        # a simple implementation:
+        #
+        # * If a chunk is too large for a frame, we flush and emit frames
+        #   for the new chunk.
+        # * If a chunk can be buffered without total buffered size limits
+        #   being exceeded, we do that.
+        # * If a chunk causes us to go over our buffering limit, we flush
+        #   and then buffer the new chunk.
+
+        if len(data) > self._maxsize:
+            for frame in self._flush():
+                yield frame
+
+            # Now emit frames for the big chunk.
+            offset = 0
+            while True:
+                chunk = data[offset:offset + self._maxsize]
+                offset += len(chunk)
+
+                yield self._stream.makeframe(
+                    self._requestid,
+                    typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                    flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+                    payload=chunk)
+
+                if offset == len(data):
+                    return
+
+        # If we don't have enough to constitute a full frame, buffer and
+        # return.
+        if len(data) + self._chunkssize < self._maxsize:
+            self._chunks.append(data)
+            self._chunkssize += len(data)
+            return
+
+        # Else flush what we have and buffer the new chunk. We could do
+        # something more intelligent here, like break the chunk. Let's
+        # keep things simple for now.
+        for frame in self._flush():
+            yield frame
+
+        self._chunks.append(data)
+        self._chunkssize = len(data)
+
+    def _flush(self):
+        payload = b''.join(self._chunks)
+        assert len(payload) <= self._maxsize
+
+        self._chunks[:] = []
+        self._chunkssize = 0
+
+        yield self._stream.makeframe(
+            self._requestid,
+            typeid=FRAME_TYPE_COMMAND_RESPONSE,
+            flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+            payload=payload)
+
 class stream(object):
     """Represents a logical unidirectional series of frames."""
 
@@ -716,10 +808,14 @@
 
         def sendframes():
             emitted = False
+            emitter = bufferingcommandresponseemitter(stream, requestid)
             while True:
                 try:
                     o = next(objs)
                 except StopIteration:
+                    for frame in emitter.send(None):
+                        yield frame
+
                     if emitted:
                         yield createcommandresponseeosframe(stream, requestid)
                     break
@@ -743,11 +839,10 @@
                         yield createcommandresponseokframe(stream, requestid)
                         emitted = True
 
-                    # TODO buffer chunks so emitted frame payloads can be
-                    # larger.
-                    for frame in createbytesresponseframesfromgen(
-                        stream, requestid, cborutil.streamencode(o)):
-                        yield frame
+                    for chunk in cborutil.streamencode(o):
+                        for frame in emitter.send(chunk):
+                            yield frame
+
                 except Exception as e:
                     for frame in createerrorframe(stream, requestid,
                                                   '%s' % e,