tests/test-wireproto.py
author Gregory Szorc <gregory.szorc@gmail.com>
Fri, 04 Dec 2015 00:24:48 -0800
changeset 27268 ed1660ce99d9
parent 25912 cbbdd085c991
child 27301 5defcb7d6e5f
permissions -rw-r--r--
setup.py: attempt to build and install hg.exe on Windows Currently, packaging Mercurial on Windows will produce a Scripts\hg Python script and a Scripts\hg.bat batch script. The py2exe distribution contains a hg.exe which loads a Python interpretter and invokes the "hg" Python script. Running a exe directly has benefits over batch scripts because batch scripts do things like muck around with command arguments. This patch implements a custom "build_scripts" command which attempts to build hg.exe on Windows. If hg.exe is built, it is marked as a "script" file and installed into the Scripts\ directory on Windows. Since hg.exe is redundant and better than hg.bat, if hg.exe is built, hg.bat is not installed. Since some environments don't support compiling C programs, we treat hg.exe as optional and catch failures building it. This is not ideal. However, I reckon most Windows users will not be installing Mercurial from source: they will get it from the MSI installer or via `pip install Mercurial`, which will download a wheel that has hg.exe in it. So, I don't think this is a big deal.

from mercurial import wireproto

class proto(object):
    def __init__(self, args):
        self.args = args
    def getargs(self, spec):
        args = self.args
        args.setdefault('*', {})
        names = spec.split()
        return [args[n] for n in names]

class clientpeer(wireproto.wirepeer):
    def __init__(self, serverrepo):
        self.serverrepo = serverrepo

    def _capabilities(self):
        return ['batch']

    def _call(self, cmd, **args):
        return wireproto.dispatch(self.serverrepo, proto(args), cmd)

    @wireproto.batchable
    def greet(self, name):
        f = wireproto.future()
        yield {'name': mangle(name)}, f
        yield unmangle(f.value)

class serverrepo(object):
    def greet(self, name):
        return "Hello, " + name

    def filtered(self, name):
        return self

def mangle(s):
    return ''.join(chr(ord(c) + 1) for c in s)
def unmangle(s):
    return ''.join(chr(ord(c) - 1) for c in s)

def greet(repo, proto, name):
    return mangle(repo.greet(unmangle(name)))

wireproto.commands['greet'] = (greet, 'name',)

srv = serverrepo()
clt = clientpeer(srv)

print clt.greet("Foobar")
b = clt.batch()
fs = [b.greet(s) for s in ["Fo, =;:<o", "Bar"]]
b.submit()
print [f.value for f in fs]