mercurial/httppeer.py
branchstable
changeset 48796 c00d3ce4e94b
parent 48526 04688c51f81f
child 48835 a0da5075bca3
equal deleted inserted replaced
48776:b84ff512b645 48796:c00d3ce4e94b
    11 import errno
    11 import errno
    12 import io
    12 import io
    13 import os
    13 import os
    14 import socket
    14 import socket
    15 import struct
    15 import struct
    16 import weakref
       
    17 
    16 
    18 from .i18n import _
    17 from .i18n import _
    19 from .pycompat import getattr
    18 from .pycompat import getattr
    20 from . import (
    19 from . import (
    21     bundle2,
    20     bundle2,
    23     httpconnection,
    22     httpconnection,
    24     pycompat,
    23     pycompat,
    25     statichttprepo,
    24     statichttprepo,
    26     url as urlmod,
    25     url as urlmod,
    27     util,
    26     util,
    28     wireprotoframing,
       
    29     wireprototypes,
       
    30     wireprotov1peer,
    27     wireprotov1peer,
    31     wireprotov2peer,
       
    32     wireprotov2server,
       
    33 )
    28 )
    34 from .interfaces import (
    29 from .utils import urlutil
    35     repository,
       
    36     util as interfaceutil,
       
    37 )
       
    38 from .utils import (
       
    39     cborutil,
       
    40     stringutil,
       
    41     urlutil,
       
    42 )
       
    43 
    30 
    44 httplib = util.httplib
    31 httplib = util.httplib
    45 urlerr = util.urlerr
    32 urlerr = util.urlerr
    46 urlreq = util.urlreq
    33 urlreq = util.urlreq
    47 
    34 
   329     def __init__(self, msg, respurl):
   316     def __init__(self, msg, respurl):
   330         super(RedirectedRepoError, self).__init__(msg)
   317         super(RedirectedRepoError, self).__init__(msg)
   331         self.respurl = respurl
   318         self.respurl = respurl
   332 
   319 
   333 
   320 
   334 def parsev1commandresponse(
   321 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible):
   335     ui, baseurl, requrl, qs, resp, compressible, allowcbor=False
       
   336 ):
       
   337     # record the url we got redirected to
   322     # record the url we got redirected to
   338     redirected = False
   323     redirected = False
   339     respurl = pycompat.bytesurl(resp.geturl())
   324     respurl = pycompat.bytesurl(resp.geturl())
   340     if respurl.endswith(qs):
   325     if respurl.endswith(qs):
   341         respurl = respurl[: -len(qs)]
   326         respurl = respurl[: -len(qs)]
   373         else:
   358         else:
   374             raise error.RepoError(msg)
   359             raise error.RepoError(msg)
   375 
   360 
   376     try:
   361     try:
   377         subtype = proto.split(b'-', 1)[1]
   362         subtype = proto.split(b'-', 1)[1]
   378 
       
   379         # Unless we end up supporting CBOR in the legacy wire protocol,
       
   380         # this should ONLY be encountered for the initial capabilities
       
   381         # request during handshake.
       
   382         if subtype == b'cbor':
       
   383             if allowcbor:
       
   384                 return respurl, proto, resp
       
   385             else:
       
   386                 raise error.RepoError(
       
   387                     _(b'unexpected CBOR response from server')
       
   388                 )
       
   389 
   363 
   390         version_info = tuple([int(n) for n in subtype.split(b'.')])
   364         version_info = tuple([int(n) for n in subtype.split(b'.')])
   391     except ValueError:
   365     except ValueError:
   392         raise error.RepoError(
   366         raise error.RepoError(
   393             _(b"'%s' sent a broken Content-Type header (%s)") % (safeurl, proto)
   367             _(b"'%s' sent a broken Content-Type header (%s)") % (safeurl, proto)
   562 
   536 
   563     def _abort(self, exception):
   537     def _abort(self, exception):
   564         raise exception
   538         raise exception
   565 
   539 
   566 
   540 
   567 def sendv2request(
       
   568     ui, opener, requestbuilder, apiurl, permission, requests, redirect
       
   569 ):
       
   570     wireprotoframing.populatestreamencoders()
       
   571 
       
   572     uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order')
       
   573 
       
   574     if uiencoders:
       
   575         encoders = []
       
   576 
       
   577         for encoder in uiencoders:
       
   578             if encoder not in wireprotoframing.STREAM_ENCODERS:
       
   579                 ui.warn(
       
   580                     _(
       
   581                         b'wire protocol version 2 encoder referenced in '
       
   582                         b'config (%s) is not known; ignoring\n'
       
   583                     )
       
   584                     % encoder
       
   585                 )
       
   586             else:
       
   587                 encoders.append(encoder)
       
   588 
       
   589     else:
       
   590         encoders = wireprotoframing.STREAM_ENCODERS_ORDER
       
   591 
       
   592     reactor = wireprotoframing.clientreactor(
       
   593         ui,
       
   594         hasmultiplesend=False,
       
   595         buffersends=True,
       
   596         clientcontentencoders=encoders,
       
   597     )
       
   598 
       
   599     handler = wireprotov2peer.clienthandler(
       
   600         ui, reactor, opener=opener, requestbuilder=requestbuilder
       
   601     )
       
   602 
       
   603     url = b'%s/%s' % (apiurl, permission)
       
   604 
       
   605     if len(requests) > 1:
       
   606         url += b'/multirequest'
       
   607     else:
       
   608         url += b'/%s' % requests[0][0]
       
   609 
       
   610     ui.debug(b'sending %d commands\n' % len(requests))
       
   611     for command, args, f in requests:
       
   612         ui.debug(
       
   613             b'sending command %s: %s\n'
       
   614             % (command, stringutil.pprint(args, indent=2))
       
   615         )
       
   616         assert not list(
       
   617             handler.callcommand(command, args, f, redirect=redirect)
       
   618         )
       
   619 
       
   620     # TODO stream this.
       
   621     body = b''.join(map(bytes, handler.flushcommands()))
       
   622 
       
   623     # TODO modify user-agent to reflect v2
       
   624     headers = {
       
   625         'Accept': wireprotov2server.FRAMINGTYPE,
       
   626         'Content-Type': wireprotov2server.FRAMINGTYPE,
       
   627     }
       
   628 
       
   629     req = requestbuilder(pycompat.strurl(url), body, headers)
       
   630     req.add_unredirected_header('Content-Length', '%d' % len(body))
       
   631 
       
   632     try:
       
   633         res = opener.open(req)
       
   634     except urlerr.httperror as e:
       
   635         if e.code == 401:
       
   636             raise error.Abort(_(b'authorization failed'))
       
   637 
       
   638         raise
       
   639     except httplib.HTTPException as e:
       
   640         ui.traceback()
       
   641         raise IOError(None, e)
       
   642 
       
   643     return handler, res
       
   644 
       
   645 
       
   646 class queuedcommandfuture(pycompat.futures.Future):
   541 class queuedcommandfuture(pycompat.futures.Future):
   647     """Wraps result() on command futures to trigger submission on call."""
   542     """Wraps result() on command futures to trigger submission on call."""
   648 
   543 
   649     def result(self, timeout=None):
   544     def result(self, timeout=None):
   650         if self.done():
   545         if self.done():
   655         # sendcommands() will restore the original __class__ and self.result
   550         # sendcommands() will restore the original __class__ and self.result
   656         # will resolve to Future.result.
   551         # will resolve to Future.result.
   657         return self.result(timeout)
   552         return self.result(timeout)
   658 
   553 
   659 
   554 
   660 @interfaceutil.implementer(repository.ipeercommandexecutor)
       
   661 class httpv2executor(object):
       
   662     def __init__(
       
   663         self, ui, opener, requestbuilder, apiurl, descriptor, redirect
       
   664     ):
       
   665         self._ui = ui
       
   666         self._opener = opener
       
   667         self._requestbuilder = requestbuilder
       
   668         self._apiurl = apiurl
       
   669         self._descriptor = descriptor
       
   670         self._redirect = redirect
       
   671         self._sent = False
       
   672         self._closed = False
       
   673         self._neededpermissions = set()
       
   674         self._calls = []
       
   675         self._futures = weakref.WeakSet()
       
   676         self._responseexecutor = None
       
   677         self._responsef = None
       
   678 
       
   679     def __enter__(self):
       
   680         return self
       
   681 
       
   682     def __exit__(self, exctype, excvalue, exctb):
       
   683         self.close()
       
   684 
       
   685     def callcommand(self, command, args):
       
   686         if self._sent:
       
   687             raise error.ProgrammingError(
       
   688                 b'callcommand() cannot be used after commands are sent'
       
   689             )
       
   690 
       
   691         if self._closed:
       
   692             raise error.ProgrammingError(
       
   693                 b'callcommand() cannot be used after close()'
       
   694             )
       
   695 
       
   696         # The service advertises which commands are available. So if we attempt
       
   697         # to call an unknown command or pass an unknown argument, we can screen
       
   698         # for this.
       
   699         if command not in self._descriptor[b'commands']:
       
   700             raise error.ProgrammingError(
       
   701                 b'wire protocol command %s is not available' % command
       
   702             )
       
   703 
       
   704         cmdinfo = self._descriptor[b'commands'][command]
       
   705         unknownargs = set(args.keys()) - set(cmdinfo.get(b'args', {}))
       
   706 
       
   707         if unknownargs:
       
   708             raise error.ProgrammingError(
       
   709                 b'wire protocol command %s does not accept argument: %s'
       
   710                 % (command, b', '.join(sorted(unknownargs)))
       
   711             )
       
   712 
       
   713         self._neededpermissions |= set(cmdinfo[b'permissions'])
       
   714 
       
   715         # TODO we /could/ also validate types here, since the API descriptor
       
   716         # includes types...
       
   717 
       
   718         f = pycompat.futures.Future()
       
   719 
       
   720         # Monkeypatch it so result() triggers sendcommands(), otherwise result()
       
   721         # could deadlock.
       
   722         f.__class__ = queuedcommandfuture
       
   723         f._peerexecutor = self
       
   724 
       
   725         self._futures.add(f)
       
   726         self._calls.append((command, args, f))
       
   727 
       
   728         return f
       
   729 
       
   730     def sendcommands(self):
       
   731         if self._sent:
       
   732             return
       
   733 
       
   734         if not self._calls:
       
   735             return
       
   736 
       
   737         self._sent = True
       
   738 
       
   739         # Unhack any future types so caller sees a clean type and so we
       
   740         # break reference cycle.
       
   741         for f in self._futures:
       
   742             if isinstance(f, queuedcommandfuture):
       
   743                 f.__class__ = pycompat.futures.Future
       
   744                 f._peerexecutor = None
       
   745 
       
   746         # Mark the future as running and filter out cancelled futures.
       
   747         calls = [
       
   748             (command, args, f)
       
   749             for command, args, f in self._calls
       
   750             if f.set_running_or_notify_cancel()
       
   751         ]
       
   752 
       
   753         # Clear out references, prevent improper object usage.
       
   754         self._calls = None
       
   755 
       
   756         if not calls:
       
   757             return
       
   758 
       
   759         permissions = set(self._neededpermissions)
       
   760 
       
   761         if b'push' in permissions and b'pull' in permissions:
       
   762             permissions.remove(b'pull')
       
   763 
       
   764         if len(permissions) > 1:
       
   765             raise error.RepoError(
       
   766                 _(b'cannot make request requiring multiple permissions: %s')
       
   767                 % _(b', ').join(sorted(permissions))
       
   768             )
       
   769 
       
   770         permission = {
       
   771             b'push': b'rw',
       
   772             b'pull': b'ro',
       
   773         }[permissions.pop()]
       
   774 
       
   775         handler, resp = sendv2request(
       
   776             self._ui,
       
   777             self._opener,
       
   778             self._requestbuilder,
       
   779             self._apiurl,
       
   780             permission,
       
   781             calls,
       
   782             self._redirect,
       
   783         )
       
   784 
       
   785         # TODO we probably want to validate the HTTP code, media type, etc.
       
   786 
       
   787         self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
       
   788         self._responsef = self._responseexecutor.submit(
       
   789             self._handleresponse, handler, resp
       
   790         )
       
   791 
       
   792     def close(self):
       
   793         if self._closed:
       
   794             return
       
   795 
       
   796         self.sendcommands()
       
   797 
       
   798         self._closed = True
       
   799 
       
   800         if not self._responsef:
       
   801             return
       
   802 
       
   803         # TODO ^C here may not result in immediate program termination.
       
   804 
       
   805         try:
       
   806             self._responsef.result()
       
   807         finally:
       
   808             self._responseexecutor.shutdown(wait=True)
       
   809             self._responsef = None
       
   810             self._responseexecutor = None
       
   811 
       
   812             # If any of our futures are still in progress, mark them as
       
   813             # errored, otherwise a result() could wait indefinitely.
       
   814             for f in self._futures:
       
   815                 if not f.done():
       
   816                     f.set_exception(
       
   817                         error.ResponseError(_(b'unfulfilled command response'))
       
   818                     )
       
   819 
       
   820             self._futures = None
       
   821 
       
   822     def _handleresponse(self, handler, resp):
       
   823         # Called in a thread to read the response.
       
   824 
       
   825         while handler.readdata(resp):
       
   826             pass
       
   827 
       
   828 
       
   829 @interfaceutil.implementer(repository.ipeerv2)
       
   830 class httpv2peer(object):
       
   831 
       
   832     limitedarguments = False
       
   833 
       
   834     def __init__(
       
   835         self, ui, repourl, apipath, opener, requestbuilder, apidescriptor
       
   836     ):
       
   837         self.ui = ui
       
   838         self.apidescriptor = apidescriptor
       
   839 
       
   840         if repourl.endswith(b'/'):
       
   841             repourl = repourl[:-1]
       
   842 
       
   843         self._url = repourl
       
   844         self._apipath = apipath
       
   845         self._apiurl = b'%s/%s' % (repourl, apipath)
       
   846         self._opener = opener
       
   847         self._requestbuilder = requestbuilder
       
   848 
       
   849         self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
       
   850 
       
   851     # Start of ipeerconnection.
       
   852 
       
   853     def url(self):
       
   854         return self._url
       
   855 
       
   856     def local(self):
       
   857         return None
       
   858 
       
   859     def peer(self):
       
   860         return self
       
   861 
       
   862     def canpush(self):
       
   863         # TODO change once implemented.
       
   864         return False
       
   865 
       
   866     def close(self):
       
   867         self.ui.note(
       
   868             _(
       
   869                 b'(sent %d HTTP requests and %d bytes; '
       
   870                 b'received %d bytes in responses)\n'
       
   871             )
       
   872             % (
       
   873                 self._opener.requestscount,
       
   874                 self._opener.sentbytescount,
       
   875                 self._opener.receivedbytescount,
       
   876             )
       
   877         )
       
   878 
       
   879     # End of ipeerconnection.
       
   880 
       
   881     # Start of ipeercapabilities.
       
   882 
       
   883     def capable(self, name):
       
   884         # The capabilities used internally historically map to capabilities
       
   885         # advertised from the "capabilities" wire protocol command. However,
       
   886         # version 2 of that command works differently.
       
   887 
       
   888         # Maps to commands that are available.
       
   889         if name in (
       
   890             b'branchmap',
       
   891             b'getbundle',
       
   892             b'known',
       
   893             b'lookup',
       
   894             b'pushkey',
       
   895         ):
       
   896             return True
       
   897 
       
   898         # Other concepts.
       
   899         if name in (b'bundle2',):
       
   900             return True
       
   901 
       
   902         # Alias command-* to presence of command of that name.
       
   903         if name.startswith(b'command-'):
       
   904             return name[len(b'command-') :] in self.apidescriptor[b'commands']
       
   905 
       
   906         return False
       
   907 
       
   908     def requirecap(self, name, purpose):
       
   909         if self.capable(name):
       
   910             return
       
   911 
       
   912         raise error.CapabilityError(
       
   913             _(
       
   914                 b'cannot %s; client or remote repository does not support the '
       
   915                 b'\'%s\' capability'
       
   916             )
       
   917             % (purpose, name)
       
   918         )
       
   919 
       
   920     # End of ipeercapabilities.
       
   921 
       
   922     def _call(self, name, **args):
       
   923         with self.commandexecutor() as e:
       
   924             return e.callcommand(name, args).result()
       
   925 
       
   926     def commandexecutor(self):
       
   927         return httpv2executor(
       
   928             self.ui,
       
   929             self._opener,
       
   930             self._requestbuilder,
       
   931             self._apiurl,
       
   932             self.apidescriptor,
       
   933             self._redirect,
       
   934         )
       
   935 
       
   936 
       
   937 # Registry of API service names to metadata about peers that handle it.
       
   938 #
       
   939 # The following keys are meaningful:
       
   940 #
       
   941 # init
       
   942 #    Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
       
   943 #                        apidescriptor) to create a peer.
       
   944 #
       
   945 # priority
       
   946 #    Integer priority for the service. If we could choose from multiple
       
   947 #    services, we choose the one with the highest priority.
       
   948 API_PEERS = {
       
   949     wireprototypes.HTTP_WIREPROTO_V2: {
       
   950         b'init': httpv2peer,
       
   951         b'priority': 50,
       
   952     },
       
   953 }
       
   954 
       
   955 
       
   956 def performhandshake(ui, url, opener, requestbuilder):
   555 def performhandshake(ui, url, opener, requestbuilder):
   957     # The handshake is a request to the capabilities command.
   556     # The handshake is a request to the capabilities command.
   958 
   557 
   959     caps = None
   558     caps = None
   960 
   559 
   961     def capable(x):
   560     def capable(x):
   962         raise error.ProgrammingError(b'should not be called')
   561         raise error.ProgrammingError(b'should not be called')
   963 
   562 
   964     args = {}
   563     args = {}
   965 
       
   966     # The client advertises support for newer protocols by adding an
       
   967     # X-HgUpgrade-* header with a list of supported APIs and an
       
   968     # X-HgProto-* header advertising which serializing formats it supports.
       
   969     # We only support the HTTP version 2 transport and CBOR responses for
       
   970     # now.
       
   971     advertisev2 = ui.configbool(b'experimental', b'httppeer.advertise-v2')
       
   972 
       
   973     if advertisev2:
       
   974         args[b'headers'] = {
       
   975             'X-HgProto-1': 'cbor',
       
   976         }
       
   977 
       
   978         args[b'headers'].update(
       
   979             encodevalueinheaders(
       
   980                 b' '.join(sorted(API_PEERS)),
       
   981                 b'X-HgUpgrade',
       
   982                 # We don't know the header limit this early.
       
   983                 # So make it small.
       
   984                 1024,
       
   985             )
       
   986         )
       
   987 
   564 
   988     req, requrl, qs = makev1commandrequest(
   565     req, requrl, qs = makev1commandrequest(
   989         ui, requestbuilder, caps, capable, url, b'capabilities', args
   566         ui, requestbuilder, caps, capable, url, b'capabilities', args
   990     )
   567     )
   991     resp = sendrequest(ui, opener, req)
   568     resp = sendrequest(ui, opener, req)
  1002     # issue without behavior degradation. And according to issue 5860, it may
   579     # issue without behavior degradation. And according to issue 5860, it may
  1003     # be a longstanding bug in some server implementations. So we allow a
   580     # be a longstanding bug in some server implementations. So we allow a
  1004     # redirect that drops the query string to "just work."
   581     # redirect that drops the query string to "just work."
  1005     try:
   582     try:
  1006         respurl, ct, resp = parsev1commandresponse(
   583         respurl, ct, resp = parsev1commandresponse(
  1007             ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2
   584             ui, url, requrl, qs, resp, compressible=False
  1008         )
   585         )
  1009     except RedirectedRepoError as e:
   586     except RedirectedRepoError as e:
  1010         req, requrl, qs = makev1commandrequest(
   587         req, requrl, qs = makev1commandrequest(
  1011             ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args
   588             ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args
  1012         )
   589         )
  1013         resp = sendrequest(ui, opener, req)
   590         resp = sendrequest(ui, opener, req)
  1014         respurl, ct, resp = parsev1commandresponse(
   591         respurl, ct, resp = parsev1commandresponse(
  1015             ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2
   592             ui, url, requrl, qs, resp, compressible=False
  1016         )
   593         )
  1017 
   594 
  1018     try:
   595     try:
  1019         rawdata = resp.read()
   596         rawdata = resp.read()
  1020     finally:
   597     finally:
  1021         resp.close()
   598         resp.close()
  1022 
   599 
  1023     if not ct.startswith(b'application/mercurial-'):
   600     if not ct.startswith(b'application/mercurial-'):
  1024         raise error.ProgrammingError(b'unexpected content-type: %s' % ct)
   601         raise error.ProgrammingError(b'unexpected content-type: %s' % ct)
  1025 
   602 
  1026     if advertisev2:
   603     info = {b'v1capabilities': set(rawdata.split())}
  1027         if ct == b'application/mercurial-cbor':
       
  1028             try:
       
  1029                 info = cborutil.decodeall(rawdata)[0]
       
  1030             except cborutil.CBORDecodeError:
       
  1031                 raise error.Abort(
       
  1032                     _(b'error decoding CBOR from remote server'),
       
  1033                     hint=_(
       
  1034                         b'try again and consider contacting '
       
  1035                         b'the server operator'
       
  1036                     ),
       
  1037                 )
       
  1038 
       
  1039         # We got a legacy response. That's fine.
       
  1040         elif ct in (b'application/mercurial-0.1', b'application/mercurial-0.2'):
       
  1041             info = {b'v1capabilities': set(rawdata.split())}
       
  1042 
       
  1043         else:
       
  1044             raise error.RepoError(
       
  1045                 _(b'unexpected response type from server: %s') % ct
       
  1046             )
       
  1047     else:
       
  1048         info = {b'v1capabilities': set(rawdata.split())}
       
  1049 
   604 
  1050     return respurl, info
   605     return respurl, info
  1051 
   606 
  1052 
   607 
  1053 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
   608 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
  1071 
   626 
  1072     opener = opener or urlmod.opener(ui, authinfo)
   627     opener = opener or urlmod.opener(ui, authinfo)
  1073 
   628 
  1074     respurl, info = performhandshake(ui, url, opener, requestbuilder)
   629     respurl, info = performhandshake(ui, url, opener, requestbuilder)
  1075 
   630 
  1076     # Given the intersection of APIs that both we and the server support,
       
  1077     # sort by their advertised priority and pick the first one.
       
  1078     #
       
  1079     # TODO consider making this request-based and interface driven. For
       
  1080     # example, the caller could say "I want a peer that does X." It's quite
       
  1081     # possible that not all peers would do that. Since we know the service
       
  1082     # capabilities, we could filter out services not meeting the
       
  1083     # requirements. Possibly by consulting the interfaces defined by the
       
  1084     # peer type.
       
  1085     apipeerchoices = set(info.get(b'apis', {}).keys()) & set(API_PEERS.keys())
       
  1086 
       
  1087     preferredchoices = sorted(
       
  1088         apipeerchoices, key=lambda x: API_PEERS[x][b'priority'], reverse=True
       
  1089     )
       
  1090 
       
  1091     for service in preferredchoices:
       
  1092         apipath = b'%s/%s' % (info[b'apibase'].rstrip(b'/'), service)
       
  1093 
       
  1094         return API_PEERS[service][b'init'](
       
  1095             ui, respurl, apipath, opener, requestbuilder, info[b'apis'][service]
       
  1096         )
       
  1097 
       
  1098     # Failed to construct an API peer. Fall back to legacy.
       
  1099     return httppeer(
   631     return httppeer(
  1100         ui, path, respurl, opener, requestbuilder, info[b'v1capabilities']
   632         ui, path, respurl, opener, requestbuilder, info[b'v1capabilities']
  1101     )
   633     )
  1102 
   634 
  1103 
   635