hgext/fsmonitor/pywatchman/__init__.py
changeset 28432 2377c4ac4eec
child 30656 16f4b341288d
equal deleted inserted replaced
28431:a7e3b72cf756 28432:2377c4ac4eec
       
     1 # Copyright 2014-present Facebook, Inc.
       
     2 # All rights reserved.
       
     3 #
       
     4 # Redistribution and use in source and binary forms, with or without
       
     5 # modification, are permitted provided that the following conditions are met:
       
     6 #
       
     7 #  * Redistributions of source code must retain the above copyright notice,
       
     8 #    this list of conditions and the following disclaimer.
       
     9 #
       
    10 #  * Redistributions in binary form must reproduce the above copyright notice,
       
    11 #    this list of conditions and the following disclaimer in the documentation
       
    12 #    and/or other materials provided with the distribution.
       
    13 #
       
    14 #  * Neither the name Facebook nor the names of its contributors may be used to
       
    15 #    endorse or promote products derived from this software without specific
       
    16 #    prior written permission.
       
    17 #
       
    18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
       
    19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
       
    20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
       
    21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
       
    22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
       
    23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
       
    24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
       
    25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
       
    26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
       
    27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
       
    28 
       
    29 import os
       
    30 import errno
       
    31 import math
       
    32 import socket
       
    33 import subprocess
       
    34 import time
       
    35 
       
    36 # Sometimes it's really hard to get Python extensions to compile,
       
    37 # so fall back to a pure Python implementation.
       
    38 try:
       
    39     import bser
       
    40 except ImportError:
       
    41     import pybser as bser
       
    42 
       
    43 import capabilities
       
    44 
       
    45 if os.name == 'nt':
       
    46     import ctypes
       
    47     import ctypes.wintypes
       
    48 
       
    49     wintypes = ctypes.wintypes
       
    50     GENERIC_READ = 0x80000000
       
    51     GENERIC_WRITE = 0x40000000
       
    52     FILE_FLAG_OVERLAPPED = 0x40000000
       
    53     OPEN_EXISTING = 3
       
    54     INVALID_HANDLE_VALUE = -1
       
    55     FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
       
    56     FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
       
    57     FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
       
    58     WAIT_TIMEOUT = 0x00000102
       
    59     WAIT_OBJECT_0 = 0x00000000
       
    60     ERROR_IO_PENDING = 997
       
    61 
       
    62     class OVERLAPPED(ctypes.Structure):
       
    63         _fields_ = [
       
    64             ("Internal", wintypes.ULONG), ("InternalHigh", wintypes.ULONG),
       
    65             ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
       
    66             ("hEvent", wintypes.HANDLE)
       
    67         ]
       
    68 
       
    69         def __init__(self):
       
    70             self.Offset = 0
       
    71             self.OffsetHigh = 0
       
    72             self.hEvent = 0
       
    73 
       
    74     LPDWORD = ctypes.POINTER(wintypes.DWORD)
       
    75 
       
    76     CreateFile = ctypes.windll.kernel32.CreateFileA
       
    77     CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
       
    78                            wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
       
    79                            wintypes.HANDLE]
       
    80     CreateFile.restype = wintypes.HANDLE
       
    81 
       
    82     CloseHandle = ctypes.windll.kernel32.CloseHandle
       
    83     CloseHandle.argtypes = [wintypes.HANDLE]
       
    84     CloseHandle.restype = wintypes.BOOL
       
    85 
       
    86     ReadFile = ctypes.windll.kernel32.ReadFile
       
    87     ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
       
    88                          LPDWORD, ctypes.POINTER(OVERLAPPED)]
       
    89     ReadFile.restype = wintypes.BOOL
       
    90 
       
    91     WriteFile = ctypes.windll.kernel32.WriteFile
       
    92     WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
       
    93                           LPDWORD, ctypes.POINTER(OVERLAPPED)]
       
    94     WriteFile.restype = wintypes.BOOL
       
    95 
       
    96     GetLastError = ctypes.windll.kernel32.GetLastError
       
    97     GetLastError.argtypes = []
       
    98     GetLastError.restype = wintypes.DWORD
       
    99 
       
   100     FormatMessage = ctypes.windll.kernel32.FormatMessageA
       
   101     FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
       
   102                               wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
       
   103                               wintypes.DWORD, wintypes.LPVOID]
       
   104     FormatMessage.restype = wintypes.DWORD
       
   105 
       
   106     LocalFree = ctypes.windll.kernel32.LocalFree
       
   107 
       
   108     GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx
       
   109     GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
       
   110                                       ctypes.POINTER(OVERLAPPED), LPDWORD,
       
   111                                       wintypes.DWORD, wintypes.BOOL]
       
   112     GetOverlappedResultEx.restype = wintypes.BOOL
       
   113 
       
   114     CancelIoEx = ctypes.windll.kernel32.CancelIoEx
       
   115     CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
       
   116     CancelIoEx.restype = wintypes.BOOL
       
   117 
       
   118 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
       
   119 sniff_len = 13
       
   120 
       
   121 # This is a helper for debugging the client.
       
   122 _debugging = False
       
   123 if _debugging:
       
   124 
       
   125     def log(fmt, *args):
       
   126         print('[%s] %s' %
       
   127               (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
       
   128                fmt % args[:]))
       
   129 else:
       
   130 
       
   131     def log(fmt, *args):
       
   132         pass
       
   133 
       
   134 
       
   135 class WatchmanError(Exception):
       
   136     pass
       
   137 
       
   138 
       
   139 class SocketTimeout(WatchmanError):
       
   140     """A specialized exception raised for socket timeouts during communication to/from watchman.
       
   141        This makes it easier to implement non-blocking loops as callers can easily distinguish
       
   142        between a routine timeout and an actual error condition.
       
   143 
       
   144        Note that catching WatchmanError will also catch this as it is a super-class, so backwards
       
   145        compatibility in exception handling is preserved.
       
   146     """
       
   147 
       
   148 
       
   149 class CommandError(WatchmanError):
       
   150     """error returned by watchman
       
   151 
       
   152     self.msg is the message returned by watchman.
       
   153     """
       
   154 
       
   155     def __init__(self, msg, cmd=None):
       
   156         self.msg = msg
       
   157         self.cmd = cmd
       
   158         super(CommandError, self).__init__('watchman command error: %s' % msg)
       
   159 
       
   160     def setCommand(self, cmd):
       
   161         self.cmd = cmd
       
   162 
       
   163     def __str__(self):
       
   164         if self.cmd:
       
   165             return '%s, while executing %s' % (self.msg, self.cmd)
       
   166         return self.msg
       
   167 
       
   168 
       
   169 class Transport(object):
       
   170     """ communication transport to the watchman server """
       
   171     buf = None
       
   172 
       
   173     def close(self):
       
   174         """ tear it down """
       
   175         raise NotImplementedError()
       
   176 
       
   177     def readBytes(self, size):
       
   178         """ read size bytes """
       
   179         raise NotImplementedError()
       
   180 
       
   181     def write(self, buf):
       
   182         """ write some data """
       
   183         raise NotImplementedError()
       
   184 
       
   185     def setTimeout(self, value):
       
   186         pass
       
   187 
       
   188     def readLine(self):
       
   189         """ read a line
       
   190         Maintains its own buffer, callers of the transport should not mix
       
   191         calls to readBytes and readLine.
       
   192         """
       
   193         if self.buf is None:
       
   194             self.buf = []
       
   195 
       
   196         # Buffer may already have a line if we've received unilateral
       
   197         # response(s) from the server
       
   198         if len(self.buf) == 1 and "\n" in self.buf[0]:
       
   199             (line, b) = self.buf[0].split("\n", 1)
       
   200             self.buf = [b]
       
   201             return line
       
   202 
       
   203         while True:
       
   204             b = self.readBytes(4096)
       
   205             if "\n" in b:
       
   206                 result = ''.join(self.buf)
       
   207                 (line, b) = b.split("\n", 1)
       
   208                 self.buf = [b]
       
   209                 return result + line
       
   210             self.buf.append(b)
       
   211 
       
   212 
       
   213 class Codec(object):
       
   214     """ communication encoding for the watchman server """
       
   215     transport = None
       
   216 
       
   217     def __init__(self, transport):
       
   218         self.transport = transport
       
   219 
       
   220     def receive(self):
       
   221         raise NotImplementedError()
       
   222 
       
   223     def send(self, *args):
       
   224         raise NotImplementedError()
       
   225 
       
   226     def setTimeout(self, value):
       
   227         self.transport.setTimeout(value)
       
   228 
       
   229 
       
   230 class UnixSocketTransport(Transport):
       
   231     """ local unix domain socket transport """
       
   232     sock = None
       
   233 
       
   234     def __init__(self, sockpath, timeout):
       
   235         self.sockpath = sockpath
       
   236         self.timeout = timeout
       
   237 
       
   238         sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
       
   239         try:
       
   240             sock.settimeout(self.timeout)
       
   241             sock.connect(self.sockpath)
       
   242             self.sock = sock
       
   243         except socket.error as e:
       
   244             raise WatchmanError('unable to connect to %s: %s' %
       
   245                                 (self.sockpath, e))
       
   246 
       
   247     def close(self):
       
   248         self.sock.close()
       
   249         self.sock = None
       
   250 
       
   251     def setTimeout(self, value):
       
   252         self.timeout = value
       
   253         self.sock.settimeout(self.timeout)
       
   254 
       
   255     def readBytes(self, size):
       
   256         try:
       
   257             buf = [self.sock.recv(size)]
       
   258             if not buf[0]:
       
   259                 raise WatchmanError('empty watchman response')
       
   260             return buf[0]
       
   261         except socket.timeout:
       
   262             raise SocketTimeout('timed out waiting for response')
       
   263 
       
   264     def write(self, data):
       
   265         try:
       
   266             self.sock.sendall(data)
       
   267         except socket.timeout:
       
   268             raise SocketTimeout('timed out sending query command')
       
   269 
       
   270 
       
   271 class WindowsNamedPipeTransport(Transport):
       
   272     """ connect to a named pipe """
       
   273 
       
   274     def __init__(self, sockpath, timeout):
       
   275         self.sockpath = sockpath
       
   276         self.timeout = int(math.ceil(timeout * 1000))
       
   277         self._iobuf = None
       
   278 
       
   279         self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
       
   280                                OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
       
   281 
       
   282         if self.pipe == INVALID_HANDLE_VALUE:
       
   283             self.pipe = None
       
   284             self._raise_win_err('failed to open pipe %s' % sockpath,
       
   285                                 GetLastError())
       
   286 
       
   287     def _win32_strerror(self, err):
       
   288         """ expand a win32 error code into a human readable message """
       
   289 
       
   290         # FormatMessage will allocate memory and assign it here
       
   291         buf = ctypes.c_char_p()
       
   292         FormatMessage(
       
   293             FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
       
   294             | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
       
   295         try:
       
   296             return buf.value
       
   297         finally:
       
   298             LocalFree(buf)
       
   299 
       
   300     def _raise_win_err(self, msg, err):
       
   301         raise IOError('%s win32 error code: %d %s' %
       
   302                       (msg, err, self._win32_strerror(err)))
       
   303 
       
   304     def close(self):
       
   305         if self.pipe:
       
   306             CloseHandle(self.pipe)
       
   307         self.pipe = None
       
   308 
       
   309     def readBytes(self, size):
       
   310         """ A read can block for an unbounded amount of time, even if the
       
   311             kernel reports that the pipe handle is signalled, so we need to
       
   312             always perform our reads asynchronously
       
   313         """
       
   314 
       
   315         # try to satisfy the read from any buffered data
       
   316         if self._iobuf:
       
   317             if size >= len(self._iobuf):
       
   318                 res = self._iobuf
       
   319                 self.buf = None
       
   320                 return res
       
   321             res = self._iobuf[:size]
       
   322             self._iobuf = self._iobuf[size:]
       
   323             return res
       
   324 
       
   325         # We need to initiate a read
       
   326         buf = ctypes.create_string_buffer(size)
       
   327         olap = OVERLAPPED()
       
   328 
       
   329         log('made read buff of size %d', size)
       
   330 
       
   331         # ReadFile docs warn against sending in the nread parameter for async
       
   332         # operations, so we always collect it via GetOverlappedResultEx
       
   333         immediate = ReadFile(self.pipe, buf, size, None, olap)
       
   334 
       
   335         if not immediate:
       
   336             err = GetLastError()
       
   337             if err != ERROR_IO_PENDING:
       
   338                 self._raise_win_err('failed to read %d bytes' % size,
       
   339                                     GetLastError())
       
   340 
       
   341         nread = wintypes.DWORD()
       
   342         if not GetOverlappedResultEx(self.pipe, olap, nread,
       
   343                                      0 if immediate else self.timeout, True):
       
   344             err = GetLastError()
       
   345             CancelIoEx(self.pipe, olap)
       
   346 
       
   347             if err == WAIT_TIMEOUT:
       
   348                 log('GetOverlappedResultEx timedout')
       
   349                 raise SocketTimeout('timed out after waiting %dms for read' %
       
   350                                     self.timeout)
       
   351 
       
   352             log('GetOverlappedResultEx reports error %d', err)
       
   353             self._raise_win_err('error while waiting for read', err)
       
   354 
       
   355         nread = nread.value
       
   356         if nread == 0:
       
   357             # Docs say that named pipes return 0 byte when the other end did
       
   358             # a zero byte write.  Since we don't ever do that, the only
       
   359             # other way this shows up is if the client has gotten in a weird
       
   360             # state, so let's bail out
       
   361             CancelIoEx(self.pipe, olap)
       
   362             raise IOError('Async read yielded 0 bytes; unpossible!')
       
   363 
       
   364         # Holds precisely the bytes that we read from the prior request
       
   365         buf = buf[:nread]
       
   366 
       
   367         returned_size = min(nread, size)
       
   368         if returned_size == nread:
       
   369             return buf
       
   370 
       
   371         # keep any left-overs around for a later read to consume
       
   372         self._iobuf = buf[returned_size:]
       
   373         return buf[:returned_size]
       
   374 
       
   375     def write(self, data):
       
   376         olap = OVERLAPPED()
       
   377         immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
       
   378                               None, olap)
       
   379 
       
   380         if not immediate:
       
   381             err = GetLastError()
       
   382             if err != ERROR_IO_PENDING:
       
   383                 self._raise_win_err('failed to write %d bytes' % len(data),
       
   384                                     GetLastError())
       
   385 
       
   386         # Obtain results, waiting if needed
       
   387         nwrote = wintypes.DWORD()
       
   388         if GetOverlappedResultEx(self.pipe, olap, nwrote, 0 if immediate else
       
   389                                  self.timeout, True):
       
   390             return nwrote.value
       
   391 
       
   392         err = GetLastError()
       
   393 
       
   394         # It's potentially unsafe to allow the write to continue after
       
   395         # we unwind, so let's make a best effort to avoid that happening
       
   396         CancelIoEx(self.pipe, olap)
       
   397 
       
   398         if err == WAIT_TIMEOUT:
       
   399             raise SocketTimeout('timed out after waiting %dms for write' %
       
   400                                 self.timeout)
       
   401         self._raise_win_err('error while waiting for write of %d bytes' %
       
   402                             len(data), err)
       
   403 
       
   404 
       
   405 class CLIProcessTransport(Transport):
       
   406     """ open a pipe to the cli to talk to the service
       
   407     This intended to be used only in the test harness!
       
   408 
       
   409     The CLI is an oddball because we only support JSON input
       
   410     and cannot send multiple commands through the same instance,
       
   411     so we spawn a new process for each command.
       
   412 
       
   413     We disable server spawning for this implementation, again, because
       
   414     it is intended to be used only in our test harness.  You really
       
   415     should not need to use the CLI transport for anything real.
       
   416 
       
   417     While the CLI can output in BSER, our Transport interface doesn't
       
   418     support telling this instance that it should do so.  That effectively
       
   419     limits this implementation to JSON input and output only at this time.
       
   420 
       
   421     It is the responsibility of the caller to set the send and
       
   422     receive codecs appropriately.
       
   423     """
       
   424     proc = None
       
   425     closed = True
       
   426 
       
   427     def __init__(self, sockpath, timeout):
       
   428         self.sockpath = sockpath
       
   429         self.timeout = timeout
       
   430 
       
   431     def close(self):
       
   432         if self.proc:
       
   433             self.proc.kill()
       
   434             self.proc = None
       
   435 
       
   436     def _connect(self):
       
   437         if self.proc:
       
   438             return self.proc
       
   439         args = [
       
   440             'watchman',
       
   441             '--sockname={}'.format(self.sockpath),
       
   442             '--logfile=/BOGUS',
       
   443             '--statefile=/BOGUS',
       
   444             '--no-spawn',
       
   445             '--no-local',
       
   446             '--no-pretty',
       
   447             '-j',
       
   448         ]
       
   449         self.proc = subprocess.Popen(args,
       
   450                                      stdin=subprocess.PIPE,
       
   451                                      stdout=subprocess.PIPE)
       
   452         return self.proc
       
   453 
       
   454     def readBytes(self, size):
       
   455         self._connect()
       
   456         res = self.proc.stdout.read(size)
       
   457         if res == '':
       
   458             raise WatchmanError('EOF on CLI process transport')
       
   459         return res
       
   460 
       
   461     def write(self, data):
       
   462         if self.closed:
       
   463             self.closed = False
       
   464             self.proc = None
       
   465         self._connect()
       
   466         res = self.proc.stdin.write(data)
       
   467         self.proc.stdin.close()
       
   468         self.closed = True
       
   469         return res
       
   470 
       
   471 
       
   472 class BserCodec(Codec):
       
   473     """ use the BSER encoding.  This is the default, preferred codec """
       
   474 
       
   475     def _loads(self, response):
       
   476         return bser.loads(response)
       
   477 
       
   478     def receive(self):
       
   479         buf = [self.transport.readBytes(sniff_len)]
       
   480         if not buf[0]:
       
   481             raise WatchmanError('empty watchman response')
       
   482 
       
   483         elen = bser.pdu_len(buf[0])
       
   484 
       
   485         rlen = len(buf[0])
       
   486         while elen > rlen:
       
   487             buf.append(self.transport.readBytes(elen - rlen))
       
   488             rlen += len(buf[-1])
       
   489 
       
   490         response = ''.join(buf)
       
   491         try:
       
   492             res = self._loads(response)
       
   493             return res
       
   494         except ValueError as e:
       
   495             raise WatchmanError('watchman response decode error: %s' % e)
       
   496 
       
   497     def send(self, *args):
       
   498         cmd = bser.dumps(*args)
       
   499         self.transport.write(cmd)
       
   500 
       
   501 
       
   502 class ImmutableBserCodec(BserCodec):
       
   503     """ use the BSER encoding, decoding values using the newer
       
   504         immutable object support """
       
   505 
       
   506     def _loads(self, response):
       
   507         return bser.loads(response, False)
       
   508 
       
   509 
       
   510 class JsonCodec(Codec):
       
   511     """ Use json codec.  This is here primarily for testing purposes """
       
   512     json = None
       
   513 
       
   514     def __init__(self, transport):
       
   515         super(JsonCodec, self).__init__(transport)
       
   516         # optional dep on json, only if JsonCodec is used
       
   517         import json
       
   518         self.json = json
       
   519 
       
   520     def receive(self):
       
   521         line = self.transport.readLine()
       
   522         try:
       
   523             return self.json.loads(line)
       
   524         except Exception as e:
       
   525             print(e, line)
       
   526             raise
       
   527 
       
   528     def send(self, *args):
       
   529         cmd = self.json.dumps(*args)
       
   530         self.transport.write(cmd + "\n")
       
   531 
       
   532 
       
   533 class client(object):
       
   534     """ Handles the communication with the watchman service """
       
   535     sockpath = None
       
   536     transport = None
       
   537     sendCodec = None
       
   538     recvCodec = None
       
   539     sendConn = None
       
   540     recvConn = None
       
   541     subs = {}  # Keyed by subscription name
       
   542     sub_by_root = {}  # Keyed by root, then by subscription name
       
   543     logs = []  # When log level is raised
       
   544     unilateral = ['log', 'subscription']
       
   545     tport = None
       
   546     useImmutableBser = None
       
   547 
       
   548     def __init__(self,
       
   549                  sockpath=None,
       
   550                  timeout=1.0,
       
   551                  transport=None,
       
   552                  sendEncoding=None,
       
   553                  recvEncoding=None,
       
   554                  useImmutableBser=False):
       
   555         self.sockpath = sockpath
       
   556         self.timeout = timeout
       
   557         self.useImmutableBser = useImmutableBser
       
   558 
       
   559         transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
       
   560         if transport == 'local' and os.name == 'nt':
       
   561             self.transport = WindowsNamedPipeTransport
       
   562         elif transport == 'local':
       
   563             self.transport = UnixSocketTransport
       
   564         elif transport == 'cli':
       
   565             self.transport = CLIProcessTransport
       
   566             if sendEncoding is None:
       
   567                 sendEncoding = 'json'
       
   568             if recvEncoding is None:
       
   569                 recvEncoding = sendEncoding
       
   570         else:
       
   571             raise WatchmanError('invalid transport %s' % transport)
       
   572 
       
   573         sendEncoding = sendEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
       
   574         recvEncoding = recvEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser'
       
   575 
       
   576         self.recvCodec = self._parseEncoding(recvEncoding)
       
   577         self.sendCodec = self._parseEncoding(sendEncoding)
       
   578 
       
   579     def _parseEncoding(self, enc):
       
   580         if enc == 'bser':
       
   581             if self.useImmutableBser:
       
   582                 return ImmutableBserCodec
       
   583             return BserCodec
       
   584         elif enc == 'json':
       
   585             return JsonCodec
       
   586         else:
       
   587             raise WatchmanError('invalid encoding %s' % enc)
       
   588 
       
   589     def _hasprop(self, result, name):
       
   590         if self.useImmutableBser:
       
   591             return hasattr(result, name)
       
   592         return name in result
       
   593 
       
   594     def _resolvesockname(self):
       
   595         # if invoked via a trigger, watchman will set this env var; we
       
   596         # should use it unless explicitly set otherwise
       
   597         path = os.getenv('WATCHMAN_SOCK')
       
   598         if path:
       
   599             return path
       
   600 
       
   601         cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
       
   602         try:
       
   603             p = subprocess.Popen(cmd,
       
   604                                  stdout=subprocess.PIPE,
       
   605                                  stderr=subprocess.PIPE,
       
   606                                  close_fds=os.name != 'nt')
       
   607         except OSError as e:
       
   608             raise WatchmanError('"watchman" executable not in PATH (%s)', e)
       
   609 
       
   610         stdout, stderr = p.communicate()
       
   611         exitcode = p.poll()
       
   612 
       
   613         if exitcode:
       
   614             raise WatchmanError("watchman exited with code %d" % exitcode)
       
   615 
       
   616         result = bser.loads(stdout)
       
   617         if 'error' in result:
       
   618             raise WatchmanError('get-sockname error: %s' % result['error'])
       
   619 
       
   620         return result['sockname']
       
   621 
       
   622     def _connect(self):
       
   623         """ establish transport connection """
       
   624 
       
   625         if self.recvConn:
       
   626             return
       
   627 
       
   628         if self.sockpath is None:
       
   629             self.sockpath = self._resolvesockname()
       
   630 
       
   631         self.tport = self.transport(self.sockpath, self.timeout)
       
   632         self.sendConn = self.sendCodec(self.tport)
       
   633         self.recvConn = self.recvCodec(self.tport)
       
   634 
       
   635     def __del__(self):
       
   636         self.close()
       
   637 
       
   638     def close(self):
       
   639         if self.tport:
       
   640             self.tport.close()
       
   641             self.tport = None
       
   642             self.recvConn = None
       
   643             self.sendConn = None
       
   644 
       
   645     def receive(self):
       
   646         """ receive the next PDU from the watchman service
       
   647 
       
   648         If the client has activated subscriptions or logs then
       
   649         this PDU may be a unilateral PDU sent by the service to
       
   650         inform the client of a log event or subscription change.
       
   651 
       
   652         It may also simply be the response portion of a request
       
   653         initiated by query.
       
   654 
       
   655         There are clients in production that subscribe and call
       
   656         this in a loop to retrieve all subscription responses,
       
   657         so care should be taken when making changes here.
       
   658         """
       
   659 
       
   660         self._connect()
       
   661         result = self.recvConn.receive()
       
   662         if self._hasprop(result, 'error'):
       
   663             raise CommandError(result['error'])
       
   664 
       
   665         if self._hasprop(result, 'log'):
       
   666             self.logs.append(result['log'])
       
   667 
       
   668         if self._hasprop(result, 'subscription'):
       
   669             sub = result['subscription']
       
   670             if not (sub in self.subs):
       
   671                 self.subs[sub] = []
       
   672             self.subs[sub].append(result)
       
   673 
       
   674             # also accumulate in {root,sub} keyed store
       
   675             root = os.path.normcase(result['root'])
       
   676             if not root in self.sub_by_root:
       
   677                 self.sub_by_root[root] = {}
       
   678             if not sub in self.sub_by_root[root]:
       
   679                 self.sub_by_root[root][sub] = []
       
   680             self.sub_by_root[root][sub].append(result)
       
   681 
       
   682         return result
       
   683 
       
   684     def isUnilateralResponse(self, res):
       
   685         for k in self.unilateral:
       
   686             if k in res:
       
   687                 return True
       
   688         return False
       
   689 
       
   690     def getLog(self, remove=True):
       
   691         """ Retrieve buffered log data
       
   692 
       
   693         If remove is true the data will be removed from the buffer.
       
   694         Otherwise it will be left in the buffer
       
   695         """
       
   696         res = self.logs
       
   697         if remove:
       
   698             self.logs = []
       
   699         return res
       
   700 
       
   701     def getSubscription(self, name, remove=True, root=None):
       
   702         """ Retrieve the data associated with a named subscription
       
   703 
       
   704         If remove is True (the default), the subscription data is removed
       
   705         from the buffer.  Otherwise the data is returned but left in
       
   706         the buffer.
       
   707 
       
   708         Returns None if there is no data associated with `name`
       
   709 
       
   710         If root is not None, then only return the subscription
       
   711         data that matches both root and name.  When used in this way,
       
   712         remove processing impacts both the unscoped and scoped stores
       
   713         for the subscription data.
       
   714         """
       
   715 
       
   716         if root is not None:
       
   717             if not root in self.sub_by_root:
       
   718                 return None
       
   719             if not name in self.sub_by_root[root]:
       
   720                 return None
       
   721             sub = self.sub_by_root[root][name]
       
   722             if remove:
       
   723                 del self.sub_by_root[root][name]
       
   724                 # don't let this grow unbounded
       
   725                 if name in self.subs:
       
   726                     del self.subs[name]
       
   727             return sub
       
   728 
       
   729         if not (name in self.subs):
       
   730             return None
       
   731         sub = self.subs[name]
       
   732         if remove:
       
   733             del self.subs[name]
       
   734         return sub
       
   735 
       
   736     def query(self, *args):
       
   737         """ Send a query to the watchman service and return the response
       
   738 
       
   739         This call will block until the response is returned.
       
   740         If any unilateral responses are sent by the service in between
       
   741         the request-response they will be buffered up in the client object
       
   742         and NOT returned via this method.
       
   743         """
       
   744 
       
   745         log('calling client.query')
       
   746         self._connect()
       
   747         try:
       
   748             self.sendConn.send(args)
       
   749 
       
   750             res = self.receive()
       
   751             while self.isUnilateralResponse(res):
       
   752                 res = self.receive()
       
   753 
       
   754             return res
       
   755         except CommandError as ex:
       
   756             ex.setCommand(args)
       
   757             raise ex
       
   758 
       
   759     def capabilityCheck(self, optional=None, required=None):
       
   760         """ Perform a server capability check """
       
   761         res = self.query('version', {
       
   762             'optional': optional or [],
       
   763             'required': required or []
       
   764         })
       
   765 
       
   766         if not self._hasprop(res, 'capabilities'):
       
   767             # Server doesn't support capabilities, so we need to
       
   768             # synthesize the results based on the version
       
   769             capabilities.synthesize(res, optional)
       
   770             if 'error' in res:
       
   771                 raise CommandError(res['error'])
       
   772 
       
   773         return res
       
   774 
       
   775     def setTimeout(self, value):
       
   776         self.recvConn.setTimeout(value)
       
   777         self.sendConn.setTimeout(value)
       
   778 
       
   779 # no-check-code -- this is a 3rd party library