--- a/mercurial/wireprotoframing.py Mon Apr 09 14:17:57 2018 -0700
+++ b/mercurial/wireprotoframing.py Mon Apr 09 15:32:01 2018 -0700
@@ -11,6 +11,7 @@
from __future__ import absolute_import
+import collections
import struct
from .i18n import _
@@ -876,3 +877,133 @@
def _onframeerrored(self, frame):
return self._makeerrorresult(_('server already errored'))
+
+class commandrequest(object):
+ """Represents a request to run a command."""
+
+ def __init__(self, requestid, name, args, datafh=None):
+ self.requestid = requestid
+ self.name = name
+ self.args = args
+ self.datafh = datafh
+ self.state = 'pending'
+
+class clientreactor(object):
+ """Holds state of a client issuing frame-based protocol requests.
+
+ This is like ``serverreactor`` but for client-side state.
+
+ Each instance is bound to the lifetime of a connection. For persistent
+ connection transports using e.g. TCP sockets and speaking the raw
+ framing protocol, there will be a single instance for the lifetime of
+ the TCP socket. For transports where there are multiple discrete
+ interactions (say tunneled within in HTTP request), there will be a
+ separate instance for each distinct interaction.
+ """
+ def __init__(self, hasmultiplesend=False, buffersends=True):
+ """Create a new instance.
+
+ ``hasmultiplesend`` indicates whether multiple sends are supported
+ by the transport. When True, it is possible to send commands immediately
+ instead of buffering until the caller signals an intent to finish a
+ send operation.
+
+ ``buffercommands`` indicates whether sends should be buffered until the
+ last request has been issued.
+ """
+ self._hasmultiplesend = hasmultiplesend
+ self._buffersends = buffersends
+
+ self._canissuecommands = True
+ self._cansend = True
+
+ self._nextrequestid = 1
+ # We only support a single outgoing stream for now.
+ self._outgoingstream = stream(1)
+ self._pendingrequests = collections.deque()
+ self._activerequests = {}
+
+ def callcommand(self, name, args, datafh=None):
+ """Request that a command be executed.
+
+ Receives the command name, a dict of arguments to pass to the command,
+ and an optional file object containing the raw data for the command.
+
+ Returns a 3-tuple of (request, action, action data).
+ """
+ if not self._canissuecommands:
+ raise error.ProgrammingError('cannot issue new commands')
+
+ requestid = self._nextrequestid
+ self._nextrequestid += 2
+
+ request = commandrequest(requestid, name, args, datafh=datafh)
+
+ if self._buffersends:
+ self._pendingrequests.append(request)
+ return request, 'noop', {}
+ else:
+ if not self._cansend:
+ raise error.ProgrammingError('sends cannot be performed on '
+ 'this instance')
+
+ if not self._hasmultiplesend:
+ self._cansend = False
+ self._canissuecommands = False
+
+ return request, 'sendframes', {
+ 'framegen': self._makecommandframes(request),
+ }
+
+ def flushcommands(self):
+ """Request that all queued commands be sent.
+
+ If any commands are buffered, this will instruct the caller to send
+ them over the wire. If no commands are buffered it instructs the client
+ to no-op.
+
+ If instances aren't configured for multiple sends, no new command
+ requests are allowed after this is called.
+ """
+ if not self._pendingrequests:
+ return 'noop', {}
+
+ if not self._cansend:
+ raise error.ProgrammingError('sends cannot be performed on this '
+ 'instance')
+
+ # If the instance only allows sending once, mark that we have fired
+ # our one shot.
+ if not self._hasmultiplesend:
+ self._canissuecommands = False
+ self._cansend = False
+
+ def makeframes():
+ while self._pendingrequests:
+ request = self._pendingrequests.popleft()
+ for frame in self._makecommandframes(request):
+ yield frame
+
+ return 'sendframes', {
+ 'framegen': makeframes(),
+ }
+
+ def _makecommandframes(self, request):
+ """Emit frames to issue a command request.
+
+ As a side-effect, update request accounting to reflect its changed
+ state.
+ """
+ self._activerequests[request.requestid] = request
+ request.state = 'sending'
+
+ res = createcommandframes(self._outgoingstream,
+ request.requestid,
+ request.name,
+ request.args,
+ request.datafh)
+
+ for frame in res:
+ yield frame
+
+ request.state = 'sent'