mercurial/httppeer.py
changeset 37558 8a73132214a3
parent 37556 b77aa48ba690
child 37609 01bfe5ad0c53
equal deleted inserted replaced
37557:734515aca84d 37558:8a73132214a3
    27     statichttprepo,
    27     statichttprepo,
    28     url as urlmod,
    28     url as urlmod,
    29     util,
    29     util,
    30     wireproto,
    30     wireproto,
    31     wireprotoframing,
    31     wireprotoframing,
       
    32     wireprototypes,
    32     wireprotov2server,
    33     wireprotov2server,
    33 )
    34 )
    34 
    35 
    35 httplib = util.httplib
    36 httplib = util.httplib
    36 urlerr = util.urlerr
    37 urlerr = util.urlerr
   309     # Insert error handlers for common I/O failures.
   310     # Insert error handlers for common I/O failures.
   310     _wraphttpresponse(res)
   311     _wraphttpresponse(res)
   311 
   312 
   312     return res
   313     return res
   313 
   314 
   314 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible):
   315 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
       
   316                            allowcbor=False):
   315     # record the url we got redirected to
   317     # record the url we got redirected to
   316     respurl = pycompat.bytesurl(resp.geturl())
   318     respurl = pycompat.bytesurl(resp.geturl())
   317     if respurl.endswith(qs):
   319     if respurl.endswith(qs):
   318         respurl = respurl[:-len(qs)]
   320         respurl = respurl[:-len(qs)]
   319     if baseurl.rstrip('/') != respurl.rstrip('/'):
   321     if baseurl.rstrip('/') != respurl.rstrip('/'):
   337             _("'%s' does not appear to be an hg repository:\n"
   339             _("'%s' does not appear to be an hg repository:\n"
   338               "---%%<--- (%s)\n%s\n---%%<---\n")
   340               "---%%<--- (%s)\n%s\n---%%<---\n")
   339             % (safeurl, proto or 'no content-type', resp.read(1024)))
   341             % (safeurl, proto or 'no content-type', resp.read(1024)))
   340 
   342 
   341     try:
   343     try:
   342         version = proto.split('-', 1)[1]
   344         subtype = proto.split('-', 1)[1]
   343         version_info = tuple([int(n) for n in version.split('.')])
   345 
       
   346         # Unless we end up supporting CBOR in the legacy wire protocol,
       
   347         # this should ONLY be encountered for the initial capabilities
       
   348         # request during handshake.
       
   349         if subtype == 'cbor':
       
   350             if allowcbor:
       
   351                 return respurl, proto, resp
       
   352             else:
       
   353                 raise error.RepoError(_('unexpected CBOR response from '
       
   354                                         'server'))
       
   355 
       
   356         version_info = tuple([int(n) for n in subtype.split('.')])
   344     except ValueError:
   357     except ValueError:
   345         raise error.RepoError(_("'%s' sent a broken Content-Type "
   358         raise error.RepoError(_("'%s' sent a broken Content-Type "
   346                                 "header (%s)") % (safeurl, proto))
   359                                 "header (%s)") % (safeurl, proto))
   347 
   360 
   348     # TODO consider switching to a decompression reader that uses
   361     # TODO consider switching to a decompression reader that uses
   359         engine = util.compengines.forwiretype(ename)
   372         engine = util.compengines.forwiretype(ename)
   360 
   373 
   361         resp = engine.decompressorreader(resp)
   374         resp = engine.decompressorreader(resp)
   362     else:
   375     else:
   363         raise error.RepoError(_("'%s' uses newer protocol %s") %
   376         raise error.RepoError(_("'%s' uses newer protocol %s") %
   364                               (safeurl, version))
   377                               (safeurl, subtype))
   365 
   378 
   366     return respurl, resp
   379     return respurl, proto, resp
   367 
   380 
   368 class httppeer(wireproto.wirepeer):
   381 class httppeer(wireproto.wirepeer):
   369     def __init__(self, ui, path, url, opener, requestbuilder, caps):
   382     def __init__(self, ui, path, url, opener, requestbuilder, caps):
   370         self.ui = ui
   383         self.ui = ui
   371         self._path = path
   384         self._path = path
   414                                            self._caps, self.capable,
   427                                            self._caps, self.capable,
   415                                            self._url, cmd, args)
   428                                            self._url, cmd, args)
   416 
   429 
   417         resp = sendrequest(self.ui, self._urlopener, req)
   430         resp = sendrequest(self.ui, self._urlopener, req)
   418 
   431 
   419         self._url, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
   432         self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
   420                                                  resp, _compressible)
   433                                                      resp, _compressible)
   421 
   434 
   422         return resp
   435         return resp
   423 
   436 
   424     def _call(self, cmd, **args):
   437     def _call(self, cmd, **args):
   425         fp = self._callstream(cmd, **args)
   438         fp = self._callstream(cmd, **args)
   499     def _abort(self, exception):
   512     def _abort(self, exception):
   500         raise exception
   513         raise exception
   501 
   514 
   502 # TODO implement interface for version 2 peers
   515 # TODO implement interface for version 2 peers
   503 class httpv2peer(object):
   516 class httpv2peer(object):
   504     def __init__(self, ui, repourl, opener):
   517     def __init__(self, ui, repourl, apipath, opener, requestbuilder,
       
   518                  apidescriptor):
   505         self.ui = ui
   519         self.ui = ui
   506 
   520 
   507         if repourl.endswith('/'):
   521         if repourl.endswith('/'):
   508             repourl = repourl[:-1]
   522             repourl = repourl[:-1]
   509 
   523 
   510         self.url = repourl
   524         self.url = repourl
       
   525         self._apipath = apipath
   511         self._opener = opener
   526         self._opener = opener
   512         # This is an its own attribute to facilitate extensions overriding
   527         self._requestbuilder = requestbuilder
   513         # the default type.
   528         self._descriptor = apidescriptor
   514         self._requestbuilder = urlreq.request
       
   515 
   529 
   516     def close(self):
   530     def close(self):
   517         pass
   531         pass
   518 
   532 
   519     # TODO require to be part of a batched primitive, use futures.
   533     # TODO require to be part of a batched primitive, use futures.
   538         permission = {
   552         permission = {
   539             'push': 'rw',
   553             'push': 'rw',
   540             'pull': 'ro',
   554             'pull': 'ro',
   541         }[permission]
   555         }[permission]
   542 
   556 
   543         url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2,
   557         url = '%s/%s/%s/%s' % (self.url, self._apipath, permission, name)
   544                                    permission, name)
       
   545 
   558 
   546         # TODO this should be part of a generic peer for the frame-based
   559         # TODO this should be part of a generic peer for the frame-based
   547         # protocol.
   560         # protocol.
   548         reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
   561         reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
   549                                                  buffersends=True)
   562                                                  buffersends=True)
   595             else:
   608             else:
   596                 error.ProgrammingError('unhandled action: %s' % action)
   609                 error.ProgrammingError('unhandled action: %s' % action)
   597 
   610 
   598         return results
   611         return results
   599 
   612 
       
   613 # Registry of API service names to metadata about peers that handle it.
       
   614 #
       
   615 # The following keys are meaningful:
       
   616 #
       
   617 # init
       
   618 #    Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
       
   619 #                        apidescriptor) to create a peer.
       
   620 #
       
   621 # priority
       
   622 #    Integer priority for the service. If we could choose from multiple
       
   623 #    services, we choose the one with the highest priority.
       
   624 API_PEERS = {
       
   625     wireprototypes.HTTPV2: {
       
   626         'init': httpv2peer,
       
   627         'priority': 50,
       
   628     },
       
   629 }
       
   630 
   600 def performhandshake(ui, url, opener, requestbuilder):
   631 def performhandshake(ui, url, opener, requestbuilder):
   601     # The handshake is a request to the capabilities command.
   632     # The handshake is a request to the capabilities command.
   602 
   633 
   603     caps = None
   634     caps = None
   604     def capable(x):
   635     def capable(x):
   605         raise error.ProgrammingError('should not be called')
   636         raise error.ProgrammingError('should not be called')
   606 
   637 
       
   638     args = {}
       
   639 
       
   640     # The client advertises support for newer protocols by adding an
       
   641     # X-HgUpgrade-* header with a list of supported APIs and an
       
   642     # X-HgProto-* header advertising which serializing formats it supports.
       
   643     # We only support the HTTP version 2 transport and CBOR responses for
       
   644     # now.
       
   645     advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
       
   646 
       
   647     if advertisev2:
       
   648         args['headers'] = {
       
   649             r'X-HgProto-1': r'cbor',
       
   650         }
       
   651 
       
   652         args['headers'].update(
       
   653             encodevalueinheaders(' '.join(sorted(API_PEERS)),
       
   654                                  'X-HgUpgrade',
       
   655                                  # We don't know the header limit this early.
       
   656                                  # So make it small.
       
   657                                  1024))
       
   658 
   607     req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
   659     req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
   608                                            capable, url, 'capabilities',
   660                                            capable, url, 'capabilities',
   609                                            {})
   661                                            args)
   610 
   662 
   611     resp = sendrequest(ui, opener, req)
   663     resp = sendrequest(ui, opener, req)
   612 
   664 
   613     respurl, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
   665     respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
   614                                            compressible=False)
   666                                                compressible=False,
       
   667                                                allowcbor=advertisev2)
   615 
   668 
   616     try:
   669     try:
   617         rawcaps = resp.read()
   670         rawdata = resp.read()
   618     finally:
   671     finally:
   619         resp.close()
   672         resp.close()
   620 
   673 
   621     return respurl, set(rawcaps.split())
   674     if not ct.startswith('application/mercurial-'):
       
   675         raise error.ProgrammingError('unexpected content-type: %s' % ct)
       
   676 
       
   677     if advertisev2:
       
   678         if ct == 'application/mercurial-cbor':
       
   679             try:
       
   680                 info = cbor.loads(rawdata)
       
   681             except cbor.CBORDecodeError:
       
   682                 raise error.Abort(_('error decoding CBOR from remote server'),
       
   683                                   hint=_('try again and consider contacting '
       
   684                                          'the server operator'))
       
   685 
       
   686         # We got a legacy response. That's fine.
       
   687         elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
       
   688             info = {
       
   689                 'v1capabilities': set(rawdata.split())
       
   690             }
       
   691 
       
   692         else:
       
   693             raise error.RepoError(
       
   694                 _('unexpected response type from server: %s') % ct)
       
   695     else:
       
   696         info = {
       
   697             'v1capabilities': set(rawdata.split())
       
   698         }
       
   699 
       
   700     return respurl, info
   622 
   701 
   623 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
   702 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
   624     """Construct an appropriate HTTP peer instance.
   703     """Construct an appropriate HTTP peer instance.
   625 
   704 
   626     ``opener`` is an ``url.opener`` that should be used to establish
   705     ``opener`` is an ``url.opener`` that should be used to establish
   638     url, authinfo = u.authinfo()
   717     url, authinfo = u.authinfo()
   639     ui.debug('using %s\n' % url)
   718     ui.debug('using %s\n' % url)
   640 
   719 
   641     opener = opener or urlmod.opener(ui, authinfo)
   720     opener = opener or urlmod.opener(ui, authinfo)
   642 
   721 
   643     respurl, caps = performhandshake(ui, url, opener, requestbuilder)
   722     respurl, info = performhandshake(ui, url, opener, requestbuilder)
   644 
   723 
   645     return httppeer(ui, path, respurl, opener, requestbuilder, caps)
   724     # Given the intersection of APIs that both we and the server support,
       
   725     # sort by their advertised priority and pick the first one.
       
   726     #
       
   727     # TODO consider making this request-based and interface driven. For
       
   728     # example, the caller could say "I want a peer that does X." It's quite
       
   729     # possible that not all peers would do that. Since we know the service
       
   730     # capabilities, we could filter out services not meeting the
       
   731     # requirements. Possibly by consulting the interfaces defined by the
       
   732     # peer type.
       
   733     apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
       
   734 
       
   735     preferredchoices = sorted(apipeerchoices,
       
   736                               key=lambda x: API_PEERS[x]['priority'],
       
   737                               reverse=True)
       
   738 
       
   739     for service in preferredchoices:
       
   740         apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
       
   741 
       
   742         return API_PEERS[service]['init'](ui, respurl, apipath, opener,
       
   743                                           requestbuilder,
       
   744                                           info['apis'][service])
       
   745 
       
   746     # Failed to construct an API peer. Fall back to legacy.
       
   747     return httppeer(ui, path, respurl, opener, requestbuilder,
       
   748                     info['v1capabilities'])
   646 
   749 
   647 def instance(ui, path, create):
   750 def instance(ui, path, create):
   648     if create:
   751     if create:
   649         raise error.Abort(_('cannot create new http repository'))
   752         raise error.Abort(_('cannot create new http repository'))
   650     try:
   753     try: