wireprotov2peer: stream decoded responses
authorGregory Szorc <gregory.szorc@gmail.com>
Wed, 29 Aug 2018 15:17:11 -0700
changeset 39561 d06834e0f48e
parent 39560 84bf6ded9317
child 39562 067f7d2c7d60
wireprotov2peer: stream decoded responses Previously, wire protocol version 2 would buffer all response data. Only once all data was received did we CBOR decode it and resolve the future associated with the command. This was obviously not desirable. In future commits that introduce large response payloads, this caused significant memory bloat and slowed down client operations due to waiting on the server. This commit refactors the response handling code so that response data can be streamed. Command response objects now contain a buffered CBOR decoder. As new data arrives, it is fed into the decoder. Decoded objects are made available to the generator as they are decoded. Because there is a separate thread processing incoming frames and feeding data into the response object, there is the potential for race conditions when mutating response objects. So a lock has been added to guard access to critical state variables. Because the generator emitting decoded objects needs to wait on those objects to become available, we've added an Event for the generator to wait on so it doesn't busy loop. This does mean there is the potential for deadlocks. And I'm pretty sure they can occur in some scenarios. We already have a handful of TODOs around this. But I've added some more. Fixing this will likely require moving the background thread receiving frames into clienthandler. We likely would have done this anyway when implementing the client bits for the SSH transport. Test output changes because the initial CBOR map holding the overall response state is now always handled internally by the response object. Differential Revision: https://phab.mercurial-scm.org/D4474
mercurial/debugcommands.py
mercurial/wireprotov2peer.py
tests/test-http-api-httpv2.t
tests/test-wireproto-command-capabilities.t
--- a/mercurial/debugcommands.py	Wed Aug 29 16:43:17 2018 -0700
+++ b/mercurial/debugcommands.py	Wed Aug 29 15:17:11 2018 -0700
@@ -3240,7 +3240,7 @@
                     res = e.callcommand(command, args).result()
 
                 if isinstance(res, wireprotov2peer.commandresponse):
-                    val = list(res.cborobjects())
+                    val = res.objects()
                     ui.status(_('response: %s\n') %
                               stringutil.pprint(val, bprefix=True, indent=2))
                 else:
--- a/mercurial/wireprotov2peer.py	Wed Aug 29 16:43:17 2018 -0700
+++ b/mercurial/wireprotov2peer.py	Wed Aug 29 15:17:11 2018 -0700
@@ -7,11 +7,12 @@
 
 from __future__ import absolute_import
 
+import threading
+
 from .i18n import _
 from . import (
     encoding,
     error,
-    util,
     wireprotoframing,
 )
 from .utils import (
@@ -34,20 +35,101 @@
     return b''.join(chunks)
 
 class commandresponse(object):
-    """Represents the response to a command request."""
+    """Represents the response to a command request.
+
+    Instances track the state of the command and hold its results.
+
+    An external entity is required to update the state of the object when
+    events occur.
+    """
 
     def __init__(self, requestid, command):
         self.requestid = requestid
         self.command = command
 
-        self.b = util.bytesio()
+        # Whether all remote input related to this command has been
+        # received.
+        self._inputcomplete = False
+
+        # We have a lock that is acquired when important object state is
+        # mutated. This is to prevent race conditions between 1 thread
+        # sending us new data and another consuming it.
+        self._lock = threading.RLock()
+
+        # An event is set when state of the object changes. This event
+        # is waited on by the generator emitting objects.
+        self._serviceable = threading.Event()
+
+        self._pendingevents = []
+        self._decoder = cborutil.bufferingdecoder()
+        self._seeninitial = False
+
+    def _oninputcomplete(self):
+        with self._lock:
+            self._inputcomplete = True
+            self._serviceable.set()
+
+    def _onresponsedata(self, data):
+        available, readcount, wanted = self._decoder.decode(data)
+
+        if not available:
+            return
+
+        with self._lock:
+            for o in self._decoder.getavailable():
+                if not self._seeninitial:
+                    self._handleinitial(o)
+                    continue
+
+                self._pendingevents.append(o)
+
+            self._serviceable.set()
 
-    def cborobjects(self):
-        """Obtain decoded CBOR objects from this response."""
-        self.b.seek(0)
+    def _handleinitial(self, o):
+        self._seeninitial = True
+        if o[b'status'] == 'ok':
+            return
+
+        atoms = [{'msg': o[b'error'][b'message']}]
+        if b'args' in o[b'error']:
+            atoms[0]['args'] = o[b'error'][b'args']
+
+        raise error.RepoError(formatrichmessage(atoms))
+
+    def objects(self):
+        """Obtained decoded objects from this response.
+
+        This is a generator of data structures that were decoded from the
+        command response.
+
+        Obtaining the next member of the generator may block due to waiting
+        on external data to become available.
 
-        for v in cborutil.decodeall(self.b.getvalue()):
-            yield v
+        If the server encountered an error in the middle of serving the data
+        or if another error occurred, an exception may be raised when
+        advancing the generator.
+        """
+        while True:
+            # TODO this can infinite loop if self._inputcomplete is never
+            # set. We likely want to tie the lifetime of this object/state
+            # to that of the background thread receiving frames and updating
+            # our state.
+            self._serviceable.wait(1.0)
+
+            with self._lock:
+                self._serviceable.clear()
+
+                # Make copies because objects could be mutated during
+                # iteration.
+                stop = self._inputcomplete
+                pending = list(self._pendingevents)
+                self._pendingevents[:] = []
+
+            for o in pending:
+                yield o
+
+            if stop:
+                break
 
 class clienthandler(object):
     """Object to handle higher-level client activities.
@@ -80,6 +162,8 @@
         rid = request.requestid
         self._requests[rid] = request
         self._futures[rid] = f
+        # TODO we need some kind of lifetime on response instances otherwise
+        # objects() may deadlock.
         self._responses[rid] = commandresponse(rid, command)
 
         return iter(())
@@ -119,8 +203,12 @@
         if action == 'error':
             e = error.RepoError(meta['message'])
 
+            if frame.requestid in self._responses:
+                self._responses[frame.requestid]._oninputcomplete()
+
             if frame.requestid in self._futures:
                 self._futures[frame.requestid].set_exception(e)
+                del self._futures[frame.requestid]
             else:
                 raise e
 
@@ -141,39 +229,32 @@
                 self._processresponsedata(frame, meta, response)
             except BaseException as e:
                 self._futures[frame.requestid].set_exception(e)
+                del self._futures[frame.requestid]
+                response._oninputcomplete()
         else:
             raise error.ProgrammingError(
                 'unhandled action from clientreactor: %s' % action)
 
     def _processresponsedata(self, frame, meta, response):
-        # This buffers all data until end of stream is received. This
-        # is bad for performance.
-        # TODO make response data streamable
-        response.b.write(meta['data'])
+        # This can raise. The caller can handle it.
+        response._onresponsedata(meta['data'])
 
         if meta['eos']:
-            # If the command has a decoder, resolve the future to the
-            # decoded value. Otherwise resolve to the rich response object.
-            decoder = COMMAND_DECODERS.get(response.command)
-
-            # TODO consider always resolving the overall status map.
-            if decoder:
-                objs = response.cborobjects()
-
-                overall = next(objs)
+            response._oninputcomplete()
+            del self._requests[frame.requestid]
 
-                if overall['status'] == 'ok':
-                    self._futures[frame.requestid].set_result(decoder(objs))
-                else:
-                    atoms = [{'msg': overall['error']['message']}]
-                    if 'args' in overall['error']:
-                        atoms[0]['args'] = overall['error']['args']
-                    e = error.RepoError(formatrichmessage(atoms))
-                    self._futures[frame.requestid].set_exception(e)
-            else:
-                self._futures[frame.requestid].set_result(response)
+        # If the command has a decoder, we wait until all input has been
+        # received before resolving the future. Otherwise we resolve the
+        # future immediately.
+        if frame.requestid not in self._futures:
+            return
 
-            del self._requests[frame.requestid]
+        if response.command not in COMMAND_DECODERS:
+            self._futures[frame.requestid].set_result(response.objects())
+            del self._futures[frame.requestid]
+        elif response._inputcomplete:
+            decoded = COMMAND_DECODERS[response.command](response.objects())
+            self._futures[frame.requestid].set_result(decoded)
             del self._futures[frame.requestid]
 
 def decodebranchmap(objs):
--- a/tests/test-http-api-httpv2.t	Wed Aug 29 16:43:17 2018 -0700
+++ b/tests/test-http-api-httpv2.t	Wed Aug 29 15:17:11 2018 -0700
@@ -225,10 +225,7 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  response: [
-    {
-      b'status': b'ok'
-    },
+  response: gen[
     b'customreadonly bytes response'
   ]
 
--- a/tests/test-wireproto-command-capabilities.t	Wed Aug 29 16:43:17 2018 -0700
+++ b/tests/test-wireproto-command-capabilities.t	Wed Aug 29 15:17:11 2018 -0700
@@ -349,10 +349,7 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  response: [
-    {
-      b'status': b'ok'
-    },
+  response: gen[
     {
       b'commands': {
         b'branchmap': {