hgext/fsmonitor/pywatchman/__init__.py
changeset 30656 16f4b341288d
parent 28432 2377c4ac4eec
child 33769 dd35abc409ee
equal deleted inserted replaced
30655:f35397fe0c04 30656:16f4b341288d
    24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
    24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
    25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
    25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
    26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
    26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
    27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    28 
    28 
       
    29 from __future__ import absolute_import
       
    30 from __future__ import division
       
    31 from __future__ import print_function
       
    32 # no unicode literals
       
    33 
       
    34 import inspect
       
    35 import math
    29 import os
    36 import os
    30 import errno
       
    31 import math
       
    32 import socket
    37 import socket
    33 import subprocess
    38 import subprocess
    34 import time
    39 import time
    35 
    40 
    36 # Sometimes it's really hard to get Python extensions to compile,
    41 # Sometimes it's really hard to get Python extensions to compile,
    37 # so fall back to a pure Python implementation.
    42 # so fall back to a pure Python implementation.
    38 try:
    43 try:
    39     import bser
    44     from . import bser
       
    45     # Demandimport causes modules to be loaded lazily. Force the load now
       
    46     # so that we can fall back on pybser if bser doesn't exist
       
    47     bser.pdu_info
    40 except ImportError:
    48 except ImportError:
    41     import pybser as bser
    49     from . import pybser as bser
    42 
    50 
    43 import capabilities
    51 from . import (
       
    52     capabilities,
       
    53     compat,
       
    54     encoding,
       
    55     load,
       
    56 )
       
    57 
    44 
    58 
    45 if os.name == 'nt':
    59 if os.name == 'nt':
    46     import ctypes
    60     import ctypes
    47     import ctypes.wintypes
    61     import ctypes.wintypes
    48 
    62 
    53     OPEN_EXISTING = 3
    67     OPEN_EXISTING = 3
    54     INVALID_HANDLE_VALUE = -1
    68     INVALID_HANDLE_VALUE = -1
    55     FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
    69     FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
    56     FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
    70     FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
    57     FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
    71     FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
       
    72     WAIT_FAILED = 0xFFFFFFFF
    58     WAIT_TIMEOUT = 0x00000102
    73     WAIT_TIMEOUT = 0x00000102
    59     WAIT_OBJECT_0 = 0x00000000
    74     WAIT_OBJECT_0 = 0x00000000
    60     ERROR_IO_PENDING = 997
    75     WAIT_IO_COMPLETION = 0x000000C0
       
    76     INFINITE = 0xFFFFFFFF
       
    77 
       
    78     # Overlapped I/O operation is in progress. (997)
       
    79     ERROR_IO_PENDING = 0x000003E5
       
    80 
       
    81     # The pointer size follows the architecture
       
    82     # We use WPARAM since this type is already conditionally defined
       
    83     ULONG_PTR = ctypes.wintypes.WPARAM
    61 
    84 
    62     class OVERLAPPED(ctypes.Structure):
    85     class OVERLAPPED(ctypes.Structure):
    63         _fields_ = [
    86         _fields_ = [
    64             ("Internal", wintypes.ULONG), ("InternalHigh", wintypes.ULONG),
    87             ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
    65             ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
    88             ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
    66             ("hEvent", wintypes.HANDLE)
    89             ("hEvent", wintypes.HANDLE)
    67         ]
    90         ]
    68 
    91 
    69         def __init__(self):
    92         def __init__(self):
       
    93             self.Internal = 0
       
    94             self.InternalHigh = 0
    70             self.Offset = 0
    95             self.Offset = 0
    71             self.OffsetHigh = 0
    96             self.OffsetHigh = 0
    72             self.hEvent = 0
    97             self.hEvent = 0
    73 
    98 
    74     LPDWORD = ctypes.POINTER(wintypes.DWORD)
    99     LPDWORD = ctypes.POINTER(wintypes.DWORD)
    95 
   120 
    96     GetLastError = ctypes.windll.kernel32.GetLastError
   121     GetLastError = ctypes.windll.kernel32.GetLastError
    97     GetLastError.argtypes = []
   122     GetLastError.argtypes = []
    98     GetLastError.restype = wintypes.DWORD
   123     GetLastError.restype = wintypes.DWORD
    99 
   124 
       
   125     SetLastError = ctypes.windll.kernel32.SetLastError
       
   126     SetLastError.argtypes = [wintypes.DWORD]
       
   127     SetLastError.restype = None
       
   128 
   100     FormatMessage = ctypes.windll.kernel32.FormatMessageA
   129     FormatMessage = ctypes.windll.kernel32.FormatMessageA
   101     FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
   130     FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
   102                               wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
   131                               wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
   103                               wintypes.DWORD, wintypes.LPVOID]
   132                               wintypes.DWORD, wintypes.LPVOID]
   104     FormatMessage.restype = wintypes.DWORD
   133     FormatMessage.restype = wintypes.DWORD
   105 
   134 
   106     LocalFree = ctypes.windll.kernel32.LocalFree
   135     LocalFree = ctypes.windll.kernel32.LocalFree
   107 
   136 
   108     GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx
   137     GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
   109     GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
   138     GetOverlappedResult.argtypes = [wintypes.HANDLE,
   110                                       ctypes.POINTER(OVERLAPPED), LPDWORD,
   139                                     ctypes.POINTER(OVERLAPPED), LPDWORD,
   111                                       wintypes.DWORD, wintypes.BOOL]
   140                                     wintypes.BOOL]
   112     GetOverlappedResultEx.restype = wintypes.BOOL
   141     GetOverlappedResult.restype = wintypes.BOOL
   113 
   142 
       
   143     GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
       
   144                                     'GetOverlappedResultEx', None)
       
   145     if GetOverlappedResultEx is not None:
       
   146         GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
       
   147                                           ctypes.POINTER(OVERLAPPED), LPDWORD,
       
   148                                           wintypes.DWORD, wintypes.BOOL]
       
   149         GetOverlappedResultEx.restype = wintypes.BOOL
       
   150 
       
   151     WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
       
   152     WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
       
   153     WaitForSingleObjectEx.restype = wintypes.DWORD
       
   154 
       
   155     CreateEvent = ctypes.windll.kernel32.CreateEventA
       
   156     CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
       
   157                             wintypes.LPSTR]
       
   158     CreateEvent.restype = wintypes.HANDLE
       
   159 
       
   160     # Windows Vista is the minimum supported client for CancelIoEx.
   114     CancelIoEx = ctypes.windll.kernel32.CancelIoEx
   161     CancelIoEx = ctypes.windll.kernel32.CancelIoEx
   115     CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
   162     CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
   116     CancelIoEx.restype = wintypes.BOOL
   163     CancelIoEx.restype = wintypes.BOOL
   117 
   164 
   118 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
   165 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
   130 
   177 
   131     def log(fmt, *args):
   178     def log(fmt, *args):
   132         pass
   179         pass
   133 
   180 
   134 
   181 
       
   182 def _win32_strerror(err):
       
   183     """ expand a win32 error code into a human readable message """
       
   184 
       
   185     # FormatMessage will allocate memory and assign it here
       
   186     buf = ctypes.c_char_p()
       
   187     FormatMessage(
       
   188         FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
       
   189         | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
       
   190     try:
       
   191         return buf.value
       
   192     finally:
       
   193         LocalFree(buf)
       
   194 
       
   195 
   135 class WatchmanError(Exception):
   196 class WatchmanError(Exception):
   136     pass
   197     def __init__(self, msg=None, cmd=None):
       
   198         self.msg = msg
       
   199         self.cmd = cmd
       
   200 
       
   201     def setCommand(self, cmd):
       
   202         self.cmd = cmd
       
   203 
       
   204     def __str__(self):
       
   205         if self.cmd:
       
   206             return '%s, while executing %s' % (self.msg, self.cmd)
       
   207         return self.msg
       
   208 
       
   209 
       
   210 class WatchmanEnvironmentError(WatchmanError):
       
   211     def __init__(self, msg, errno, errmsg, cmd=None):
       
   212         super(WatchmanEnvironmentError, self).__init__(
       
   213             '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
       
   214             cmd)
       
   215 
       
   216 
       
   217 class SocketConnectError(WatchmanError):
       
   218     def __init__(self, sockpath, exc):
       
   219         super(SocketConnectError, self).__init__(
       
   220             'unable to connect to %s: %s' % (sockpath, exc))
       
   221         self.sockpath = sockpath
       
   222         self.exc = exc
   137 
   223 
   138 
   224 
   139 class SocketTimeout(WatchmanError):
   225 class SocketTimeout(WatchmanError):
   140     """A specialized exception raised for socket timeouts during communication to/from watchman.
   226     """A specialized exception raised for socket timeouts during communication to/from watchman.
   141        This makes it easier to implement non-blocking loops as callers can easily distinguish
   227        This makes it easier to implement non-blocking loops as callers can easily distinguish
   149 class CommandError(WatchmanError):
   235 class CommandError(WatchmanError):
   150     """error returned by watchman
   236     """error returned by watchman
   151 
   237 
   152     self.msg is the message returned by watchman.
   238     self.msg is the message returned by watchman.
   153     """
   239     """
   154 
       
   155     def __init__(self, msg, cmd=None):
   240     def __init__(self, msg, cmd=None):
   156         self.msg = msg
   241         super(CommandError, self).__init__(
   157         self.cmd = cmd
   242             'watchman command error: %s' % (msg, ),
   158         super(CommandError, self).__init__('watchman command error: %s' % msg)
   243             cmd,
   159 
   244         )
   160     def setCommand(self, cmd):
       
   161         self.cmd = cmd
       
   162 
       
   163     def __str__(self):
       
   164         if self.cmd:
       
   165             return '%s, while executing %s' % (self.msg, self.cmd)
       
   166         return self.msg
       
   167 
   245 
   168 
   246 
   169 class Transport(object):
   247 class Transport(object):
   170     """ communication transport to the watchman server """
   248     """ communication transport to the watchman server """
   171     buf = None
   249     buf = None
   193         if self.buf is None:
   271         if self.buf is None:
   194             self.buf = []
   272             self.buf = []
   195 
   273 
   196         # Buffer may already have a line if we've received unilateral
   274         # Buffer may already have a line if we've received unilateral
   197         # response(s) from the server
   275         # response(s) from the server
   198         if len(self.buf) == 1 and "\n" in self.buf[0]:
   276         if len(self.buf) == 1 and b"\n" in self.buf[0]:
   199             (line, b) = self.buf[0].split("\n", 1)
   277             (line, b) = self.buf[0].split(b"\n", 1)
   200             self.buf = [b]
   278             self.buf = [b]
   201             return line
   279             return line
   202 
   280 
   203         while True:
   281         while True:
   204             b = self.readBytes(4096)
   282             b = self.readBytes(4096)
   205             if "\n" in b:
   283             if b"\n" in b:
   206                 result = ''.join(self.buf)
   284                 result = b''.join(self.buf)
   207                 (line, b) = b.split("\n", 1)
   285                 (line, b) = b.split(b"\n", 1)
   208                 self.buf = [b]
   286                 self.buf = [b]
   209                 return result + line
   287                 return result + line
   210             self.buf.append(b)
   288             self.buf.append(b)
   211 
   289 
   212 
   290 
   239         try:
   317         try:
   240             sock.settimeout(self.timeout)
   318             sock.settimeout(self.timeout)
   241             sock.connect(self.sockpath)
   319             sock.connect(self.sockpath)
   242             self.sock = sock
   320             self.sock = sock
   243         except socket.error as e:
   321         except socket.error as e:
   244             raise WatchmanError('unable to connect to %s: %s' %
   322             sock.close()
   245                                 (self.sockpath, e))
   323             raise SocketConnectError(self.sockpath, e)
   246 
   324 
   247     def close(self):
   325     def close(self):
   248         self.sock.close()
   326         self.sock.close()
   249         self.sock = None
   327         self.sock = None
   250 
   328 
   266             self.sock.sendall(data)
   344             self.sock.sendall(data)
   267         except socket.timeout:
   345         except socket.timeout:
   268             raise SocketTimeout('timed out sending query command')
   346             raise SocketTimeout('timed out sending query command')
   269 
   347 
   270 
   348 
       
   349 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
       
   350     """ Windows 7 and earlier does not support GetOverlappedResultEx. The
       
   351     alternative is to use GetOverlappedResult and wait for read or write
       
   352     operation to complete. This is done be using CreateEvent and
       
   353     WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
       
   354     and GetOverlappedResult are all part of Windows API since WindowsXP.
       
   355     This is the exact same implementation that can be found in the watchman
       
   356     source code (see get_overlapped_result_ex_impl in stream_win.c). This
       
   357     way, maintenance should be simplified.
       
   358     """
       
   359     log('Preparing to wait for maximum %dms', millis )
       
   360     if millis != 0:
       
   361         waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
       
   362         if waitReturnCode == WAIT_OBJECT_0:
       
   363             # Event is signaled, overlapped IO operation result should be available.
       
   364             pass
       
   365         elif waitReturnCode == WAIT_IO_COMPLETION:
       
   366             # WaitForSingleObjectEx returnes because the system added an I/O completion
       
   367             # routine or an asynchronous procedure call (APC) to the thread queue.
       
   368             SetLastError(WAIT_IO_COMPLETION)
       
   369             pass
       
   370         elif waitReturnCode == WAIT_TIMEOUT:
       
   371             # We reached the maximum allowed wait time, the IO operation failed
       
   372             # to complete in timely fashion.
       
   373             SetLastError(WAIT_TIMEOUT)
       
   374             return False
       
   375         elif waitReturnCode == WAIT_FAILED:
       
   376             # something went wrong calling WaitForSingleObjectEx
       
   377             err = GetLastError()
       
   378             log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
       
   379             return False
       
   380         else:
       
   381             # unexpected situation deserving investigation.
       
   382             err = GetLastError()
       
   383             log('Unexpected error: %s', _win32_strerror(err))
       
   384             return False
       
   385 
       
   386     return GetOverlappedResult(pipe, olap, nbytes, False)
       
   387 
       
   388 
   271 class WindowsNamedPipeTransport(Transport):
   389 class WindowsNamedPipeTransport(Transport):
   272     """ connect to a named pipe """
   390     """ connect to a named pipe """
   273 
   391 
   274     def __init__(self, sockpath, timeout):
   392     def __init__(self, sockpath, timeout):
   275         self.sockpath = sockpath
   393         self.sockpath = sockpath
   282         if self.pipe == INVALID_HANDLE_VALUE:
   400         if self.pipe == INVALID_HANDLE_VALUE:
   283             self.pipe = None
   401             self.pipe = None
   284             self._raise_win_err('failed to open pipe %s' % sockpath,
   402             self._raise_win_err('failed to open pipe %s' % sockpath,
   285                                 GetLastError())
   403                                 GetLastError())
   286 
   404 
   287     def _win32_strerror(self, err):
   405         # event for the overlapped I/O operations
   288         """ expand a win32 error code into a human readable message """
   406         self._waitable = CreateEvent(None, True, False, None)
   289 
   407         if self._waitable is None:
   290         # FormatMessage will allocate memory and assign it here
   408             self._raise_win_err('CreateEvent failed', GetLastError())
   291         buf = ctypes.c_char_p()
   409 
   292         FormatMessage(
   410         self._get_overlapped_result_ex = GetOverlappedResultEx
   293             FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
   411         if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
   294             | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
   412             self._get_overlapped_result_ex is None):
   295         try:
   413             self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
   296             return buf.value
       
   297         finally:
       
   298             LocalFree(buf)
       
   299 
   414 
   300     def _raise_win_err(self, msg, err):
   415     def _raise_win_err(self, msg, err):
   301         raise IOError('%s win32 error code: %d %s' %
   416         raise IOError('%s win32 error code: %d %s' %
   302                       (msg, err, self._win32_strerror(err)))
   417                       (msg, err, _win32_strerror(err)))
   303 
   418 
   304     def close(self):
   419     def close(self):
   305         if self.pipe:
   420         if self.pipe:
       
   421             log('Closing pipe')
   306             CloseHandle(self.pipe)
   422             CloseHandle(self.pipe)
   307         self.pipe = None
   423         self.pipe = None
       
   424 
       
   425         if self._waitable is not None:
       
   426             # We release the handle for the event
       
   427             CloseHandle(self._waitable)
       
   428         self._waitable = None
       
   429 
       
   430     def setTimeout(self, value):
       
   431         # convert to milliseconds
       
   432         self.timeout = int(value * 1000)
   308 
   433 
   309     def readBytes(self, size):
   434     def readBytes(self, size):
   310         """ A read can block for an unbounded amount of time, even if the
   435         """ A read can block for an unbounded amount of time, even if the
   311             kernel reports that the pipe handle is signalled, so we need to
   436             kernel reports that the pipe handle is signalled, so we need to
   312             always perform our reads asynchronously
   437             always perform our reads asynchronously
   323             return res
   448             return res
   324 
   449 
   325         # We need to initiate a read
   450         # We need to initiate a read
   326         buf = ctypes.create_string_buffer(size)
   451         buf = ctypes.create_string_buffer(size)
   327         olap = OVERLAPPED()
   452         olap = OVERLAPPED()
       
   453         olap.hEvent = self._waitable
   328 
   454 
   329         log('made read buff of size %d', size)
   455         log('made read buff of size %d', size)
   330 
   456 
   331         # ReadFile docs warn against sending in the nread parameter for async
   457         # ReadFile docs warn against sending in the nread parameter for async
   332         # operations, so we always collect it via GetOverlappedResultEx
   458         # operations, so we always collect it via GetOverlappedResultEx
   337             if err != ERROR_IO_PENDING:
   463             if err != ERROR_IO_PENDING:
   338                 self._raise_win_err('failed to read %d bytes' % size,
   464                 self._raise_win_err('failed to read %d bytes' % size,
   339                                     GetLastError())
   465                                     GetLastError())
   340 
   466 
   341         nread = wintypes.DWORD()
   467         nread = wintypes.DWORD()
   342         if not GetOverlappedResultEx(self.pipe, olap, nread,
   468         if not self._get_overlapped_result_ex(self.pipe, olap, nread,
   343                                      0 if immediate else self.timeout, True):
   469                                               0 if immediate else self.timeout,
       
   470                                               True):
   344             err = GetLastError()
   471             err = GetLastError()
   345             CancelIoEx(self.pipe, olap)
   472             CancelIoEx(self.pipe, olap)
   346 
   473 
   347             if err == WAIT_TIMEOUT:
   474             if err == WAIT_TIMEOUT:
   348                 log('GetOverlappedResultEx timedout')
   475                 log('GetOverlappedResultEx timedout')
   372         self._iobuf = buf[returned_size:]
   499         self._iobuf = buf[returned_size:]
   373         return buf[:returned_size]
   500         return buf[:returned_size]
   374 
   501 
   375     def write(self, data):
   502     def write(self, data):
   376         olap = OVERLAPPED()
   503         olap = OVERLAPPED()
       
   504         olap.hEvent = self._waitable
       
   505 
   377         immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
   506         immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
   378                               None, olap)
   507                               None, olap)
   379 
   508 
   380         if not immediate:
   509         if not immediate:
   381             err = GetLastError()
   510             err = GetLastError()
   383                 self._raise_win_err('failed to write %d bytes' % len(data),
   512                 self._raise_win_err('failed to write %d bytes' % len(data),
   384                                     GetLastError())
   513                                     GetLastError())
   385 
   514 
   386         # Obtain results, waiting if needed
   515         # Obtain results, waiting if needed
   387         nwrote = wintypes.DWORD()
   516         nwrote = wintypes.DWORD()
   388         if GetOverlappedResultEx(self.pipe, olap, nwrote, 0 if immediate else
   517         if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
   389                                  self.timeout, True):
   518                                           0 if immediate else self.timeout,
       
   519                                           True):
       
   520             log('made write of %d bytes', nwrote.value)
   390             return nwrote.value
   521             return nwrote.value
   391 
   522 
   392         err = GetLastError()
   523         err = GetLastError()
   393 
   524 
   394         # It's potentially unsafe to allow the write to continue after
   525         # It's potentially unsafe to allow the write to continue after
   428         self.sockpath = sockpath
   559         self.sockpath = sockpath
   429         self.timeout = timeout
   560         self.timeout = timeout
   430 
   561 
   431     def close(self):
   562     def close(self):
   432         if self.proc:
   563         if self.proc:
   433             self.proc.kill()
   564             if self.proc.pid is not None:
       
   565                 self.proc.kill()
       
   566             self.proc.stdin.close()
       
   567             self.proc.stdout.close()
   434             self.proc = None
   568             self.proc = None
   435 
   569 
   436     def _connect(self):
   570     def _connect(self):
   437         if self.proc:
   571         if self.proc:
   438             return self.proc
   572             return self.proc
   439         args = [
   573         args = [
   440             'watchman',
   574             'watchman',
   441             '--sockname={}'.format(self.sockpath),
   575             '--sockname={0}'.format(self.sockpath),
   442             '--logfile=/BOGUS',
   576             '--logfile=/BOGUS',
   443             '--statefile=/BOGUS',
   577             '--statefile=/BOGUS',
   444             '--no-spawn',
   578             '--no-spawn',
   445             '--no-local',
   579             '--no-local',
   446             '--no-pretty',
   580             '--no-pretty',
   458             raise WatchmanError('EOF on CLI process transport')
   592             raise WatchmanError('EOF on CLI process transport')
   459         return res
   593         return res
   460 
   594 
   461     def write(self, data):
   595     def write(self, data):
   462         if self.closed:
   596         if self.closed:
       
   597             self.close()
   463             self.closed = False
   598             self.closed = False
   464             self.proc = None
       
   465         self._connect()
   599         self._connect()
   466         res = self.proc.stdin.write(data)
   600         res = self.proc.stdin.write(data)
   467         self.proc.stdin.close()
   601         self.proc.stdin.close()
   468         self.closed = True
   602         self.closed = True
   469         return res
   603         return res
   471 
   605 
   472 class BserCodec(Codec):
   606 class BserCodec(Codec):
   473     """ use the BSER encoding.  This is the default, preferred codec """
   607     """ use the BSER encoding.  This is the default, preferred codec """
   474 
   608 
   475     def _loads(self, response):
   609     def _loads(self, response):
   476         return bser.loads(response)
   610         return bser.loads(response) # Defaults to BSER v1
   477 
   611 
   478     def receive(self):
   612     def receive(self):
   479         buf = [self.transport.readBytes(sniff_len)]
   613         buf = [self.transport.readBytes(sniff_len)]
   480         if not buf[0]:
   614         if not buf[0]:
   481             raise WatchmanError('empty watchman response')
   615             raise WatchmanError('empty watchman response')
   482 
   616 
   483         elen = bser.pdu_len(buf[0])
   617         _1, _2, elen = bser.pdu_info(buf[0])
   484 
   618 
   485         rlen = len(buf[0])
   619         rlen = len(buf[0])
   486         while elen > rlen:
   620         while elen > rlen:
   487             buf.append(self.transport.readBytes(elen - rlen))
   621             buf.append(self.transport.readBytes(elen - rlen))
   488             rlen += len(buf[-1])
   622             rlen += len(buf[-1])
   489 
   623 
   490         response = ''.join(buf)
   624         response = b''.join(buf)
   491         try:
   625         try:
   492             res = self._loads(response)
   626             res = self._loads(response)
   493             return res
   627             return res
   494         except ValueError as e:
   628         except ValueError as e:
   495             raise WatchmanError('watchman response decode error: %s' % e)
   629             raise WatchmanError('watchman response decode error: %s' % e)
   496 
   630 
   497     def send(self, *args):
   631     def send(self, *args):
   498         cmd = bser.dumps(*args)
   632         cmd = bser.dumps(*args) # Defaults to BSER v1
   499         self.transport.write(cmd)
   633         self.transport.write(cmd)
   500 
   634 
   501 
   635 
   502 class ImmutableBserCodec(BserCodec):
   636 class ImmutableBserCodec(BserCodec):
   503     """ use the BSER encoding, decoding values using the newer
   637     """ use the BSER encoding, decoding values using the newer
   504         immutable object support """
   638         immutable object support """
   505 
   639 
   506     def _loads(self, response):
   640     def _loads(self, response):
   507         return bser.loads(response, False)
   641         return bser.loads(response, False) # Defaults to BSER v1
       
   642 
       
   643 
       
   644 class Bser2WithFallbackCodec(BserCodec):
       
   645     """ use BSER v2 encoding """
       
   646 
       
   647     def __init__(self, transport):
       
   648         super(Bser2WithFallbackCodec, self).__init__(transport)
       
   649         # Once the server advertises support for bser-v2 we should switch this
       
   650         # to 'required' on Python 3.
       
   651         self.send(["version", {"optional": ["bser-v2"]}])
       
   652 
       
   653         capabilities = self.receive()
       
   654 
       
   655         if 'error' in capabilities:
       
   656           raise Exception('Unsupported BSER version')
       
   657 
       
   658         if capabilities['capabilities']['bser-v2']:
       
   659             self.bser_version = 2
       
   660             self.bser_capabilities = 0
       
   661         else:
       
   662             self.bser_version = 1
       
   663             self.bser_capabilities = 0
       
   664 
       
   665     def _loads(self, response):
       
   666         return bser.loads(response)
       
   667 
       
   668     def receive(self):
       
   669         buf = [self.transport.readBytes(sniff_len)]
       
   670         if not buf[0]:
       
   671             raise WatchmanError('empty watchman response')
       
   672 
       
   673         recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
       
   674 
       
   675         if hasattr(self, 'bser_version'):
       
   676           # Readjust BSER version and capabilities if necessary
       
   677           self.bser_version = max(self.bser_version, recv_bser_version)
       
   678           self.capabilities = self.bser_capabilities & recv_bser_capabilities
       
   679 
       
   680         rlen = len(buf[0])
       
   681         while elen > rlen:
       
   682             buf.append(self.transport.readBytes(elen - rlen))
       
   683             rlen += len(buf[-1])
       
   684 
       
   685         response = b''.join(buf)
       
   686         try:
       
   687             res = self._loads(response)
       
   688             return res
       
   689         except ValueError as e:
       
   690             raise WatchmanError('watchman response decode error: %s' % e)
       
   691 
       
   692     def send(self, *args):
       
   693         if hasattr(self, 'bser_version'):
       
   694             cmd = bser.dumps(*args, version=self.bser_version,
       
   695                 capabilities=self.bser_capabilities)
       
   696         else:
       
   697             cmd = bser.dumps(*args)
       
   698         self.transport.write(cmd)
   508 
   699 
   509 
   700 
   510 class JsonCodec(Codec):
   701 class JsonCodec(Codec):
   511     """ Use json codec.  This is here primarily for testing purposes """
   702     """ Use json codec.  This is here primarily for testing purposes """
   512     json = None
   703     json = None
   518         self.json = json
   709         self.json = json
   519 
   710 
   520     def receive(self):
   711     def receive(self):
   521         line = self.transport.readLine()
   712         line = self.transport.readLine()
   522         try:
   713         try:
       
   714             # In Python 3, json.loads is a transformation from Unicode string to
       
   715             # objects possibly containing Unicode strings. We typically expect
       
   716             # the JSON blob to be ASCII-only with non-ASCII characters escaped,
       
   717             # but it's possible we might get non-ASCII bytes that are valid
       
   718             # UTF-8.
       
   719             if compat.PYTHON3:
       
   720                 line = line.decode('utf-8')
   523             return self.json.loads(line)
   721             return self.json.loads(line)
   524         except Exception as e:
   722         except Exception as e:
   525             print(e, line)
   723             print(e, line)
   526             raise
   724             raise
   527 
   725 
   528     def send(self, *args):
   726     def send(self, *args):
   529         cmd = self.json.dumps(*args)
   727         cmd = self.json.dumps(*args)
   530         self.transport.write(cmd + "\n")
   728         # In Python 3, json.dumps is a transformation from objects possibly
       
   729         # containing Unicode strings to Unicode string. Even with (the default)
       
   730         # ensure_ascii=True, dumps returns a Unicode string.
       
   731         if compat.PYTHON3:
       
   732             cmd = cmd.encode('ascii')
       
   733         self.transport.write(cmd + b"\n")
   531 
   734 
   532 
   735 
   533 class client(object):
   736 class client(object):
   534     """ Handles the communication with the watchman service """
   737     """ Handles the communication with the watchman service """
   535     sockpath = None
   738     sockpath = None
   554                  useImmutableBser=False):
   757                  useImmutableBser=False):
   555         self.sockpath = sockpath
   758         self.sockpath = sockpath
   556         self.timeout = timeout
   759         self.timeout = timeout
   557         self.useImmutableBser = useImmutableBser
   760         self.useImmutableBser = useImmutableBser
   558 
   761 
   559         transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
   762         if inspect.isclass(transport) and issubclass(transport, Transport):
   560         if transport == 'local' and os.name == 'nt':
   763             self.transport = transport
   561             self.transport = WindowsNamedPipeTransport
       
   562         elif transport == 'local':
       
   563             self.transport = UnixSocketTransport
       
   564         elif transport == 'cli':
       
   565             self.transport = CLIProcessTransport
       
   566             if sendEncoding is None:
       
   567                 sendEncoding = 'json'
       
   568             if recvEncoding is None:
       
   569                 recvEncoding = sendEncoding
       
   570         else:
   764         else:
   571             raise WatchmanError('invalid transport %s' % transport)
   765             transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
   572 
   766             if transport == 'local' and os.name == 'nt':
   573         sendEncoding = sendEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
   767                 self.transport = WindowsNamedPipeTransport
   574         recvEncoding = recvEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
   768             elif transport == 'local':
       
   769                 self.transport = UnixSocketTransport
       
   770             elif transport == 'cli':
       
   771                 self.transport = CLIProcessTransport
       
   772                 if sendEncoding is None:
       
   773                     sendEncoding = 'json'
       
   774                 if recvEncoding is None:
       
   775                     recvEncoding = sendEncoding
       
   776             else:
       
   777                 raise WatchmanError('invalid transport %s' % transport)
       
   778 
       
   779         sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
       
   780                            'bser')
       
   781         recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
       
   782                            'bser')
   575 
   783 
   576         self.recvCodec = self._parseEncoding(recvEncoding)
   784         self.recvCodec = self._parseEncoding(recvEncoding)
   577         self.sendCodec = self._parseEncoding(sendEncoding)
   785         self.sendCodec = self._parseEncoding(sendEncoding)
   578 
   786 
   579     def _parseEncoding(self, enc):
   787     def _parseEncoding(self, enc):
   580         if enc == 'bser':
   788         if enc == 'bser':
   581             if self.useImmutableBser:
   789             if self.useImmutableBser:
   582                 return ImmutableBserCodec
   790                 return ImmutableBserCodec
   583             return BserCodec
   791             return BserCodec
       
   792         elif enc == 'experimental-bser-v2':
       
   793           return Bser2WithFallbackCodec
   584         elif enc == 'json':
   794         elif enc == 'json':
   585             return JsonCodec
   795             return JsonCodec
   586         else:
   796         else:
   587             raise WatchmanError('invalid encoding %s' % enc)
   797             raise WatchmanError('invalid encoding %s' % enc)
   588 
   798 
   598         if path:
   808         if path:
   599             return path
   809             return path
   600 
   810 
   601         cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
   811         cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
   602         try:
   812         try:
   603             p = subprocess.Popen(cmd,
   813             args = dict(stdout=subprocess.PIPE,
   604                                  stdout=subprocess.PIPE,
   814                         stderr=subprocess.PIPE,
   605                                  stderr=subprocess.PIPE,
   815                         close_fds=os.name != 'nt')
   606                                  close_fds=os.name != 'nt')
   816 
       
   817             if os.name == 'nt':
       
   818                 # if invoked via an application with graphical user interface,
       
   819                 # this call will cause a brief command window pop-up.
       
   820                 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
       
   821                 startupinfo = subprocess.STARTUPINFO()
       
   822                 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
       
   823                 args['startupinfo'] = startupinfo
       
   824 
       
   825             p = subprocess.Popen(cmd, **args)
       
   826 
   607         except OSError as e:
   827         except OSError as e:
   608             raise WatchmanError('"watchman" executable not in PATH (%s)', e)
   828             raise WatchmanError('"watchman" executable not in PATH (%s)', e)
   609 
   829 
   610         stdout, stderr = p.communicate()
   830         stdout, stderr = p.communicate()
   611         exitcode = p.poll()
   831         exitcode = p.poll()
   612 
   832 
   613         if exitcode:
   833         if exitcode:
   614             raise WatchmanError("watchman exited with code %d" % exitcode)
   834             raise WatchmanError("watchman exited with code %d" % exitcode)
   615 
   835 
   616         result = bser.loads(stdout)
   836         result = bser.loads(stdout)
   617         if 'error' in result:
   837         if b'error' in result:
   618             raise WatchmanError('get-sockname error: %s' % result['error'])
   838             raise WatchmanError('get-sockname error: %s' % result['error'])
   619 
   839 
   620         return result['sockname']
   840         return result[b'sockname']
   621 
   841 
   622     def _connect(self):
   842     def _connect(self):
   623         """ establish transport connection """
   843         """ establish transport connection """
   624 
   844 
   625         if self.recvConn:
   845         if self.recvConn:
   658         """
   878         """
   659 
   879 
   660         self._connect()
   880         self._connect()
   661         result = self.recvConn.receive()
   881         result = self.recvConn.receive()
   662         if self._hasprop(result, 'error'):
   882         if self._hasprop(result, 'error'):
   663             raise CommandError(result['error'])
   883             error = result['error']
       
   884             if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
       
   885                 error = result['error'].decode('utf-8', 'surrogateescape')
       
   886             raise CommandError(error)
   664 
   887 
   665         if self._hasprop(result, 'log'):
   888         if self._hasprop(result, 'log'):
   666             self.logs.append(result['log'])
   889             log = result['log']
       
   890             if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
       
   891                 log = log.decode('utf-8', 'surrogateescape')
       
   892             self.logs.append(log)
   667 
   893 
   668         if self._hasprop(result, 'subscription'):
   894         if self._hasprop(result, 'subscription'):
   669             sub = result['subscription']
   895             sub = result['subscription']
   670             if not (sub in self.subs):
   896             if not (sub in self.subs):
   671                 self.subs[sub] = []
   897                 self.subs[sub] = []
   680             self.sub_by_root[root][sub].append(result)
   906             self.sub_by_root[root][sub].append(result)
   681 
   907 
   682         return result
   908         return result
   683 
   909 
   684     def isUnilateralResponse(self, res):
   910     def isUnilateralResponse(self, res):
       
   911         if 'unilateral' in res and res['unilateral']:
       
   912             return True
       
   913         # Fall back to checking for known unilateral responses
   685         for k in self.unilateral:
   914         for k in self.unilateral:
   686             if k in res:
   915             if k in res:
   687                 return True
   916                 return True
   688         return False
   917         return False
   689 
   918 
   710         If root is not None, then only return the subscription
   939         If root is not None, then only return the subscription
   711         data that matches both root and name.  When used in this way,
   940         data that matches both root and name.  When used in this way,
   712         remove processing impacts both the unscoped and scoped stores
   941         remove processing impacts both the unscoped and scoped stores
   713         for the subscription data.
   942         for the subscription data.
   714         """
   943         """
       
   944         if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
       
   945             # People may pass in Unicode strings here -- but currently BSER only
       
   946             # returns bytestrings. Deal with that.
       
   947             if isinstance(root, str):
       
   948                 root = encoding.encode_local(root)
       
   949             if isinstance(name, str):
       
   950                 name = name.encode('utf-8')
   715 
   951 
   716         if root is not None:
   952         if root is not None:
   717             if not root in self.sub_by_root:
   953             if not root in self.sub_by_root:
   718                 return None
   954                 return None
   719             if not name in self.sub_by_root[root]:
   955             if not name in self.sub_by_root[root]:
   750             res = self.receive()
   986             res = self.receive()
   751             while self.isUnilateralResponse(res):
   987             while self.isUnilateralResponse(res):
   752                 res = self.receive()
   988                 res = self.receive()
   753 
   989 
   754             return res
   990             return res
   755         except CommandError as ex:
   991         except EnvironmentError as ee:
       
   992             # When we can depend on Python 3, we can use PEP 3134
       
   993             # exception chaining here.
       
   994             raise WatchmanEnvironmentError(
       
   995                 'I/O error communicating with watchman daemon',
       
   996                 ee.errno,
       
   997                 ee.strerror,
       
   998                 args)
       
   999         except WatchmanError as ex:
   756             ex.setCommand(args)
  1000             ex.setCommand(args)
   757             raise ex
  1001             raise
   758 
  1002 
   759     def capabilityCheck(self, optional=None, required=None):
  1003     def capabilityCheck(self, optional=None, required=None):
   760         """ Perform a server capability check """
  1004         """ Perform a server capability check """
   761         res = self.query('version', {
  1005         res = self.query('version', {
   762             'optional': optional or [],
  1006             'optional': optional or [],
   773         return res
  1017         return res
   774 
  1018 
   775     def setTimeout(self, value):
  1019     def setTimeout(self, value):
   776         self.recvConn.setTimeout(value)
  1020         self.recvConn.setTimeout(value)
   777         self.sendConn.setTimeout(value)
  1021         self.sendConn.setTimeout(value)
   778 
       
   779 # no-check-code -- this is a 3rd party library