hgext/fsmonitor/pywatchman/__init__.py
changeset 28432 2377c4ac4eec
child 30656 16f4b341288d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hgext/fsmonitor/pywatchman/__init__.py	Wed Mar 02 16:25:12 2016 +0000
@@ -0,0 +1,779 @@
+# Copyright 2014-present Facebook, Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+#  * Redistributions of source code must retain the above copyright notice,
+#    this list of conditions and the following disclaimer.
+#
+#  * Redistributions in binary form must reproduce the above copyright notice,
+#    this list of conditions and the following disclaimer in the documentation
+#    and/or other materials provided with the distribution.
+#
+#  * Neither the name Facebook nor the names of its contributors may be used to
+#    endorse or promote products derived from this software without specific
+#    prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import os
+import errno
+import math
+import socket
+import subprocess
+import time
+
+# Sometimes it's really hard to get Python extensions to compile,
+# so fall back to a pure Python implementation.
+try:
+    import bser
+except ImportError:
+    import pybser as bser
+
+import capabilities
+
+if os.name == 'nt':
+    import ctypes
+    import ctypes.wintypes
+
+    wintypes = ctypes.wintypes
+    GENERIC_READ = 0x80000000
+    GENERIC_WRITE = 0x40000000
+    FILE_FLAG_OVERLAPPED = 0x40000000
+    OPEN_EXISTING = 3
+    INVALID_HANDLE_VALUE = -1
+    FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
+    FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
+    FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
+    WAIT_TIMEOUT = 0x00000102
+    WAIT_OBJECT_0 = 0x00000000
+    ERROR_IO_PENDING = 997
+
+    class OVERLAPPED(ctypes.Structure):
+        _fields_ = [
+            ("Internal", wintypes.ULONG), ("InternalHigh", wintypes.ULONG),
+            ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
+            ("hEvent", wintypes.HANDLE)
+        ]
+
+        def __init__(self):
+            self.Offset = 0
+            self.OffsetHigh = 0
+            self.hEvent = 0
+
+    LPDWORD = ctypes.POINTER(wintypes.DWORD)
+
+    CreateFile = ctypes.windll.kernel32.CreateFileA
+    CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
+                           wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
+                           wintypes.HANDLE]
+    CreateFile.restype = wintypes.HANDLE
+
+    CloseHandle = ctypes.windll.kernel32.CloseHandle
+    CloseHandle.argtypes = [wintypes.HANDLE]
+    CloseHandle.restype = wintypes.BOOL
+
+    ReadFile = ctypes.windll.kernel32.ReadFile
+    ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
+                         LPDWORD, ctypes.POINTER(OVERLAPPED)]
+    ReadFile.restype = wintypes.BOOL
+
+    WriteFile = ctypes.windll.kernel32.WriteFile
+    WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
+                          LPDWORD, ctypes.POINTER(OVERLAPPED)]
+    WriteFile.restype = wintypes.BOOL
+
+    GetLastError = ctypes.windll.kernel32.GetLastError
+    GetLastError.argtypes = []
+    GetLastError.restype = wintypes.DWORD
+
+    FormatMessage = ctypes.windll.kernel32.FormatMessageA
+    FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
+                              wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
+                              wintypes.DWORD, wintypes.LPVOID]
+    FormatMessage.restype = wintypes.DWORD
+
+    LocalFree = ctypes.windll.kernel32.LocalFree
+
+    GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx
+    GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
+                                      ctypes.POINTER(OVERLAPPED), LPDWORD,
+                                      wintypes.DWORD, wintypes.BOOL]
+    GetOverlappedResultEx.restype = wintypes.BOOL
+
+    CancelIoEx = ctypes.windll.kernel32.CancelIoEx
+    CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
+    CancelIoEx.restype = wintypes.BOOL
+
+# 2 bytes marker, 1 byte int size, 8 bytes int64 value
+sniff_len = 13
+
+# This is a helper for debugging the client.
+_debugging = False
+if _debugging:
+
+    def log(fmt, *args):
+        print('[%s] %s' %
+              (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
+               fmt % args[:]))
+else:
+
+    def log(fmt, *args):
+        pass
+
+
+class WatchmanError(Exception):
+    pass
+
+
+class SocketTimeout(WatchmanError):
+    """A specialized exception raised for socket timeouts during communication to/from watchman.
+       This makes it easier to implement non-blocking loops as callers can easily distinguish
+       between a routine timeout and an actual error condition.
+
+       Note that catching WatchmanError will also catch this as it is a super-class, so backwards
+       compatibility in exception handling is preserved.
+    """
+
+
+class CommandError(WatchmanError):
+    """error returned by watchman
+
+    self.msg is the message returned by watchman.
+    """
+
+    def __init__(self, msg, cmd=None):
+        self.msg = msg
+        self.cmd = cmd
+        super(CommandError, self).__init__('watchman command error: %s' % msg)
+
+    def setCommand(self, cmd):
+        self.cmd = cmd
+
+    def __str__(self):
+        if self.cmd:
+            return '%s, while executing %s' % (self.msg, self.cmd)
+        return self.msg
+
+
+class Transport(object):
+    """ communication transport to the watchman server """
+    buf = None
+
+    def close(self):
+        """ tear it down """
+        raise NotImplementedError()
+
+    def readBytes(self, size):
+        """ read size bytes """
+        raise NotImplementedError()
+
+    def write(self, buf):
+        """ write some data """
+        raise NotImplementedError()
+
+    def setTimeout(self, value):
+        pass
+
+    def readLine(self):
+        """ read a line
+        Maintains its own buffer, callers of the transport should not mix
+        calls to readBytes and readLine.
+        """
+        if self.buf is None:
+            self.buf = []
+
+        # Buffer may already have a line if we've received unilateral
+        # response(s) from the server
+        if len(self.buf) == 1 and "\n" in self.buf[0]:
+            (line, b) = self.buf[0].split("\n", 1)
+            self.buf = [b]
+            return line
+
+        while True:
+            b = self.readBytes(4096)
+            if "\n" in b:
+                result = ''.join(self.buf)
+                (line, b) = b.split("\n", 1)
+                self.buf = [b]
+                return result + line
+            self.buf.append(b)
+
+
+class Codec(object):
+    """ communication encoding for the watchman server """
+    transport = None
+
+    def __init__(self, transport):
+        self.transport = transport
+
+    def receive(self):
+        raise NotImplementedError()
+
+    def send(self, *args):
+        raise NotImplementedError()
+
+    def setTimeout(self, value):
+        self.transport.setTimeout(value)
+
+
+class UnixSocketTransport(Transport):
+    """ local unix domain socket transport """
+    sock = None
+
+    def __init__(self, sockpath, timeout):
+        self.sockpath = sockpath
+        self.timeout = timeout
+
+        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        try:
+            sock.settimeout(self.timeout)
+            sock.connect(self.sockpath)
+            self.sock = sock
+        except socket.error as e:
+            raise WatchmanError('unable to connect to %s: %s' %
+                                (self.sockpath, e))
+
+    def close(self):
+        self.sock.close()
+        self.sock = None
+
+    def setTimeout(self, value):
+        self.timeout = value
+        self.sock.settimeout(self.timeout)
+
+    def readBytes(self, size):
+        try:
+            buf = [self.sock.recv(size)]
+            if not buf[0]:
+                raise WatchmanError('empty watchman response')
+            return buf[0]
+        except socket.timeout:
+            raise SocketTimeout('timed out waiting for response')
+
+    def write(self, data):
+        try:
+            self.sock.sendall(data)
+        except socket.timeout:
+            raise SocketTimeout('timed out sending query command')
+
+
+class WindowsNamedPipeTransport(Transport):
+    """ connect to a named pipe """
+
+    def __init__(self, sockpath, timeout):
+        self.sockpath = sockpath
+        self.timeout = int(math.ceil(timeout * 1000))
+        self._iobuf = None
+
+        self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
+                               OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
+
+        if self.pipe == INVALID_HANDLE_VALUE:
+            self.pipe = None
+            self._raise_win_err('failed to open pipe %s' % sockpath,
+                                GetLastError())
+
+    def _win32_strerror(self, err):
+        """ expand a win32 error code into a human readable message """
+
+        # FormatMessage will allocate memory and assign it here
+        buf = ctypes.c_char_p()
+        FormatMessage(
+            FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
+            | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
+        try:
+            return buf.value
+        finally:
+            LocalFree(buf)
+
+    def _raise_win_err(self, msg, err):
+        raise IOError('%s win32 error code: %d %s' %
+                      (msg, err, self._win32_strerror(err)))
+
+    def close(self):
+        if self.pipe:
+            CloseHandle(self.pipe)
+        self.pipe = None
+
+    def readBytes(self, size):
+        """ A read can block for an unbounded amount of time, even if the
+            kernel reports that the pipe handle is signalled, so we need to
+            always perform our reads asynchronously
+        """
+
+        # try to satisfy the read from any buffered data
+        if self._iobuf:
+            if size >= len(self._iobuf):
+                res = self._iobuf
+                self.buf = None
+                return res
+            res = self._iobuf[:size]
+            self._iobuf = self._iobuf[size:]
+            return res
+
+        # We need to initiate a read
+        buf = ctypes.create_string_buffer(size)
+        olap = OVERLAPPED()
+
+        log('made read buff of size %d', size)
+
+        # ReadFile docs warn against sending in the nread parameter for async
+        # operations, so we always collect it via GetOverlappedResultEx
+        immediate = ReadFile(self.pipe, buf, size, None, olap)
+
+        if not immediate:
+            err = GetLastError()
+            if err != ERROR_IO_PENDING:
+                self._raise_win_err('failed to read %d bytes' % size,
+                                    GetLastError())
+
+        nread = wintypes.DWORD()
+        if not GetOverlappedResultEx(self.pipe, olap, nread,
+                                     0 if immediate else self.timeout, True):
+            err = GetLastError()
+            CancelIoEx(self.pipe, olap)
+
+            if err == WAIT_TIMEOUT:
+                log('GetOverlappedResultEx timedout')
+                raise SocketTimeout('timed out after waiting %dms for read' %
+                                    self.timeout)
+
+            log('GetOverlappedResultEx reports error %d', err)
+            self._raise_win_err('error while waiting for read', err)
+
+        nread = nread.value
+        if nread == 0:
+            # Docs say that named pipes return 0 byte when the other end did
+            # a zero byte write.  Since we don't ever do that, the only
+            # other way this shows up is if the client has gotten in a weird
+            # state, so let's bail out
+            CancelIoEx(self.pipe, olap)
+            raise IOError('Async read yielded 0 bytes; unpossible!')
+
+        # Holds precisely the bytes that we read from the prior request
+        buf = buf[:nread]
+
+        returned_size = min(nread, size)
+        if returned_size == nread:
+            return buf
+
+        # keep any left-overs around for a later read to consume
+        self._iobuf = buf[returned_size:]
+        return buf[:returned_size]
+
+    def write(self, data):
+        olap = OVERLAPPED()
+        immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
+                              None, olap)
+
+        if not immediate:
+            err = GetLastError()
+            if err != ERROR_IO_PENDING:
+                self._raise_win_err('failed to write %d bytes' % len(data),
+                                    GetLastError())
+
+        # Obtain results, waiting if needed
+        nwrote = wintypes.DWORD()
+        if GetOverlappedResultEx(self.pipe, olap, nwrote, 0 if immediate else
+                                 self.timeout, True):
+            return nwrote.value
+
+        err = GetLastError()
+
+        # It's potentially unsafe to allow the write to continue after
+        # we unwind, so let's make a best effort to avoid that happening
+        CancelIoEx(self.pipe, olap)
+
+        if err == WAIT_TIMEOUT:
+            raise SocketTimeout('timed out after waiting %dms for write' %
+                                self.timeout)
+        self._raise_win_err('error while waiting for write of %d bytes' %
+                            len(data), err)
+
+
+class CLIProcessTransport(Transport):
+    """ open a pipe to the cli to talk to the service
+    This intended to be used only in the test harness!
+
+    The CLI is an oddball because we only support JSON input
+    and cannot send multiple commands through the same instance,
+    so we spawn a new process for each command.
+
+    We disable server spawning for this implementation, again, because
+    it is intended to be used only in our test harness.  You really
+    should not need to use the CLI transport for anything real.
+
+    While the CLI can output in BSER, our Transport interface doesn't
+    support telling this instance that it should do so.  That effectively
+    limits this implementation to JSON input and output only at this time.
+
+    It is the responsibility of the caller to set the send and
+    receive codecs appropriately.
+    """
+    proc = None
+    closed = True
+
+    def __init__(self, sockpath, timeout):
+        self.sockpath = sockpath
+        self.timeout = timeout
+
+    def close(self):
+        if self.proc:
+            self.proc.kill()
+            self.proc = None
+
+    def _connect(self):
+        if self.proc:
+            return self.proc
+        args = [
+            'watchman',
+            '--sockname={}'.format(self.sockpath),
+            '--logfile=/BOGUS',
+            '--statefile=/BOGUS',
+            '--no-spawn',
+            '--no-local',
+            '--no-pretty',
+            '-j',
+        ]
+        self.proc = subprocess.Popen(args,
+                                     stdin=subprocess.PIPE,
+                                     stdout=subprocess.PIPE)
+        return self.proc
+
+    def readBytes(self, size):
+        self._connect()
+        res = self.proc.stdout.read(size)
+        if res == '':
+            raise WatchmanError('EOF on CLI process transport')
+        return res
+
+    def write(self, data):
+        if self.closed:
+            self.closed = False
+            self.proc = None
+        self._connect()
+        res = self.proc.stdin.write(data)
+        self.proc.stdin.close()
+        self.closed = True
+        return res
+
+
+class BserCodec(Codec):
+    """ use the BSER encoding.  This is the default, preferred codec """
+
+    def _loads(self, response):
+        return bser.loads(response)
+
+    def receive(self):
+        buf = [self.transport.readBytes(sniff_len)]
+        if not buf[0]:
+            raise WatchmanError('empty watchman response')
+
+        elen = bser.pdu_len(buf[0])
+
+        rlen = len(buf[0])
+        while elen > rlen:
+            buf.append(self.transport.readBytes(elen - rlen))
+            rlen += len(buf[-1])
+
+        response = ''.join(buf)
+        try:
+            res = self._loads(response)
+            return res
+        except ValueError as e:
+            raise WatchmanError('watchman response decode error: %s' % e)
+
+    def send(self, *args):
+        cmd = bser.dumps(*args)
+        self.transport.write(cmd)
+
+
+class ImmutableBserCodec(BserCodec):
+    """ use the BSER encoding, decoding values using the newer
+        immutable object support """
+
+    def _loads(self, response):
+        return bser.loads(response, False)
+
+
+class JsonCodec(Codec):
+    """ Use json codec.  This is here primarily for testing purposes """
+    json = None
+
+    def __init__(self, transport):
+        super(JsonCodec, self).__init__(transport)
+        # optional dep on json, only if JsonCodec is used
+        import json
+        self.json = json
+
+    def receive(self):
+        line = self.transport.readLine()
+        try:
+            return self.json.loads(line)
+        except Exception as e:
+            print(e, line)
+            raise
+
+    def send(self, *args):
+        cmd = self.json.dumps(*args)
+        self.transport.write(cmd + "\n")
+
+
+class client(object):
+    """ Handles the communication with the watchman service """
+    sockpath = None
+    transport = None
+    sendCodec = None
+    recvCodec = None
+    sendConn = None
+    recvConn = None
+    subs = {}  # Keyed by subscription name
+    sub_by_root = {}  # Keyed by root, then by subscription name
+    logs = []  # When log level is raised
+    unilateral = ['log', 'subscription']
+    tport = None
+    useImmutableBser = None
+
+    def __init__(self,
+                 sockpath=None,
+                 timeout=1.0,
+                 transport=None,
+                 sendEncoding=None,
+                 recvEncoding=None,
+                 useImmutableBser=False):
+        self.sockpath = sockpath
+        self.timeout = timeout
+        self.useImmutableBser = useImmutableBser
+
+        transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
+        if transport == 'local' and os.name == 'nt':
+            self.transport = WindowsNamedPipeTransport
+        elif transport == 'local':
+            self.transport = UnixSocketTransport
+        elif transport == 'cli':
+            self.transport = CLIProcessTransport
+            if sendEncoding is None:
+                sendEncoding = 'json'
+            if recvEncoding is None:
+                recvEncoding = sendEncoding
+        else:
+            raise WatchmanError('invalid transport %s' % transport)
+
+        sendEncoding = sendEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
+        recvEncoding = recvEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
+
+        self.recvCodec = self._parseEncoding(recvEncoding)
+        self.sendCodec = self._parseEncoding(sendEncoding)
+
+    def _parseEncoding(self, enc):
+        if enc == 'bser':
+            if self.useImmutableBser:
+                return ImmutableBserCodec
+            return BserCodec
+        elif enc == 'json':
+            return JsonCodec
+        else:
+            raise WatchmanError('invalid encoding %s' % enc)
+
+    def _hasprop(self, result, name):
+        if self.useImmutableBser:
+            return hasattr(result, name)
+        return name in result
+
+    def _resolvesockname(self):
+        # if invoked via a trigger, watchman will set this env var; we
+        # should use it unless explicitly set otherwise
+        path = os.getenv('WATCHMAN_SOCK')
+        if path:
+            return path
+
+        cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
+        try:
+            p = subprocess.Popen(cmd,
+                                 stdout=subprocess.PIPE,
+                                 stderr=subprocess.PIPE,
+                                 close_fds=os.name != 'nt')
+        except OSError as e:
+            raise WatchmanError('"watchman" executable not in PATH (%s)', e)
+
+        stdout, stderr = p.communicate()
+        exitcode = p.poll()
+
+        if exitcode:
+            raise WatchmanError("watchman exited with code %d" % exitcode)
+
+        result = bser.loads(stdout)
+        if 'error' in result:
+            raise WatchmanError('get-sockname error: %s' % result['error'])
+
+        return result['sockname']
+
+    def _connect(self):
+        """ establish transport connection """
+
+        if self.recvConn:
+            return
+
+        if self.sockpath is None:
+            self.sockpath = self._resolvesockname()
+
+        self.tport = self.transport(self.sockpath, self.timeout)
+        self.sendConn = self.sendCodec(self.tport)
+        self.recvConn = self.recvCodec(self.tport)
+
+    def __del__(self):
+        self.close()
+
+    def close(self):
+        if self.tport:
+            self.tport.close()
+            self.tport = None
+            self.recvConn = None
+            self.sendConn = None
+
+    def receive(self):
+        """ receive the next PDU from the watchman service
+
+        If the client has activated subscriptions or logs then
+        this PDU may be a unilateral PDU sent by the service to
+        inform the client of a log event or subscription change.
+
+        It may also simply be the response portion of a request
+        initiated by query.
+
+        There are clients in production that subscribe and call
+        this in a loop to retrieve all subscription responses,
+        so care should be taken when making changes here.
+        """
+
+        self._connect()
+        result = self.recvConn.receive()
+        if self._hasprop(result, 'error'):
+            raise CommandError(result['error'])
+
+        if self._hasprop(result, 'log'):
+            self.logs.append(result['log'])
+
+        if self._hasprop(result, 'subscription'):
+            sub = result['subscription']
+            if not (sub in self.subs):
+                self.subs[sub] = []
+            self.subs[sub].append(result)
+
+            # also accumulate in {root,sub} keyed store
+            root = os.path.normcase(result['root'])
+            if not root in self.sub_by_root:
+                self.sub_by_root[root] = {}
+            if not sub in self.sub_by_root[root]:
+                self.sub_by_root[root][sub] = []
+            self.sub_by_root[root][sub].append(result)
+
+        return result
+
+    def isUnilateralResponse(self, res):
+        for k in self.unilateral:
+            if k in res:
+                return True
+        return False
+
+    def getLog(self, remove=True):
+        """ Retrieve buffered log data
+
+        If remove is true the data will be removed from the buffer.
+        Otherwise it will be left in the buffer
+        """
+        res = self.logs
+        if remove:
+            self.logs = []
+        return res
+
+    def getSubscription(self, name, remove=True, root=None):
+        """ Retrieve the data associated with a named subscription
+
+        If remove is True (the default), the subscription data is removed
+        from the buffer.  Otherwise the data is returned but left in
+        the buffer.
+
+        Returns None if there is no data associated with `name`
+
+        If root is not None, then only return the subscription
+        data that matches both root and name.  When used in this way,
+        remove processing impacts both the unscoped and scoped stores
+        for the subscription data.
+        """
+
+        if root is not None:
+            if not root in self.sub_by_root:
+                return None
+            if not name in self.sub_by_root[root]:
+                return None
+            sub = self.sub_by_root[root][name]
+            if remove:
+                del self.sub_by_root[root][name]
+                # don't let this grow unbounded
+                if name in self.subs:
+                    del self.subs[name]
+            return sub
+
+        if not (name in self.subs):
+            return None
+        sub = self.subs[name]
+        if remove:
+            del self.subs[name]
+        return sub
+
+    def query(self, *args):
+        """ Send a query to the watchman service and return the response
+
+        This call will block until the response is returned.
+        If any unilateral responses are sent by the service in between
+        the request-response they will be buffered up in the client object
+        and NOT returned via this method.
+        """
+
+        log('calling client.query')
+        self._connect()
+        try:
+            self.sendConn.send(args)
+
+            res = self.receive()
+            while self.isUnilateralResponse(res):
+                res = self.receive()
+
+            return res
+        except CommandError as ex:
+            ex.setCommand(args)
+            raise ex
+
+    def capabilityCheck(self, optional=None, required=None):
+        """ Perform a server capability check """
+        res = self.query('version', {
+            'optional': optional or [],
+            'required': required or []
+        })
+
+        if not self._hasprop(res, 'capabilities'):
+            # Server doesn't support capabilities, so we need to
+            # synthesize the results based on the version
+            capabilities.synthesize(res, optional)
+            if 'error' in res:
+                raise CommandError(res['error'])
+
+        return res
+
+    def setTimeout(self, value):
+        self.recvConn.setTimeout(value)
+        self.sendConn.setTimeout(value)
+
+# no-check-code -- this is a 3rd party library