# HG changeset patch # User Gregory Szorc # Date 1523641883 25200 # Node ID e1b32dc4646cd6c2f001d5d20029c8c22ba07f22 # Parent fa0382088993ca3a0b5092be311084bf7565da56 wireproto: implement command executor interface for version 1 peers Now that we've defined our new interface for issuing commands, let's implement it. We add the interface to the base peer interface. This means all peer types must implement it. The only peer types that we have are the local peer in localrepo and a shared wire peer for version 1 of the wire protocol. The local peer implementation is pretty straightforward. We don't do anything fancy and just return a resolved future with the result of a method call. This is similar to what localiterbatcher does. The wire protocol version 1 implementation is a bit more complicated and is a more robust implementation. The wire executor queues commands by default. And because the new executor interface always allows multiple commands but not all version 1 commands are @batchable, it has to check that the requested commands are batchable if multiple commands are being requested. The wire executor currently only supports executing a single command. This is for simplicity reasons. Support for multiple commands will be added in a separate commit. To prove the new interface works, a call to the "known" command during discovery has been updated to use the new API. It's worth noting that both implementations require a method having the command name to exist on the peer. There is at least one caller in core that don't have a method calls peer._call() directly. We may need to shore up the requirements later... Differential Revision: https://phab.mercurial-scm.org/D3268 diff -r fa0382088993 -r e1b32dc4646c mercurial/localrepo.py --- a/mercurial/localrepo.py Fri Apr 13 10:23:05 2018 -0700 +++ b/mercurial/localrepo.py Fri Apr 13 10:51:23 2018 -0700 @@ -11,6 +11,7 @@ import hashlib import os import random +import sys import time import weakref @@ -167,6 +168,49 @@ resref.set(getattr(self.local, name)(*args, **opts)) yield resref.value +@zi.implementer(repository.ipeercommandexecutor) +class localcommandexecutor(object): + def __init__(self, peer): + self._peer = peer + self._sent = False + self._closed = False + + def __enter__(self): + return self + + def __exit__(self, exctype, excvalue, exctb): + self.close() + + def callcommand(self, command, args): + if self._sent: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'sendcommands()') + + if self._closed: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'close()') + + # We don't need to support anything fancy. Just call the named + # method on the peer and return a resolved future. + fn = getattr(self._peer, pycompat.sysstr(command)) + + f = pycompat.futures.Future() + + try: + result = fn(**args) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + else: + f.set_result(result) + + return f + + def sendcommands(self): + self._sent = True + + def close(self): + self._closed = True + class localpeer(repository.peer): '''peer for a local repo; reflects only the most recent API''' @@ -286,6 +330,9 @@ # Begin of peer interface. + def commandexecutor(self): + return localcommandexecutor(self) + def iterbatch(self): return localiterbatcher(self) diff -r fa0382088993 -r e1b32dc4646c mercurial/repository.py --- a/mercurial/repository.py Fri Apr 13 10:23:05 2018 -0700 +++ b/mercurial/repository.py Fri Apr 13 10:51:23 2018 -0700 @@ -278,7 +278,8 @@ being issued. """ -class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands): +class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands, + ipeerrequests): """Unified interface for peer repositories. All peer instances must conform to this interface. diff -r fa0382088993 -r e1b32dc4646c mercurial/setdiscovery.py --- a/mercurial/setdiscovery.py Fri Apr 13 10:23:05 2018 -0700 +++ b/mercurial/setdiscovery.py Fri Apr 13 10:51:23 2018 -0700 @@ -228,7 +228,12 @@ % (roundtrips, len(undecided), len(sample))) # indices between sample and externalized version must match sample = list(sample) - yesno = remote.known(dag.externalizeall(sample)) + + with remote.commandexecutor() as e: + yesno = e.callcommand('known', { + 'nodes': dag.externalizeall(sample), + }).result() + full = True if sample: diff -r fa0382088993 -r e1b32dc4646c mercurial/wireprotov1peer.py --- a/mercurial/wireprotov1peer.py Fri Apr 13 10:23:05 2018 -0700 +++ b/mercurial/wireprotov1peer.py Fri Apr 13 10:51:23 2018 -0700 @@ -8,12 +8,15 @@ from __future__ import absolute_import import hashlib +import sys from .i18n import _ from .node import ( bin, ) - +from .thirdparty.zope import ( + interface as zi, +) from . import ( bundle2, changegroup as changegroupmod, @@ -177,6 +180,93 @@ return ';'.join(cmds) +@zi.implementer(repository.ipeercommandexecutor) +class peerexecutor(object): + def __init__(self, peer): + self._peer = peer + self._sent = False + self._closed = False + self._calls = [] + + def __enter__(self): + return self + + def __exit__(self, exctype, excvalee, exctb): + self.close() + + def callcommand(self, command, args): + if self._sent: + raise error.ProgrammingError('callcommand() cannot be used ' + 'after commands are sent') + + if self._closed: + raise error.ProgrammingError('callcommand() cannot be used ' + 'after close()') + + # Commands are dispatched through methods on the peer. + fn = getattr(self._peer, pycompat.sysstr(command), None) + + if not fn: + raise error.ProgrammingError( + 'cannot call command %s: method of same name not available ' + 'on peer' % command) + + # Commands are either batchable or they aren't. If a command + # isn't batchable, we send it immediately because the executor + # can no longer accept new commands after a non-batchable command. + # If a command is batchable, we queue it for later. + + if getattr(fn, 'batchable', False): + pass + else: + if self._calls: + raise error.ProgrammingError( + '%s is not batchable and cannot be called on a command ' + 'executor along with other commands' % command) + + # We don't support batching yet. So resolve it immediately. + f = pycompat.futures.Future() + self._calls.append((command, args, fn, f)) + self.sendcommands() + return f + + def sendcommands(self): + if self._sent: + return + + if not self._calls: + return + + self._sent = True + + calls = self._calls + # Mainly to destroy references to futures. + self._calls = None + + if len(calls) == 1: + command, args, fn, f = calls[0] + + # Future was cancelled. Ignore it. + if not f.set_running_or_notify_cancel(): + return + + try: + result = fn(**pycompat.strkwargs(args)) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + else: + f.set_result(result) + + return + + raise error.ProgrammingError('support for multiple commands not ' + 'yet implemented') + + def close(self): + self.sendcommands() + + self._closed = True + class wirepeer(repository.legacypeer): """Client-side interface for communicating with a peer repository. @@ -185,6 +275,9 @@ See also httppeer.py and sshpeer.py for protocol-specific implementations of this interface. """ + def commandexecutor(self): + return peerexecutor(self) + # Begin of ipeercommands interface. def iterbatch(self): diff -r fa0382088993 -r e1b32dc4646c tests/test-check-interfaces.py --- a/tests/test-check-interfaces.py Fri Apr 13 10:23:05 2018 -0700 +++ b/tests/test-check-interfaces.py Fri Apr 13 10:51:23 2018 -0700 @@ -23,6 +23,7 @@ vfs as vfsmod, wireprotoserver, wireprototypes, + wireprotov1peer, wireprotov2server, ) @@ -102,6 +103,14 @@ localrepo.localpeer) checkzobject(localrepo.localpeer(dummyrepo())) + ziverify.verifyClass(repository.ipeercommandexecutor, + localrepo.localcommandexecutor) + checkzobject(localrepo.localcommandexecutor(None)) + + ziverify.verifyClass(repository.ipeercommandexecutor, + wireprotov1peer.peerexecutor) + checkzobject(wireprotov1peer.peerexecutor(None)) + ziverify.verifyClass(repository.ipeerbaselegacycommands, sshpeer.sshv1peer) checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),