--- a/mercurial/wireproto.py Wed Apr 04 10:35:09 2018 -0400
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1111 +0,0 @@
-# wireproto.py - generic wire protocol support functions
-#
-# Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
-#
-# This software may be used and distributed according to the terms of the
-# GNU General Public License version 2 or any later version.
-
-from __future__ import absolute_import
-
-import hashlib
-import os
-import tempfile
-
-from .i18n import _
-from .node import (
- bin,
- hex,
- nullid,
-)
-
-from . import (
- bundle2,
- changegroup as changegroupmod,
- discovery,
- encoding,
- error,
- exchange,
- peer,
- pushkey as pushkeymod,
- pycompat,
- repository,
- streamclone,
- util,
-)
-
-urlerr = util.urlerr
-urlreq = util.urlreq
-
-bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
-bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
- 'IncompatibleClient')
-bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
-
-class abstractserverproto(object):
- """abstract class that summarizes the protocol API
-
- Used as reference and documentation.
- """
-
- def getargs(self, args):
- """return the value for arguments in <args>
-
- returns a list of values (same order as <args>)"""
- raise NotImplementedError()
-
- def getfile(self, fp):
- """write the whole content of a file into a file like object
-
- The file is in the form::
-
- (<chunk-size>\n<chunk>)+0\n
-
- chunk size is the ascii version of the int.
- """
- raise NotImplementedError()
-
- def redirect(self):
- """may setup interception for stdout and stderr
-
- See also the `restore` method."""
- raise NotImplementedError()
-
- # If the `redirect` function does install interception, the `restore`
- # function MUST be defined. If interception is not used, this function
- # MUST NOT be defined.
- #
- # left commented here on purpose
- #
- #def restore(self):
- # """reinstall previous stdout and stderr and return intercepted stdout
- # """
- # raise NotImplementedError()
-
-class remoteiterbatcher(peer.iterbatcher):
- def __init__(self, remote):
- super(remoteiterbatcher, self).__init__()
- self._remote = remote
-
- def __getattr__(self, name):
- # Validate this method is batchable, since submit() only supports
- # batchable methods.
- fn = getattr(self._remote, name)
- if not getattr(fn, 'batchable', None):
- raise error.ProgrammingError('Attempted to batch a non-batchable '
- 'call to %r' % name)
-
- return super(remoteiterbatcher, self).__getattr__(name)
-
- def submit(self):
- """Break the batch request into many patch calls and pipeline them.
-
- This is mostly valuable over http where request sizes can be
- limited, but can be used in other places as well.
- """
- # 2-tuple of (command, arguments) that represents what will be
- # sent over the wire.
- requests = []
-
- # 4-tuple of (command, final future, @batchable generator, remote
- # future).
- results = []
-
- for command, args, opts, finalfuture in self.calls:
- mtd = getattr(self._remote, command)
- batchable = mtd.batchable(mtd.__self__, *args, **opts)
-
- commandargs, fremote = next(batchable)
- assert fremote
- requests.append((command, commandargs))
- results.append((command, finalfuture, batchable, fremote))
-
- if requests:
- self._resultiter = self._remote._submitbatch(requests)
-
- self._results = results
-
- def results(self):
- for command, finalfuture, batchable, remotefuture in self._results:
- # Get the raw result, set it in the remote future, feed it
- # back into the @batchable generator so it can be decoded, and
- # set the result on the final future to this value.
- remoteresult = next(self._resultiter)
- remotefuture.set(remoteresult)
- finalfuture.set(next(batchable))
-
- # Verify our @batchable generators only emit 2 values.
- try:
- next(batchable)
- except StopIteration:
- pass
- else:
- raise error.ProgrammingError('%s @batchable generator emitted '
- 'unexpected value count' % command)
-
- yield finalfuture.value
-
-# Forward a couple of names from peer to make wireproto interactions
-# slightly more sensible.
-batchable = peer.batchable
-future = peer.future
-
-# list of nodes encoding / decoding
-
-def decodelist(l, sep=' '):
- if l:
- return [bin(v) for v in l.split(sep)]
- return []
-
-def encodelist(l, sep=' '):
- try:
- return sep.join(map(hex, l))
- except TypeError:
- raise
-
-# batched call argument encoding
-
-def escapearg(plain):
- return (plain
- .replace(':', ':c')
- .replace(',', ':o')
- .replace(';', ':s')
- .replace('=', ':e'))
-
-def unescapearg(escaped):
- return (escaped
- .replace(':e', '=')
- .replace(':s', ';')
- .replace(':o', ',')
- .replace(':c', ':'))
-
-def encodebatchcmds(req):
- """Return a ``cmds`` argument value for the ``batch`` command."""
- cmds = []
- for op, argsdict in req:
- # Old servers didn't properly unescape argument names. So prevent
- # the sending of argument names that may not be decoded properly by
- # servers.
- assert all(escapearg(k) == k for k in argsdict)
-
- args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
- for k, v in argsdict.iteritems())
- cmds.append('%s %s' % (op, args))
-
- return ';'.join(cmds)
-
-# mapping of options accepted by getbundle and their types
-#
-# Meant to be extended by extensions. It is extensions responsibility to ensure
-# such options are properly processed in exchange.getbundle.
-#
-# supported types are:
-#
-# :nodes: list of binary nodes
-# :csv: list of comma-separated values
-# :scsv: list of comma-separated values return as set
-# :plain: string with no transformation needed.
-gboptsmap = {'heads': 'nodes',
- 'bookmarks': 'boolean',
- 'common': 'nodes',
- 'obsmarkers': 'boolean',
- 'phases': 'boolean',
- 'bundlecaps': 'scsv',
- 'listkeys': 'csv',
- 'cg': 'boolean',
- 'cbattempted': 'boolean',
- 'stream': 'boolean',
-}
-
-# client side
-
-class wirepeer(repository.legacypeer):
- """Client-side interface for communicating with a peer repository.
-
- Methods commonly call wire protocol commands of the same name.
-
- See also httppeer.py and sshpeer.py for protocol-specific
- implementations of this interface.
- """
- # Begin of basewirepeer interface.
-
- def iterbatch(self):
- return remoteiterbatcher(self)
-
- @batchable
- def lookup(self, key):
- self.requirecap('lookup', _('look up remote revision'))
- f = future()
- yield {'key': encoding.fromlocal(key)}, f
- d = f.value
- success, data = d[:-1].split(" ", 1)
- if int(success):
- yield bin(data)
- else:
- self._abort(error.RepoError(data))
-
- @batchable
- def heads(self):
- f = future()
- yield {}, f
- d = f.value
- try:
- yield decodelist(d[:-1])
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- @batchable
- def known(self, nodes):
- f = future()
- yield {'nodes': encodelist(nodes)}, f
- d = f.value
- try:
- yield [bool(int(b)) for b in d]
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- @batchable
- def branchmap(self):
- f = future()
- yield {}, f
- d = f.value
- try:
- branchmap = {}
- for branchpart in d.splitlines():
- branchname, branchheads = branchpart.split(' ', 1)
- branchname = encoding.tolocal(urlreq.unquote(branchname))
- branchheads = decodelist(branchheads)
- branchmap[branchname] = branchheads
- yield branchmap
- except TypeError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- @batchable
- def listkeys(self, namespace):
- if not self.capable('pushkey'):
- yield {}, None
- f = future()
- self.ui.debug('preparing listkeys for "%s"\n' % namespace)
- yield {'namespace': encoding.fromlocal(namespace)}, f
- d = f.value
- self.ui.debug('received listkey for "%s": %i bytes\n'
- % (namespace, len(d)))
- yield pushkeymod.decodekeys(d)
-
- @batchable
- def pushkey(self, namespace, key, old, new):
- if not self.capable('pushkey'):
- yield False, None
- f = future()
- self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
- yield {'namespace': encoding.fromlocal(namespace),
- 'key': encoding.fromlocal(key),
- 'old': encoding.fromlocal(old),
- 'new': encoding.fromlocal(new)}, f
- d = f.value
- d, output = d.split('\n', 1)
- try:
- d = bool(int(d))
- except ValueError:
- raise error.ResponseError(
- _('push failed (unexpected response):'), d)
- for l in output.splitlines(True):
- self.ui.status(_('remote: '), l)
- yield d
-
- def stream_out(self):
- return self._callstream('stream_out')
-
- def getbundle(self, source, **kwargs):
- kwargs = pycompat.byteskwargs(kwargs)
- self.requirecap('getbundle', _('look up remote changes'))
- opts = {}
- bundlecaps = kwargs.get('bundlecaps')
- if bundlecaps is not None:
- kwargs['bundlecaps'] = sorted(bundlecaps)
- else:
- bundlecaps = () # kwargs could have it to None
- for key, value in kwargs.iteritems():
- if value is None:
- continue
- keytype = gboptsmap.get(key)
- if keytype is None:
- raise error.ProgrammingError(
- 'Unexpectedly None keytype for key %s' % key)
- elif keytype == 'nodes':
- value = encodelist(value)
- elif keytype in ('csv', 'scsv'):
- value = ','.join(value)
- elif keytype == 'boolean':
- value = '%i' % bool(value)
- elif keytype != 'plain':
- raise KeyError('unknown getbundle option type %s'
- % keytype)
- opts[key] = value
- f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
- if any((cap.startswith('HG2') for cap in bundlecaps)):
- return bundle2.getunbundler(self.ui, f)
- else:
- return changegroupmod.cg1unpacker(f, 'UN')
-
- def unbundle(self, cg, heads, url):
- '''Send cg (a readable file-like object representing the
- changegroup to push, typically a chunkbuffer object) to the
- remote server as a bundle.
-
- When pushing a bundle10 stream, return an integer indicating the
- result of the push (see changegroup.apply()).
-
- When pushing a bundle20 stream, return a bundle20 stream.
-
- `url` is the url the client thinks it's pushing to, which is
- visible to hooks.
- '''
-
- if heads != ['force'] and self.capable('unbundlehash'):
- heads = encodelist(['hashed',
- hashlib.sha1(''.join(sorted(heads))).digest()])
- else:
- heads = encodelist(heads)
-
- if util.safehasattr(cg, 'deltaheader'):
- # this a bundle10, do the old style call sequence
- ret, output = self._callpush("unbundle", cg, heads=heads)
- if ret == "":
- raise error.ResponseError(
- _('push failed:'), output)
- try:
- ret = int(ret)
- except ValueError:
- raise error.ResponseError(
- _('push failed (unexpected response):'), ret)
-
- for l in output.splitlines(True):
- self.ui.status(_('remote: '), l)
- else:
- # bundle2 push. Send a stream, fetch a stream.
- stream = self._calltwowaystream('unbundle', cg, heads=heads)
- ret = bundle2.getunbundler(self.ui, stream)
- return ret
-
- # End of basewirepeer interface.
-
- # Begin of baselegacywirepeer interface.
-
- def branches(self, nodes):
- n = encodelist(nodes)
- d = self._call("branches", nodes=n)
- try:
- br = [tuple(decodelist(b)) for b in d.splitlines()]
- return br
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
-
- def between(self, pairs):
- batch = 8 # avoid giant requests
- r = []
- for i in xrange(0, len(pairs), batch):
- n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
- d = self._call("between", pairs=n)
- try:
- r.extend(l and decodelist(l) or [] for l in d.splitlines())
- except ValueError:
- self._abort(error.ResponseError(_("unexpected response:"), d))
- return r
-
- def changegroup(self, nodes, kind):
- n = encodelist(nodes)
- f = self._callcompressable("changegroup", roots=n)
- return changegroupmod.cg1unpacker(f, 'UN')
-
- def changegroupsubset(self, bases, heads, kind):
- self.requirecap('changegroupsubset', _('look up remote changes'))
- bases = encodelist(bases)
- heads = encodelist(heads)
- f = self._callcompressable("changegroupsubset",
- bases=bases, heads=heads)
- return changegroupmod.cg1unpacker(f, 'UN')
-
- # End of baselegacywirepeer interface.
-
- def _submitbatch(self, req):
- """run batch request <req> on the server
-
- Returns an iterator of the raw responses from the server.
- """
- rsp = self._callstream("batch", cmds=encodebatchcmds(req))
- chunk = rsp.read(1024)
- work = [chunk]
- while chunk:
- while ';' not in chunk and chunk:
- chunk = rsp.read(1024)
- work.append(chunk)
- merged = ''.join(work)
- while ';' in merged:
- one, merged = merged.split(';', 1)
- yield unescapearg(one)
- chunk = rsp.read(1024)
- work = [merged, chunk]
- yield unescapearg(''.join(work))
-
- def _submitone(self, op, args):
- return self._call(op, **pycompat.strkwargs(args))
-
- def debugwireargs(self, one, two, three=None, four=None, five=None):
- # don't pass optional arguments left at their default value
- opts = {}
- if three is not None:
- opts[r'three'] = three
- if four is not None:
- opts[r'four'] = four
- return self._call('debugwireargs', one=one, two=two, **opts)
-
- def _call(self, cmd, **args):
- """execute <cmd> on the server
-
- The command is expected to return a simple string.
-
- returns the server reply as a string."""
- raise NotImplementedError()
-
- def _callstream(self, cmd, **args):
- """execute <cmd> on the server
-
- The command is expected to return a stream. Note that if the
- command doesn't return a stream, _callstream behaves
- differently for ssh and http peers.
-
- returns the server reply as a file like object.
- """
- raise NotImplementedError()
-
- def _callcompressable(self, cmd, **args):
- """execute <cmd> on the server
-
- The command is expected to return a stream.
-
- The stream may have been compressed in some implementations. This
- function takes care of the decompression. This is the only difference
- with _callstream.
-
- returns the server reply as a file like object.
- """
- raise NotImplementedError()
-
- def _callpush(self, cmd, fp, **args):
- """execute a <cmd> on server
-
- The command is expected to be related to a push. Push has a special
- return method.
-
- returns the server reply as a (ret, output) tuple. ret is either
- empty (error) or a stringified int.
- """
- raise NotImplementedError()
-
- def _calltwowaystream(self, cmd, fp, **args):
- """execute <cmd> on server
-
- The command will send a stream to the server and get a stream in reply.
- """
- raise NotImplementedError()
-
- def _abort(self, exception):
- """clearly abort the wire protocol connection and raise the exception
- """
- raise NotImplementedError()
-
-# server side
-
-# wire protocol command can either return a string or one of these classes.
-class streamres(object):
- """wireproto reply: binary stream
-
- The call was successful and the result is a stream.
-
- Accepts a generator containing chunks of data to be sent to the client.
-
- ``prefer_uncompressed`` indicates that the data is expected to be
- uncompressable and that the stream should therefore use the ``none``
- engine.
- """
- def __init__(self, gen=None, prefer_uncompressed=False):
- self.gen = gen
- self.prefer_uncompressed = prefer_uncompressed
-
-class streamres_legacy(object):
- """wireproto reply: uncompressed binary stream
-
- The call was successful and the result is a stream.
-
- Accepts a generator containing chunks of data to be sent to the client.
-
- Like ``streamres``, but sends an uncompressed data for "version 1" clients
- using the application/mercurial-0.1 media type.
- """
- def __init__(self, gen=None):
- self.gen = gen
-
-class pushres(object):
- """wireproto reply: success with simple integer return
-
- The call was successful and returned an integer contained in `self.res`.
- """
- def __init__(self, res):
- self.res = res
-
-class pusherr(object):
- """wireproto reply: failure
-
- The call failed. The `self.res` attribute contains the error message.
- """
- def __init__(self, res):
- self.res = res
-
-class ooberror(object):
- """wireproto reply: failure of a batch of operation
-
- Something failed during a batch call. The error message is stored in
- `self.message`.
- """
- def __init__(self, message):
- self.message = message
-
-def getdispatchrepo(repo, proto, command):
- """Obtain the repo used for processing wire protocol commands.
-
- The intent of this function is to serve as a monkeypatch point for
- extensions that need commands to operate on different repo views under
- specialized circumstances.
- """
- return repo.filtered('served')
-
-def dispatch(repo, proto, command):
- repo = getdispatchrepo(repo, proto, command)
- func, spec = commands[command]
- args = proto.getargs(spec)
- return func(repo, proto, *args)
-
-def options(cmd, keys, others):
- opts = {}
- for k in keys:
- if k in others:
- opts[k] = others[k]
- del others[k]
- if others:
- util.stderr.write("warning: %s ignored unexpected arguments %s\n"
- % (cmd, ",".join(others)))
- return opts
-
-def bundle1allowed(repo, action):
- """Whether a bundle1 operation is allowed from the server.
-
- Priority is:
-
- 1. server.bundle1gd.<action> (if generaldelta active)
- 2. server.bundle1.<action>
- 3. server.bundle1gd (if generaldelta active)
- 4. server.bundle1
- """
- ui = repo.ui
- gd = 'generaldelta' in repo.requirements
-
- if gd:
- v = ui.configbool('server', 'bundle1gd.%s' % action)
- if v is not None:
- return v
-
- v = ui.configbool('server', 'bundle1.%s' % action)
- if v is not None:
- return v
-
- if gd:
- v = ui.configbool('server', 'bundle1gd')
- if v is not None:
- return v
-
- return ui.configbool('server', 'bundle1')
-
-def supportedcompengines(ui, proto, role):
- """Obtain the list of supported compression engines for a request."""
- assert role in (util.CLIENTROLE, util.SERVERROLE)
-
- compengines = util.compengines.supportedwireengines(role)
-
- # Allow config to override default list and ordering.
- if role == util.SERVERROLE:
- configengines = ui.configlist('server', 'compressionengines')
- config = 'server.compressionengines'
- else:
- # This is currently implemented mainly to facilitate testing. In most
- # cases, the server should be in charge of choosing a compression engine
- # because a server has the most to lose from a sub-optimal choice. (e.g.
- # CPU DoS due to an expensive engine or a network DoS due to poor
- # compression ratio).
- configengines = ui.configlist('experimental',
- 'clientcompressionengines')
- config = 'experimental.clientcompressionengines'
-
- # No explicit config. Filter out the ones that aren't supposed to be
- # advertised and return default ordering.
- if not configengines:
- attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
- return [e for e in compengines
- if getattr(e.wireprotosupport(), attr) > 0]
-
- # If compression engines are listed in the config, assume there is a good
- # reason for it (like server operators wanting to achieve specific
- # performance characteristics). So fail fast if the config references
- # unusable compression engines.
- validnames = set(e.name() for e in compengines)
- invalidnames = set(e for e in configengines if e not in validnames)
- if invalidnames:
- raise error.Abort(_('invalid compression engine defined in %s: %s') %
- (config, ', '.join(sorted(invalidnames))))
-
- compengines = [e for e in compengines if e.name() in configengines]
- compengines = sorted(compengines,
- key=lambda e: configengines.index(e.name()))
-
- if not compengines:
- raise error.Abort(_('%s config option does not specify any known '
- 'compression engines') % config,
- hint=_('usable compression engines: %s') %
- ', '.sorted(validnames))
-
- return compengines
-
-# list of commands
-commands = {}
-
-# Maps wire protocol name to operation type. This is used for permissions
-# checking. All defined @wireiprotocommand should have an entry in this
-# dict.
-permissions = {}
-
-def wireprotocommand(name, args=''):
- """decorator for wire protocol command"""
- def register(func):
- commands[name] = (func, args)
- return func
- return register
-
-# TODO define a more appropriate permissions type to use for this.
-permissions['batch'] = 'pull'
-@wireprotocommand('batch', 'cmds *')
-def batch(repo, proto, cmds, others):
- repo = repo.filtered("served")
- res = []
- for pair in cmds.split(';'):
- op, args = pair.split(' ', 1)
- vals = {}
- for a in args.split(','):
- if a:
- n, v = a.split('=')
- vals[unescapearg(n)] = unescapearg(v)
- func, spec = commands[op]
-
- # If the protocol supports permissions checking, perform that
- # checking on each batched command.
- # TODO formalize permission checking as part of protocol interface.
- if util.safehasattr(proto, 'checkperm'):
- # Assume commands with no defined permissions are writes / for
- # pushes. This is the safest from a security perspective because
- # it doesn't allow commands with undefined semantics from
- # bypassing permissions checks.
- proto.checkperm(permissions.get(op, 'push'))
-
- if spec:
- keys = spec.split()
- data = {}
- for k in keys:
- if k == '*':
- star = {}
- for key in vals.keys():
- if key not in keys:
- star[key] = vals[key]
- data['*'] = star
- else:
- data[k] = vals[k]
- result = func(repo, proto, *[data[k] for k in keys])
- else:
- result = func(repo, proto)
- if isinstance(result, ooberror):
- return result
- res.append(escapearg(result))
- return ';'.join(res)
-
-permissions['between'] = 'pull'
-@wireprotocommand('between', 'pairs')
-def between(repo, proto, pairs):
- pairs = [decodelist(p, '-') for p in pairs.split(" ")]
- r = []
- for b in repo.between(pairs):
- r.append(encodelist(b) + "\n")
- return "".join(r)
-
-permissions['branchmap'] = 'pull'
-@wireprotocommand('branchmap')
-def branchmap(repo, proto):
- branchmap = repo.branchmap()
- heads = []
- for branch, nodes in branchmap.iteritems():
- branchname = urlreq.quote(encoding.fromlocal(branch))
- branchnodes = encodelist(nodes)
- heads.append('%s %s' % (branchname, branchnodes))
- return '\n'.join(heads)
-
-permissions['branches'] = 'pull'
-@wireprotocommand('branches', 'nodes')
-def branches(repo, proto, nodes):
- nodes = decodelist(nodes)
- r = []
- for b in repo.branches(nodes):
- r.append(encodelist(b) + "\n")
- return "".join(r)
-
-permissions['clonebundles'] = 'pull'
-@wireprotocommand('clonebundles', '')
-def clonebundles(repo, proto):
- """Server command for returning info for available bundles to seed clones.
-
- Clients will parse this response and determine what bundle to fetch.
-
- Extensions may wrap this command to filter or dynamically emit data
- depending on the request. e.g. you could advertise URLs for the closest
- data center given the client's IP address.
- """
- return repo.vfs.tryread('clonebundles.manifest')
-
-wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
- 'known', 'getbundle', 'unbundlehash', 'batch']
-
-def _capabilities(repo, proto):
- """return a list of capabilities for a repo
-
- This function exists to allow extensions to easily wrap capabilities
- computation
-
- - returns a lists: easy to alter
- - change done here will be propagated to both `capabilities` and `hello`
- command without any other action needed.
- """
- # copy to prevent modification of the global list
- caps = list(wireprotocaps)
- if streamclone.allowservergeneration(repo):
- if repo.ui.configbool('server', 'preferuncompressed'):
- caps.append('stream-preferred')
- requiredformats = repo.requirements & repo.supportedformats
- # if our local revlogs are just revlogv1, add 'stream' cap
- if not requiredformats - {'revlogv1'}:
- caps.append('stream')
- # otherwise, add 'streamreqs' detailing our local revlog format
- else:
- caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
- if repo.ui.configbool('experimental', 'bundle2-advertise'):
- capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
- caps.append('bundle2=' + urlreq.quote(capsblob))
- caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
-
- if proto.name == 'http':
- caps.append('httpheader=%d' %
- repo.ui.configint('server', 'maxhttpheaderlen'))
- if repo.ui.configbool('experimental', 'httppostargs'):
- caps.append('httppostargs')
-
- # FUTURE advertise 0.2rx once support is implemented
- # FUTURE advertise minrx and mintx after consulting config option
- caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
-
- compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
- if compengines:
- comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
- for e in compengines)
- caps.append('compression=%s' % comptypes)
-
- return caps
-
-# If you are writing an extension and consider wrapping this function. Wrap
-# `_capabilities` instead.
-permissions['capabilities'] = 'pull'
-@wireprotocommand('capabilities')
-def capabilities(repo, proto):
- return ' '.join(_capabilities(repo, proto))
-
-permissions['changegroup'] = 'pull'
-@wireprotocommand('changegroup', 'roots')
-def changegroup(repo, proto, roots):
- nodes = decodelist(roots)
- outgoing = discovery.outgoing(repo, missingroots=nodes,
- missingheads=repo.heads())
- cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
- gen = iter(lambda: cg.read(32768), '')
- return streamres(gen=gen)
-
-permissions['changegroupsubset'] = 'pull'
-@wireprotocommand('changegroupsubset', 'bases heads')
-def changegroupsubset(repo, proto, bases, heads):
- bases = decodelist(bases)
- heads = decodelist(heads)
- outgoing = discovery.outgoing(repo, missingroots=bases,
- missingheads=heads)
- cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
- gen = iter(lambda: cg.read(32768), '')
- return streamres(gen=gen)
-
-permissions['debugwireargs'] = 'pull'
-@wireprotocommand('debugwireargs', 'one two *')
-def debugwireargs(repo, proto, one, two, others):
- # only accept optional args from the known set
- opts = options('debugwireargs', ['three', 'four'], others)
- return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
-
-permissions['getbundle'] = 'pull'
-@wireprotocommand('getbundle', '*')
-def getbundle(repo, proto, others):
- opts = options('getbundle', gboptsmap.keys(), others)
- for k, v in opts.iteritems():
- keytype = gboptsmap[k]
- if keytype == 'nodes':
- opts[k] = decodelist(v)
- elif keytype == 'csv':
- opts[k] = list(v.split(','))
- elif keytype == 'scsv':
- opts[k] = set(v.split(','))
- elif keytype == 'boolean':
- # Client should serialize False as '0', which is a non-empty string
- # so it evaluates as a True bool.
- if v == '0':
- opts[k] = False
- else:
- opts[k] = bool(v)
- elif keytype != 'plain':
- raise KeyError('unknown getbundle option type %s'
- % keytype)
-
- if not bundle1allowed(repo, 'pull'):
- if not exchange.bundle2requested(opts.get('bundlecaps')):
- if proto.name == 'http':
- return ooberror(bundle2required)
- raise error.Abort(bundle2requiredmain,
- hint=bundle2requiredhint)
-
- prefercompressed = True
-
- try:
- if repo.ui.configbool('server', 'disablefullbundle'):
- # Check to see if this is a full clone.
- clheads = set(repo.changelog.heads())
- changegroup = opts.get('cg', True)
- heads = set(opts.get('heads', set()))
- common = set(opts.get('common', set()))
- common.discard(nullid)
- if changegroup and not common and clheads == heads:
- raise error.Abort(
- _('server has pull-based clones disabled'),
- hint=_('remove --pull if specified or upgrade Mercurial'))
-
- info, chunks = exchange.getbundlechunks(repo, 'serve',
- **pycompat.strkwargs(opts))
- prefercompressed = info.get('prefercompressed', True)
- except error.Abort as exc:
- # cleanly forward Abort error to the client
- if not exchange.bundle2requested(opts.get('bundlecaps')):
- if proto.name == 'http':
- return ooberror(str(exc) + '\n')
- raise # cannot do better for bundle1 + ssh
- # bundle2 request expect a bundle2 reply
- bundler = bundle2.bundle20(repo.ui)
- manargs = [('message', str(exc))]
- advargs = []
- if exc.hint is not None:
- advargs.append(('hint', exc.hint))
- bundler.addpart(bundle2.bundlepart('error:abort',
- manargs, advargs))
- chunks = bundler.getchunks()
- prefercompressed = False
-
- return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
-
-permissions['heads'] = 'pull'
-@wireprotocommand('heads')
-def heads(repo, proto):
- h = repo.heads()
- return encodelist(h) + "\n"
-
-permissions['hello'] = 'pull'
-@wireprotocommand('hello')
-def hello(repo, proto):
- '''the hello command returns a set of lines describing various
- interesting things about the server, in an RFC822-like format.
- Currently the only one defined is "capabilities", which
- consists of a line in the form:
-
- capabilities: space separated list of tokens
- '''
- return "capabilities: %s\n" % (capabilities(repo, proto))
-
-permissions['listkeys'] = 'pull'
-@wireprotocommand('listkeys', 'namespace')
-def listkeys(repo, proto, namespace):
- d = repo.listkeys(encoding.tolocal(namespace)).items()
- return pushkeymod.encodekeys(d)
-
-permissions['lookup'] = 'pull'
-@wireprotocommand('lookup', 'key')
-def lookup(repo, proto, key):
- try:
- k = encoding.tolocal(key)
- c = repo[k]
- r = c.hex()
- success = 1
- except Exception as inst:
- r = str(inst)
- success = 0
- return "%d %s\n" % (success, r)
-
-permissions['known'] = 'pull'
-@wireprotocommand('known', 'nodes *')
-def known(repo, proto, nodes, others):
- return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
-
-permissions['pushkey'] = 'push'
-@wireprotocommand('pushkey', 'namespace key old new')
-def pushkey(repo, proto, namespace, key, old, new):
- # compatibility with pre-1.8 clients which were accidentally
- # sending raw binary nodes rather than utf-8-encoded hex
- if len(new) == 20 and util.escapestr(new) != new:
- # looks like it could be a binary node
- try:
- new.decode('utf-8')
- new = encoding.tolocal(new) # but cleanly decodes as UTF-8
- except UnicodeDecodeError:
- pass # binary, leave unmodified
- else:
- new = encoding.tolocal(new) # normal path
-
- if util.safehasattr(proto, 'restore'):
-
- proto.redirect()
-
- try:
- r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
- encoding.tolocal(old), new) or False
- except error.Abort:
- r = False
-
- output = proto.restore()
-
- return '%s\n%s' % (int(r), output)
-
- r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
- encoding.tolocal(old), new)
- return '%s\n' % int(r)
-
-permissions['stream_out'] = 'pull'
-@wireprotocommand('stream_out')
-def stream(repo, proto):
- '''If the server supports streaming clone, it advertises the "stream"
- capability with a value representing the version and flags of the repo
- it is serving. Client checks to see if it understands the format.
- '''
- return streamres_legacy(streamclone.generatev1wireproto(repo))
-
-permissions['unbundle'] = 'push'
-@wireprotocommand('unbundle', 'heads')
-def unbundle(repo, proto, heads):
- their_heads = decodelist(heads)
-
- try:
- proto.redirect()
-
- exchange.check_heads(repo, their_heads, 'preparing changes')
-
- # write bundle data to temporary file because it can be big
- fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
- fp = os.fdopen(fd, pycompat.sysstr('wb+'))
- r = 0
- try:
- proto.getfile(fp)
- fp.seek(0)
- gen = exchange.readbundle(repo.ui, fp, None)
- if (isinstance(gen, changegroupmod.cg1unpacker)
- and not bundle1allowed(repo, 'push')):
- if proto.name == 'http':
- # need to special case http because stderr do not get to
- # the http client on failed push so we need to abuse some
- # other error type to make sure the message get to the
- # user.
- return ooberror(bundle2required)
- raise error.Abort(bundle2requiredmain,
- hint=bundle2requiredhint)
-
- r = exchange.unbundle(repo, gen, their_heads, 'serve',
- proto._client())
- if util.safehasattr(r, 'addpart'):
- # The return looks streamable, we are in the bundle2 case and
- # should return a stream.
- return streamres_legacy(gen=r.getchunks())
- return pushres(r)
-
- finally:
- fp.close()
- os.unlink(tempname)
-
- except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
- # handle non-bundle2 case first
- if not getattr(exc, 'duringunbundle2', False):
- try:
- raise
- except error.Abort:
- # The old code we moved used util.stderr directly.
- # We did not change it to minimise code change.
- # This need to be moved to something proper.
- # Feel free to do it.
- util.stderr.write("abort: %s\n" % exc)
- if exc.hint is not None:
- util.stderr.write("(%s)\n" % exc.hint)
- return pushres(0)
- except error.PushRaced:
- return pusherr(str(exc))
-
- bundler = bundle2.bundle20(repo.ui)
- for out in getattr(exc, '_bundle2salvagedoutput', ()):
- bundler.addpart(out)
- try:
- try:
- raise
- except error.PushkeyFailed as exc:
- # check client caps
- remotecaps = getattr(exc, '_replycaps', None)
- if (remotecaps is not None
- and 'pushkey' not in remotecaps.get('error', ())):
- # no support remote side, fallback to Abort handler.
- raise
- part = bundler.newpart('error:pushkey')
- part.addparam('in-reply-to', exc.partid)
- if exc.namespace is not None:
- part.addparam('namespace', exc.namespace, mandatory=False)
- if exc.key is not None:
- part.addparam('key', exc.key, mandatory=False)
- if exc.new is not None:
- part.addparam('new', exc.new, mandatory=False)
- if exc.old is not None:
- part.addparam('old', exc.old, mandatory=False)
- if exc.ret is not None:
- part.addparam('ret', exc.ret, mandatory=False)
- except error.BundleValueError as exc:
- errpart = bundler.newpart('error:unsupportedcontent')
- if exc.parttype is not None:
- errpart.addparam('parttype', exc.parttype)
- if exc.params:
- errpart.addparam('params', '\0'.join(exc.params))
- except error.Abort as exc:
- manargs = [('message', str(exc))]
- advargs = []
- if exc.hint is not None:
- advargs.append(('hint', exc.hint))
- bundler.addpart(bundle2.bundlepart('error:abort',
- manargs, advargs))
- except error.PushRaced as exc:
- bundler.newpart('error:pushraced', [('message', str(exc))])
- return streamres_legacy(gen=bundler.getchunks())