mercurial/wireprotov2peer.py
changeset 37719 a656cba08a04
child 37720 d715a85003c8
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/wireprotov2peer.py	Sat Apr 14 11:50:19 2018 -0700
@@ -0,0 +1,135 @@
+# wireprotov2peer.py - client side code for wire protocol version 2
+#
+# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+from .i18n import _
+from .thirdparty import (
+    cbor,
+)
+from . import (
+    error,
+    util,
+    wireprotoframing,
+)
+
+class clienthandler(object):
+    """Object to handle higher-level client activities.
+
+    The ``clientreactor`` is used to hold low-level state about the frame-based
+    protocol, such as which requests and streams are active. This type is used
+    for higher-level operations, such as reading frames from a socket, exposing
+    and managing a higher-level primitive for representing command responses,
+    etc. This class is what peers should probably use to bridge wire activity
+    with the higher-level peer API.
+    """
+
+    def __init__(self, ui, clientreactor):
+        self._ui = ui
+        self._reactor = clientreactor
+        self._requests = {}
+        self._futures = {}
+        self._responses = {}
+
+    def callcommand(self, command, args, f):
+        """Register a request to call a command.
+
+        Returns an iterable of frames that should be sent over the wire.
+        """
+        request, action, meta = self._reactor.callcommand(command, args)
+
+        if action != 'noop':
+            raise error.ProgrammingError('%s not yet supported' % action)
+
+        rid = request.requestid
+        self._requests[rid] = request
+        self._futures[rid] = f
+        self._responses[rid] = {
+            'cbor': False,
+            'b': util.bytesio(),
+        }
+
+        return iter(())
+
+    def flushcommands(self):
+        """Flush all queued commands.
+
+        Returns an iterable of frames that should be sent over the wire.
+        """
+        action, meta = self._reactor.flushcommands()
+
+        if action != 'sendframes':
+            raise error.ProgrammingError('%s not yet supported' % action)
+
+        return meta['framegen']
+
+    def readframe(self, fh):
+        """Attempt to read and process a frame.
+
+        Returns None if no frame was read. Presumably this means EOF.
+        """
+        frame = wireprotoframing.readframe(fh)
+        if frame is None:
+            # TODO tell reactor?
+            return
+
+        self._ui.note(_('received %r\n') % frame)
+        self._processframe(frame)
+
+        return True
+
+    def _processframe(self, frame):
+        """Process a single read frame."""
+
+        action, meta = self._reactor.onframerecv(frame)
+
+        if action == 'error':
+            e = error.RepoError(meta['message'])
+
+            if frame.requestid in self._futures:
+                self._futures[frame.requestid].set_exception(e)
+            else:
+                raise e
+
+        if frame.requestid not in self._requests:
+            raise error.ProgrammingError(
+                'received frame for unknown request; this is either a bug in '
+                'the clientreactor not screening for this or this instance was '
+                'never told about this request: %r' % frame)
+
+        response = self._responses[frame.requestid]
+
+        if action == 'responsedata':
+            response['b'].write(meta['data'])
+
+            if meta['cbor']:
+                response['cbor'] = True
+
+            if meta['eos']:
+                if meta['cbor']:
+                    # If CBOR, decode every object.
+                    b = response['b']
+
+                    size = b.tell()
+                    b.seek(0)
+
+                    decoder = cbor.CBORDecoder(b)
+
+                    result = []
+                    while b.tell() < size:
+                        result.append(decoder.decode())
+                else:
+                    result = [response['b'].getvalue()]
+
+                self._futures[frame.requestid].set_result(result)
+
+                del self._requests[frame.requestid]
+                del self._futures[frame.requestid]
+
+        else:
+            raise error.ProgrammingError(
+                'unhandled action from clientreactor: %s' % action)