mercurial/wireprotov1peer.py
changeset 43077 687b865b95ad
parent 43076 2372284d9457
child 43087 66f2cc210a29
equal deleted inserted replaced
43076:2372284d9457 43077:687b865b95ad
    71 
    71 
    72 class future(object):
    72 class future(object):
    73     '''placeholder for a value to be set later'''
    73     '''placeholder for a value to be set later'''
    74 
    74 
    75     def set(self, value):
    75     def set(self, value):
    76         if util.safehasattr(self, 'value'):
    76         if util.safehasattr(self, b'value'):
    77             raise error.RepoError("future is already set")
    77             raise error.RepoError(b"future is already set")
    78         self.value = value
    78         self.value = value
    79 
    79 
    80 
    80 
    81 def encodebatchcmds(req):
    81 def encodebatchcmds(req):
    82     """Return a ``cmds`` argument value for the ``batch`` command."""
    82     """Return a ``cmds`` argument value for the ``batch`` command."""
    87         # Old servers didn't properly unescape argument names. So prevent
    87         # Old servers didn't properly unescape argument names. So prevent
    88         # the sending of argument names that may not be decoded properly by
    88         # the sending of argument names that may not be decoded properly by
    89         # servers.
    89         # servers.
    90         assert all(escapearg(k) == k for k in argsdict)
    90         assert all(escapearg(k) == k for k in argsdict)
    91 
    91 
    92         args = ','.join(
    92         args = b','.join(
    93             '%s=%s' % (escapearg(k), escapearg(v))
    93             b'%s=%s' % (escapearg(k), escapearg(v))
    94             for k, v in argsdict.iteritems()
    94             for k, v in argsdict.iteritems()
    95         )
    95         )
    96         cmds.append('%s %s' % (op, args))
    96         cmds.append(b'%s %s' % (op, args))
    97 
    97 
    98     return ';'.join(cmds)
    98     return b';'.join(cmds)
    99 
    99 
   100 
   100 
   101 class unsentfuture(pycompat.futures.Future):
   101 class unsentfuture(pycompat.futures.Future):
   102     """A Future variation to represent an unsent command.
   102     """A Future variation to represent an unsent command.
   103 
   103 
   137         self.close()
   137         self.close()
   138 
   138 
   139     def callcommand(self, command, args):
   139     def callcommand(self, command, args):
   140         if self._sent:
   140         if self._sent:
   141             raise error.ProgrammingError(
   141             raise error.ProgrammingError(
   142                 'callcommand() cannot be used ' 'after commands are sent'
   142                 b'callcommand() cannot be used ' b'after commands are sent'
   143             )
   143             )
   144 
   144 
   145         if self._closed:
   145         if self._closed:
   146             raise error.ProgrammingError(
   146             raise error.ProgrammingError(
   147                 'callcommand() cannot be used ' 'after close()'
   147                 b'callcommand() cannot be used ' b'after close()'
   148             )
   148             )
   149 
   149 
   150         # Commands are dispatched through methods on the peer.
   150         # Commands are dispatched through methods on the peer.
   151         fn = getattr(self._peer, pycompat.sysstr(command), None)
   151         fn = getattr(self._peer, pycompat.sysstr(command), None)
   152 
   152 
   153         if not fn:
   153         if not fn:
   154             raise error.ProgrammingError(
   154             raise error.ProgrammingError(
   155                 'cannot call command %s: method of same name not available '
   155                 b'cannot call command %s: method of same name not available '
   156                 'on peer' % command
   156                 b'on peer' % command
   157             )
   157             )
   158 
   158 
   159         # Commands are either batchable or they aren't. If a command
   159         # Commands are either batchable or they aren't. If a command
   160         # isn't batchable, we send it immediately because the executor
   160         # isn't batchable, we send it immediately because the executor
   161         # can no longer accept new commands after a non-batchable command.
   161         # can no longer accept new commands after a non-batchable command.
   177             f.__class__ = unsentfuture
   177             f.__class__ = unsentfuture
   178             f._peerexecutor = self
   178             f._peerexecutor = self
   179         else:
   179         else:
   180             if self._calls:
   180             if self._calls:
   181                 raise error.ProgrammingError(
   181                 raise error.ProgrammingError(
   182                     '%s is not batchable and cannot be called on a command '
   182                     b'%s is not batchable and cannot be called on a command '
   183                     'executor along with other commands' % command
   183                     b'executor along with other commands' % command
   184                 )
   184                 )
   185 
   185 
   186             f = addcall()
   186             f = addcall()
   187 
   187 
   188             # Non-batchable commands can never coexist with another command
   188             # Non-batchable commands can never coexist with another command
   304             # errored. Otherwise a result() could wait indefinitely.
   304             # errored. Otherwise a result() could wait indefinitely.
   305             for f in self._futures:
   305             for f in self._futures:
   306                 if not f.done():
   306                 if not f.done():
   307                     f.set_exception(
   307                     f.set_exception(
   308                         error.ResponseError(
   308                         error.ResponseError(
   309                             _('unfulfilled batch command response')
   309                             _(b'unfulfilled batch command response')
   310                         )
   310                         )
   311                     )
   311                     )
   312 
   312 
   313             self._futures = None
   313             self._futures = None
   314 
   314 
   346         return peerexecutor(self)
   346         return peerexecutor(self)
   347 
   347 
   348     # Begin of ipeercommands interface.
   348     # Begin of ipeercommands interface.
   349 
   349 
   350     def clonebundles(self):
   350     def clonebundles(self):
   351         self.requirecap('clonebundles', _('clone bundles'))
   351         self.requirecap(b'clonebundles', _(b'clone bundles'))
   352         return self._call('clonebundles')
   352         return self._call(b'clonebundles')
   353 
   353 
   354     @batchable
   354     @batchable
   355     def lookup(self, key):
   355     def lookup(self, key):
   356         self.requirecap('lookup', _('look up remote revision'))
   356         self.requirecap(b'lookup', _(b'look up remote revision'))
   357         f = future()
   357         f = future()
   358         yield {'key': encoding.fromlocal(key)}, f
   358         yield {b'key': encoding.fromlocal(key)}, f
   359         d = f.value
   359         d = f.value
   360         success, data = d[:-1].split(" ", 1)
   360         success, data = d[:-1].split(b" ", 1)
   361         if int(success):
   361         if int(success):
   362             yield bin(data)
   362             yield bin(data)
   363         else:
   363         else:
   364             self._abort(error.RepoError(data))
   364             self._abort(error.RepoError(data))
   365 
   365 
   369         yield {}, f
   369         yield {}, f
   370         d = f.value
   370         d = f.value
   371         try:
   371         try:
   372             yield wireprototypes.decodelist(d[:-1])
   372             yield wireprototypes.decodelist(d[:-1])
   373         except ValueError:
   373         except ValueError:
   374             self._abort(error.ResponseError(_("unexpected response:"), d))
   374             self._abort(error.ResponseError(_(b"unexpected response:"), d))
   375 
   375 
   376     @batchable
   376     @batchable
   377     def known(self, nodes):
   377     def known(self, nodes):
   378         f = future()
   378         f = future()
   379         yield {'nodes': wireprototypes.encodelist(nodes)}, f
   379         yield {b'nodes': wireprototypes.encodelist(nodes)}, f
   380         d = f.value
   380         d = f.value
   381         try:
   381         try:
   382             yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
   382             yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
   383         except ValueError:
   383         except ValueError:
   384             self._abort(error.ResponseError(_("unexpected response:"), d))
   384             self._abort(error.ResponseError(_(b"unexpected response:"), d))
   385 
   385 
   386     @batchable
   386     @batchable
   387     def branchmap(self):
   387     def branchmap(self):
   388         f = future()
   388         f = future()
   389         yield {}, f
   389         yield {}, f
   390         d = f.value
   390         d = f.value
   391         try:
   391         try:
   392             branchmap = {}
   392             branchmap = {}
   393             for branchpart in d.splitlines():
   393             for branchpart in d.splitlines():
   394                 branchname, branchheads = branchpart.split(' ', 1)
   394                 branchname, branchheads = branchpart.split(b' ', 1)
   395                 branchname = encoding.tolocal(urlreq.unquote(branchname))
   395                 branchname = encoding.tolocal(urlreq.unquote(branchname))
   396                 branchheads = wireprototypes.decodelist(branchheads)
   396                 branchheads = wireprototypes.decodelist(branchheads)
   397                 branchmap[branchname] = branchheads
   397                 branchmap[branchname] = branchheads
   398             yield branchmap
   398             yield branchmap
   399         except TypeError:
   399         except TypeError:
   400             self._abort(error.ResponseError(_("unexpected response:"), d))
   400             self._abort(error.ResponseError(_(b"unexpected response:"), d))
   401 
   401 
   402     @batchable
   402     @batchable
   403     def listkeys(self, namespace):
   403     def listkeys(self, namespace):
   404         if not self.capable('pushkey'):
   404         if not self.capable(b'pushkey'):
   405             yield {}, None
   405             yield {}, None
   406         f = future()
   406         f = future()
   407         self.ui.debug('preparing listkeys for "%s"\n' % namespace)
   407         self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
   408         yield {'namespace': encoding.fromlocal(namespace)}, f
   408         yield {b'namespace': encoding.fromlocal(namespace)}, f
   409         d = f.value
   409         d = f.value
   410         self.ui.debug(
   410         self.ui.debug(
   411             'received listkey for "%s": %i bytes\n' % (namespace, len(d))
   411             b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
   412         )
   412         )
   413         yield pushkeymod.decodekeys(d)
   413         yield pushkeymod.decodekeys(d)
   414 
   414 
   415     @batchable
   415     @batchable
   416     def pushkey(self, namespace, key, old, new):
   416     def pushkey(self, namespace, key, old, new):
   417         if not self.capable('pushkey'):
   417         if not self.capable(b'pushkey'):
   418             yield False, None
   418             yield False, None
   419         f = future()
   419         f = future()
   420         self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
   420         self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
   421         yield {
   421         yield {
   422             'namespace': encoding.fromlocal(namespace),
   422             b'namespace': encoding.fromlocal(namespace),
   423             'key': encoding.fromlocal(key),
   423             b'key': encoding.fromlocal(key),
   424             'old': encoding.fromlocal(old),
   424             b'old': encoding.fromlocal(old),
   425             'new': encoding.fromlocal(new),
   425             b'new': encoding.fromlocal(new),
   426         }, f
   426         }, f
   427         d = f.value
   427         d = f.value
   428         d, output = d.split('\n', 1)
   428         d, output = d.split(b'\n', 1)
   429         try:
   429         try:
   430             d = bool(int(d))
   430             d = bool(int(d))
   431         except ValueError:
   431         except ValueError:
   432             raise error.ResponseError(
   432             raise error.ResponseError(
   433                 _('push failed (unexpected response):'), d
   433                 _(b'push failed (unexpected response):'), d
   434             )
   434             )
   435         for l in output.splitlines(True):
   435         for l in output.splitlines(True):
   436             self.ui.status(_('remote: '), l)
   436             self.ui.status(_(b'remote: '), l)
   437         yield d
   437         yield d
   438 
   438 
   439     def stream_out(self):
   439     def stream_out(self):
   440         return self._callstream('stream_out')
   440         return self._callstream(b'stream_out')
   441 
   441 
   442     def getbundle(self, source, **kwargs):
   442     def getbundle(self, source, **kwargs):
   443         kwargs = pycompat.byteskwargs(kwargs)
   443         kwargs = pycompat.byteskwargs(kwargs)
   444         self.requirecap('getbundle', _('look up remote changes'))
   444         self.requirecap(b'getbundle', _(b'look up remote changes'))
   445         opts = {}
   445         opts = {}
   446         bundlecaps = kwargs.get('bundlecaps') or set()
   446         bundlecaps = kwargs.get(b'bundlecaps') or set()
   447         for key, value in kwargs.iteritems():
   447         for key, value in kwargs.iteritems():
   448             if value is None:
   448             if value is None:
   449                 continue
   449                 continue
   450             keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
   450             keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
   451             if keytype is None:
   451             if keytype is None:
   452                 raise error.ProgrammingError(
   452                 raise error.ProgrammingError(
   453                     'Unexpectedly None keytype for key %s' % key
   453                     b'Unexpectedly None keytype for key %s' % key
   454                 )
   454                 )
   455             elif keytype == 'nodes':
   455             elif keytype == b'nodes':
   456                 value = wireprototypes.encodelist(value)
   456                 value = wireprototypes.encodelist(value)
   457             elif keytype == 'csv':
   457             elif keytype == b'csv':
   458                 value = ','.join(value)
   458                 value = b','.join(value)
   459             elif keytype == 'scsv':
   459             elif keytype == b'scsv':
   460                 value = ','.join(sorted(value))
   460                 value = b','.join(sorted(value))
   461             elif keytype == 'boolean':
   461             elif keytype == b'boolean':
   462                 value = '%i' % bool(value)
   462                 value = b'%i' % bool(value)
   463             elif keytype != 'plain':
   463             elif keytype != b'plain':
   464                 raise KeyError('unknown getbundle option type %s' % keytype)
   464                 raise KeyError(b'unknown getbundle option type %s' % keytype)
   465             opts[key] = value
   465             opts[key] = value
   466         f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
   466         f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
   467         if any((cap.startswith('HG2') for cap in bundlecaps)):
   467         if any((cap.startswith(b'HG2') for cap in bundlecaps)):
   468             return bundle2.getunbundler(self.ui, f)
   468             return bundle2.getunbundler(self.ui, f)
   469         else:
   469         else:
   470             return changegroupmod.cg1unpacker(f, 'UN')
   470             return changegroupmod.cg1unpacker(f, b'UN')
   471 
   471 
   472     def unbundle(self, bundle, heads, url):
   472     def unbundle(self, bundle, heads, url):
   473         '''Send cg (a readable file-like object representing the
   473         '''Send cg (a readable file-like object representing the
   474         changegroup to push, typically a chunkbuffer object) to the
   474         changegroup to push, typically a chunkbuffer object) to the
   475         remote server as a bundle.
   475         remote server as a bundle.
   481 
   481 
   482         `url` is the url the client thinks it's pushing to, which is
   482         `url` is the url the client thinks it's pushing to, which is
   483         visible to hooks.
   483         visible to hooks.
   484         '''
   484         '''
   485 
   485 
   486         if heads != ['force'] and self.capable('unbundlehash'):
   486         if heads != [b'force'] and self.capable(b'unbundlehash'):
   487             heads = wireprototypes.encodelist(
   487             heads = wireprototypes.encodelist(
   488                 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]
   488                 [b'hashed', hashlib.sha1(b''.join(sorted(heads))).digest()]
   489             )
   489             )
   490         else:
   490         else:
   491             heads = wireprototypes.encodelist(heads)
   491             heads = wireprototypes.encodelist(heads)
   492 
   492 
   493         if util.safehasattr(bundle, 'deltaheader'):
   493         if util.safehasattr(bundle, b'deltaheader'):
   494             # this a bundle10, do the old style call sequence
   494             # this a bundle10, do the old style call sequence
   495             ret, output = self._callpush("unbundle", bundle, heads=heads)
   495             ret, output = self._callpush(b"unbundle", bundle, heads=heads)
   496             if ret == "":
   496             if ret == b"":
   497                 raise error.ResponseError(_('push failed:'), output)
   497                 raise error.ResponseError(_(b'push failed:'), output)
   498             try:
   498             try:
   499                 ret = int(ret)
   499                 ret = int(ret)
   500             except ValueError:
   500             except ValueError:
   501                 raise error.ResponseError(
   501                 raise error.ResponseError(
   502                     _('push failed (unexpected response):'), ret
   502                     _(b'push failed (unexpected response):'), ret
   503                 )
   503                 )
   504 
   504 
   505             for l in output.splitlines(True):
   505             for l in output.splitlines(True):
   506                 self.ui.status(_('remote: '), l)
   506                 self.ui.status(_(b'remote: '), l)
   507         else:
   507         else:
   508             # bundle2 push. Send a stream, fetch a stream.
   508             # bundle2 push. Send a stream, fetch a stream.
   509             stream = self._calltwowaystream('unbundle', bundle, heads=heads)
   509             stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
   510             ret = bundle2.getunbundler(self.ui, stream)
   510             ret = bundle2.getunbundler(self.ui, stream)
   511         return ret
   511         return ret
   512 
   512 
   513     # End of ipeercommands interface.
   513     # End of ipeercommands interface.
   514 
   514 
   515     # Begin of ipeerlegacycommands interface.
   515     # Begin of ipeerlegacycommands interface.
   516 
   516 
   517     def branches(self, nodes):
   517     def branches(self, nodes):
   518         n = wireprototypes.encodelist(nodes)
   518         n = wireprototypes.encodelist(nodes)
   519         d = self._call("branches", nodes=n)
   519         d = self._call(b"branches", nodes=n)
   520         try:
   520         try:
   521             br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
   521             br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
   522             return br
   522             return br
   523         except ValueError:
   523         except ValueError:
   524             self._abort(error.ResponseError(_("unexpected response:"), d))
   524             self._abort(error.ResponseError(_(b"unexpected response:"), d))
   525 
   525 
   526     def between(self, pairs):
   526     def between(self, pairs):
   527         batch = 8  # avoid giant requests
   527         batch = 8  # avoid giant requests
   528         r = []
   528         r = []
   529         for i in pycompat.xrange(0, len(pairs), batch):
   529         for i in pycompat.xrange(0, len(pairs), batch):
   530             n = " ".join(
   530             n = b" ".join(
   531                 [
   531                 [
   532                     wireprototypes.encodelist(p, '-')
   532                     wireprototypes.encodelist(p, b'-')
   533                     for p in pairs[i : i + batch]
   533                     for p in pairs[i : i + batch]
   534                 ]
   534                 ]
   535             )
   535             )
   536             d = self._call("between", pairs=n)
   536             d = self._call(b"between", pairs=n)
   537             try:
   537             try:
   538                 r.extend(
   538                 r.extend(
   539                     l and wireprototypes.decodelist(l) or []
   539                     l and wireprototypes.decodelist(l) or []
   540                     for l in d.splitlines()
   540                     for l in d.splitlines()
   541                 )
   541                 )
   542             except ValueError:
   542             except ValueError:
   543                 self._abort(error.ResponseError(_("unexpected response:"), d))
   543                 self._abort(error.ResponseError(_(b"unexpected response:"), d))
   544         return r
   544         return r
   545 
   545 
   546     def changegroup(self, nodes, source):
   546     def changegroup(self, nodes, source):
   547         n = wireprototypes.encodelist(nodes)
   547         n = wireprototypes.encodelist(nodes)
   548         f = self._callcompressable("changegroup", roots=n)
   548         f = self._callcompressable(b"changegroup", roots=n)
   549         return changegroupmod.cg1unpacker(f, 'UN')
   549         return changegroupmod.cg1unpacker(f, b'UN')
   550 
   550 
   551     def changegroupsubset(self, bases, heads, source):
   551     def changegroupsubset(self, bases, heads, source):
   552         self.requirecap('changegroupsubset', _('look up remote changes'))
   552         self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
   553         bases = wireprototypes.encodelist(bases)
   553         bases = wireprototypes.encodelist(bases)
   554         heads = wireprototypes.encodelist(heads)
   554         heads = wireprototypes.encodelist(heads)
   555         f = self._callcompressable(
   555         f = self._callcompressable(
   556             "changegroupsubset", bases=bases, heads=heads
   556             b"changegroupsubset", bases=bases, heads=heads
   557         )
   557         )
   558         return changegroupmod.cg1unpacker(f, 'UN')
   558         return changegroupmod.cg1unpacker(f, b'UN')
   559 
   559 
   560     # End of ipeerlegacycommands interface.
   560     # End of ipeerlegacycommands interface.
   561 
   561 
   562     def _submitbatch(self, req):
   562     def _submitbatch(self, req):
   563         """run batch request <req> on the server
   563         """run batch request <req> on the server
   564 
   564 
   565         Returns an iterator of the raw responses from the server.
   565         Returns an iterator of the raw responses from the server.
   566         """
   566         """
   567         ui = self.ui
   567         ui = self.ui
   568         if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
   568         if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
   569             ui.debug('devel-peer-request: batched-content\n')
   569             ui.debug(b'devel-peer-request: batched-content\n')
   570             for op, args in req:
   570             for op, args in req:
   571                 msg = 'devel-peer-request:    - %s (%d arguments)\n'
   571                 msg = b'devel-peer-request:    - %s (%d arguments)\n'
   572                 ui.debug(msg % (op, len(args)))
   572                 ui.debug(msg % (op, len(args)))
   573 
   573 
   574         unescapearg = wireprototypes.unescapebatcharg
   574         unescapearg = wireprototypes.unescapebatcharg
   575 
   575 
   576         rsp = self._callstream("batch", cmds=encodebatchcmds(req))
   576         rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
   577         chunk = rsp.read(1024)
   577         chunk = rsp.read(1024)
   578         work = [chunk]
   578         work = [chunk]
   579         while chunk:
   579         while chunk:
   580             while ';' not in chunk and chunk:
   580             while b';' not in chunk and chunk:
   581                 chunk = rsp.read(1024)
   581                 chunk = rsp.read(1024)
   582                 work.append(chunk)
   582                 work.append(chunk)
   583             merged = ''.join(work)
   583             merged = b''.join(work)
   584             while ';' in merged:
   584             while b';' in merged:
   585                 one, merged = merged.split(';', 1)
   585                 one, merged = merged.split(b';', 1)
   586                 yield unescapearg(one)
   586                 yield unescapearg(one)
   587             chunk = rsp.read(1024)
   587             chunk = rsp.read(1024)
   588             work = [merged, chunk]
   588             work = [merged, chunk]
   589         yield unescapearg(''.join(work))
   589         yield unescapearg(b''.join(work))
   590 
   590 
   591     def _submitone(self, op, args):
   591     def _submitone(self, op, args):
   592         return self._call(op, **pycompat.strkwargs(args))
   592         return self._call(op, **pycompat.strkwargs(args))
   593 
   593 
   594     def debugwireargs(self, one, two, three=None, four=None, five=None):
   594     def debugwireargs(self, one, two, three=None, four=None, five=None):
   596         opts = {}
   596         opts = {}
   597         if three is not None:
   597         if three is not None:
   598             opts[r'three'] = three
   598             opts[r'three'] = three
   599         if four is not None:
   599         if four is not None:
   600             opts[r'four'] = four
   600             opts[r'four'] = four
   601         return self._call('debugwireargs', one=one, two=two, **opts)
   601         return self._call(b'debugwireargs', one=one, two=two, **opts)
   602 
   602 
   603     def _call(self, cmd, **args):
   603     def _call(self, cmd, **args):
   604         """execute <cmd> on the server
   604         """execute <cmd> on the server
   605 
   605 
   606         The command is expected to return a simple string.
   606         The command is expected to return a simple string.