35 from .utils import ( |
36 from .utils import ( |
36 cborutil, |
37 cborutil, |
37 procutil, |
38 procutil, |
38 ) |
39 ) |
39 |
40 |
|
41 |
40 class channeledoutput(object): |
42 class channeledoutput(object): |
41 """ |
43 """ |
42 Write data to out in the following format: |
44 Write data to out in the following format: |
43 |
45 |
44 data length (unsigned int), |
46 data length (unsigned int), |
45 data |
47 data |
46 """ |
48 """ |
|
49 |
47 def __init__(self, out, channel): |
50 def __init__(self, out, channel): |
48 self.out = out |
51 self.out = out |
49 self.channel = channel |
52 self.channel = channel |
50 |
53 |
51 @property |
54 @property |
62 def __getattr__(self, attr): |
65 def __getattr__(self, attr): |
63 if attr in (r'isatty', r'fileno', r'tell', r'seek'): |
66 if attr in (r'isatty', r'fileno', r'tell', r'seek'): |
64 raise AttributeError(attr) |
67 raise AttributeError(attr) |
65 return getattr(self.out, attr) |
68 return getattr(self.out, attr) |
66 |
69 |
|
70 |
67 class channeledmessage(object): |
71 class channeledmessage(object): |
68 """ |
72 """ |
69 Write encoded message and metadata to out in the following format: |
73 Write encoded message and metadata to out in the following format: |
70 |
74 |
71 data length (unsigned int), |
75 data length (unsigned int), |
89 opts[b'data'] = data |
93 opts[b'data'] = data |
90 self._cout.write(self._encodefn(opts)) |
94 self._cout.write(self._encodefn(opts)) |
91 |
95 |
92 def __getattr__(self, attr): |
96 def __getattr__(self, attr): |
93 return getattr(self._cout, attr) |
97 return getattr(self._cout, attr) |
|
98 |
94 |
99 |
95 class channeledinput(object): |
100 class channeledinput(object): |
96 """ |
101 """ |
97 Read data from in_. |
102 Read data from in_. |
98 |
103 |
176 def __getattr__(self, attr): |
181 def __getattr__(self, attr): |
177 if attr in (r'isatty', r'fileno', r'tell', r'seek'): |
182 if attr in (r'isatty', r'fileno', r'tell', r'seek'): |
178 raise AttributeError(attr) |
183 raise AttributeError(attr) |
179 return getattr(self.in_, attr) |
184 return getattr(self.in_, attr) |
180 |
185 |
|
186 |
181 _messageencoders = { |
187 _messageencoders = { |
182 b'cbor': lambda v: b''.join(cborutil.streamencode(v)), |
188 b'cbor': lambda v: b''.join(cborutil.streamencode(v)), |
183 } |
189 } |
|
190 |
184 |
191 |
185 def _selectmessageencoder(ui): |
192 def _selectmessageencoder(ui): |
186 # experimental config: cmdserver.message-encodings |
193 # experimental config: cmdserver.message-encodings |
187 encnames = ui.configlist(b'cmdserver', b'message-encodings') |
194 encnames = ui.configlist(b'cmdserver', b'message-encodings') |
188 for n in encnames: |
195 for n in encnames: |
189 f = _messageencoders.get(n) |
196 f = _messageencoders.get(n) |
190 if f: |
197 if f: |
191 return n, f |
198 return n, f |
192 raise error.Abort(b'no supported message encodings: %s' |
199 raise error.Abort( |
193 % b' '.join(encnames)) |
200 b'no supported message encodings: %s' % b' '.join(encnames) |
|
201 ) |
|
202 |
194 |
203 |
195 class server(object): |
204 class server(object): |
196 """ |
205 """ |
197 Listens for commands on fin, runs them and writes the output on a channel |
206 Listens for commands on fin, runs them and writes the output on a channel |
198 based stream to fout. |
207 based stream to fout. |
199 """ |
208 """ |
|
209 |
200 def __init__(self, ui, repo, fin, fout, prereposetups=None): |
210 def __init__(self, ui, repo, fin, fout, prereposetups=None): |
201 self.cwd = encoding.getcwd() |
211 self.cwd = encoding.getcwd() |
202 |
212 |
203 if repo: |
213 if repo: |
204 # the ui here is really the repo ui so take its baseui so we don't |
214 # the ui here is really the repo ui so take its baseui so we don't |
280 uis = [copiedui] |
290 uis = [copiedui] |
281 if self.repo: |
291 if self.repo: |
282 self.repo.baseui = copiedui |
292 self.repo.baseui = copiedui |
283 # clone ui without using ui.copy because this is protected |
293 # clone ui without using ui.copy because this is protected |
284 repoui = self.repoui.__class__(self.repoui) |
294 repoui = self.repoui.__class__(self.repoui) |
285 repoui.copy = copiedui.copy # redo copy protection |
295 repoui.copy = copiedui.copy # redo copy protection |
286 uis.append(repoui) |
296 uis.append(repoui) |
287 self.repo.ui = self.repo.dirstate._ui = repoui |
297 self.repo.ui = self.repo.dirstate._ui = repoui |
288 self.repo.invalidateall() |
298 self.repo.invalidateall() |
289 |
299 |
290 for ui in uis: |
300 for ui in uis: |
293 # replace channels by fully functional tty files. so nontty is |
303 # replace channels by fully functional tty files. so nontty is |
294 # enforced only if cin is a channel. |
304 # enforced only if cin is a channel. |
295 if not util.safehasattr(self.cin, 'fileno'): |
305 if not util.safehasattr(self.cin, 'fileno'): |
296 ui.setconfig('ui', 'nontty', 'true', 'commandserver') |
306 ui.setconfig('ui', 'nontty', 'true', 'commandserver') |
297 |
307 |
298 req = dispatch.request(args[:], copiedui, self.repo, self.cin, |
308 req = dispatch.request( |
299 self.cout, self.cerr, self.cmsg, |
309 args[:], |
300 prereposetups=self._prereposetups) |
310 copiedui, |
|
311 self.repo, |
|
312 self.cin, |
|
313 self.cout, |
|
314 self.cerr, |
|
315 self.cmsg, |
|
316 prereposetups=self._prereposetups, |
|
317 ) |
301 |
318 |
302 try: |
319 try: |
303 ret = dispatch.dispatch(req) & 255 |
320 ret = dispatch.dispatch(req) & 255 |
304 self.cresult.write(struct.pack('>i', int(ret))) |
321 self.cresult.write(struct.pack('>i', int(ret))) |
305 finally: |
322 finally: |
322 # looking at the servers capabilities |
339 # looking at the servers capabilities |
323 raise error.Abort(_('unknown command %s') % cmd) |
340 raise error.Abort(_('unknown command %s') % cmd) |
324 |
341 |
325 return cmd != '' |
342 return cmd != '' |
326 |
343 |
327 capabilities = {'runcommand': runcommand, |
344 capabilities = {'runcommand': runcommand, 'getencoding': getencoding} |
328 'getencoding': getencoding} |
|
329 |
345 |
330 def serve(self): |
346 def serve(self): |
331 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities)) |
347 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities)) |
332 hellomsg += '\n' |
348 hellomsg += '\n' |
333 hellomsg += 'encoding: ' + encoding.encoding |
349 hellomsg += 'encoding: ' + encoding.encoding |
350 # its request |
366 # its request |
351 return 1 |
367 return 1 |
352 |
368 |
353 return 0 |
369 return 0 |
354 |
370 |
|
371 |
355 def setuplogging(ui, repo=None, fp=None): |
372 def setuplogging(ui, repo=None, fp=None): |
356 """Set up server logging facility |
373 """Set up server logging facility |
357 |
374 |
358 If cmdserver.log is '-', log messages will be sent to the given fp. |
375 If cmdserver.log is '-', log messages will be sent to the given fp. |
359 It should be the 'd' channel while a client is connected, and otherwise |
376 It should be the 'd' channel while a client is connected, and otherwise |
375 # developer config: cmdserver.max-log-files |
392 # developer config: cmdserver.max-log-files |
376 maxfiles = ui.configint(b'cmdserver', b'max-log-files') |
393 maxfiles = ui.configint(b'cmdserver', b'max-log-files') |
377 # developer config: cmdserver.max-log-size |
394 # developer config: cmdserver.max-log-size |
378 maxsize = ui.configbytes(b'cmdserver', b'max-log-size') |
395 maxsize = ui.configbytes(b'cmdserver', b'max-log-size') |
379 vfs = vfsmod.vfs(os.path.dirname(logpath)) |
396 vfs = vfsmod.vfs(os.path.dirname(logpath)) |
380 logger = loggingutil.filelogger(vfs, os.path.basename(logpath), tracked, |
397 logger = loggingutil.filelogger( |
381 maxfiles=maxfiles, maxsize=maxsize) |
398 vfs, |
|
399 os.path.basename(logpath), |
|
400 tracked, |
|
401 maxfiles=maxfiles, |
|
402 maxsize=maxsize, |
|
403 ) |
382 |
404 |
383 targetuis = {ui} |
405 targetuis = {ui} |
384 if repo: |
406 if repo: |
385 targetuis.add(repo.baseui) |
407 targetuis.add(repo.baseui) |
386 targetuis.add(repo.ui) |
408 targetuis.add(repo.ui) |
387 for u in targetuis: |
409 for u in targetuis: |
388 u.setlogger(b'cmdserver', logger) |
410 u.setlogger(b'cmdserver', logger) |
|
411 |
389 |
412 |
390 class pipeservice(object): |
413 class pipeservice(object): |
391 def __init__(self, ui, repo, opts): |
414 def __init__(self, ui, repo, opts): |
392 self.ui = ui |
415 self.ui = ui |
393 self.repo = repo |
416 self.repo = repo |
403 sv = server(ui, self.repo, fin, fout) |
426 sv = server(ui, self.repo, fin, fout) |
404 try: |
427 try: |
405 return sv.serve() |
428 return sv.serve() |
406 finally: |
429 finally: |
407 sv.cleanup() |
430 sv.cleanup() |
|
431 |
408 |
432 |
409 def _initworkerprocess(): |
433 def _initworkerprocess(): |
410 # use a different process group from the master process, in order to: |
434 # use a different process group from the master process, in order to: |
411 # 1. make the current process group no longer "orphaned" (because the |
435 # 1. make the current process group no longer "orphaned" (because the |
412 # parent of this process is in a different process group while |
436 # parent of this process is in a different process group while |
420 # unrelated processes. |
444 # unrelated processes. |
421 os.setpgid(0, 0) |
445 os.setpgid(0, 0) |
422 # change random state otherwise forked request handlers would have a |
446 # change random state otherwise forked request handlers would have a |
423 # same state inherited from parent. |
447 # same state inherited from parent. |
424 random.seed() |
448 random.seed() |
|
449 |
425 |
450 |
426 def _serverequest(ui, repo, conn, createcmdserver, prereposetups): |
451 def _serverequest(ui, repo, conn, createcmdserver, prereposetups): |
427 fin = conn.makefile(r'rb') |
452 fin = conn.makefile(r'rb') |
428 fout = conn.makefile(r'wb') |
453 fout = conn.makefile(r'wb') |
429 sv = None |
454 sv = None |
457 fout.close() # implicit flush() may cause another EPIPE |
482 fout.close() # implicit flush() may cause another EPIPE |
458 except IOError as inst: |
483 except IOError as inst: |
459 if inst.errno != errno.EPIPE: |
484 if inst.errno != errno.EPIPE: |
460 raise |
485 raise |
461 |
486 |
|
487 |
462 class unixservicehandler(object): |
488 class unixservicehandler(object): |
463 """Set of pluggable operations for unix-mode services |
489 """Set of pluggable operations for unix-mode services |
464 |
490 |
465 Almost all methods except for createcmdserver() are called in the main |
491 Almost all methods except for createcmdserver() are called in the main |
466 process. You can't pass mutable resource back from createcmdserver(). |
492 process. You can't pass mutable resource back from createcmdserver(). |
489 |
515 |
490 def createcmdserver(self, repo, conn, fin, fout, prereposetups): |
516 def createcmdserver(self, repo, conn, fin, fout, prereposetups): |
491 """Create new command server instance; called in the process that |
517 """Create new command server instance; called in the process that |
492 serves for the current connection""" |
518 serves for the current connection""" |
493 return server(self.ui, repo, fin, fout, prereposetups) |
519 return server(self.ui, repo, fin, fout, prereposetups) |
|
520 |
494 |
521 |
495 class unixforkingservice(object): |
522 class unixforkingservice(object): |
496 """ |
523 """ |
497 Listens on unix domain socket and forks server per connection |
524 Listens on unix domain socket and forks server per connection |
498 """ |
525 """ |
556 |
583 |
557 def _mainloop(self): |
584 def _mainloop(self): |
558 exiting = False |
585 exiting = False |
559 h = self._servicehandler |
586 h = self._servicehandler |
560 selector = selectors.DefaultSelector() |
587 selector = selectors.DefaultSelector() |
561 selector.register(self._sock, selectors.EVENT_READ, |
588 selector.register( |
562 self._acceptnewconnection) |
589 self._sock, selectors.EVENT_READ, self._acceptnewconnection |
563 selector.register(self._mainipc, selectors.EVENT_READ, |
590 ) |
564 self._handlemainipc) |
591 selector.register( |
|
592 self._mainipc, selectors.EVENT_READ, self._handlemainipc |
|
593 ) |
565 while True: |
594 while True: |
566 if not exiting and h.shouldexit(): |
595 if not exiting and h.shouldexit(): |
567 # clients can no longer connect() to the domain socket, so |
596 # clients can no longer connect() to the domain socket, so |
568 # we stop queuing new requests. |
597 # we stop queuing new requests. |
569 # for requests that are queued (connect()-ed, but haven't been |
598 # for requests that are queued (connect()-ed, but haven't been |
603 # https://instagram-engineering.com/ |
632 # https://instagram-engineering.com/ |
604 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf |
633 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf |
605 pid = os.fork() |
634 pid = os.fork() |
606 if pid: |
635 if pid: |
607 try: |
636 try: |
608 self.ui.log(b'cmdserver', b'forked worker process (pid=%d)\n', |
637 self.ui.log( |
609 pid) |
638 b'cmdserver', b'forked worker process (pid=%d)\n', pid |
|
639 ) |
610 self._workerpids.add(pid) |
640 self._workerpids.add(pid) |
611 h.newconnection() |
641 h.newconnection() |
612 finally: |
642 finally: |
613 conn.close() # release handle in parent process |
643 conn.close() # release handle in parent process |
614 else: |
644 else: |
660 def _runworker(self, conn): |
690 def _runworker(self, conn): |
661 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
691 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
662 _initworkerprocess() |
692 _initworkerprocess() |
663 h = self._servicehandler |
693 h = self._servicehandler |
664 try: |
694 try: |
665 _serverequest(self.ui, self.repo, conn, h.createcmdserver, |
695 _serverequest( |
666 prereposetups=[self._reposetup]) |
696 self.ui, |
|
697 self.repo, |
|
698 conn, |
|
699 h.createcmdserver, |
|
700 prereposetups=[self._reposetup], |
|
701 ) |
667 finally: |
702 finally: |
668 gc.collect() # trigger __del__ since worker process uses os._exit |
703 gc.collect() # trigger __del__ since worker process uses os._exit |
669 |
704 |
670 def _reposetup(self, ui, repo): |
705 def _reposetup(self, ui, repo): |
671 if not repo.local(): |
706 if not repo.local(): |
675 def close(self): |
710 def close(self): |
676 super(unixcmdserverrepo, self).close() |
711 super(unixcmdserverrepo, self).close() |
677 try: |
712 try: |
678 self._cmdserveripc.send(self.root) |
713 self._cmdserveripc.send(self.root) |
679 except socket.error: |
714 except socket.error: |
680 self.ui.log(b'cmdserver', |
715 self.ui.log( |
681 b'failed to send repo root to master\n') |
716 b'cmdserver', b'failed to send repo root to master\n' |
|
717 ) |
682 |
718 |
683 repo.__class__ = unixcmdserverrepo |
719 repo.__class__ = unixcmdserverrepo |
684 repo._cmdserveripc = self._workeripc |
720 repo._cmdserveripc = self._workeripc |
685 |
721 |
686 cachedrepo = self._repoloader.get(repo.root) |
722 cachedrepo = self._repoloader.get(repo.root) |