diff -r ed4ebbb98ca0 -r b5bf3dd6ec5b mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py Mon Oct 08 15:19:32 2018 -0700 +++ b/mercurial/wireprotoframing.py Mon Oct 08 17:24:28 2018 -0700 @@ -368,17 +368,46 @@ def createcommandresponseokframe(stream, requestid): overall = b''.join(cborutil.streamencode({b'status': b'ok'})) + if stream.streamsettingssent: + overall = stream.encode(overall) + encoded = True + + if not overall: + return None + else: + encoded = False + return stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_RESPONSE, flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=overall) + payload=overall, + encoded=encoded) -def createcommandresponseeosframe(stream, requestid): +def createcommandresponseeosframes(stream, requestid, + maxframesize=DEFAULT_MAX_FRAME_SIZE): """Create an empty payload frame representing command end-of-stream.""" - return stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_EOS, - payload=b'') + payload = stream.flush() + + offset = 0 + while True: + chunk = payload[offset:offset + maxframesize] + offset += len(chunk) + + done = offset == len(payload) + + if done: + flags = FLAG_COMMAND_RESPONSE_EOS + else: + flags = FLAG_COMMAND_RESPONSE_CONTINUATION + + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=chunk, + encoded=payload != b'') + + if done: + break def createalternatelocationresponseframe(stream, requestid, location): data = { @@ -395,10 +424,19 @@ if value is not None: data[b'location'][pycompat.bytestr(a)] = value + payload = b''.join(cborutil.streamencode(data)) + + if stream.streamsettingssent: + payload = stream.encode(payload) + encoded = True + else: + encoded = False + return stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_RESPONSE, flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=b''.join(cborutil.streamencode(data))) + payload=payload, + encoded=encoded) def createcommanderrorresponse(stream, requestid, message, args=None): # TODO should this be using a list of {'msg': ..., 'args': {}} so atom @@ -519,6 +557,8 @@ yield frame return + data = self._stream.encode(data) + # There is a ton of potential to do more complicated things here. # Our immediate goal is to coalesce small chunks into big frames, # not achieve the fewest number of frames possible. So we go with @@ -548,7 +588,8 @@ self._requestid, typeid=FRAME_TYPE_COMMAND_RESPONSE, flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=chunk) + payload=chunk, + encoded=True) if offset == len(data): return @@ -583,7 +624,8 @@ self._requestid, typeid=FRAME_TYPE_COMMAND_RESPONSE, flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=payload) + payload=payload, + encoded=True) # TODO consider defining encoders/decoders using the util.compressionengine # mechanism. @@ -776,7 +818,9 @@ def __init__(self, streamid, active=False): super(outputstream, self).__init__(streamid, active=active) + self.streamsettingssent = False self._encoder = None + self._encodername = None def setencoder(self, ui, name): """Set the encoder for this stream. @@ -787,6 +831,7 @@ raise error.Abort(_('unknown stream encoder: %s') % name) self._encoder = STREAM_ENCODERS[name][0](ui) + self._encodername = name def encode(self, data): if not self._encoder: @@ -806,6 +851,45 @@ self._encoder.finish() + def makeframe(self, requestid, typeid, flags, payload, + encoded=False): + """Create a frame to be sent out over this stream. + + Only returns the frame instance. Does not actually send it. + """ + streamflags = 0 + if not self._active: + streamflags |= STREAM_FLAG_BEGIN_STREAM + self._active = True + + if encoded: + if not self.streamsettingssent: + raise error.ProgrammingError( + b'attempting to send encoded frame without sending stream ' + b'settings') + + streamflags |= STREAM_FLAG_ENCODING_APPLIED + + if (typeid == FRAME_TYPE_STREAM_SETTINGS + and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS): + self.streamsettingssent = True + + return makeframe(requestid, self.streamid, streamflags, typeid, flags, + payload) + + def makestreamsettingsframe(self, requestid): + """Create a stream settings frame for this stream. + + Returns frame data or None if no stream settings frame is needed or has + already been sent. + """ + if not self._encoder or self.streamsettingssent: + return None + + payload = b''.join(cborutil.streamencode(self._encodername)) + return self.makeframe(requestid, FRAME_TYPE_STREAM_SETTINGS, + FLAG_STREAM_ENCODING_SETTINGS_EOS, payload) + def ensureserverstream(stream): if stream.streamid % 2: raise error.ProgrammingError('server should only write to even ' @@ -995,7 +1079,9 @@ yield frame if emitted: - yield createcommandresponseeosframe(stream, requestid) + for frame in createcommandresponseeosframes( + stream, requestid): + yield frame break except error.WireprotoCommandError as e: @@ -1022,6 +1108,10 @@ 'alternatelocationresponse seen after initial ' 'output object') + frame = stream.makestreamsettingsframe(requestid) + if frame: + yield frame + yield createalternatelocationresponseframe( stream, requestid, o) @@ -1034,7 +1124,16 @@ 'object follows alternatelocationresponse') if not emitted: - yield createcommandresponseokframe(stream, requestid) + # Frame is optional. + frame = stream.makestreamsettingsframe(requestid) + if frame: + yield frame + + # May be None if empty frame (due to encoding). + frame = createcommandresponseokframe(stream, requestid) + if frame: + yield frame + emitted = True # Objects emitted by command functions can be serializable @@ -1121,13 +1220,25 @@ return self._handlesendframes(sendframes()) def makeoutputstream(self): - """Create a stream to be used for sending data to the client.""" + """Create a stream to be used for sending data to the client. + + If this is called before protocol settings frames are received, we + don't know what stream encodings are supported by the client and + we will default to identity. + """ streamid = self._nextoutgoingstreamid self._nextoutgoingstreamid += 2 s = outputstream(streamid) self._outgoingstreams[streamid] = s + # Always use the *server's* preferred encoder over the client's, + # as servers have more to lose from sub-optimal encoders being used. + for name in STREAM_ENCODERS_ORDER: + if name in self._sendersettings['contentencodings']: + s.setencoder(self._ui, name) + break + return s def _makeerrorresult(self, msg):