wireprotov2: server support for sending content redirects
authorGregory Szorc <gregory.szorc@gmail.com>
Wed, 26 Sep 2018 18:07:55 -0700
changeset 40025 b099e6032f38
parent 40024 86b22a4cfab1
child 40026 7e807b8a9e56
wireprotov2: server support for sending content redirects A "content redirect" can be sent in place of inline response content. In terms of code, we model a content redirect as a special type of response object holding the attributes describing that redirect. Sending a content redirect thus becomes as simple as the object emission layer sending an instance of that type. A cacher using externally-addressable content storage could replace the outgoing object stream with an object advertising its location. The bulk of the code in this commit is teaching the output layer which handles the object stream to recognize alternate location objects. The rules are that if an alternate location object is present, it must be the first and only object in the object stream. Otherwise the server emits an error. Differential Revision: https://phab.mercurial-scm.org/D4777
mercurial/wireprotoframing.py
mercurial/wireprototypes.py
mercurial/wireprotov2server.py
tests/test-http-api-httpv2.t
tests/test-wireproto-content-redirects.t
tests/test-wireproto-serverreactor.py
tests/wireprotosimplecache.py
--- a/mercurial/wireprotoframing.py	Wed Sep 26 15:02:19 2018 -0700
+++ b/mercurial/wireprotoframing.py	Wed Sep 26 18:07:55 2018 -0700
@@ -21,6 +21,7 @@
 from . import (
     encoding,
     error,
+    pycompat,
     util,
     wireprototypes,
 )
@@ -429,6 +430,26 @@
                             flags=FLAG_COMMAND_RESPONSE_EOS,
                             payload=b'')
 
+def createalternatelocationresponseframe(stream, requestid, location):
+    data = {
+        b'status': b'redirect',
+        b'location': {
+            b'url': location.url,
+            b'mediatype': location.mediatype,
+        }
+    }
+
+    for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
+              r'servercadercerts'):
+        value = getattr(location, a)
+        if value is not None:
+            data[b'location'][pycompat.bytestr(a)] = value
+
+    return stream.makeframe(requestid=requestid,
+                            typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                            flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+                            payload=b''.join(cborutil.streamencode(data)))
+
 def createcommanderrorresponse(stream, requestid, message, args=None):
     # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
     # formatting works consistently?
@@ -813,6 +834,7 @@
 
         def sendframes():
             emitted = False
+            alternatelocationsent = False
             emitter = bufferingcommandresponseemitter(stream, requestid)
             while True:
                 try:
@@ -841,6 +863,25 @@
                     break
 
                 try:
+                    # Alternate location responses can only be the first and
+                    # only object in the output stream.
+                    if isinstance(o, wireprototypes.alternatelocationresponse):
+                        if emitted:
+                            raise error.ProgrammingError(
+                                'alternatelocationresponse seen after initial '
+                                'output object')
+
+                        yield createalternatelocationresponseframe(
+                            stream, requestid, o)
+
+                        alternatelocationsent = True
+                        emitted = True
+                        continue
+
+                    if alternatelocationsent:
+                        raise error.ProgrammingError(
+                            'object follows alternatelocationresponse')
+
                     if not emitted:
                         yield createcommandresponseokframe(stream, requestid)
                         emitted = True
@@ -977,6 +1018,7 @@
             'requestid': requestid,
             'command': request[b'name'],
             'args': request[b'args'],
+            'redirect': request.get(b'redirect'),
             'data': entry['data'].getvalue() if entry['data'] else None,
         }
 
--- a/mercurial/wireprototypes.py	Wed Sep 26 15:02:19 2018 -0700
+++ b/mercurial/wireprototypes.py	Wed Sep 26 18:07:55 2018 -0700
@@ -368,3 +368,20 @@
     and the content from this object is used instead.
     """
     data = attr.ib()
+
+@attr.s
+class alternatelocationresponse(object):
+    """Represents a response available at an alternate location.
+
+    Instances are sent in place of actual response objects when the server
+    is sending a "content redirect" response.
+
+    Only compatible with wire protocol version 2.
+    """
+    url = attr.ib()
+    mediatype = attr.ib()
+    size = attr.ib(default=None)
+    fullhashes = attr.ib(default=None)
+    fullhashseed = attr.ib(default=None)
+    serverdercerts = attr.ib(default=None)
+    servercadercerts = attr.ib(default=None)
--- a/mercurial/wireprotov2server.py	Wed Sep 26 15:02:19 2018 -0700
+++ b/mercurial/wireprotov2server.py	Wed Sep 26 18:07:55 2018 -0700
@@ -312,7 +312,7 @@
     res.headers[b'Content-Type'] = FRAMINGTYPE
 
     try:
-        objs = dispatch(repo, proto, command['command'])
+        objs = dispatch(repo, proto, command['command'], command['redirect'])
 
         action, meta = reactor.oncommandresponsereadyobjects(
             outstream, command['requestid'], objs)
@@ -339,7 +339,7 @@
 def getdispatchrepo(repo, proto, command):
     return repo.filtered('served')
 
-def dispatch(repo, proto, command):
+def dispatch(repo, proto, command, redirect):
     """Run a wire protocol command.
 
     Returns an iterable of objects that will be sent to the client.
@@ -364,8 +364,17 @@
             yield o
         return
 
+    if redirect:
+        redirecttargets = redirect[b'targets']
+        redirecthashes = redirect[b'hashes']
+    else:
+        redirecttargets = []
+        redirecthashes = []
+
     cacher = makeresponsecacher(repo, proto, command, args,
-                                cborutil.streamencode)
+                                cborutil.streamencode,
+                                redirecttargets=redirecttargets,
+                                redirecthashes=redirecthashes)
 
     # But we have no cacher. Do default handling.
     if not cacher:
@@ -751,7 +760,8 @@
 
     return cachekeyfn
 
-def makeresponsecacher(repo, proto, command, args, objencoderfn):
+def makeresponsecacher(repo, proto, command, args, objencoderfn,
+                       redirecttargets, redirecthashes):
     """Construct a cacher for a cacheable command.
 
     Returns an ``iwireprotocolcommandcacher`` instance.
--- a/tests/test-http-api-httpv2.t	Wed Sep 26 15:02:19 2018 -0700
+++ b/tests/test-http-api-httpv2.t	Wed Sep 26 18:07:55 2018 -0700
@@ -430,10 +430,10 @@
   s>     Server: testing stub value\r\n
   s>     Date: $HTTP_DATE$\r\n
   s>     Content-Type: text/plain\r\n
-  s>     Content-Length: 205\r\n
+  s>     Content-Length: 223\r\n
   s>     \r\n
   s>     received: 1 1 1 \xa2Dargs\xa2Dbar1CvalCfooDval1DnameHcommand1\n
-  s>     ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null, "requestid": 1}]\n
+  s>     ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null, "redirect": null, "requestid": 1}]\n
   s>     received: <no frame>\n
   s>     {"action": "noop"}
 
--- a/tests/test-wireproto-content-redirects.t	Wed Sep 26 15:02:19 2018 -0700
+++ b/tests/test-wireproto-content-redirects.t	Wed Sep 26 18:07:55 2018 -0700
@@ -1,11 +1,20 @@
   $ . $TESTDIR/wireprotohelpers.sh
 
+  $ cat >> $HGRCPATH << EOF
+  > [extensions]
+  > blackbox =
+  > [blackbox]
+  > track = simplecache
+  > EOF
+
   $ hg init server
   $ enablehttpv2 server
   $ cd server
   $ cat >> .hg/hgrc << EOF
   > [extensions]
   > simplecache = $TESTDIR/wireprotosimplecache.py
+  > [simplecache]
+  > cacheapi = true
   > EOF
 
   $ echo a0 > a
@@ -1183,5 +1192,178 @@
     }
   ]
 
+Set up the server to issue content redirects to its built-in API server.
+
+  $ cat > redirects.py << EOF
+  > [
+  >   {
+  >     b'name': b'local',
+  >     b'protocol': b'http',
+  >     b'uris': [b'http://example.com/'],
+  >   },
+  > ]
+  > EOF
+
+Request to eventual cache URL should return 404 (validating the cache server works)
+
+  $ sendhttpraw << EOF
+  > httprequest GET api/simplecache/missingkey
+  >     user-agent: test
+  > EOF
+  using raw connection to peer
+  s>     GET /api/simplecache/missingkey HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     user-agent: test\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 404 Not Found\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: text/plain\r\n
+  s>     Content-Length: 22\r\n
+  s>     \r\n
+  s>     key not found in cache
+
+Send a cacheable request
+
+  $ sendhttpv2peer << EOF
+  > command manifestdata
+  >     nodes eval:[b'\x99\x2f\x47\x79\x02\x9a\x3d\xf8\xd0\x66\x6d\x00\xbb\x92\x4f\x69\x63\x4e\x26\x41']
+  >     tree eval:b''
+  >     fields eval:[b'parents']
+  > EOF
+  creating http peer for wire protocol version 2
+  sending manifestdata command
+  s>     POST /api/exp-http-v2-0002/ro/manifestdata HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     accept: application/mercurial-exp-framing-0005\r\n
+  s>     content-type: application/mercurial-exp-framing-0005\r\n
+  s>     content-length: 128\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s>     x\x00\x00\x01\x00\x01\x01\x11\xa3Dargs\xa3Ffields\x81GparentsEnodes\x81T\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&ADtree@DnameLmanifestdataHredirect\xa2Fhashes\x82Fsha256Dsha1Gtargets\x81Elocal
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-exp-framing-0005\r\n
+  s>     Transfer-Encoding: chunked\r\n
+  s>     \r\n
+  s>     13\r\n
+  s>     \x0b\x00\x00\x01\x00\x02\x011
+  s>     \xa1FstatusBok
+  s>     \r\n
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  s>     63\r\n
+  s>     [\x00\x00\x01\x00\x02\x001
+  s>     \xa1Jtotalitems\x01\xa2DnodeT\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&AGparents\x82T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
+  s>     \r\n
+  received frame(size=91; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  s>     8\r\n
+  s>     \x00\x00\x00\x01\x00\x02\x002
+  s>     \r\n
+  s>     0\r\n
+  s>     \r\n
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  response: gen[
+    {
+      b'totalitems': 1
+    },
+    {
+      b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A',
+      b'parents': [
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+      ]
+    }
+  ]
+
+Cached entry should be available on server
+
+  $ sendhttpraw << EOF
+  > httprequest GET api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0
+  >     user-agent: test
+  > EOF
+  using raw connection to peer
+  s>     GET /api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0 HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     user-agent: test\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-cbor\r\n
+  s>     Content-Length: 91\r\n
+  s>     \r\n
+  s>     \xa1Jtotalitems\x01\xa2DnodeT\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&AGparents\x82T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
+  cbor> [
+    {
+      b'totalitems': 1
+    },
+    {
+      b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A',
+      b'parents': [
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+      ]
+    }
+  ]
+
+2nd request should result in content redirect response
+
+  $ sendhttpv2peer << EOF
+  > command manifestdata
+  >     nodes eval:[b'\x99\x2f\x47\x79\x02\x9a\x3d\xf8\xd0\x66\x6d\x00\xbb\x92\x4f\x69\x63\x4e\x26\x41']
+  >     tree eval:b''
+  >     fields eval:[b'parents']
+  > EOF
+  creating http peer for wire protocol version 2
+  sending manifestdata command
+  s>     POST /api/exp-http-v2-0002/ro/manifestdata HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     accept: application/mercurial-exp-framing-0005\r\n
+  s>     content-type: application/mercurial-exp-framing-0005\r\n
+  s>     content-length: 128\r\n
+  s>     host: $LOCALIP:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s>     x\x00\x00\x01\x00\x01\x01\x11\xa3Dargs\xa3Ffields\x81GparentsEnodes\x81T\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&ADtree@DnameLmanifestdataHredirect\xa2Fhashes\x82Fsha256Dsha1Gtargets\x81Elocal
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-exp-framing-0005\r\n
+  s>     Transfer-Encoding: chunked\r\n
+  s>     \r\n
+  s>     *\r\n (glob)
+  s>     \x*\x00\x00\x01\x00\x02\x011 (glob)
+  s>     \xa2Hlocation\xa2ImediatypeX\x1aapplication/mercurial-cborCurl*http://*:$HGPORT/api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0FstatusHredirect (glob)
+  s>     \r\n
+  received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) (glob)
+  s>     8\r\n
+  s>     \x00\x00\x00\x01\x00\x02\x001
+  s>     \r\n
+  s>     8\r\n
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  s>     \x00\x00\x00\x01\x00\x02\x002
+  s>     \r\n
+  s>     0\r\n
+  s>     \r\n
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  abort: redirect responses not yet supported
+  [255]
+
   $ cat error.log
   $ killdaemons.py
+
+  $ cat .hg/blackbox.log
+  *> cacher constructed for manifestdata (glob)
+  *> cache miss for c045a581599d58608efd3d93d8129841f2af04a0 (glob)
+  *> storing cache entry for c045a581599d58608efd3d93d8129841f2af04a0 (glob)
+  *> cacher constructed for manifestdata (glob)
+  *> cache hit for c045a581599d58608efd3d93d8129841f2af04a0 (glob)
+  *> sending content redirect for c045a581599d58608efd3d93d8129841f2af04a0 to http://*:$HGPORT/api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0 (glob)
--- a/tests/test-wireproto-serverreactor.py	Wed Sep 26 15:02:19 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py	Wed Sep 26 18:07:55 2018 -0700
@@ -69,6 +69,7 @@
             b'requestid': 1,
             b'command': b'mycommand',
             b'args': {},
+            b'redirect': None,
             b'data': None,
         })
 
@@ -86,6 +87,7 @@
             b'requestid': 41,
             b'command': b'mycommand',
             b'args': {b'foo': b'bar'},
+            b'redirect': None,
             b'data': None,
         })
 
@@ -100,6 +102,7 @@
             b'requestid': 1,
             b'command': b'mycommand',
             b'args': {b'foo': b'bar', b'biz': b'baz'},
+            b'redirect': None,
             b'data': None,
         })
 
@@ -115,6 +118,7 @@
             b'requestid': 1,
             b'command': b'mycommand',
             b'args': {},
+            b'redirect': None,
             b'data': b'data!',
         })
 
@@ -137,6 +141,7 @@
             b'requestid': 1,
             b'command': b'mycommand',
             b'args': {},
+            b'redirect': None,
             b'data': b'data1data2data3',
         })
 
@@ -160,6 +165,7 @@
                 b'key': b'val',
                 b'foo': b'bar',
             },
+            b'redirect': None,
             b'data': b'value1value2',
         })
 
@@ -235,6 +241,7 @@
                 b'requestid': 1,
                 b'command': b'command',
                 b'args': {},
+                b'redirect': None,
                 b'data': None,
             })
 
@@ -291,12 +298,14 @@
             b'requestid': 3,
             b'command': b'command3',
             b'args': {b'biz': b'baz', b'key': b'val'},
+            b'redirect': None,
             b'data': None,
         })
         self.assertEqual(results[5][1], {
             b'requestid': 1,
             b'command': b'command1',
             b'args': {b'foo': b'bar', b'key1': b'val'},
+            b'redirect': None,
             b'data': None,
         })
 
--- a/tests/wireprotosimplecache.py	Wed Sep 26 15:02:19 2018 -0700
+++ b/tests/wireprotosimplecache.py	Wed Sep 26 18:07:55 2018 -0700
@@ -12,6 +12,7 @@
     registrar,
     repository,
     util,
+    wireprotoserver,
     wireprototypes,
     wireprotov2server,
 )
@@ -25,18 +26,59 @@
 configtable = {}
 configitem = registrar.configitem(configtable)
 
+configitem('simplecache', 'cacheapi',
+           default=False)
 configitem('simplecache', 'cacheobjects',
            default=False)
 configitem('simplecache', 'redirectsfile',
            default=None)
 
+# API handler that makes cached keys available.
+def handlecacherequest(rctx, req, res, checkperm, urlparts):
+    if rctx.repo.ui.configbool('simplecache', 'cacheobjects'):
+        res.status = b'500 Internal Server Error'
+        res.setbodybytes(b'cacheobjects not supported for api server')
+        return
+
+    if not urlparts:
+        res.status = b'200 OK'
+        res.headers[b'Content-Type'] = b'text/plain'
+        res.setbodybytes(b'simple cache server')
+        return
+
+    key = b'/'.join(urlparts)
+
+    if key not in CACHE:
+        res.status = b'404 Not Found'
+        res.headers[b'Content-Type'] = b'text/plain'
+        res.setbodybytes(b'key not found in cache')
+        return
+
+    res.status = b'200 OK'
+    res.headers[b'Content-Type'] = b'application/mercurial-cbor'
+    res.setbodybytes(CACHE[key])
+
+def cachedescriptor(req, repo):
+    return {}
+
+wireprotoserver.API_HANDLERS[b'simplecache'] = {
+    'config': (b'simplecache', b'cacheapi'),
+    'handler': handlecacherequest,
+    'apidescriptor': cachedescriptor,
+}
+
 @interfaceutil.implementer(repository.iwireprotocolcommandcacher)
 class memorycacher(object):
-    def __init__(self, ui, command, encodefn):
+    def __init__(self, ui, command, encodefn, redirecttargets, redirecthashes,
+                 req):
         self.ui = ui
         self.encodefn = encodefn
+        self.redirecttargets = redirecttargets
+        self.redirecthashes = redirecthashes
+        self.req = req
         self.key = None
         self.cacheobjects = ui.configbool('simplecache', 'cacheobjects')
+        self.cacheapi = ui.configbool('simplecache', 'cacheapi')
         self.buffered = []
 
         ui.log('simplecache', 'cacher constructed for %s\n', command)
@@ -65,6 +107,37 @@
         entry = CACHE[self.key]
         self.ui.log('simplecache', 'cache hit for %s\n', self.key)
 
+        redirectable = True
+
+        if not self.cacheapi:
+            redirectable = False
+        elif not self.redirecttargets:
+            redirectable = False
+        else:
+            clienttargets = set(self.redirecttargets)
+            ourtargets = set(t[b'name'] for t in loadredirecttargets(self.ui))
+
+            # We only ever redirect to a single target (for now). So we don't
+            # need to store which target matched.
+            if not clienttargets & ourtargets:
+                redirectable = False
+
+        if redirectable:
+            paths = self.req.dispatchparts[:-3]
+            paths.append(b'simplecache')
+            paths.append(self.key)
+
+            url = b'%s/%s' % (self.req.advertisedbaseurl, b'/'.join(paths))
+
+            #url = b'http://example.com/%s' % self.key
+            self.ui.log('simplecache', 'sending content redirect for %s to '
+                                       '%s\n', self.key, url)
+            response = wireprototypes.alternatelocationresponse(
+                url=url,
+                mediatype=b'application/mercurial-cbor')
+
+            return {'objs': [response]}
+
         if self.cacheobjects:
             return {
                 'objs': entry,
@@ -91,8 +164,10 @@
 
         return []
 
-def makeresponsecacher(orig, repo, proto, command, args, objencoderfn):
-    return memorycacher(repo.ui, command, objencoderfn)
+def makeresponsecacher(orig, repo, proto, command, args, objencoderfn,
+                       redirecttargets, redirecthashes):
+    return memorycacher(repo.ui, command, objencoderfn, redirecttargets,
+                        redirecthashes, proto._req)
 
 def loadredirecttargets(ui):
     path = ui.config('simplecache', 'redirectsfile')