--- a/mercurial/wireprotov2server.py Thu Dec 30 13:25:44 2021 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1617 +0,0 @@
-# Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
-# Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
-#
-# This software may be used and distributed according to the terms of the
-# GNU General Public License version 2 or any later version.
-
-from __future__ import absolute_import
-
-import collections
-import contextlib
-
-from .i18n import _
-from .node import hex
-from . import (
- discovery,
- encoding,
- error,
- match as matchmod,
- narrowspec,
- pycompat,
- streamclone,
- templatefilters,
- util,
- wireprotoframing,
- wireprototypes,
-)
-from .interfaces import util as interfaceutil
-from .utils import (
- cborutil,
- hashutil,
- stringutil,
-)
-
-FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
-
-HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
-
-COMMANDS = wireprototypes.commanddict()
-
-# Value inserted into cache key computation function. Change the value to
-# force new cache keys for every command request. This should be done when
-# there is a change to how caching works, etc.
-GLOBAL_CACHE_VERSION = 1
-
-
-def handlehttpv2request(rctx, req, res, checkperm, urlparts):
- from .hgweb import common as hgwebcommon
-
- # URL space looks like: <permissions>/<command>, where <permission> can
- # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
-
- # Root URL does nothing meaningful... yet.
- if not urlparts:
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(_(b'HTTP version 2 API handler'))
- return
-
- if len(urlparts) == 1:
- res.status = b'404 Not Found'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(
- _(b'do not know how to process %s\n') % req.dispatchpath
- )
- return
-
- permission, command = urlparts[0:2]
-
- if permission not in (b'ro', b'rw'):
- res.status = b'404 Not Found'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(_(b'unknown permission: %s') % permission)
- return
-
- if req.method != b'POST':
- res.status = b'405 Method Not Allowed'
- res.headers[b'Allow'] = b'POST'
- res.setbodybytes(_(b'commands require POST requests'))
- return
-
- # At some point we'll want to use our own API instead of recycling the
- # behavior of version 1 of the wire protocol...
- # TODO return reasonable responses - not responses that overload the
- # HTTP status line message for error reporting.
- try:
- checkperm(rctx, req, b'pull' if permission == b'ro' else b'push')
- except hgwebcommon.ErrorResponse as e:
- res.status = hgwebcommon.statusmessage(
- e.code, stringutil.forcebytestr(e)
- )
- for k, v in e.headers:
- res.headers[k] = v
- res.setbodybytes(b'permission denied')
- return
-
- # We have a special endpoint to reflect the request back at the client.
- if command == b'debugreflect':
- _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
- return
-
- # Extra commands that we handle that aren't really wire protocol
- # commands. Think extra hard before making this hackery available to
- # extension.
- extracommands = {b'multirequest'}
-
- if command not in COMMANDS and command not in extracommands:
- res.status = b'404 Not Found'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(_(b'unknown wire protocol command: %s\n') % command)
- return
-
- repo = rctx.repo
- ui = repo.ui
-
- proto = httpv2protocolhandler(req, ui)
-
- if (
- not COMMANDS.commandavailable(command, proto)
- and command not in extracommands
- ):
- res.status = b'404 Not Found'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(_(b'invalid wire protocol command: %s') % command)
- return
-
- # TODO consider cases where proxies may add additional Accept headers.
- if req.headers.get(b'Accept') != FRAMINGTYPE:
- res.status = b'406 Not Acceptable'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(
- _(b'client MUST specify Accept header with value: %s\n')
- % FRAMINGTYPE
- )
- return
-
- if req.headers.get(b'Content-Type') != FRAMINGTYPE:
- res.status = b'415 Unsupported Media Type'
- # TODO we should send a response with appropriate media type,
- # since client does Accept it.
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(
- _(b'client MUST send Content-Type header with value: %s\n')
- % FRAMINGTYPE
- )
- return
-
- _processhttpv2request(ui, repo, req, res, permission, command, proto)
-
-
-def _processhttpv2reflectrequest(ui, repo, req, res):
- """Reads unified frame protocol request and dumps out state to client.
-
- This special endpoint can be used to help debug the wire protocol.
-
- Instead of routing the request through the normal dispatch mechanism,
- we instead read all frames, decode them, and feed them into our state
- tracker. We then dump the log of all that activity back out to the
- client.
- """
- # Reflection APIs have a history of being abused, accidentally disclosing
- # sensitive data, etc. So we have a config knob.
- if not ui.configbool(b'experimental', b'web.api.debugreflect'):
- res.status = b'404 Not Found'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(_(b'debugreflect service not available'))
- return
-
- # We assume we have a unified framing protocol request body.
-
- reactor = wireprotoframing.serverreactor(ui)
- states = []
-
- while True:
- frame = wireprotoframing.readframe(req.bodyfh)
-
- if not frame:
- states.append(b'received: <no frame>')
- break
-
- states.append(
- b'received: %d %d %d %s'
- % (frame.typeid, frame.flags, frame.requestid, frame.payload)
- )
-
- action, meta = reactor.onframerecv(frame)
- states.append(templatefilters.json((action, meta)))
-
- action, meta = reactor.oninputeof()
- meta[b'action'] = action
- states.append(templatefilters.json(meta))
-
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(b'\n'.join(states))
-
-
-def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
- """Post-validation handler for HTTPv2 requests.
-
- Called when the HTTP request contains unified frame-based protocol
- frames for evaluation.
- """
- # TODO Some HTTP clients are full duplex and can receive data before
- # the entire request is transmitted. Figure out a way to indicate support
- # for that so we can opt into full duplex mode.
- reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
- seencommand = False
-
- outstream = None
-
- while True:
- frame = wireprotoframing.readframe(req.bodyfh)
- if not frame:
- break
-
- action, meta = reactor.onframerecv(frame)
-
- if action == b'wantframe':
- # Need more data before we can do anything.
- continue
- elif action == b'runcommand':
- # Defer creating output stream because we need to wait for
- # protocol settings frames so proper encoding can be applied.
- if not outstream:
- outstream = reactor.makeoutputstream()
-
- sentoutput = _httpv2runcommand(
- ui,
- repo,
- req,
- res,
- authedperm,
- reqcommand,
- reactor,
- outstream,
- meta,
- issubsequent=seencommand,
- )
-
- if sentoutput:
- return
-
- seencommand = True
-
- elif action == b'error':
- # TODO define proper error mechanism.
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(meta[b'message'] + b'\n')
- return
- else:
- raise error.ProgrammingError(
- b'unhandled action from frame processor: %s' % action
- )
-
- action, meta = reactor.oninputeof()
- if action == b'sendframes':
- # We assume we haven't started sending the response yet. If we're
- # wrong, the response type will raise an exception.
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = FRAMINGTYPE
- res.setbodygen(meta[b'framegen'])
- elif action == b'noop':
- pass
- else:
- raise error.ProgrammingError(
- b'unhandled action from frame processor: %s' % action
- )
-
-
-def _httpv2runcommand(
- ui,
- repo,
- req,
- res,
- authedperm,
- reqcommand,
- reactor,
- outstream,
- command,
- issubsequent,
-):
- """Dispatch a wire protocol command made from HTTPv2 requests.
-
- The authenticated permission (``authedperm``) along with the original
- command from the URL (``reqcommand``) are passed in.
- """
- # We already validated that the session has permissions to perform the
- # actions in ``authedperm``. In the unified frame protocol, the canonical
- # command to run is expressed in a frame. However, the URL also requested
- # to run a specific command. We need to be careful that the command we
- # run doesn't have permissions requirements greater than what was granted
- # by ``authedperm``.
- #
- # Our rule for this is we only allow one command per HTTP request and
- # that command must match the command in the URL. However, we make
- # an exception for the ``multirequest`` URL. This URL is allowed to
- # execute multiple commands. We double check permissions of each command
- # as it is invoked to ensure there is no privilege escalation.
- # TODO consider allowing multiple commands to regular command URLs
- # iff each command is the same.
-
- proto = httpv2protocolhandler(req, ui, args=command[b'args'])
-
- if reqcommand == b'multirequest':
- if not COMMANDS.commandavailable(command[b'command'], proto):
- # TODO proper error mechanism
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(
- _(b'wire protocol command not available: %s')
- % command[b'command']
- )
- return True
-
- # TODO don't use assert here, since it may be elided by -O.
- assert authedperm in (b'ro', b'rw')
- wirecommand = COMMANDS[command[b'command']]
- assert wirecommand.permission in (b'push', b'pull')
-
- if authedperm == b'ro' and wirecommand.permission != b'pull':
- # TODO proper error mechanism
- res.status = b'403 Forbidden'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(
- _(b'insufficient permissions to execute command: %s')
- % command[b'command']
- )
- return True
-
- # TODO should we also call checkperm() here? Maybe not if we're going
- # to overhaul that API. The granted scope from the URL check should
- # be good enough.
-
- else:
- # Don't allow multiple commands outside of ``multirequest`` URL.
- if issubsequent:
- # TODO proper error mechanism
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(
- _(b'multiple commands cannot be issued to this URL')
- )
- return True
-
- if reqcommand != command[b'command']:
- # TODO define proper error mechanism
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = b'text/plain'
- res.setbodybytes(_(b'command in frame must match command in URL'))
- return True
-
- res.status = b'200 OK'
- res.headers[b'Content-Type'] = FRAMINGTYPE
-
- try:
- objs = dispatch(repo, proto, command[b'command'], command[b'redirect'])
-
- action, meta = reactor.oncommandresponsereadyobjects(
- outstream, command[b'requestid'], objs
- )
-
- except error.WireprotoCommandError as e:
- action, meta = reactor.oncommanderror(
- outstream, command[b'requestid'], e.message, e.messageargs
- )
-
- except Exception as e:
- action, meta = reactor.onservererror(
- outstream,
- command[b'requestid'],
- _(b'exception when invoking command: %s')
- % stringutil.forcebytestr(e),
- )
-
- if action == b'sendframes':
- res.setbodygen(meta[b'framegen'])
- return True
- elif action == b'noop':
- return False
- else:
- raise error.ProgrammingError(
- b'unhandled event from reactor: %s' % action
- )
-
-
-def getdispatchrepo(repo, proto, command):
- viewconfig = repo.ui.config(b'server', b'view')
- return repo.filtered(viewconfig)
-
-
-def dispatch(repo, proto, command, redirect):
- """Run a wire protocol command.
-
- Returns an iterable of objects that will be sent to the client.
- """
- repo = getdispatchrepo(repo, proto, command)
-
- entry = COMMANDS[command]
- func = entry.func
- spec = entry.args
-
- args = proto.getargs(spec)
-
- # There is some duplicate boilerplate code here for calling the command and
- # emitting objects. It is either that or a lot of indented code that looks
- # like a pyramid (since there are a lot of code paths that result in not
- # using the cacher).
- callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
-
- # Request is not cacheable. Don't bother instantiating a cacher.
- if not entry.cachekeyfn:
- for o in callcommand():
- yield o
- return
-
- if redirect:
- redirecttargets = redirect[b'targets']
- redirecthashes = redirect[b'hashes']
- else:
- redirecttargets = []
- redirecthashes = []
-
- cacher = makeresponsecacher(
- repo,
- proto,
- command,
- args,
- cborutil.streamencode,
- redirecttargets=redirecttargets,
- redirecthashes=redirecthashes,
- )
-
- # But we have no cacher. Do default handling.
- if not cacher:
- for o in callcommand():
- yield o
- return
-
- with cacher:
- cachekey = entry.cachekeyfn(
- repo, proto, cacher, **pycompat.strkwargs(args)
- )
-
- # No cache key or the cacher doesn't like it. Do default handling.
- if cachekey is None or not cacher.setcachekey(cachekey):
- for o in callcommand():
- yield o
- return
-
- # Serve it from the cache, if possible.
- cached = cacher.lookup()
-
- if cached:
- for o in cached[b'objs']:
- yield o
- return
-
- # Else call the command and feed its output into the cacher, allowing
- # the cacher to buffer/mutate objects as it desires.
- for o in callcommand():
- for o in cacher.onobject(o):
- yield o
-
- for o in cacher.onfinished():
- yield o
-
-
-@interfaceutil.implementer(wireprototypes.baseprotocolhandler)
-class httpv2protocolhandler(object):
- def __init__(self, req, ui, args=None):
- self._req = req
- self._ui = ui
- self._args = args
-
- @property
- def name(self):
- return HTTP_WIREPROTO_V2
-
- def getargs(self, args):
- # First look for args that were passed but aren't registered on this
- # command.
- extra = set(self._args) - set(args)
- if extra:
- raise error.WireprotoCommandError(
- b'unsupported argument to command: %s'
- % b', '.join(sorted(extra))
- )
-
- # And look for required arguments that are missing.
- missing = {a for a in args if args[a][b'required']} - set(self._args)
-
- if missing:
- raise error.WireprotoCommandError(
- b'missing required arguments: %s' % b', '.join(sorted(missing))
- )
-
- # Now derive the arguments to pass to the command, taking into
- # account the arguments specified by the client.
- data = {}
- for k, meta in sorted(args.items()):
- # This argument wasn't passed by the client.
- if k not in self._args:
- data[k] = meta[b'default']()
- continue
-
- v = self._args[k]
-
- # Sets may be expressed as lists. Silently normalize.
- if meta[b'type'] == b'set' and isinstance(v, list):
- v = set(v)
-
- # TODO consider more/stronger type validation.
-
- data[k] = v
-
- return data
-
- def getprotocaps(self):
- # Protocol capabilities are currently not implemented for HTTP V2.
- return set()
-
- def getpayload(self):
- raise NotImplementedError
-
- @contextlib.contextmanager
- def mayberedirectstdio(self):
- raise NotImplementedError
-
- def client(self):
- raise NotImplementedError
-
- def addcapabilities(self, repo, caps):
- return caps
-
- def checkperm(self, perm):
- raise NotImplementedError
-
-
-def httpv2apidescriptor(req, repo):
- proto = httpv2protocolhandler(req, repo.ui)
-
- return _capabilitiesv2(repo, proto)
-
-
-def _capabilitiesv2(repo, proto):
- """Obtain the set of capabilities for version 2 transports.
-
- These capabilities are distinct from the capabilities for version 1
- transports.
- """
- caps = {
- b'commands': {},
- b'framingmediatypes': [FRAMINGTYPE],
- b'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
- }
-
- for command, entry in COMMANDS.items():
- args = {}
-
- for arg, meta in entry.args.items():
- args[arg] = {
- # TODO should this be a normalized type using CBOR's
- # terminology?
- b'type': meta[b'type'],
- b'required': meta[b'required'],
- }
-
- if not meta[b'required']:
- args[arg][b'default'] = meta[b'default']()
-
- if meta[b'validvalues']:
- args[arg][b'validvalues'] = meta[b'validvalues']
-
- # TODO this type of check should be defined in a per-command callback.
- if (
- command == b'rawstorefiledata'
- and not streamclone.allowservergeneration(repo)
- ):
- continue
-
- # pytype: disable=unsupported-operands
- caps[b'commands'][command] = {
- b'args': args,
- b'permissions': [entry.permission],
- }
- # pytype: enable=unsupported-operands
-
- if entry.extracapabilitiesfn:
- extracaps = entry.extracapabilitiesfn(repo, proto)
- caps[b'commands'][command].update(extracaps)
-
- caps[b'rawrepoformats'] = sorted(repo.requirements & repo.supportedformats)
-
- targets = getadvertisedredirecttargets(repo, proto)
- if targets:
- caps[b'redirect'] = {
- b'targets': [],
- b'hashes': [b'sha256', b'sha1'],
- }
-
- for target in targets:
- entry = {
- b'name': target[b'name'],
- b'protocol': target[b'protocol'],
- b'uris': target[b'uris'],
- }
-
- for key in (b'snirequired', b'tlsversions'):
- if key in target:
- entry[key] = target[key]
-
- # pytype: disable=attribute-error
- caps[b'redirect'][b'targets'].append(entry)
- # pytype: enable=attribute-error
-
- return proto.addcapabilities(repo, caps)
-
-
-def getadvertisedredirecttargets(repo, proto):
- """Obtain a list of content redirect targets.
-
- Returns a list containing potential redirect targets that will be
- advertised in capabilities data. Each dict MUST have the following
- keys:
-
- name
- The name of this redirect target. This is the identifier clients use
- to refer to a target. It is transferred as part of every command
- request.
-
- protocol
- Network protocol used by this target. Typically this is the string
- in front of the ``://`` in a URL. e.g. ``https``.
-
- uris
- List of representative URIs for this target. Clients can use the
- URIs to test parsing for compatibility or for ordering preference
- for which target to use.
-
- The following optional keys are recognized:
-
- snirequired
- Bool indicating if Server Name Indication (SNI) is required to
- connect to this target.
-
- tlsversions
- List of bytes indicating which TLS versions are supported by this
- target.
-
- By default, clients reflect the target order advertised by servers
- and servers will use the first client-advertised target when picking
- a redirect target. So targets should be advertised in the order the
- server prefers they be used.
- """
- return []
-
-
-def wireprotocommand(
- name,
- args=None,
- permission=b'push',
- cachekeyfn=None,
- extracapabilitiesfn=None,
-):
- """Decorator to declare a wire protocol command.
-
- ``name`` is the name of the wire protocol command being provided.
-
- ``args`` is a dict defining arguments accepted by the command. Keys are
- the argument name. Values are dicts with the following keys:
-
- ``type``
- The argument data type. Must be one of the following string
- literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
- or ``bool``.
-
- ``default``
- A callable returning the default value for this argument. If not
- specified, ``None`` will be the default value.
-
- ``example``
- An example value for this argument.
-
- ``validvalues``
- Set of recognized values for this argument.
-
- ``permission`` defines the permission type needed to run this command.
- Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
- respectively. Default is to assume command requires ``push`` permissions
- because otherwise commands not declaring their permissions could modify
- a repository that is supposed to be read-only.
-
- ``cachekeyfn`` defines an optional callable that can derive the
- cache key for this request.
-
- ``extracapabilitiesfn`` defines an optional callable that defines extra
- command capabilities/parameters that are advertised next to the command
- in the capabilities data structure describing the server. The callable
- receives as arguments the repository and protocol objects. It returns
- a dict of extra fields to add to the command descriptor.
-
- Wire protocol commands are generators of objects to be serialized and
- sent to the client.
-
- If a command raises an uncaught exception, this will be translated into
- a command error.
-
- All commands can opt in to being cacheable by defining a function
- (``cachekeyfn``) that is called to derive a cache key. This function
- receives the same arguments as the command itself plus a ``cacher``
- argument containing the active cacher for the request and returns a bytes
- containing the key in a cache the response to this command may be cached
- under.
- """
- transports = {
- k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 2
- }
-
- if permission not in (b'push', b'pull'):
- raise error.ProgrammingError(
- b'invalid wire protocol permission; '
- b'got %s; expected "push" or "pull"' % permission
- )
-
- if args is None:
- args = {}
-
- if not isinstance(args, dict):
- raise error.ProgrammingError(
- b'arguments for version 2 commands must be declared as dicts'
- )
-
- for arg, meta in args.items():
- if arg == b'*':
- raise error.ProgrammingError(
- b'* argument name not allowed on version 2 commands'
- )
-
- if not isinstance(meta, dict):
- raise error.ProgrammingError(
- b'arguments for version 2 commands '
- b'must declare metadata as a dict'
- )
-
- if b'type' not in meta:
- raise error.ProgrammingError(
- b'%s argument for command %s does not '
- b'declare type field' % (arg, name)
- )
-
- if meta[b'type'] not in (
- b'bytes',
- b'int',
- b'list',
- b'dict',
- b'set',
- b'bool',
- ):
- raise error.ProgrammingError(
- b'%s argument for command %s has '
- b'illegal type: %s' % (arg, name, meta[b'type'])
- )
-
- if b'example' not in meta:
- raise error.ProgrammingError(
- b'%s argument for command %s does not '
- b'declare example field' % (arg, name)
- )
-
- meta[b'required'] = b'default' not in meta
-
- meta.setdefault(b'default', lambda: None)
- meta.setdefault(b'validvalues', None)
-
- def register(func):
- if name in COMMANDS:
- raise error.ProgrammingError(
- b'%s command already registered for version 2' % name
- )
-
- COMMANDS[name] = wireprototypes.commandentry(
- func,
- args=args,
- transports=transports,
- permission=permission,
- cachekeyfn=cachekeyfn,
- extracapabilitiesfn=extracapabilitiesfn,
- )
-
- return func
-
- return register
-
-
-def makecommandcachekeyfn(command, localversion=None, allargs=False):
- """Construct a cache key derivation function with common features.
-
- By default, the cache key is a hash of:
-
- * The command name.
- * A global cache version number.
- * A local cache version number (passed via ``localversion``).
- * All the arguments passed to the command.
- * The media type used.
- * Wire protocol version string.
- * The repository path.
- """
- if not allargs:
- raise error.ProgrammingError(
- b'only allargs=True is currently supported'
- )
-
- if localversion is None:
- raise error.ProgrammingError(b'must set localversion argument value')
-
- def cachekeyfn(repo, proto, cacher, **args):
- spec = COMMANDS[command]
-
- # Commands that mutate the repo can not be cached.
- if spec.permission == b'push':
- return None
-
- # TODO config option to disable caching.
-
- # Our key derivation strategy is to construct a data structure
- # holding everything that could influence cacheability and to hash
- # the CBOR representation of that. Using CBOR seems like it might
- # be overkill. However, simpler hashing mechanisms are prone to
- # duplicate input issues. e.g. if you just concatenate two values,
- # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
- # "padding" between values and prevents these problems.
-
- # Seed the hash with various data.
- state = {
- # To invalidate all cache keys.
- b'globalversion': GLOBAL_CACHE_VERSION,
- # More granular cache key invalidation.
- b'localversion': localversion,
- # Cache keys are segmented by command.
- b'command': command,
- # Throw in the media type and API version strings so changes
- # to exchange semantics invalid cache.
- b'mediatype': FRAMINGTYPE,
- b'version': HTTP_WIREPROTO_V2,
- # So same requests for different repos don't share cache keys.
- b'repo': repo.root,
- }
-
- # The arguments passed to us will have already been normalized.
- # Default values will be set, etc. This is important because it
- # means that it doesn't matter if clients send an explicit argument
- # or rely on the default value: it will all normalize to the same
- # set of arguments on the server and therefore the same cache key.
- #
- # Arguments by their very nature must support being encoded to CBOR.
- # And the CBOR encoder is deterministic. So we hash the arguments
- # by feeding the CBOR of their representation into the hasher.
- if allargs:
- state[b'args'] = pycompat.byteskwargs(args)
-
- cacher.adjustcachekeystate(state)
-
- hasher = hashutil.sha1()
- for chunk in cborutil.streamencode(state):
- hasher.update(chunk)
-
- return pycompat.sysbytes(hasher.hexdigest())
-
- return cachekeyfn
-
-
-def makeresponsecacher(
- repo, proto, command, args, objencoderfn, redirecttargets, redirecthashes
-):
- """Construct a cacher for a cacheable command.
-
- Returns an ``iwireprotocolcommandcacher`` instance.
-
- Extensions can monkeypatch this function to provide custom caching
- backends.
- """
- return None
-
-
-def resolvenodes(repo, revisions):
- """Resolve nodes from a revisions specifier data structure."""
- cl = repo.changelog
- clhasnode = cl.hasnode
-
- seen = set()
- nodes = []
-
- if not isinstance(revisions, list):
- raise error.WireprotoCommandError(
- b'revisions must be defined as an array'
- )
-
- for spec in revisions:
- if b'type' not in spec:
- raise error.WireprotoCommandError(
- b'type key not present in revision specifier'
- )
-
- typ = spec[b'type']
-
- if typ == b'changesetexplicit':
- if b'nodes' not in spec:
- raise error.WireprotoCommandError(
- b'nodes key not present in changesetexplicit revision '
- b'specifier'
- )
-
- for node in spec[b'nodes']:
- if node not in seen:
- nodes.append(node)
- seen.add(node)
-
- elif typ == b'changesetexplicitdepth':
- for key in (b'nodes', b'depth'):
- if key not in spec:
- raise error.WireprotoCommandError(
- b'%s key not present in changesetexplicitdepth revision '
- b'specifier',
- (key,),
- )
-
- for rev in repo.revs(
- b'ancestors(%ln, %s)', spec[b'nodes'], spec[b'depth'] - 1
- ):
- node = cl.node(rev)
-
- if node not in seen:
- nodes.append(node)
- seen.add(node)
-
- elif typ == b'changesetdagrange':
- for key in (b'roots', b'heads'):
- if key not in spec:
- raise error.WireprotoCommandError(
- b'%s key not present in changesetdagrange revision '
- b'specifier',
- (key,),
- )
-
- if not spec[b'heads']:
- raise error.WireprotoCommandError(
- b'heads key in changesetdagrange cannot be empty'
- )
-
- if spec[b'roots']:
- common = [n for n in spec[b'roots'] if clhasnode(n)]
- else:
- common = [repo.nullid]
-
- for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
- if n not in seen:
- nodes.append(n)
- seen.add(n)
-
- else:
- raise error.WireprotoCommandError(
- b'unknown revision specifier type: %s', (typ,)
- )
-
- return nodes
-
-
-@wireprotocommand(b'branchmap', permission=b'pull')
-def branchmapv2(repo, proto):
- yield {
- encoding.fromlocal(k): v
- for k, v in pycompat.iteritems(repo.branchmap())
- }
-
-
-@wireprotocommand(b'capabilities', permission=b'pull')
-def capabilitiesv2(repo, proto):
- yield _capabilitiesv2(repo, proto)
-
-
-@wireprotocommand(
- b'changesetdata',
- args={
- b'revisions': {
- b'type': b'list',
- b'example': [
- {
- b'type': b'changesetexplicit',
- b'nodes': [b'abcdef...'],
- }
- ],
- },
- b'fields': {
- b'type': b'set',
- b'default': set,
- b'example': {b'parents', b'revision'},
- b'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
- },
- },
- permission=b'pull',
-)
-def changesetdata(repo, proto, revisions, fields):
- # TODO look for unknown fields and abort when they can't be serviced.
- # This could probably be validated by dispatcher using validvalues.
-
- cl = repo.changelog
- outgoing = resolvenodes(repo, revisions)
- publishing = repo.publishing()
-
- if outgoing:
- repo.hook(b'preoutgoing', throw=True, source=b'serve')
-
- yield {
- b'totalitems': len(outgoing),
- }
-
- # The phases of nodes already transferred to the client may have changed
- # since the client last requested data. We send phase-only records
- # for these revisions, if requested.
- # TODO actually do this. We'll probably want to emit phase heads
- # in the ancestry set of the outgoing revisions. This will ensure
- # that phase updates within that set are seen.
- if b'phase' in fields:
- pass
-
- nodebookmarks = {}
- for mark, node in repo._bookmarks.items():
- nodebookmarks.setdefault(node, set()).add(mark)
-
- # It is already topologically sorted by revision number.
- for node in outgoing:
- d = {
- b'node': node,
- }
-
- if b'parents' in fields:
- d[b'parents'] = cl.parents(node)
-
- if b'phase' in fields:
- if publishing:
- d[b'phase'] = b'public'
- else:
- ctx = repo[node]
- d[b'phase'] = ctx.phasestr()
-
- if b'bookmarks' in fields and node in nodebookmarks:
- d[b'bookmarks'] = sorted(nodebookmarks[node])
- del nodebookmarks[node]
-
- followingmeta = []
- followingdata = []
-
- if b'revision' in fields:
- revisiondata = cl.revision(node)
- followingmeta.append((b'revision', len(revisiondata)))
- followingdata.append(revisiondata)
-
- # TODO make it possible for extensions to wrap a function or register
- # a handler to service custom fields.
-
- if followingmeta:
- d[b'fieldsfollowing'] = followingmeta
-
- yield d
-
- for extra in followingdata:
- yield extra
-
- # If requested, send bookmarks from nodes that didn't have revision
- # data sent so receiver is aware of any bookmark updates.
- if b'bookmarks' in fields:
- for node, marks in sorted(pycompat.iteritems(nodebookmarks)):
- yield {
- b'node': node,
- b'bookmarks': sorted(marks),
- }
-
-
-class FileAccessError(Exception):
- """Represents an error accessing a specific file."""
-
- def __init__(self, path, msg, args):
- self.path = path
- self.msg = msg
- self.args = args
-
-
-def getfilestore(repo, proto, path):
- """Obtain a file storage object for use with wire protocol.
-
- Exists as a standalone function so extensions can monkeypatch to add
- access control.
- """
- # This seems to work even if the file doesn't exist. So catch
- # "empty" files and return an error.
- fl = repo.file(path)
-
- if not len(fl):
- raise FileAccessError(path, b'unknown file: %s', (path,))
-
- return fl
-
-
-def emitfilerevisions(repo, path, revisions, linknodes, fields):
- for revision in revisions:
- d = {
- b'node': revision.node,
- }
-
- if b'parents' in fields:
- d[b'parents'] = [revision.p1node, revision.p2node]
-
- if b'linknode' in fields:
- d[b'linknode'] = linknodes[revision.node]
-
- followingmeta = []
- followingdata = []
-
- if b'revision' in fields:
- if revision.revision is not None:
- followingmeta.append((b'revision', len(revision.revision)))
- followingdata.append(revision.revision)
- else:
- d[b'deltabasenode'] = revision.basenode
- followingmeta.append((b'delta', len(revision.delta)))
- followingdata.append(revision.delta)
-
- if followingmeta:
- d[b'fieldsfollowing'] = followingmeta
-
- yield d
-
- for extra in followingdata:
- yield extra
-
-
-def makefilematcher(repo, pathfilter):
- """Construct a matcher from a path filter dict."""
-
- # Validate values.
- if pathfilter:
- for key in (b'include', b'exclude'):
- for pattern in pathfilter.get(key, []):
- if not pattern.startswith((b'path:', b'rootfilesin:')):
- raise error.WireprotoCommandError(
- b'%s pattern must begin with `path:` or `rootfilesin:`; '
- b'got %s',
- (key, pattern),
- )
-
- if pathfilter:
- matcher = matchmod.match(
- repo.root,
- b'',
- include=pathfilter.get(b'include', []),
- exclude=pathfilter.get(b'exclude', []),
- )
- else:
- matcher = matchmod.match(repo.root, b'')
-
- # Requested patterns could include files not in the local store. So
- # filter those out.
- return repo.narrowmatch(matcher)
-
-
-@wireprotocommand(
- b'filedata',
- args={
- b'haveparents': {
- b'type': b'bool',
- b'default': lambda: False,
- b'example': True,
- },
- b'nodes': {
- b'type': b'list',
- b'example': [b'0123456...'],
- },
- b'fields': {
- b'type': b'set',
- b'default': set,
- b'example': {b'parents', b'revision'},
- b'validvalues': {b'parents', b'revision', b'linknode'},
- },
- b'path': {
- b'type': b'bytes',
- b'example': b'foo.txt',
- },
- },
- permission=b'pull',
- # TODO censoring a file revision won't invalidate the cache.
- # Figure out a way to take censoring into account when deriving
- # the cache key.
- cachekeyfn=makecommandcachekeyfn(b'filedata', 1, allargs=True),
-)
-def filedata(repo, proto, haveparents, nodes, fields, path):
- # TODO this API allows access to file revisions that are attached to
- # secret changesets. filesdata does not have this problem. Maybe this
- # API should be deleted?
-
- try:
- # Extensions may wish to access the protocol handler.
- store = getfilestore(repo, proto, path)
- except FileAccessError as e:
- raise error.WireprotoCommandError(e.msg, e.args)
-
- clnode = repo.changelog.node
- linknodes = {}
-
- # Validate requested nodes.
- for node in nodes:
- try:
- store.rev(node)
- except error.LookupError:
- raise error.WireprotoCommandError(
- b'unknown file node: %s', (hex(node),)
- )
-
- # TODO by creating the filectx against a specific file revision
- # instead of changeset, linkrev() is always used. This is wrong for
- # cases where linkrev() may refer to a hidden changeset. But since this
- # API doesn't know anything about changesets, we're not sure how to
- # disambiguate the linknode. Perhaps we should delete this API?
- fctx = repo.filectx(path, fileid=node)
- linknodes[node] = clnode(fctx.introrev())
-
- revisions = store.emitrevisions(
- nodes,
- revisiondata=b'revision' in fields,
- assumehaveparentrevisions=haveparents,
- )
-
- yield {
- b'totalitems': len(nodes),
- }
-
- for o in emitfilerevisions(repo, path, revisions, linknodes, fields):
- yield o
-
-
-def filesdatacapabilities(repo, proto):
- batchsize = repo.ui.configint(
- b'experimental', b'server.filesdata.recommended-batch-size'
- )
- return {
- b'recommendedbatchsize': batchsize,
- }
-
-
-@wireprotocommand(
- b'filesdata',
- args={
- b'haveparents': {
- b'type': b'bool',
- b'default': lambda: False,
- b'example': True,
- },
- b'fields': {
- b'type': b'set',
- b'default': set,
- b'example': {b'parents', b'revision'},
- b'validvalues': {
- b'firstchangeset',
- b'linknode',
- b'parents',
- b'revision',
- },
- },
- b'pathfilter': {
- b'type': b'dict',
- b'default': lambda: None,
- b'example': {b'include': [b'path:tests']},
- },
- b'revisions': {
- b'type': b'list',
- b'example': [
- {
- b'type': b'changesetexplicit',
- b'nodes': [b'abcdef...'],
- }
- ],
- },
- },
- permission=b'pull',
- # TODO censoring a file revision won't invalidate the cache.
- # Figure out a way to take censoring into account when deriving
- # the cache key.
- cachekeyfn=makecommandcachekeyfn(b'filesdata', 1, allargs=True),
- extracapabilitiesfn=filesdatacapabilities,
-)
-def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
- # TODO This should operate on a repo that exposes obsolete changesets. There
- # is a race between a client making a push that obsoletes a changeset and
- # another client fetching files data for that changeset. If a client has a
- # changeset, it should probably be allowed to access files data for that
- # changeset.
-
- outgoing = resolvenodes(repo, revisions)
- filematcher = makefilematcher(repo, pathfilter)
-
- # path -> {fnode: linknode}
- fnodes = collections.defaultdict(dict)
-
- # We collect the set of relevant file revisions by iterating the changeset
- # revisions and either walking the set of files recorded in the changeset
- # or by walking the manifest at that revision. There is probably room for a
- # storage-level API to request this data, as it can be expensive to compute
- # and would benefit from caching or alternate storage from what revlogs
- # provide.
- for node in outgoing:
- ctx = repo[node]
- mctx = ctx.manifestctx()
- md = mctx.read()
-
- if haveparents:
- checkpaths = ctx.files()
- else:
- checkpaths = md.keys()
-
- for path in checkpaths:
- fnode = md[path]
-
- if path in fnodes and fnode in fnodes[path]:
- continue
-
- if not filematcher(path):
- continue
-
- fnodes[path].setdefault(fnode, node)
-
- yield {
- b'totalpaths': len(fnodes),
- b'totalitems': sum(len(v) for v in fnodes.values()),
- }
-
- for path, filenodes in sorted(fnodes.items()):
- try:
- store = getfilestore(repo, proto, path)
- except FileAccessError as e:
- raise error.WireprotoCommandError(e.msg, e.args)
-
- yield {
- b'path': path,
- b'totalitems': len(filenodes),
- }
-
- revisions = store.emitrevisions(
- filenodes.keys(),
- revisiondata=b'revision' in fields,
- assumehaveparentrevisions=haveparents,
- )
-
- for o in emitfilerevisions(repo, path, revisions, filenodes, fields):
- yield o
-
-
-@wireprotocommand(
- b'heads',
- args={
- b'publiconly': {
- b'type': b'bool',
- b'default': lambda: False,
- b'example': False,
- },
- },
- permission=b'pull',
-)
-def headsv2(repo, proto, publiconly):
- if publiconly:
- repo = repo.filtered(b'immutable')
-
- yield repo.heads()
-
-
-@wireprotocommand(
- b'known',
- args={
- b'nodes': {
- b'type': b'list',
- b'default': list,
- b'example': [b'deadbeef'],
- },
- },
- permission=b'pull',
-)
-def knownv2(repo, proto, nodes):
- result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
- yield result
-
-
-@wireprotocommand(
- b'listkeys',
- args={
- b'namespace': {
- b'type': b'bytes',
- b'example': b'ns',
- },
- },
- permission=b'pull',
-)
-def listkeysv2(repo, proto, namespace):
- keys = repo.listkeys(encoding.tolocal(namespace))
- keys = {
- encoding.fromlocal(k): encoding.fromlocal(v)
- for k, v in pycompat.iteritems(keys)
- }
-
- yield keys
-
-
-@wireprotocommand(
- b'lookup',
- args={
- b'key': {
- b'type': b'bytes',
- b'example': b'foo',
- },
- },
- permission=b'pull',
-)
-def lookupv2(repo, proto, key):
- key = encoding.tolocal(key)
-
- # TODO handle exception.
- node = repo.lookup(key)
-
- yield node
-
-
-def manifestdatacapabilities(repo, proto):
- batchsize = repo.ui.configint(
- b'experimental', b'server.manifestdata.recommended-batch-size'
- )
-
- return {
- b'recommendedbatchsize': batchsize,
- }
-
-
-@wireprotocommand(
- b'manifestdata',
- args={
- b'nodes': {
- b'type': b'list',
- b'example': [b'0123456...'],
- },
- b'haveparents': {
- b'type': b'bool',
- b'default': lambda: False,
- b'example': True,
- },
- b'fields': {
- b'type': b'set',
- b'default': set,
- b'example': {b'parents', b'revision'},
- b'validvalues': {b'parents', b'revision'},
- },
- b'tree': {
- b'type': b'bytes',
- b'example': b'',
- },
- },
- permission=b'pull',
- cachekeyfn=makecommandcachekeyfn(b'manifestdata', 1, allargs=True),
- extracapabilitiesfn=manifestdatacapabilities,
-)
-def manifestdata(repo, proto, haveparents, nodes, fields, tree):
- store = repo.manifestlog.getstorage(tree)
-
- # Validate the node is known and abort on unknown revisions.
- for node in nodes:
- try:
- store.rev(node)
- except error.LookupError:
- raise error.WireprotoCommandError(b'unknown node: %s', (node,))
-
- revisions = store.emitrevisions(
- nodes,
- revisiondata=b'revision' in fields,
- assumehaveparentrevisions=haveparents,
- )
-
- yield {
- b'totalitems': len(nodes),
- }
-
- for revision in revisions:
- d = {
- b'node': revision.node,
- }
-
- if b'parents' in fields:
- d[b'parents'] = [revision.p1node, revision.p2node]
-
- followingmeta = []
- followingdata = []
-
- if b'revision' in fields:
- if revision.revision is not None:
- followingmeta.append((b'revision', len(revision.revision)))
- followingdata.append(revision.revision)
- else:
- d[b'deltabasenode'] = revision.basenode
- followingmeta.append((b'delta', len(revision.delta)))
- followingdata.append(revision.delta)
-
- if followingmeta:
- d[b'fieldsfollowing'] = followingmeta
-
- yield d
-
- for extra in followingdata:
- yield extra
-
-
-@wireprotocommand(
- b'pushkey',
- args={
- b'namespace': {
- b'type': b'bytes',
- b'example': b'ns',
- },
- b'key': {
- b'type': b'bytes',
- b'example': b'key',
- },
- b'old': {
- b'type': b'bytes',
- b'example': b'old',
- },
- b'new': {
- b'type': b'bytes',
- b'example': b'new',
- },
- },
- permission=b'push',
-)
-def pushkeyv2(repo, proto, namespace, key, old, new):
- # TODO handle ui output redirection
- yield repo.pushkey(
- encoding.tolocal(namespace),
- encoding.tolocal(key),
- encoding.tolocal(old),
- encoding.tolocal(new),
- )
-
-
-@wireprotocommand(
- b'rawstorefiledata',
- args={
- b'files': {
- b'type': b'list',
- b'example': [b'changelog', b'manifestlog'],
- },
- b'pathfilter': {
- b'type': b'list',
- b'default': lambda: None,
- b'example': {b'include': [b'path:tests']},
- },
- },
- permission=b'pull',
-)
-def rawstorefiledata(repo, proto, files, pathfilter):
- if not streamclone.allowservergeneration(repo):
- raise error.WireprotoCommandError(b'stream clone is disabled')
-
- # TODO support dynamically advertising what store files "sets" are
- # available. For now, we support changelog, manifestlog, and files.
- files = set(files)
- allowedfiles = {b'changelog', b'manifestlog'}
-
- unsupported = files - allowedfiles
- if unsupported:
- raise error.WireprotoCommandError(
- b'unknown file type: %s', (b', '.join(sorted(unsupported)),)
- )
-
- with repo.lock():
- topfiles = list(repo.store.topfiles())
-
- sendfiles = []
- totalsize = 0
-
- # TODO this is a bunch of storage layer interface abstractions because
- # it assumes revlogs.
- for rl_type, name, size in topfiles:
- # XXX use the `rl_type` for that
- if b'changelog' in files and name.startswith(b'00changelog'):
- pass
- elif b'manifestlog' in files and name.startswith(b'00manifest'):
- pass
- else:
- continue
-
- sendfiles.append((b'store', name, size))
- totalsize += size
-
- yield {
- b'filecount': len(sendfiles),
- b'totalsize': totalsize,
- }
-
- for location, name, size in sendfiles:
- yield {
- b'location': location,
- b'path': name,
- b'size': size,
- }
-
- # We have to use a closure for this to ensure the context manager is
- # closed only after sending the final chunk.
- def getfiledata():
- with repo.svfs(name, b'rb', auditpath=False) as fh:
- for chunk in util.filechunkiter(fh, limit=size):
- yield chunk
-
- yield wireprototypes.indefinitebytestringresponse(getfiledata())