httppeer: support protocol upgrade
authorGregory Szorc <gregory.szorc@gmail.com>
Tue, 10 Apr 2018 18:16:47 -0700
changeset 37558 8a73132214a3
parent 37557 734515aca84d
child 37559 c4a0626f6b6e
httppeer: support protocol upgrade With the new handshake defined and in place on the server, we can now implement it on the client. The HTTP handshake mechanism has been taught to add headers advertising its support for the new capabilities response. Response handling has been adjusted to allow CBOR responses through. And makepeer() has been taught to instantiate a mutually supported peer. The HTTPv2 peer class doesn't implement the full peer interface. So HTTPv2 is not yet usable as a peer. Like the server side, we support registering handlers for different API services. This allows extensions to easily implement API services and peers. A practical use case for this is to provide a previous implementation of the experimental version 2 wire protocol to a future version of Mercurial. We know there will be BC breaks after 4.6 ships. But someone could take the peer and server code from 4.6, drop it in an extension, and allow its use indefinitely. Differential Revision: https://phab.mercurial-scm.org/D3243
mercurial/configitems.py
mercurial/debugcommands.py
mercurial/httppeer.py
tests/test-http-protocol.t
--- a/mercurial/configitems.py	Tue Apr 10 14:29:15 2018 -0700
+++ b/mercurial/configitems.py	Tue Apr 10 18:16:47 2018 -0700
@@ -538,6 +538,9 @@
 coreconfigitem('experimental', 'hook-track-tags',
     default=False,
 )
+coreconfigitem('experimental', 'httppeer.advertise-v2',
+    default=False,
+)
 coreconfigitem('experimental', 'httppostargs',
     default=False,
 )
--- a/mercurial/debugcommands.py	Tue Apr 10 14:29:15 2018 -0700
+++ b/mercurial/debugcommands.py	Tue Apr 10 18:16:47 2018 -0700
@@ -83,6 +83,7 @@
     vfs as vfsmod,
     wireprotoframing,
     wireprotoserver,
+    wireprototypes,
 )
 from .utils import (
     dateutil,
@@ -2910,7 +2911,9 @@
 
         if opts['peer'] == 'http2':
             ui.write(_('creating http peer for wire protocol version 2\n'))
-            peer = httppeer.httpv2peer(ui, path, opener)
+            peer = httppeer.httpv2peer(
+                ui, path, 'api/%s' % wireprototypes.HTTPV2,
+                opener, httppeer.urlreq.request, {})
         elif opts['peer'] == 'raw':
             ui.write(_('using raw connection to peer\n'))
             peer = None
--- a/mercurial/httppeer.py	Tue Apr 10 14:29:15 2018 -0700
+++ b/mercurial/httppeer.py	Tue Apr 10 18:16:47 2018 -0700
@@ -29,6 +29,7 @@
     util,
     wireproto,
     wireprotoframing,
+    wireprototypes,
     wireprotov2server,
 )
 
@@ -311,7 +312,8 @@
 
     return res
 
-def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible):
+def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
+                           allowcbor=False):
     # record the url we got redirected to
     respurl = pycompat.bytesurl(resp.geturl())
     if respurl.endswith(qs):
@@ -339,8 +341,19 @@
             % (safeurl, proto or 'no content-type', resp.read(1024)))
 
     try:
-        version = proto.split('-', 1)[1]
-        version_info = tuple([int(n) for n in version.split('.')])
+        subtype = proto.split('-', 1)[1]
+
+        # Unless we end up supporting CBOR in the legacy wire protocol,
+        # this should ONLY be encountered for the initial capabilities
+        # request during handshake.
+        if subtype == 'cbor':
+            if allowcbor:
+                return respurl, proto, resp
+            else:
+                raise error.RepoError(_('unexpected CBOR response from '
+                                        'server'))
+
+        version_info = tuple([int(n) for n in subtype.split('.')])
     except ValueError:
         raise error.RepoError(_("'%s' sent a broken Content-Type "
                                 "header (%s)") % (safeurl, proto))
@@ -361,9 +374,9 @@
         resp = engine.decompressorreader(resp)
     else:
         raise error.RepoError(_("'%s' uses newer protocol %s") %
-                              (safeurl, version))
+                              (safeurl, subtype))
 
-    return respurl, resp
+    return respurl, proto, resp
 
 class httppeer(wireproto.wirepeer):
     def __init__(self, ui, path, url, opener, requestbuilder, caps):
@@ -416,8 +429,8 @@
 
         resp = sendrequest(self.ui, self._urlopener, req)
 
-        self._url, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
-                                                 resp, _compressible)
+        self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
+                                                     resp, _compressible)
 
         return resp
 
@@ -501,17 +514,18 @@
 
 # TODO implement interface for version 2 peers
 class httpv2peer(object):
-    def __init__(self, ui, repourl, opener):
+    def __init__(self, ui, repourl, apipath, opener, requestbuilder,
+                 apidescriptor):
         self.ui = ui
 
         if repourl.endswith('/'):
             repourl = repourl[:-1]
 
         self.url = repourl
+        self._apipath = apipath
         self._opener = opener
-        # This is an its own attribute to facilitate extensions overriding
-        # the default type.
-        self._requestbuilder = urlreq.request
+        self._requestbuilder = requestbuilder
+        self._descriptor = apidescriptor
 
     def close(self):
         pass
@@ -540,8 +554,7 @@
             'pull': 'ro',
         }[permission]
 
-        url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2,
-                                   permission, name)
+        url = '%s/%s/%s/%s' % (self.url, self._apipath, permission, name)
 
         # TODO this should be part of a generic peer for the frame-based
         # protocol.
@@ -597,6 +610,24 @@
 
         return results
 
+# Registry of API service names to metadata about peers that handle it.
+#
+# The following keys are meaningful:
+#
+# init
+#    Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
+#                        apidescriptor) to create a peer.
+#
+# priority
+#    Integer priority for the service. If we could choose from multiple
+#    services, we choose the one with the highest priority.
+API_PEERS = {
+    wireprototypes.HTTPV2: {
+        'init': httpv2peer,
+        'priority': 50,
+    },
+}
+
 def performhandshake(ui, url, opener, requestbuilder):
     # The handshake is a request to the capabilities command.
 
@@ -604,21 +635,69 @@
     def capable(x):
         raise error.ProgrammingError('should not be called')
 
+    args = {}
+
+    # The client advertises support for newer protocols by adding an
+    # X-HgUpgrade-* header with a list of supported APIs and an
+    # X-HgProto-* header advertising which serializing formats it supports.
+    # We only support the HTTP version 2 transport and CBOR responses for
+    # now.
+    advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
+
+    if advertisev2:
+        args['headers'] = {
+            r'X-HgProto-1': r'cbor',
+        }
+
+        args['headers'].update(
+            encodevalueinheaders(' '.join(sorted(API_PEERS)),
+                                 'X-HgUpgrade',
+                                 # We don't know the header limit this early.
+                                 # So make it small.
+                                 1024))
+
     req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
                                            capable, url, 'capabilities',
-                                           {})
+                                           args)
 
     resp = sendrequest(ui, opener, req)
 
-    respurl, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
-                                           compressible=False)
+    respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
+                                               compressible=False,
+                                               allowcbor=advertisev2)
 
     try:
-        rawcaps = resp.read()
+        rawdata = resp.read()
     finally:
         resp.close()
 
-    return respurl, set(rawcaps.split())
+    if not ct.startswith('application/mercurial-'):
+        raise error.ProgrammingError('unexpected content-type: %s' % ct)
+
+    if advertisev2:
+        if ct == 'application/mercurial-cbor':
+            try:
+                info = cbor.loads(rawdata)
+            except cbor.CBORDecodeError:
+                raise error.Abort(_('error decoding CBOR from remote server'),
+                                  hint=_('try again and consider contacting '
+                                         'the server operator'))
+
+        # We got a legacy response. That's fine.
+        elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
+            info = {
+                'v1capabilities': set(rawdata.split())
+            }
+
+        else:
+            raise error.RepoError(
+                _('unexpected response type from server: %s') % ct)
+    else:
+        info = {
+            'v1capabilities': set(rawdata.split())
+        }
+
+    return respurl, info
 
 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
     """Construct an appropriate HTTP peer instance.
@@ -640,9 +719,33 @@
 
     opener = opener or urlmod.opener(ui, authinfo)
 
-    respurl, caps = performhandshake(ui, url, opener, requestbuilder)
+    respurl, info = performhandshake(ui, url, opener, requestbuilder)
+
+    # Given the intersection of APIs that both we and the server support,
+    # sort by their advertised priority and pick the first one.
+    #
+    # TODO consider making this request-based and interface driven. For
+    # example, the caller could say "I want a peer that does X." It's quite
+    # possible that not all peers would do that. Since we know the service
+    # capabilities, we could filter out services not meeting the
+    # requirements. Possibly by consulting the interfaces defined by the
+    # peer type.
+    apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
 
-    return httppeer(ui, path, respurl, opener, requestbuilder, caps)
+    preferredchoices = sorted(apipeerchoices,
+                              key=lambda x: API_PEERS[x]['priority'],
+                              reverse=True)
+
+    for service in preferredchoices:
+        apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
+
+        return API_PEERS[service]['init'](ui, respurl, apipath, opener,
+                                          requestbuilder,
+                                          info['apis'][service])
+
+    # Failed to construct an API peer. Fall back to legacy.
+    return httppeer(ui, path, respurl, opener, requestbuilder,
+                    info['v1capabilities'])
 
 def instance(ui, path, create):
     if create:
--- a/tests/test-http-protocol.t	Tue Apr 10 14:29:15 2018 -0700
+++ b/tests/test-http-protocol.t	Tue Apr 10 18:16:47 2018 -0700
@@ -1,3 +1,5 @@
+  $ . $TESTDIR/wireprotohelpers.sh
+
   $ cat >> $HGRCPATH << EOF
   > [web]
   > push_ssl = false
@@ -236,4 +238,98 @@
   s>     namespaces\t\n
   s>     phases\t
 
+Client with HTTPv2 enabled advertises that and gets old capabilities response from old server
+
+  $ hg --config experimental.httppeer.advertise-v2=true --verbose debugwireproto http://$LOCALIP:$HGPORT << EOF
+  > command heads
+  > EOF
+  s>     GET /?cmd=capabilities HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     vary: X-HgProto-1,X-HgUpgrade-1\r\n
+  s>     x-hgproto-1: cbor\r\n
+  s>     x-hgupgrade-1: exp-http-v2-0001\r\n
+  s>     accept: application/mercurial-0.1\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 Script output follows\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-0.1\r\n
+  s>     Content-Length: 458\r\n
+  s>     \r\n
+  s>     batch branchmap $USUAL_BUNDLE2_CAPS_SERVER$ changegroupsubset compression=$BUNDLE2_COMPRESSIONS$ getbundle httpheader=1024 httpmediatype=0.1rx,0.1tx,0.2tx known lookup pushkey streamreqs=generaldelta,revlogv1 unbundle=HG10GZ,HG10BZ,HG10UN unbundlehash
+  sending heads command
+  s>     GET /?cmd=heads HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     vary: X-HgProto-1\r\n
+  s>     x-hgproto-1: 0.1 0.2 comp=$USUAL_COMPRESSIONS$ partial-pull\r\n
+  s>     accept: application/mercurial-0.1\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 Script output follows\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-0.1\r\n
+  s>     Content-Length: 41\r\n
+  s>     \r\n
+  s>     0000000000000000000000000000000000000000\n
+  response: b'0000000000000000000000000000000000000000\n'
+
   $ killdaemons.py
+  $ enablehttpv2 empty
+  $ hg -R empty serve -p $HGPORT -d --pid-file hg.pid
+  $ cat hg.pid > $DAEMON_PIDS
+
+Client with HTTPv2 enabled automatically upgrades if the server supports it
+
+  $ hg --config experimental.httppeer.advertise-v2=true --verbose debugwireproto http://$LOCALIP:$HGPORT << EOF
+  > command heads
+  > EOF
+  s>     GET /?cmd=capabilities HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     vary: X-HgProto-1,X-HgUpgrade-1\r\n
+  s>     x-hgproto-1: cbor\r\n
+  s>     x-hgupgrade-1: exp-http-v2-0001\r\n
+  s>     accept: application/mercurial-0.1\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-cbor\r\n
+  s>     Content-Length: 879\r\n
+  s>     \r\n
+  s>     \xa3Dapis\xa1Pexp-http-v2-0001\xa2Hcommands\xa7Eheads\xa2Dargs\xa1Jpubliconly\xf4Kpermissions\x81DpullEknown\xa2Dargs\xa1Enodes\x81HdeadbeefKpermissions\x81DpullFlookup\xa2Dargs\xa1CkeyCfooKpermissions\x81DpullGpushkey\xa2Dargs\xa4CkeyCkeyCnewCnewColdColdInamespaceBnsKpermissions\x81DpushHlistkeys\xa2Dargs\xa1InamespaceBnsKpermissions\x81DpullIbranchmap\xa2Dargs\xa0Kpermissions\x81DpullLcapabilities\xa2Dargs\xa0Kpermissions\x81DpullKcompression\x82\xa1DnameDzstd\xa1DnameDzlibGapibaseDapi/Nv1capabilitiesY\x01\xcabatch branchmap $USUAL_BUNDLE2_CAPS_SERVER$ changegroupsubset compression=$BUNDLE2_COMPRESSIONS$ getbundle httpheader=1024 httpmediatype=0.1rx,0.1tx,0.2tx known lookup pushkey streamreqs=generaldelta,revlogv1 unbundle=HG10GZ,HG10BZ,HG10UN unbundlehash
+  sending heads command
+  s>     POST /api/exp-http-v2-0001/ro/heads HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     accept: application/mercurial-exp-framing-0003\r\n
+  s>     content-type: application/mercurial-exp-framing-0003\r\n
+  s>     content-length: 20\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s>     \x0c\x00\x00\x01\x00\x01\x01\x11\xa1DnameEheads
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-exp-framing-0003\r\n
+  s>     Transfer-Encoding: chunked\r\n
+  s>     \r\n
+  s>     1e\r\n
+  s>     \x16\x00\x00\x01\x00\x02\x01F
+  s>     \x81T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
+  s>     \r\n
+  received frame(size=22; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor)
+  s>     0\r\n
+  s>     \r\n
+  response: [[b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00']]
+
+  $ killdaemons.py