diff -r 01361be9e2dc -r 55b5ba8d4e68 mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py Mon Apr 09 15:32:01 2018 -0700 +++ b/mercurial/wireprotoframing.py Mon Apr 09 16:54:20 2018 -0700 @@ -922,6 +922,7 @@ self._outgoingstream = stream(1) self._pendingrequests = collections.deque() self._activerequests = {} + self._incomingstreams = {} def callcommand(self, name, args, datafh=None): """Request that a command be executed. @@ -1007,3 +1008,63 @@ yield frame request.state = 'sent' + + def onframerecv(self, frame): + """Process a frame that has been received off the wire. + + Returns a 2-tuple of (action, meta) describing further action the + caller needs to take as a result of receiving this frame. + """ + if frame.streamid % 2: + return 'error', { + 'message': ( + _('received frame with odd numbered stream ID: %d') % + frame.streamid), + } + + if frame.streamid not in self._incomingstreams: + if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: + return 'error', { + 'message': _('received frame on unknown stream ' + 'without beginning of stream flag set'), + } + + if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: + raise error.ProgrammingError('support for decoding stream ' + 'payloads not yet implemneted') + + if frame.streamflags & STREAM_FLAG_END_STREAM: + del self._incomingstreams[frame.streamid] + + if frame.requestid not in self._activerequests: + return 'error', { + 'message': (_('received frame for inactive request ID: %d') % + frame.requestid), + } + + request = self._activerequests[frame.requestid] + request.state = 'receiving' + + handlers = { + FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe, + } + + meth = handlers.get(frame.typeid) + if not meth: + raise error.ProgrammingError('unhandled frame type: %d' % + frame.typeid) + + return meth(request, frame) + + def _onbytesresponseframe(self, request, frame): + if frame.flags & FLAG_BYTES_RESPONSE_EOS: + request.state = 'received' + del self._activerequests[request.requestid] + + return 'responsedata', { + 'request': request, + 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION, + 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS, + 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR, + 'data': frame.payload, + }