mercurial/wireprotoserver.py
changeset 37545 93397c4633f6
parent 37535 69e46c1834ac
child 37557 734515aca84d
--- a/mercurial/wireprotoserver.py	Mon Apr 09 16:54:20 2018 -0700
+++ b/mercurial/wireprotoserver.py	Mon Apr 09 19:35:04 2018 -0700
@@ -12,9 +12,6 @@
 import threading
 
 from .i18n import _
-from .thirdparty import (
-    cbor,
-)
 from .thirdparty.zope import (
     interface as zi,
 )
@@ -25,8 +22,8 @@
     pycompat,
     util,
     wireproto,
-    wireprotoframing,
     wireprototypes,
+    wireprotov2server,
 )
 from .utils import (
     procutil,
@@ -42,9 +39,7 @@
 HGTYPE = 'application/mercurial-0.1'
 HGTYPE2 = 'application/mercurial-0.2'
 HGERRTYPE = 'application/hg-error'
-FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
 
-HTTPV2 = wireprototypes.HTTPV2
 SSHV1 = wireprototypes.SSHV1
 SSHV2 = wireprototypes.SSHV2
 
@@ -291,350 +286,14 @@
     API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
                                    req.dispatchparts[2:])
 
-def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
-    from .hgweb import common as hgwebcommon
-
-    # URL space looks like: <permissions>/<command>, where <permission> can
-    # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
-
-    # Root URL does nothing meaningful... yet.
-    if not urlparts:
-        res.status = b'200 OK'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('HTTP version 2 API handler'))
-        return
-
-    if len(urlparts) == 1:
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('do not know how to process %s\n') %
-                         req.dispatchpath)
-        return
-
-    permission, command = urlparts[0:2]
-
-    if permission not in (b'ro', b'rw'):
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('unknown permission: %s') % permission)
-        return
-
-    if req.method != 'POST':
-        res.status = b'405 Method Not Allowed'
-        res.headers[b'Allow'] = b'POST'
-        res.setbodybytes(_('commands require POST requests'))
-        return
-
-    # At some point we'll want to use our own API instead of recycling the
-    # behavior of version 1 of the wire protocol...
-    # TODO return reasonable responses - not responses that overload the
-    # HTTP status line message for error reporting.
-    try:
-        checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
-    except hgwebcommon.ErrorResponse as e:
-        res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
-        for k, v in e.headers:
-            res.headers[k] = v
-        res.setbodybytes('permission denied')
-        return
-
-    # We have a special endpoint to reflect the request back at the client.
-    if command == b'debugreflect':
-        _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
-        return
-
-    # Extra commands that we handle that aren't really wire protocol
-    # commands. Think extra hard before making this hackery available to
-    # extension.
-    extracommands = {'multirequest'}
-
-    if command not in wireproto.commandsv2 and command not in extracommands:
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
-        return
-
-    repo = rctx.repo
-    ui = repo.ui
-
-    proto = httpv2protocolhandler(req, ui)
-
-    if (not wireproto.commandsv2.commandavailable(command, proto)
-        and command not in extracommands):
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('invalid wire protocol command: %s') % command)
-        return
-
-    # TODO consider cases where proxies may add additional Accept headers.
-    if req.headers.get(b'Accept') != FRAMINGTYPE:
-        res.status = b'406 Not Acceptable'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
-                           % FRAMINGTYPE)
-        return
-
-    if req.headers.get(b'Content-Type') != FRAMINGTYPE:
-        res.status = b'415 Unsupported Media Type'
-        # TODO we should send a response with appropriate media type,
-        # since client does Accept it.
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('client MUST send Content-Type header with '
-                           'value: %s\n') % FRAMINGTYPE)
-        return
-
-    _processhttpv2request(ui, repo, req, res, permission, command, proto)
-
-def _processhttpv2reflectrequest(ui, repo, req, res):
-    """Reads unified frame protocol request and dumps out state to client.
-
-    This special endpoint can be used to help debug the wire protocol.
-
-    Instead of routing the request through the normal dispatch mechanism,
-    we instead read all frames, decode them, and feed them into our state
-    tracker. We then dump the log of all that activity back out to the
-    client.
-    """
-    import json
-
-    # Reflection APIs have a history of being abused, accidentally disclosing
-    # sensitive data, etc. So we have a config knob.
-    if not ui.configbool('experimental', 'web.api.debugreflect'):
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('debugreflect service not available'))
-        return
-
-    # We assume we have a unified framing protocol request body.
-
-    reactor = wireprotoframing.serverreactor()
-    states = []
-
-    while True:
-        frame = wireprotoframing.readframe(req.bodyfh)
-
-        if not frame:
-            states.append(b'received: <no frame>')
-            break
-
-        states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
-                                                  frame.requestid,
-                                                  frame.payload))
-
-        action, meta = reactor.onframerecv(frame)
-        states.append(json.dumps((action, meta), sort_keys=True,
-                                 separators=(', ', ': ')))
-
-    action, meta = reactor.oninputeof()
-    meta['action'] = action
-    states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
-
-    res.status = b'200 OK'
-    res.headers[b'Content-Type'] = b'text/plain'
-    res.setbodybytes(b'\n'.join(states))
-
-def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
-    """Post-validation handler for HTTPv2 requests.
-
-    Called when the HTTP request contains unified frame-based protocol
-    frames for evaluation.
-    """
-    # TODO Some HTTP clients are full duplex and can receive data before
-    # the entire request is transmitted. Figure out a way to indicate support
-    # for that so we can opt into full duplex mode.
-    reactor = wireprotoframing.serverreactor(deferoutput=True)
-    seencommand = False
-
-    outstream = reactor.makeoutputstream()
-
-    while True:
-        frame = wireprotoframing.readframe(req.bodyfh)
-        if not frame:
-            break
-
-        action, meta = reactor.onframerecv(frame)
-
-        if action == 'wantframe':
-            # Need more data before we can do anything.
-            continue
-        elif action == 'runcommand':
-            sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
-                                           reqcommand, reactor, outstream,
-                                           meta, issubsequent=seencommand)
-
-            if sentoutput:
-                return
-
-            seencommand = True
-
-        elif action == 'error':
-            # TODO define proper error mechanism.
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(meta['message'] + b'\n')
-            return
-        else:
-            raise error.ProgrammingError(
-                'unhandled action from frame processor: %s' % action)
-
-    action, meta = reactor.oninputeof()
-    if action == 'sendframes':
-        # We assume we haven't started sending the response yet. If we're
-        # wrong, the response type will raise an exception.
-        res.status = b'200 OK'
-        res.headers[b'Content-Type'] = FRAMINGTYPE
-        res.setbodygen(meta['framegen'])
-    elif action == 'noop':
-        pass
-    else:
-        raise error.ProgrammingError('unhandled action from frame processor: %s'
-                                     % action)
-
-def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
-                      outstream, command, issubsequent):
-    """Dispatch a wire protocol command made from HTTPv2 requests.
-
-    The authenticated permission (``authedperm``) along with the original
-    command from the URL (``reqcommand``) are passed in.
-    """
-    # We already validated that the session has permissions to perform the
-    # actions in ``authedperm``. In the unified frame protocol, the canonical
-    # command to run is expressed in a frame. However, the URL also requested
-    # to run a specific command. We need to be careful that the command we
-    # run doesn't have permissions requirements greater than what was granted
-    # by ``authedperm``.
-    #
-    # Our rule for this is we only allow one command per HTTP request and
-    # that command must match the command in the URL. However, we make
-    # an exception for the ``multirequest`` URL. This URL is allowed to
-    # execute multiple commands. We double check permissions of each command
-    # as it is invoked to ensure there is no privilege escalation.
-    # TODO consider allowing multiple commands to regular command URLs
-    # iff each command is the same.
-
-    proto = httpv2protocolhandler(req, ui, args=command['args'])
-
-    if reqcommand == b'multirequest':
-        if not wireproto.commandsv2.commandavailable(command['command'], proto):
-            # TODO proper error mechanism
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('wire protocol command not available: %s') %
-                             command['command'])
-            return True
-
-        # TODO don't use assert here, since it may be elided by -O.
-        assert authedperm in (b'ro', b'rw')
-        wirecommand = wireproto.commandsv2[command['command']]
-        assert wirecommand.permission in ('push', 'pull')
-
-        if authedperm == b'ro' and wirecommand.permission != 'pull':
-            # TODO proper error mechanism
-            res.status = b'403 Forbidden'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('insufficient permissions to execute '
-                               'command: %s') % command['command'])
-            return True
-
-        # TODO should we also call checkperm() here? Maybe not if we're going
-        # to overhaul that API. The granted scope from the URL check should
-        # be good enough.
-
-    else:
-        # Don't allow multiple commands outside of ``multirequest`` URL.
-        if issubsequent:
-            # TODO proper error mechanism
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('multiple commands cannot be issued to this '
-                               'URL'))
-            return True
-
-        if reqcommand != command['command']:
-            # TODO define proper error mechanism
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('command in frame must match command in URL'))
-            return True
-
-    rsp = wireproto.dispatch(repo, proto, command['command'])
-
-    res.status = b'200 OK'
-    res.headers[b'Content-Type'] = FRAMINGTYPE
-
-    if isinstance(rsp, wireprototypes.bytesresponse):
-        action, meta = reactor.onbytesresponseready(outstream,
-                                                    command['requestid'],
-                                                    rsp.data)
-    elif isinstance(rsp, wireprototypes.cborresponse):
-        encoded = cbor.dumps(rsp.value, canonical=True)
-        action, meta = reactor.onbytesresponseready(outstream,
-                                                    command['requestid'],
-                                                    encoded,
-                                                    iscbor=True)
-    else:
-        action, meta = reactor.onapplicationerror(
-            _('unhandled response type from wire proto command'))
-
-    if action == 'sendframes':
-        res.setbodygen(meta['framegen'])
-        return True
-    elif action == 'noop':
-        return False
-    else:
-        raise error.ProgrammingError('unhandled event from reactor: %s' %
-                                     action)
-
 # Maps API name to metadata so custom API can be registered.
 API_HANDLERS = {
-    HTTPV2: {
+    wireprotov2server.HTTPV2: {
         'config': ('experimental', 'web.api.http-v2'),
-        'handler': _handlehttpv2request,
+        'handler': wireprotov2server.handlehttpv2request,
     },
 }
 
-@zi.implementer(wireprototypes.baseprotocolhandler)
-class httpv2protocolhandler(object):
-    def __init__(self, req, ui, args=None):
-        self._req = req
-        self._ui = ui
-        self._args = args
-
-    @property
-    def name(self):
-        return HTTPV2
-
-    def getargs(self, args):
-        data = {}
-        for k, typ in args.items():
-            if k == '*':
-                raise NotImplementedError('do not support * args')
-            elif k in self._args:
-                # TODO consider validating value types.
-                data[k] = self._args[k]
-
-        return data
-
-    def getprotocaps(self):
-        # Protocol capabilities are currently not implemented for HTTP V2.
-        return set()
-
-    def getpayload(self):
-        raise NotImplementedError
-
-    @contextlib.contextmanager
-    def mayberedirectstdio(self):
-        raise NotImplementedError
-
-    def client(self):
-        raise NotImplementedError
-
-    def addcapabilities(self, repo, caps):
-        return caps
-
-    def checkperm(self, perm):
-        raise NotImplementedError
-
 def _httpresponsetype(ui, proto, prefer_uncompressed):
     """Determine the appropriate response type and compression settings.