--- a/mercurial/wireprotoframing.py Mon Apr 09 15:32:01 2018 -0700
+++ b/mercurial/wireprotoframing.py Mon Apr 09 16:54:20 2018 -0700
@@ -922,6 +922,7 @@
self._outgoingstream = stream(1)
self._pendingrequests = collections.deque()
self._activerequests = {}
+ self._incomingstreams = {}
def callcommand(self, name, args, datafh=None):
"""Request that a command be executed.
@@ -1007,3 +1008,63 @@
yield frame
request.state = 'sent'
+
+ def onframerecv(self, frame):
+ """Process a frame that has been received off the wire.
+
+ Returns a 2-tuple of (action, meta) describing further action the
+ caller needs to take as a result of receiving this frame.
+ """
+ if frame.streamid % 2:
+ return 'error', {
+ 'message': (
+ _('received frame with odd numbered stream ID: %d') %
+ frame.streamid),
+ }
+
+ if frame.streamid not in self._incomingstreams:
+ if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
+ return 'error', {
+ 'message': _('received frame on unknown stream '
+ 'without beginning of stream flag set'),
+ }
+
+ if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
+ raise error.ProgrammingError('support for decoding stream '
+ 'payloads not yet implemneted')
+
+ if frame.streamflags & STREAM_FLAG_END_STREAM:
+ del self._incomingstreams[frame.streamid]
+
+ if frame.requestid not in self._activerequests:
+ return 'error', {
+ 'message': (_('received frame for inactive request ID: %d') %
+ frame.requestid),
+ }
+
+ request = self._activerequests[frame.requestid]
+ request.state = 'receiving'
+
+ handlers = {
+ FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
+ }
+
+ meth = handlers.get(frame.typeid)
+ if not meth:
+ raise error.ProgrammingError('unhandled frame type: %d' %
+ frame.typeid)
+
+ return meth(request, frame)
+
+ def _onbytesresponseframe(self, request, frame):
+ if frame.flags & FLAG_BYTES_RESPONSE_EOS:
+ request.state = 'received'
+ del self._activerequests[request.requestid]
+
+ return 'responsedata', {
+ 'request': request,
+ 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
+ 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
+ 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
+ 'data': frame.payload,
+ }