--- a/hgext/fsmonitor/pywatchman/__init__.py Mon Nov 04 10:09:08 2019 +0100
+++ b/hgext/fsmonitor/pywatchman/__init__.py Sat Nov 02 12:42:23 2019 -0700
@@ -26,10 +26,8 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
# no unicode literals
+from __future__ import absolute_import, division, print_function
import inspect
import math
@@ -38,33 +36,22 @@
import subprocess
import time
+from . import capabilities, compat, encoding, load
+
+
# Sometimes it's really hard to get Python extensions to compile,
# so fall back to a pure Python implementation.
try:
from . import bser
+
# Demandimport causes modules to be loaded lazily. Force the load now
# so that we can fall back on pybser if bser doesn't exist
bser.pdu_info
except ImportError:
from . import pybser as bser
-from mercurial.utils import (
- procutil,
-)
-from mercurial import (
- pycompat,
-)
-
-from . import (
- capabilities,
- compat,
- encoding,
- load,
-)
-
-
-if os.name == 'nt':
+if os.name == "nt":
import ctypes
import ctypes.wintypes
@@ -73,7 +60,7 @@
GENERIC_WRITE = 0x40000000
FILE_FLAG_OVERLAPPED = 0x40000000
OPEN_EXISTING = 3
- INVALID_HANDLE_VALUE = -1
+ INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
@@ -92,9 +79,11 @@
class OVERLAPPED(ctypes.Structure):
_fields_ = [
- ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
- ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
- ("hEvent", wintypes.HANDLE)
+ ("Internal", ULONG_PTR),
+ ("InternalHigh", ULONG_PTR),
+ ("Offset", wintypes.DWORD),
+ ("OffsetHigh", wintypes.DWORD),
+ ("hEvent", wintypes.HANDLE),
]
def __init__(self):
@@ -107,9 +96,15 @@
LPDWORD = ctypes.POINTER(wintypes.DWORD)
CreateFile = ctypes.windll.kernel32.CreateFileA
- CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
- wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
- wintypes.HANDLE]
+ CreateFile.argtypes = [
+ wintypes.LPSTR,
+ wintypes.DWORD,
+ wintypes.DWORD,
+ wintypes.LPVOID,
+ wintypes.DWORD,
+ wintypes.DWORD,
+ wintypes.HANDLE,
+ ]
CreateFile.restype = wintypes.HANDLE
CloseHandle = ctypes.windll.kernel32.CloseHandle
@@ -117,13 +112,23 @@
CloseHandle.restype = wintypes.BOOL
ReadFile = ctypes.windll.kernel32.ReadFile
- ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
- LPDWORD, ctypes.POINTER(OVERLAPPED)]
+ ReadFile.argtypes = [
+ wintypes.HANDLE,
+ wintypes.LPVOID,
+ wintypes.DWORD,
+ LPDWORD,
+ ctypes.POINTER(OVERLAPPED),
+ ]
ReadFile.restype = wintypes.BOOL
WriteFile = ctypes.windll.kernel32.WriteFile
- WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
- LPDWORD, ctypes.POINTER(OVERLAPPED)]
+ WriteFile.argtypes = [
+ wintypes.HANDLE,
+ wintypes.LPVOID,
+ wintypes.DWORD,
+ LPDWORD,
+ ctypes.POINTER(OVERLAPPED),
+ ]
WriteFile.restype = wintypes.BOOL
GetLastError = ctypes.windll.kernel32.GetLastError
@@ -135,34 +140,56 @@
SetLastError.restype = None
FormatMessage = ctypes.windll.kernel32.FormatMessageA
- FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
- wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
- wintypes.DWORD, wintypes.LPVOID]
+ FormatMessage.argtypes = [
+ wintypes.DWORD,
+ wintypes.LPVOID,
+ wintypes.DWORD,
+ wintypes.DWORD,
+ ctypes.POINTER(wintypes.LPSTR),
+ wintypes.DWORD,
+ wintypes.LPVOID,
+ ]
FormatMessage.restype = wintypes.DWORD
LocalFree = ctypes.windll.kernel32.LocalFree
GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
- GetOverlappedResult.argtypes = [wintypes.HANDLE,
- ctypes.POINTER(OVERLAPPED), LPDWORD,
- wintypes.BOOL]
+ GetOverlappedResult.argtypes = [
+ wintypes.HANDLE,
+ ctypes.POINTER(OVERLAPPED),
+ LPDWORD,
+ wintypes.BOOL,
+ ]
GetOverlappedResult.restype = wintypes.BOOL
- GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
- 'GetOverlappedResultEx', None)
+ GetOverlappedResultEx = getattr(
+ ctypes.windll.kernel32, "GetOverlappedResultEx", None
+ )
if GetOverlappedResultEx is not None:
- GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
- ctypes.POINTER(OVERLAPPED), LPDWORD,
- wintypes.DWORD, wintypes.BOOL]
+ GetOverlappedResultEx.argtypes = [
+ wintypes.HANDLE,
+ ctypes.POINTER(OVERLAPPED),
+ LPDWORD,
+ wintypes.DWORD,
+ wintypes.BOOL,
+ ]
GetOverlappedResultEx.restype = wintypes.BOOL
WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
- WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
+ WaitForSingleObjectEx.argtypes = [
+ wintypes.HANDLE,
+ wintypes.DWORD,
+ wintypes.BOOL,
+ ]
WaitForSingleObjectEx.restype = wintypes.DWORD
CreateEvent = ctypes.windll.kernel32.CreateEventA
- CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
- wintypes.LPSTR]
+ CreateEvent.argtypes = [
+ LPDWORD,
+ wintypes.BOOL,
+ wintypes.BOOL,
+ wintypes.LPSTR,
+ ]
CreateEvent.restype = wintypes.HANDLE
# Windows Vista is the minimum supported client for CancelIoEx.
@@ -178,9 +205,15 @@
if _debugging:
def log(fmt, *args):
- print('[%s] %s' %
- (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
- fmt % args[:]))
+ print(
+ "[%s] %s"
+ % (
+ time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
+ fmt % args[:],
+ )
+ )
+
+
else:
def log(fmt, *args):
@@ -193,8 +226,16 @@
# FormatMessage will allocate memory and assign it here
buf = ctypes.c_char_p()
FormatMessage(
- FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
- | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
+ FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER
+ | FORMAT_MESSAGE_IGNORE_INSERTS,
+ None,
+ err,
+ 0,
+ buf,
+ 0,
+ None,
+ )
try:
return buf.value
finally:
@@ -211,21 +252,30 @@
def __str__(self):
if self.cmd:
- return '%s, while executing %s' % (self.msg, self.cmd)
+ return "%s, while executing %s" % (self.msg, self.cmd)
return self.msg
+class BSERv1Unsupported(WatchmanError):
+ pass
+
+
+class UseAfterFork(WatchmanError):
+ pass
+
+
class WatchmanEnvironmentError(WatchmanError):
def __init__(self, msg, errno, errmsg, cmd=None):
super(WatchmanEnvironmentError, self).__init__(
- '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
- cmd)
+ "{0}: errno={1} errmsg={2}".format(msg, errno, errmsg), cmd
+ )
class SocketConnectError(WatchmanError):
def __init__(self, sockpath, exc):
super(SocketConnectError, self).__init__(
- 'unable to connect to %s: %s' % (sockpath, exc))
+ "unable to connect to %s: %s" % (sockpath, exc)
+ )
self.sockpath = sockpath
self.exc = exc
@@ -245,15 +295,16 @@
self.msg is the message returned by watchman.
"""
+
def __init__(self, msg, cmd=None):
super(CommandError, self).__init__(
- 'watchman command error: %s' % (msg, ),
- cmd,
+ "watchman command error: %s" % (msg,), cmd
)
class Transport(object):
""" communication transport to the watchman server """
+
buf = None
def close(self):
@@ -289,7 +340,7 @@
while True:
b = self.readBytes(4096)
if b"\n" in b:
- result = b''.join(self.buf)
+ result = b"".join(self.buf)
(line, b) = b.split(b"\n", 1)
self.buf = [b]
return result + line
@@ -298,6 +349,7 @@
class Codec(object):
""" communication encoding for the watchman server """
+
transport = None
def __init__(self, transport):
@@ -315,9 +367,10 @@
class UnixSocketTransport(Transport):
""" local unix domain socket transport """
+
sock = None
- def __init__(self, sockpath, timeout, watchman_exe):
+ def __init__(self, sockpath, timeout):
self.sockpath = sockpath
self.timeout = timeout
@@ -331,8 +384,9 @@
raise SocketConnectError(self.sockpath, e)
def close(self):
- self.sock.close()
- self.sock = None
+ if self.sock:
+ self.sock.close()
+ self.sock = None
def setTimeout(self, value):
self.timeout = value
@@ -342,16 +396,16 @@
try:
buf = [self.sock.recv(size)]
if not buf[0]:
- raise WatchmanError('empty watchman response')
+ raise WatchmanError("empty watchman response")
return buf[0]
except socket.timeout:
- raise SocketTimeout('timed out waiting for response')
+ raise SocketTimeout("timed out waiting for response")
def write(self, data):
try:
self.sock.sendall(data)
except socket.timeout:
- raise SocketTimeout('timed out sending query command')
+ raise SocketTimeout("timed out sending query command")
def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
@@ -364,7 +418,7 @@
source code (see get_overlapped_result_ex_impl in stream_win.c). This
way, maintenance should be simplified.
"""
- log('Preparing to wait for maximum %dms', millis )
+ log("Preparing to wait for maximum %dms", millis)
if millis != 0:
waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
if waitReturnCode == WAIT_OBJECT_0:
@@ -383,12 +437,12 @@
elif waitReturnCode == WAIT_FAILED:
# something went wrong calling WaitForSingleObjectEx
err = GetLastError()
- log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
+ log("WaitForSingleObjectEx failed: %s", _win32_strerror(err))
return False
else:
# unexpected situation deserving investigation.
err = GetLastError()
- log('Unexpected error: %s', _win32_strerror(err))
+ log("Unexpected error: %s", _win32_strerror(err))
return False
return GetOverlappedResult(pipe, olap, nbytes, False)
@@ -397,36 +451,52 @@
class WindowsNamedPipeTransport(Transport):
""" connect to a named pipe """
- def __init__(self, sockpath, timeout, watchman_exe):
+ def __init__(self, sockpath, timeout):
self.sockpath = sockpath
self.timeout = int(math.ceil(timeout * 1000))
self._iobuf = None
- self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
- OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
+ if compat.PYTHON3:
+ sockpath = os.fsencode(sockpath)
+ self.pipe = CreateFile(
+ sockpath,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ None,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ None,
+ )
- if self.pipe == INVALID_HANDLE_VALUE:
+ err = GetLastError()
+ if self.pipe == INVALID_HANDLE_VALUE or self.pipe == 0:
self.pipe = None
- self._raise_win_err('failed to open pipe %s' % sockpath,
- GetLastError())
+ raise SocketConnectError(self.sockpath, self._make_win_err("", err))
# event for the overlapped I/O operations
self._waitable = CreateEvent(None, True, False, None)
+ err = GetLastError()
if self._waitable is None:
- self._raise_win_err('CreateEvent failed', GetLastError())
+ self._raise_win_err("CreateEvent failed", err)
self._get_overlapped_result_ex = GetOverlappedResultEx
- if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
- self._get_overlapped_result_ex is None):
+ if (
+ os.getenv("WATCHMAN_WIN7_COMPAT") == "1"
+ or self._get_overlapped_result_ex is None
+ ):
self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
def _raise_win_err(self, msg, err):
- raise IOError('%s win32 error code: %d %s' %
- (msg, err, _win32_strerror(err)))
+ raise self._make_win_err(msg, err)
+
+ def _make_win_err(self, msg, err):
+ return IOError(
+ "%s win32 error code: %d %s" % (msg, err, _win32_strerror(err))
+ )
def close(self):
if self.pipe:
- log('Closing pipe')
+ log("Closing pipe")
CloseHandle(self.pipe)
self.pipe = None
@@ -460,7 +530,7 @@
olap = OVERLAPPED()
olap.hEvent = self._waitable
- log('made read buff of size %d', size)
+ log("made read buff of size %d", size)
# ReadFile docs warn against sending in the nread parameter for async
# operations, so we always collect it via GetOverlappedResultEx
@@ -469,23 +539,23 @@
if not immediate:
err = GetLastError()
if err != ERROR_IO_PENDING:
- self._raise_win_err('failed to read %d bytes' % size,
- GetLastError())
+ self._raise_win_err("failed to read %d bytes" % size, err)
nread = wintypes.DWORD()
- if not self._get_overlapped_result_ex(self.pipe, olap, nread,
- 0 if immediate else self.timeout,
- True):
+ if not self._get_overlapped_result_ex(
+ self.pipe, olap, nread, 0 if immediate else self.timeout, True
+ ):
err = GetLastError()
CancelIoEx(self.pipe, olap)
if err == WAIT_TIMEOUT:
- log('GetOverlappedResultEx timedout')
- raise SocketTimeout('timed out after waiting %dms for read' %
- self.timeout)
+ log("GetOverlappedResultEx timedout")
+ raise SocketTimeout(
+ "timed out after waiting %dms for read" % self.timeout
+ )
- log('GetOverlappedResultEx reports error %d', err)
- self._raise_win_err('error while waiting for read', err)
+ log("GetOverlappedResultEx reports error %d", err)
+ self._raise_win_err("error while waiting for read", err)
nread = nread.value
if nread == 0:
@@ -494,7 +564,7 @@
# other way this shows up is if the client has gotten in a weird
# state, so let's bail out
CancelIoEx(self.pipe, olap)
- raise IOError('Async read yielded 0 bytes; unpossible!')
+ raise IOError("Async read yielded 0 bytes; unpossible!")
# Holds precisely the bytes that we read from the prior request
buf = buf[:nread]
@@ -511,21 +581,25 @@
olap = OVERLAPPED()
olap.hEvent = self._waitable
- immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
- None, olap)
+ immediate = WriteFile(
+ self.pipe, ctypes.c_char_p(data), len(data), None, olap
+ )
if not immediate:
err = GetLastError()
if err != ERROR_IO_PENDING:
- self._raise_win_err('failed to write %d bytes' % len(data),
- GetLastError())
+ self._raise_win_err(
+ "failed to write %d bytes to handle %r"
+ % (len(data), self.pipe),
+ err,
+ )
# Obtain results, waiting if needed
nwrote = wintypes.DWORD()
- if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
- 0 if immediate else self.timeout,
- True):
- log('made write of %d bytes', nwrote.value)
+ if self._get_overlapped_result_ex(
+ self.pipe, olap, nwrote, 0 if immediate else self.timeout, True
+ ):
+ log("made write of %d bytes", nwrote.value)
return nwrote.value
err = GetLastError()
@@ -535,10 +609,21 @@
CancelIoEx(self.pipe, olap)
if err == WAIT_TIMEOUT:
- raise SocketTimeout('timed out after waiting %dms for write' %
- self.timeout)
- self._raise_win_err('error while waiting for write of %d bytes' %
- len(data), err)
+ raise SocketTimeout(
+ "timed out after waiting %dms for write" % self.timeout
+ )
+ self._raise_win_err(
+ "error while waiting for write of %d bytes" % len(data), err
+ )
+
+
+def _default_binpath(binpath=None):
+ if binpath:
+ return binpath
+ # The test harness sets WATCHMAN_BINARY to the binary under test,
+ # so we use that by default, otherwise, allow resolving watchman
+ # from the users PATH.
+ return os.environ.get("WATCHMAN_BINARY", "watchman")
class CLIProcessTransport(Transport):
@@ -560,13 +645,14 @@
It is the responsibility of the caller to set the send and
receive codecs appropriately.
"""
+
proc = None
closed = True
- def __init__(self, sockpath, timeout, watchman_exe):
+ def __init__(self, sockpath, timeout, binpath=None):
self.sockpath = sockpath
self.timeout = timeout
- self.watchman_exe = watchman_exe
+ self.binpath = _default_binpath(binpath)
def close(self):
if self.proc:
@@ -574,32 +660,32 @@
self.proc.kill()
self.proc.stdin.close()
self.proc.stdout.close()
+ self.proc.wait()
self.proc = None
def _connect(self):
if self.proc:
return self.proc
args = [
- self.watchman_exe,
- '--sockname={0}'.format(self.sockpath),
- '--logfile=/BOGUS',
- '--statefile=/BOGUS',
- '--no-spawn',
- '--no-local',
- '--no-pretty',
- '-j',
+ self.binpath,
+ "--sockname={0}".format(self.sockpath),
+ "--logfile=/BOGUS",
+ "--statefile=/BOGUS",
+ "--no-spawn",
+ "--no-local",
+ "--no-pretty",
+ "-j",
]
- self.proc = subprocess.Popen(pycompat.rapply(procutil.tonativestr,
- args),
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE)
+ self.proc = subprocess.Popen(
+ args, stdin=subprocess.PIPE, stdout=subprocess.PIPE
+ )
return self.proc
def readBytes(self, size):
self._connect()
res = self.proc.stdout.read(size)
- if res == '':
- raise WatchmanError('EOF on CLI process transport')
+ if not res:
+ raise WatchmanError("EOF on CLI process transport")
return res
def write(self, data):
@@ -616,13 +702,22 @@
class BserCodec(Codec):
""" use the BSER encoding. This is the default, preferred codec """
+ def __init__(self, transport, value_encoding, value_errors):
+ super(BserCodec, self).__init__(transport)
+ self._value_encoding = value_encoding
+ self._value_errors = value_errors
+
def _loads(self, response):
- return bser.loads(response) # Defaults to BSER v1
+ return bser.loads(
+ response,
+ value_encoding=self._value_encoding,
+ value_errors=self._value_errors,
+ )
def receive(self):
buf = [self.transport.readBytes(sniff_len)]
if not buf[0]:
- raise WatchmanError('empty watchman response')
+ raise WatchmanError("empty watchman response")
_1, _2, elen = bser.pdu_info(buf[0])
@@ -631,15 +726,15 @@
buf.append(self.transport.readBytes(elen - rlen))
rlen += len(buf[-1])
- response = b''.join(buf)
+ response = b"".join(buf)
try:
res = self._loads(response)
return res
except ValueError as e:
- raise WatchmanError('watchman response decode error: %s' % e)
+ raise WatchmanError("watchman response decode error: %s" % e)
def send(self, *args):
- cmd = bser.dumps(*args) # Defaults to BSER v1
+ cmd = bser.dumps(*args) # Defaults to BSER v1
self.transport.write(cmd)
@@ -648,74 +743,96 @@
immutable object support """
def _loads(self, response):
- return bser.loads(response, False) # Defaults to BSER v1
+ return bser.loads(
+ response,
+ False,
+ value_encoding=self._value_encoding,
+ value_errors=self._value_errors,
+ )
class Bser2WithFallbackCodec(BserCodec):
""" use BSER v2 encoding """
- def __init__(self, transport):
- super(Bser2WithFallbackCodec, self).__init__(transport)
- # Once the server advertises support for bser-v2 we should switch this
- # to 'required' on Python 3.
- self.send(["version", {"optional": ["bser-v2"]}])
+ def __init__(self, transport, value_encoding, value_errors):
+ super(Bser2WithFallbackCodec, self).__init__(
+ transport, value_encoding, value_errors
+ )
+ if compat.PYTHON3:
+ bserv2_key = "required"
+ else:
+ bserv2_key = "optional"
+
+ self.send(["version", {bserv2_key: ["bser-v2"]}])
capabilities = self.receive()
- if 'error' in capabilities:
- raise Exception('Unsupported BSER version')
+ if "error" in capabilities:
+ raise BSERv1Unsupported(
+ "The watchman server version does not support Python 3. Please "
+ "upgrade your watchman server."
+ )
- if capabilities['capabilities']['bser-v2']:
+ if capabilities["capabilities"]["bser-v2"]:
self.bser_version = 2
self.bser_capabilities = 0
else:
self.bser_version = 1
self.bser_capabilities = 0
- def _loads(self, response):
- return bser.loads(response)
-
def receive(self):
buf = [self.transport.readBytes(sniff_len)]
if not buf[0]:
- raise WatchmanError('empty watchman response')
+ raise WatchmanError("empty watchman response")
recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
- if hasattr(self, 'bser_version'):
- # Readjust BSER version and capabilities if necessary
- self.bser_version = max(self.bser_version, recv_bser_version)
- self.capabilities = self.bser_capabilities & recv_bser_capabilities
+ if hasattr(self, "bser_version"):
+ # Readjust BSER version and capabilities if necessary
+ self.bser_version = max(self.bser_version, recv_bser_version)
+ self.capabilities = self.bser_capabilities & recv_bser_capabilities
rlen = len(buf[0])
while elen > rlen:
buf.append(self.transport.readBytes(elen - rlen))
rlen += len(buf[-1])
- response = b''.join(buf)
+ response = b"".join(buf)
try:
res = self._loads(response)
return res
except ValueError as e:
- raise WatchmanError('watchman response decode error: %s' % e)
+ raise WatchmanError("watchman response decode error: %s" % e)
def send(self, *args):
- if hasattr(self, 'bser_version'):
- cmd = bser.dumps(*args, version=self.bser_version,
- capabilities=self.bser_capabilities)
+ if hasattr(self, "bser_version"):
+ cmd = bser.dumps(
+ *args,
+ version=self.bser_version,
+ capabilities=self.bser_capabilities
+ )
else:
cmd = bser.dumps(*args)
self.transport.write(cmd)
+class ImmutableBser2Codec(Bser2WithFallbackCodec, ImmutableBserCodec):
+ """ use the BSER encoding, decoding values using the newer
+ immutable object support """
+
+ pass
+
+
class JsonCodec(Codec):
""" Use json codec. This is here primarily for testing purposes """
+
json = None
def __init__(self, transport):
super(JsonCodec, self).__init__(transport)
# optional dep on json, only if JsonCodec is used
import json
+
self.json = json
def receive(self):
@@ -727,7 +844,7 @@
# but it's possible we might get non-ASCII bytes that are valid
# UTF-8.
if compat.PYTHON3:
- line = line.decode('utf-8')
+ line = line.decode("utf-8")
return self.json.loads(line)
except Exception as e:
print(e, line)
@@ -739,12 +856,13 @@
# containing Unicode strings to Unicode string. Even with (the default)
# ensure_ascii=True, dumps returns a Unicode string.
if compat.PYTHON3:
- cmd = cmd.encode('ascii')
+ cmd = cmd.encode("ascii")
self.transport.write(cmd + b"\n")
class client(object):
""" Handles the communication with the watchman service """
+
sockpath = None
transport = None
sendCodec = None
@@ -754,60 +872,100 @@
subs = {} # Keyed by subscription name
sub_by_root = {} # Keyed by root, then by subscription name
logs = [] # When log level is raised
- unilateral = ['log', 'subscription']
+ unilateral = ["log", "subscription"]
tport = None
useImmutableBser = None
- watchman_exe = None
+ pid = None
- def __init__(self,
- sockpath=None,
- timeout=1.0,
- transport=None,
- sendEncoding=None,
- recvEncoding=None,
- useImmutableBser=False,
- watchman_exe=None):
+ def __init__(
+ self,
+ sockpath=None,
+ timeout=1.0,
+ transport=None,
+ sendEncoding=None,
+ recvEncoding=None,
+ useImmutableBser=False,
+ # use False for these two because None has a special
+ # meaning
+ valueEncoding=False,
+ valueErrors=False,
+ binpath=None,
+ ):
self.sockpath = sockpath
self.timeout = timeout
self.useImmutableBser = useImmutableBser
- self.watchman_exe = watchman_exe
+ self.binpath = _default_binpath(binpath)
if inspect.isclass(transport) and issubclass(transport, Transport):
self.transport = transport
else:
- transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
- if transport == 'local' and os.name == 'nt':
+ transport = transport or os.getenv("WATCHMAN_TRANSPORT") or "local"
+ if transport == "local" and os.name == "nt":
self.transport = WindowsNamedPipeTransport
- elif transport == 'local':
+ elif transport == "local":
self.transport = UnixSocketTransport
- elif transport == 'cli':
+ elif transport == "cli":
self.transport = CLIProcessTransport
if sendEncoding is None:
- sendEncoding = 'json'
+ sendEncoding = "json"
if recvEncoding is None:
recvEncoding = sendEncoding
else:
- raise WatchmanError('invalid transport %s' % transport)
+ raise WatchmanError("invalid transport %s" % transport)
- sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
- 'bser')
- recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
- 'bser')
+ sendEncoding = str(
+ sendEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
+ )
+ recvEncoding = str(
+ recvEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
+ )
self.recvCodec = self._parseEncoding(recvEncoding)
self.sendCodec = self._parseEncoding(sendEncoding)
+ # We want to act like the native OS methods as much as possible. This
+ # means returning bytestrings on Python 2 by default and Unicode
+ # strings on Python 3. However we take an optional argument that lets
+ # users override this.
+ if valueEncoding is False:
+ if compat.PYTHON3:
+ self.valueEncoding = encoding.get_local_encoding()
+ self.valueErrors = encoding.default_local_errors
+ else:
+ self.valueEncoding = None
+ self.valueErrors = None
+ else:
+ self.valueEncoding = valueEncoding
+ if valueErrors is False:
+ self.valueErrors = encoding.default_local_errors
+ else:
+ self.valueErrors = valueErrors
+
+ def _makeBSERCodec(self, codec):
+ def make_codec(transport):
+ return codec(transport, self.valueEncoding, self.valueErrors)
+
+ return make_codec
+
def _parseEncoding(self, enc):
- if enc == 'bser':
+ if enc == "bser":
if self.useImmutableBser:
- return ImmutableBserCodec
- return BserCodec
- elif enc == 'experimental-bser-v2':
- return Bser2WithFallbackCodec
- elif enc == 'json':
+ return self._makeBSERCodec(ImmutableBser2Codec)
+ return self._makeBSERCodec(Bser2WithFallbackCodec)
+ elif enc == "bser-v1":
+ if compat.PYTHON3:
+ raise BSERv1Unsupported(
+ "Python 3 does not support the BSER v1 encoding: specify "
+ '"bser" or omit the sendEncoding and recvEncoding '
+ "arguments"
+ )
+ if self.useImmutableBser:
+ return self._makeBSERCodec(ImmutableBserCodec)
+ return self._makeBSERCodec(BserCodec)
+ elif enc == "json":
return JsonCodec
else:
- raise WatchmanError('invalid encoding %s' % enc)
+ raise WatchmanError("invalid encoding %s" % enc)
def _hasprop(self, result, name):
if self.useImmutableBser:
@@ -817,29 +975,28 @@
def _resolvesockname(self):
# if invoked via a trigger, watchman will set this env var; we
# should use it unless explicitly set otherwise
- path = os.getenv('WATCHMAN_SOCK')
+ path = os.getenv("WATCHMAN_SOCK")
if path:
return path
- cmd = [self.watchman_exe, '--output-encoding=bser', 'get-sockname']
+ cmd = [self.binpath, "--output-encoding=bser", "get-sockname"]
try:
- args = dict(stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- close_fds=os.name != 'nt')
+ args = dict(
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ ) # noqa: C408
- if os.name == 'nt':
+ if os.name == "nt":
# if invoked via an application with graphical user interface,
# this call will cause a brief command window pop-up.
# Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- args['startupinfo'] = startupinfo
+ args["startupinfo"] = startupinfo
- p = subprocess.Popen(pycompat.rapply(procutil.tonativestr, cmd),
- **args)
+ p = subprocess.Popen(cmd, **args)
except OSError as e:
- raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
+ raise WatchmanError('"watchman" executable not in PATH (%s)', e)
stdout, stderr = p.communicate()
exitcode = p.poll()
@@ -848,27 +1005,43 @@
raise WatchmanError("watchman exited with code %d" % exitcode)
result = bser.loads(stdout)
- if b'error' in result:
- raise WatchmanError('get-sockname error: %s' % result['error'])
+ if "error" in result:
+ raise WatchmanError("get-sockname error: %s" % result["error"])
- return result[b'sockname']
+ return result["sockname"]
def _connect(self):
""" establish transport connection """
if self.recvConn:
+ if self.pid != os.getpid():
+ raise UseAfterFork(
+ "do not re-use a connection after fork; open a new client instead"
+ )
return
if self.sockpath is None:
self.sockpath = self._resolvesockname()
- self.tport = self.transport(self.sockpath, self.timeout, self.watchman_exe)
+ kwargs = {}
+ if self.transport == CLIProcessTransport:
+ kwargs["binpath"] = self.binpath
+
+ self.tport = self.transport(self.sockpath, self.timeout, **kwargs)
self.sendConn = self.sendCodec(self.tport)
self.recvConn = self.recvCodec(self.tport)
+ self.pid = os.getpid()
def __del__(self):
self.close()
+ def __enter__(self):
+ self._connect()
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_traceback):
+ self.close()
+
def close(self):
if self.tport:
self.tport.close()
@@ -893,26 +1066,20 @@
self._connect()
result = self.recvConn.receive()
- if self._hasprop(result, 'error'):
- error = result['error']
- if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
- error = result['error'].decode('utf-8', 'surrogateescape')
- raise CommandError(error)
+ if self._hasprop(result, "error"):
+ raise CommandError(result["error"])
- if self._hasprop(result, 'log'):
- log = result['log']
- if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
- log = log.decode('utf-8', 'surrogateescape')
- self.logs.append(log)
+ if self._hasprop(result, "log"):
+ self.logs.append(result["log"])
- if self._hasprop(result, 'subscription'):
- sub = result['subscription']
+ if self._hasprop(result, "subscription"):
+ sub = result["subscription"]
if not (sub in self.subs):
self.subs[sub] = []
self.subs[sub].append(result)
# also accumulate in {root,sub} keyed store
- root = os.path.normcase(result['root'])
+ root = os.path.normpath(os.path.normcase(result["root"]))
if not root in self.sub_by_root:
self.sub_by_root[root] = {}
if not sub in self.sub_by_root[root]:
@@ -922,7 +1089,7 @@
return result
def isUnilateralResponse(self, res):
- if 'unilateral' in res and res['unilateral']:
+ if "unilateral" in res and res["unilateral"]:
return True
# Fall back to checking for known unilateral responses
for k in self.unilateral:
@@ -955,18 +1122,11 @@
remove processing impacts both the unscoped and scoped stores
for the subscription data.
"""
- if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
- # People may pass in Unicode strings here -- but currently BSER only
- # returns bytestrings. Deal with that.
- if isinstance(root, str):
- root = encoding.encode_local(root)
- if isinstance(name, str):
- name = name.encode('utf-8')
-
if root is not None:
- if not root in self.sub_by_root:
+ root = os.path.normpath(os.path.normcase(root))
+ if root not in self.sub_by_root:
return None
- if not name in self.sub_by_root[root]:
+ if name not in self.sub_by_root[root]:
return None
sub = self.sub_by_root[root][name]
if remove:
@@ -976,7 +1136,7 @@
del self.subs[name]
return sub
- if not (name in self.subs):
+ if name not in self.subs:
return None
sub = self.subs[name]
if remove:
@@ -992,7 +1152,7 @@
and NOT returned via this method.
"""
- log('calling client.query')
+ log("calling client.query")
self._connect()
try:
self.sendConn.send(args)
@@ -1006,27 +1166,27 @@
# When we can depend on Python 3, we can use PEP 3134
# exception chaining here.
raise WatchmanEnvironmentError(
- 'I/O error communicating with watchman daemon',
+ "I/O error communicating with watchman daemon",
ee.errno,
ee.strerror,
- args)
+ args,
+ )
except WatchmanError as ex:
ex.setCommand(args)
raise
def capabilityCheck(self, optional=None, required=None):
""" Perform a server capability check """
- res = self.query('version', {
- 'optional': optional or [],
- 'required': required or []
- })
+ res = self.query(
+ "version", {"optional": optional or [], "required": required or []}
+ )
- if not self._hasprop(res, 'capabilities'):
+ if not self._hasprop(res, "capabilities"):
# Server doesn't support capabilities, so we need to
# synthesize the results based on the version
capabilities.synthesize(res, optional)
- if 'error' in res:
- raise CommandError(res['error'])
+ if "error" in res:
+ raise CommandError(res["error"])
return res