|
1 # Copyright 2014-present Facebook, Inc. |
|
2 # All rights reserved. |
|
3 # |
|
4 # Redistribution and use in source and binary forms, with or without |
|
5 # modification, are permitted provided that the following conditions are met: |
|
6 # |
|
7 # * Redistributions of source code must retain the above copyright notice, |
|
8 # this list of conditions and the following disclaimer. |
|
9 # |
|
10 # * Redistributions in binary form must reproduce the above copyright notice, |
|
11 # this list of conditions and the following disclaimer in the documentation |
|
12 # and/or other materials provided with the distribution. |
|
13 # |
|
14 # * Neither the name Facebook nor the names of its contributors may be used to |
|
15 # endorse or promote products derived from this software without specific |
|
16 # prior written permission. |
|
17 # |
|
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
|
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE |
|
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
|
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR |
|
24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
|
25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, |
|
26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
|
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
28 |
|
29 import os |
|
30 import errno |
|
31 import math |
|
32 import socket |
|
33 import subprocess |
|
34 import time |
|
35 |
|
36 # Sometimes it's really hard to get Python extensions to compile, |
|
37 # so fall back to a pure Python implementation. |
|
38 try: |
|
39 import bser |
|
40 except ImportError: |
|
41 import pybser as bser |
|
42 |
|
43 import capabilities |
|
44 |
|
45 if os.name == 'nt': |
|
46 import ctypes |
|
47 import ctypes.wintypes |
|
48 |
|
49 wintypes = ctypes.wintypes |
|
50 GENERIC_READ = 0x80000000 |
|
51 GENERIC_WRITE = 0x40000000 |
|
52 FILE_FLAG_OVERLAPPED = 0x40000000 |
|
53 OPEN_EXISTING = 3 |
|
54 INVALID_HANDLE_VALUE = -1 |
|
55 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000 |
|
56 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100 |
|
57 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200 |
|
58 WAIT_TIMEOUT = 0x00000102 |
|
59 WAIT_OBJECT_0 = 0x00000000 |
|
60 ERROR_IO_PENDING = 997 |
|
61 |
|
62 class OVERLAPPED(ctypes.Structure): |
|
63 _fields_ = [ |
|
64 ("Internal", wintypes.ULONG), ("InternalHigh", wintypes.ULONG), |
|
65 ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD), |
|
66 ("hEvent", wintypes.HANDLE) |
|
67 ] |
|
68 |
|
69 def __init__(self): |
|
70 self.Offset = 0 |
|
71 self.OffsetHigh = 0 |
|
72 self.hEvent = 0 |
|
73 |
|
74 LPDWORD = ctypes.POINTER(wintypes.DWORD) |
|
75 |
|
76 CreateFile = ctypes.windll.kernel32.CreateFileA |
|
77 CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD, |
|
78 wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD, |
|
79 wintypes.HANDLE] |
|
80 CreateFile.restype = wintypes.HANDLE |
|
81 |
|
82 CloseHandle = ctypes.windll.kernel32.CloseHandle |
|
83 CloseHandle.argtypes = [wintypes.HANDLE] |
|
84 CloseHandle.restype = wintypes.BOOL |
|
85 |
|
86 ReadFile = ctypes.windll.kernel32.ReadFile |
|
87 ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD, |
|
88 LPDWORD, ctypes.POINTER(OVERLAPPED)] |
|
89 ReadFile.restype = wintypes.BOOL |
|
90 |
|
91 WriteFile = ctypes.windll.kernel32.WriteFile |
|
92 WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD, |
|
93 LPDWORD, ctypes.POINTER(OVERLAPPED)] |
|
94 WriteFile.restype = wintypes.BOOL |
|
95 |
|
96 GetLastError = ctypes.windll.kernel32.GetLastError |
|
97 GetLastError.argtypes = [] |
|
98 GetLastError.restype = wintypes.DWORD |
|
99 |
|
100 FormatMessage = ctypes.windll.kernel32.FormatMessageA |
|
101 FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD, |
|
102 wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR), |
|
103 wintypes.DWORD, wintypes.LPVOID] |
|
104 FormatMessage.restype = wintypes.DWORD |
|
105 |
|
106 LocalFree = ctypes.windll.kernel32.LocalFree |
|
107 |
|
108 GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx |
|
109 GetOverlappedResultEx.argtypes = [wintypes.HANDLE, |
|
110 ctypes.POINTER(OVERLAPPED), LPDWORD, |
|
111 wintypes.DWORD, wintypes.BOOL] |
|
112 GetOverlappedResultEx.restype = wintypes.BOOL |
|
113 |
|
114 CancelIoEx = ctypes.windll.kernel32.CancelIoEx |
|
115 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)] |
|
116 CancelIoEx.restype = wintypes.BOOL |
|
117 |
|
118 # 2 bytes marker, 1 byte int size, 8 bytes int64 value |
|
119 sniff_len = 13 |
|
120 |
|
121 # This is a helper for debugging the client. |
|
122 _debugging = False |
|
123 if _debugging: |
|
124 |
|
125 def log(fmt, *args): |
|
126 print('[%s] %s' % |
|
127 (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()), |
|
128 fmt % args[:])) |
|
129 else: |
|
130 |
|
131 def log(fmt, *args): |
|
132 pass |
|
133 |
|
134 |
|
135 class WatchmanError(Exception): |
|
136 pass |
|
137 |
|
138 |
|
139 class SocketTimeout(WatchmanError): |
|
140 """A specialized exception raised for socket timeouts during communication to/from watchman. |
|
141 This makes it easier to implement non-blocking loops as callers can easily distinguish |
|
142 between a routine timeout and an actual error condition. |
|
143 |
|
144 Note that catching WatchmanError will also catch this as it is a super-class, so backwards |
|
145 compatibility in exception handling is preserved. |
|
146 """ |
|
147 |
|
148 |
|
149 class CommandError(WatchmanError): |
|
150 """error returned by watchman |
|
151 |
|
152 self.msg is the message returned by watchman. |
|
153 """ |
|
154 |
|
155 def __init__(self, msg, cmd=None): |
|
156 self.msg = msg |
|
157 self.cmd = cmd |
|
158 super(CommandError, self).__init__('watchman command error: %s' % msg) |
|
159 |
|
160 def setCommand(self, cmd): |
|
161 self.cmd = cmd |
|
162 |
|
163 def __str__(self): |
|
164 if self.cmd: |
|
165 return '%s, while executing %s' % (self.msg, self.cmd) |
|
166 return self.msg |
|
167 |
|
168 |
|
169 class Transport(object): |
|
170 """ communication transport to the watchman server """ |
|
171 buf = None |
|
172 |
|
173 def close(self): |
|
174 """ tear it down """ |
|
175 raise NotImplementedError() |
|
176 |
|
177 def readBytes(self, size): |
|
178 """ read size bytes """ |
|
179 raise NotImplementedError() |
|
180 |
|
181 def write(self, buf): |
|
182 """ write some data """ |
|
183 raise NotImplementedError() |
|
184 |
|
185 def setTimeout(self, value): |
|
186 pass |
|
187 |
|
188 def readLine(self): |
|
189 """ read a line |
|
190 Maintains its own buffer, callers of the transport should not mix |
|
191 calls to readBytes and readLine. |
|
192 """ |
|
193 if self.buf is None: |
|
194 self.buf = [] |
|
195 |
|
196 # Buffer may already have a line if we've received unilateral |
|
197 # response(s) from the server |
|
198 if len(self.buf) == 1 and "\n" in self.buf[0]: |
|
199 (line, b) = self.buf[0].split("\n", 1) |
|
200 self.buf = [b] |
|
201 return line |
|
202 |
|
203 while True: |
|
204 b = self.readBytes(4096) |
|
205 if "\n" in b: |
|
206 result = ''.join(self.buf) |
|
207 (line, b) = b.split("\n", 1) |
|
208 self.buf = [b] |
|
209 return result + line |
|
210 self.buf.append(b) |
|
211 |
|
212 |
|
213 class Codec(object): |
|
214 """ communication encoding for the watchman server """ |
|
215 transport = None |
|
216 |
|
217 def __init__(self, transport): |
|
218 self.transport = transport |
|
219 |
|
220 def receive(self): |
|
221 raise NotImplementedError() |
|
222 |
|
223 def send(self, *args): |
|
224 raise NotImplementedError() |
|
225 |
|
226 def setTimeout(self, value): |
|
227 self.transport.setTimeout(value) |
|
228 |
|
229 |
|
230 class UnixSocketTransport(Transport): |
|
231 """ local unix domain socket transport """ |
|
232 sock = None |
|
233 |
|
234 def __init__(self, sockpath, timeout): |
|
235 self.sockpath = sockpath |
|
236 self.timeout = timeout |
|
237 |
|
238 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
|
239 try: |
|
240 sock.settimeout(self.timeout) |
|
241 sock.connect(self.sockpath) |
|
242 self.sock = sock |
|
243 except socket.error as e: |
|
244 raise WatchmanError('unable to connect to %s: %s' % |
|
245 (self.sockpath, e)) |
|
246 |
|
247 def close(self): |
|
248 self.sock.close() |
|
249 self.sock = None |
|
250 |
|
251 def setTimeout(self, value): |
|
252 self.timeout = value |
|
253 self.sock.settimeout(self.timeout) |
|
254 |
|
255 def readBytes(self, size): |
|
256 try: |
|
257 buf = [self.sock.recv(size)] |
|
258 if not buf[0]: |
|
259 raise WatchmanError('empty watchman response') |
|
260 return buf[0] |
|
261 except socket.timeout: |
|
262 raise SocketTimeout('timed out waiting for response') |
|
263 |
|
264 def write(self, data): |
|
265 try: |
|
266 self.sock.sendall(data) |
|
267 except socket.timeout: |
|
268 raise SocketTimeout('timed out sending query command') |
|
269 |
|
270 |
|
271 class WindowsNamedPipeTransport(Transport): |
|
272 """ connect to a named pipe """ |
|
273 |
|
274 def __init__(self, sockpath, timeout): |
|
275 self.sockpath = sockpath |
|
276 self.timeout = int(math.ceil(timeout * 1000)) |
|
277 self._iobuf = None |
|
278 |
|
279 self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None, |
|
280 OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None) |
|
281 |
|
282 if self.pipe == INVALID_HANDLE_VALUE: |
|
283 self.pipe = None |
|
284 self._raise_win_err('failed to open pipe %s' % sockpath, |
|
285 GetLastError()) |
|
286 |
|
287 def _win32_strerror(self, err): |
|
288 """ expand a win32 error code into a human readable message """ |
|
289 |
|
290 # FormatMessage will allocate memory and assign it here |
|
291 buf = ctypes.c_char_p() |
|
292 FormatMessage( |
|
293 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER |
|
294 | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None) |
|
295 try: |
|
296 return buf.value |
|
297 finally: |
|
298 LocalFree(buf) |
|
299 |
|
300 def _raise_win_err(self, msg, err): |
|
301 raise IOError('%s win32 error code: %d %s' % |
|
302 (msg, err, self._win32_strerror(err))) |
|
303 |
|
304 def close(self): |
|
305 if self.pipe: |
|
306 CloseHandle(self.pipe) |
|
307 self.pipe = None |
|
308 |
|
309 def readBytes(self, size): |
|
310 """ A read can block for an unbounded amount of time, even if the |
|
311 kernel reports that the pipe handle is signalled, so we need to |
|
312 always perform our reads asynchronously |
|
313 """ |
|
314 |
|
315 # try to satisfy the read from any buffered data |
|
316 if self._iobuf: |
|
317 if size >= len(self._iobuf): |
|
318 res = self._iobuf |
|
319 self.buf = None |
|
320 return res |
|
321 res = self._iobuf[:size] |
|
322 self._iobuf = self._iobuf[size:] |
|
323 return res |
|
324 |
|
325 # We need to initiate a read |
|
326 buf = ctypes.create_string_buffer(size) |
|
327 olap = OVERLAPPED() |
|
328 |
|
329 log('made read buff of size %d', size) |
|
330 |
|
331 # ReadFile docs warn against sending in the nread parameter for async |
|
332 # operations, so we always collect it via GetOverlappedResultEx |
|
333 immediate = ReadFile(self.pipe, buf, size, None, olap) |
|
334 |
|
335 if not immediate: |
|
336 err = GetLastError() |
|
337 if err != ERROR_IO_PENDING: |
|
338 self._raise_win_err('failed to read %d bytes' % size, |
|
339 GetLastError()) |
|
340 |
|
341 nread = wintypes.DWORD() |
|
342 if not GetOverlappedResultEx(self.pipe, olap, nread, |
|
343 0 if immediate else self.timeout, True): |
|
344 err = GetLastError() |
|
345 CancelIoEx(self.pipe, olap) |
|
346 |
|
347 if err == WAIT_TIMEOUT: |
|
348 log('GetOverlappedResultEx timedout') |
|
349 raise SocketTimeout('timed out after waiting %dms for read' % |
|
350 self.timeout) |
|
351 |
|
352 log('GetOverlappedResultEx reports error %d', err) |
|
353 self._raise_win_err('error while waiting for read', err) |
|
354 |
|
355 nread = nread.value |
|
356 if nread == 0: |
|
357 # Docs say that named pipes return 0 byte when the other end did |
|
358 # a zero byte write. Since we don't ever do that, the only |
|
359 # other way this shows up is if the client has gotten in a weird |
|
360 # state, so let's bail out |
|
361 CancelIoEx(self.pipe, olap) |
|
362 raise IOError('Async read yielded 0 bytes; unpossible!') |
|
363 |
|
364 # Holds precisely the bytes that we read from the prior request |
|
365 buf = buf[:nread] |
|
366 |
|
367 returned_size = min(nread, size) |
|
368 if returned_size == nread: |
|
369 return buf |
|
370 |
|
371 # keep any left-overs around for a later read to consume |
|
372 self._iobuf = buf[returned_size:] |
|
373 return buf[:returned_size] |
|
374 |
|
375 def write(self, data): |
|
376 olap = OVERLAPPED() |
|
377 immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data), |
|
378 None, olap) |
|
379 |
|
380 if not immediate: |
|
381 err = GetLastError() |
|
382 if err != ERROR_IO_PENDING: |
|
383 self._raise_win_err('failed to write %d bytes' % len(data), |
|
384 GetLastError()) |
|
385 |
|
386 # Obtain results, waiting if needed |
|
387 nwrote = wintypes.DWORD() |
|
388 if GetOverlappedResultEx(self.pipe, olap, nwrote, 0 if immediate else |
|
389 self.timeout, True): |
|
390 return nwrote.value |
|
391 |
|
392 err = GetLastError() |
|
393 |
|
394 # It's potentially unsafe to allow the write to continue after |
|
395 # we unwind, so let's make a best effort to avoid that happening |
|
396 CancelIoEx(self.pipe, olap) |
|
397 |
|
398 if err == WAIT_TIMEOUT: |
|
399 raise SocketTimeout('timed out after waiting %dms for write' % |
|
400 self.timeout) |
|
401 self._raise_win_err('error while waiting for write of %d bytes' % |
|
402 len(data), err) |
|
403 |
|
404 |
|
405 class CLIProcessTransport(Transport): |
|
406 """ open a pipe to the cli to talk to the service |
|
407 This intended to be used only in the test harness! |
|
408 |
|
409 The CLI is an oddball because we only support JSON input |
|
410 and cannot send multiple commands through the same instance, |
|
411 so we spawn a new process for each command. |
|
412 |
|
413 We disable server spawning for this implementation, again, because |
|
414 it is intended to be used only in our test harness. You really |
|
415 should not need to use the CLI transport for anything real. |
|
416 |
|
417 While the CLI can output in BSER, our Transport interface doesn't |
|
418 support telling this instance that it should do so. That effectively |
|
419 limits this implementation to JSON input and output only at this time. |
|
420 |
|
421 It is the responsibility of the caller to set the send and |
|
422 receive codecs appropriately. |
|
423 """ |
|
424 proc = None |
|
425 closed = True |
|
426 |
|
427 def __init__(self, sockpath, timeout): |
|
428 self.sockpath = sockpath |
|
429 self.timeout = timeout |
|
430 |
|
431 def close(self): |
|
432 if self.proc: |
|
433 self.proc.kill() |
|
434 self.proc = None |
|
435 |
|
436 def _connect(self): |
|
437 if self.proc: |
|
438 return self.proc |
|
439 args = [ |
|
440 'watchman', |
|
441 '--sockname={}'.format(self.sockpath), |
|
442 '--logfile=/BOGUS', |
|
443 '--statefile=/BOGUS', |
|
444 '--no-spawn', |
|
445 '--no-local', |
|
446 '--no-pretty', |
|
447 '-j', |
|
448 ] |
|
449 self.proc = subprocess.Popen(args, |
|
450 stdin=subprocess.PIPE, |
|
451 stdout=subprocess.PIPE) |
|
452 return self.proc |
|
453 |
|
454 def readBytes(self, size): |
|
455 self._connect() |
|
456 res = self.proc.stdout.read(size) |
|
457 if res == '': |
|
458 raise WatchmanError('EOF on CLI process transport') |
|
459 return res |
|
460 |
|
461 def write(self, data): |
|
462 if self.closed: |
|
463 self.closed = False |
|
464 self.proc = None |
|
465 self._connect() |
|
466 res = self.proc.stdin.write(data) |
|
467 self.proc.stdin.close() |
|
468 self.closed = True |
|
469 return res |
|
470 |
|
471 |
|
472 class BserCodec(Codec): |
|
473 """ use the BSER encoding. This is the default, preferred codec """ |
|
474 |
|
475 def _loads(self, response): |
|
476 return bser.loads(response) |
|
477 |
|
478 def receive(self): |
|
479 buf = [self.transport.readBytes(sniff_len)] |
|
480 if not buf[0]: |
|
481 raise WatchmanError('empty watchman response') |
|
482 |
|
483 elen = bser.pdu_len(buf[0]) |
|
484 |
|
485 rlen = len(buf[0]) |
|
486 while elen > rlen: |
|
487 buf.append(self.transport.readBytes(elen - rlen)) |
|
488 rlen += len(buf[-1]) |
|
489 |
|
490 response = ''.join(buf) |
|
491 try: |
|
492 res = self._loads(response) |
|
493 return res |
|
494 except ValueError as e: |
|
495 raise WatchmanError('watchman response decode error: %s' % e) |
|
496 |
|
497 def send(self, *args): |
|
498 cmd = bser.dumps(*args) |
|
499 self.transport.write(cmd) |
|
500 |
|
501 |
|
502 class ImmutableBserCodec(BserCodec): |
|
503 """ use the BSER encoding, decoding values using the newer |
|
504 immutable object support """ |
|
505 |
|
506 def _loads(self, response): |
|
507 return bser.loads(response, False) |
|
508 |
|
509 |
|
510 class JsonCodec(Codec): |
|
511 """ Use json codec. This is here primarily for testing purposes """ |
|
512 json = None |
|
513 |
|
514 def __init__(self, transport): |
|
515 super(JsonCodec, self).__init__(transport) |
|
516 # optional dep on json, only if JsonCodec is used |
|
517 import json |
|
518 self.json = json |
|
519 |
|
520 def receive(self): |
|
521 line = self.transport.readLine() |
|
522 try: |
|
523 return self.json.loads(line) |
|
524 except Exception as e: |
|
525 print(e, line) |
|
526 raise |
|
527 |
|
528 def send(self, *args): |
|
529 cmd = self.json.dumps(*args) |
|
530 self.transport.write(cmd + "\n") |
|
531 |
|
532 |
|
533 class client(object): |
|
534 """ Handles the communication with the watchman service """ |
|
535 sockpath = None |
|
536 transport = None |
|
537 sendCodec = None |
|
538 recvCodec = None |
|
539 sendConn = None |
|
540 recvConn = None |
|
541 subs = {} # Keyed by subscription name |
|
542 sub_by_root = {} # Keyed by root, then by subscription name |
|
543 logs = [] # When log level is raised |
|
544 unilateral = ['log', 'subscription'] |
|
545 tport = None |
|
546 useImmutableBser = None |
|
547 |
|
548 def __init__(self, |
|
549 sockpath=None, |
|
550 timeout=1.0, |
|
551 transport=None, |
|
552 sendEncoding=None, |
|
553 recvEncoding=None, |
|
554 useImmutableBser=False): |
|
555 self.sockpath = sockpath |
|
556 self.timeout = timeout |
|
557 self.useImmutableBser = useImmutableBser |
|
558 |
|
559 transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local' |
|
560 if transport == 'local' and os.name == 'nt': |
|
561 self.transport = WindowsNamedPipeTransport |
|
562 elif transport == 'local': |
|
563 self.transport = UnixSocketTransport |
|
564 elif transport == 'cli': |
|
565 self.transport = CLIProcessTransport |
|
566 if sendEncoding is None: |
|
567 sendEncoding = 'json' |
|
568 if recvEncoding is None: |
|
569 recvEncoding = sendEncoding |
|
570 else: |
|
571 raise WatchmanError('invalid transport %s' % transport) |
|
572 |
|
573 sendEncoding = sendEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser' |
|
574 recvEncoding = recvEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser' |
|
575 |
|
576 self.recvCodec = self._parseEncoding(recvEncoding) |
|
577 self.sendCodec = self._parseEncoding(sendEncoding) |
|
578 |
|
579 def _parseEncoding(self, enc): |
|
580 if enc == 'bser': |
|
581 if self.useImmutableBser: |
|
582 return ImmutableBserCodec |
|
583 return BserCodec |
|
584 elif enc == 'json': |
|
585 return JsonCodec |
|
586 else: |
|
587 raise WatchmanError('invalid encoding %s' % enc) |
|
588 |
|
589 def _hasprop(self, result, name): |
|
590 if self.useImmutableBser: |
|
591 return hasattr(result, name) |
|
592 return name in result |
|
593 |
|
594 def _resolvesockname(self): |
|
595 # if invoked via a trigger, watchman will set this env var; we |
|
596 # should use it unless explicitly set otherwise |
|
597 path = os.getenv('WATCHMAN_SOCK') |
|
598 if path: |
|
599 return path |
|
600 |
|
601 cmd = ['watchman', '--output-encoding=bser', 'get-sockname'] |
|
602 try: |
|
603 p = subprocess.Popen(cmd, |
|
604 stdout=subprocess.PIPE, |
|
605 stderr=subprocess.PIPE, |
|
606 close_fds=os.name != 'nt') |
|
607 except OSError as e: |
|
608 raise WatchmanError('"watchman" executable not in PATH (%s)', e) |
|
609 |
|
610 stdout, stderr = p.communicate() |
|
611 exitcode = p.poll() |
|
612 |
|
613 if exitcode: |
|
614 raise WatchmanError("watchman exited with code %d" % exitcode) |
|
615 |
|
616 result = bser.loads(stdout) |
|
617 if 'error' in result: |
|
618 raise WatchmanError('get-sockname error: %s' % result['error']) |
|
619 |
|
620 return result['sockname'] |
|
621 |
|
622 def _connect(self): |
|
623 """ establish transport connection """ |
|
624 |
|
625 if self.recvConn: |
|
626 return |
|
627 |
|
628 if self.sockpath is None: |
|
629 self.sockpath = self._resolvesockname() |
|
630 |
|
631 self.tport = self.transport(self.sockpath, self.timeout) |
|
632 self.sendConn = self.sendCodec(self.tport) |
|
633 self.recvConn = self.recvCodec(self.tport) |
|
634 |
|
635 def __del__(self): |
|
636 self.close() |
|
637 |
|
638 def close(self): |
|
639 if self.tport: |
|
640 self.tport.close() |
|
641 self.tport = None |
|
642 self.recvConn = None |
|
643 self.sendConn = None |
|
644 |
|
645 def receive(self): |
|
646 """ receive the next PDU from the watchman service |
|
647 |
|
648 If the client has activated subscriptions or logs then |
|
649 this PDU may be a unilateral PDU sent by the service to |
|
650 inform the client of a log event or subscription change. |
|
651 |
|
652 It may also simply be the response portion of a request |
|
653 initiated by query. |
|
654 |
|
655 There are clients in production that subscribe and call |
|
656 this in a loop to retrieve all subscription responses, |
|
657 so care should be taken when making changes here. |
|
658 """ |
|
659 |
|
660 self._connect() |
|
661 result = self.recvConn.receive() |
|
662 if self._hasprop(result, 'error'): |
|
663 raise CommandError(result['error']) |
|
664 |
|
665 if self._hasprop(result, 'log'): |
|
666 self.logs.append(result['log']) |
|
667 |
|
668 if self._hasprop(result, 'subscription'): |
|
669 sub = result['subscription'] |
|
670 if not (sub in self.subs): |
|
671 self.subs[sub] = [] |
|
672 self.subs[sub].append(result) |
|
673 |
|
674 # also accumulate in {root,sub} keyed store |
|
675 root = os.path.normcase(result['root']) |
|
676 if not root in self.sub_by_root: |
|
677 self.sub_by_root[root] = {} |
|
678 if not sub in self.sub_by_root[root]: |
|
679 self.sub_by_root[root][sub] = [] |
|
680 self.sub_by_root[root][sub].append(result) |
|
681 |
|
682 return result |
|
683 |
|
684 def isUnilateralResponse(self, res): |
|
685 for k in self.unilateral: |
|
686 if k in res: |
|
687 return True |
|
688 return False |
|
689 |
|
690 def getLog(self, remove=True): |
|
691 """ Retrieve buffered log data |
|
692 |
|
693 If remove is true the data will be removed from the buffer. |
|
694 Otherwise it will be left in the buffer |
|
695 """ |
|
696 res = self.logs |
|
697 if remove: |
|
698 self.logs = [] |
|
699 return res |
|
700 |
|
701 def getSubscription(self, name, remove=True, root=None): |
|
702 """ Retrieve the data associated with a named subscription |
|
703 |
|
704 If remove is True (the default), the subscription data is removed |
|
705 from the buffer. Otherwise the data is returned but left in |
|
706 the buffer. |
|
707 |
|
708 Returns None if there is no data associated with `name` |
|
709 |
|
710 If root is not None, then only return the subscription |
|
711 data that matches both root and name. When used in this way, |
|
712 remove processing impacts both the unscoped and scoped stores |
|
713 for the subscription data. |
|
714 """ |
|
715 |
|
716 if root is not None: |
|
717 if not root in self.sub_by_root: |
|
718 return None |
|
719 if not name in self.sub_by_root[root]: |
|
720 return None |
|
721 sub = self.sub_by_root[root][name] |
|
722 if remove: |
|
723 del self.sub_by_root[root][name] |
|
724 # don't let this grow unbounded |
|
725 if name in self.subs: |
|
726 del self.subs[name] |
|
727 return sub |
|
728 |
|
729 if not (name in self.subs): |
|
730 return None |
|
731 sub = self.subs[name] |
|
732 if remove: |
|
733 del self.subs[name] |
|
734 return sub |
|
735 |
|
736 def query(self, *args): |
|
737 """ Send a query to the watchman service and return the response |
|
738 |
|
739 This call will block until the response is returned. |
|
740 If any unilateral responses are sent by the service in between |
|
741 the request-response they will be buffered up in the client object |
|
742 and NOT returned via this method. |
|
743 """ |
|
744 |
|
745 log('calling client.query') |
|
746 self._connect() |
|
747 try: |
|
748 self.sendConn.send(args) |
|
749 |
|
750 res = self.receive() |
|
751 while self.isUnilateralResponse(res): |
|
752 res = self.receive() |
|
753 |
|
754 return res |
|
755 except CommandError as ex: |
|
756 ex.setCommand(args) |
|
757 raise ex |
|
758 |
|
759 def capabilityCheck(self, optional=None, required=None): |
|
760 """ Perform a server capability check """ |
|
761 res = self.query('version', { |
|
762 'optional': optional or [], |
|
763 'required': required or [] |
|
764 }) |
|
765 |
|
766 if not self._hasprop(res, 'capabilities'): |
|
767 # Server doesn't support capabilities, so we need to |
|
768 # synthesize the results based on the version |
|
769 capabilities.synthesize(res, optional) |
|
770 if 'error' in res: |
|
771 raise CommandError(res['error']) |
|
772 |
|
773 return res |
|
774 |
|
775 def setTimeout(self, value): |
|
776 self.recvConn.setTimeout(value) |
|
777 self.sendConn.setTimeout(value) |
|
778 |
|
779 # no-check-code -- this is a 3rd party library |