504 raise error.Abort(_('unsupported platform')) |
504 raise error.Abort(_('unsupported platform')) |
505 if not self.address: |
505 if not self.address: |
506 raise error.Abort(_('no socket path specified with --address')) |
506 raise error.Abort(_('no socket path specified with --address')) |
507 self._servicehandler = handler or unixservicehandler(ui) |
507 self._servicehandler = handler or unixservicehandler(ui) |
508 self._sock = None |
508 self._sock = None |
|
509 self._mainipc = None |
|
510 self._workeripc = None |
509 self._oldsigchldhandler = None |
511 self._oldsigchldhandler = None |
510 self._workerpids = set() # updated by signal handler; do not iterate |
512 self._workerpids = set() # updated by signal handler; do not iterate |
511 self._socketunlinked = None |
513 self._socketunlinked = None |
512 |
514 |
513 def init(self): |
515 def init(self): |
514 self._sock = socket.socket(socket.AF_UNIX) |
516 self._sock = socket.socket(socket.AF_UNIX) |
|
517 # IPC channel from many workers to one main process; this is actually |
|
518 # a uni-directional pipe, but is backed by a DGRAM socket so each |
|
519 # message can be easily separated. |
|
520 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) |
|
521 self._mainipc, self._workeripc = o |
515 self._servicehandler.bindsocket(self._sock, self.address) |
522 self._servicehandler.bindsocket(self._sock, self.address) |
516 if util.safehasattr(procutil, 'unblocksignal'): |
523 if util.safehasattr(procutil, 'unblocksignal'): |
517 procutil.unblocksignal(signal.SIGCHLD) |
524 procutil.unblocksignal(signal.SIGCHLD) |
518 o = signal.signal(signal.SIGCHLD, self._sigchldhandler) |
525 o = signal.signal(signal.SIGCHLD, self._sigchldhandler) |
519 self._oldsigchldhandler = o |
526 self._oldsigchldhandler = o |
525 self._socketunlinked = True |
532 self._socketunlinked = True |
526 |
533 |
527 def _cleanup(self): |
534 def _cleanup(self): |
528 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
535 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
529 self._sock.close() |
536 self._sock.close() |
|
537 self._mainipc.close() |
|
538 self._workeripc.close() |
530 self._unlinksocket() |
539 self._unlinksocket() |
531 # don't kill child processes as they have active clients, just wait |
540 # don't kill child processes as they have active clients, just wait |
532 self._reapworkers(0) |
541 self._reapworkers(0) |
533 |
542 |
534 def run(self): |
543 def run(self): |
541 exiting = False |
550 exiting = False |
542 h = self._servicehandler |
551 h = self._servicehandler |
543 selector = selectors.DefaultSelector() |
552 selector = selectors.DefaultSelector() |
544 selector.register(self._sock, selectors.EVENT_READ, |
553 selector.register(self._sock, selectors.EVENT_READ, |
545 self._acceptnewconnection) |
554 self._acceptnewconnection) |
|
555 selector.register(self._mainipc, selectors.EVENT_READ, |
|
556 self._handlemainipc) |
546 while True: |
557 while True: |
547 if not exiting and h.shouldexit(): |
558 if not exiting and h.shouldexit(): |
548 # clients can no longer connect() to the domain socket, so |
559 # clients can no longer connect() to the domain socket, so |
549 # we stop queuing new requests. |
560 # we stop queuing new requests. |
550 # for requests that are queued (connect()-ed, but haven't been |
561 # for requests that are queued (connect()-ed, but haven't been |
590 conn.close() # release handle in parent process |
601 conn.close() # release handle in parent process |
591 else: |
602 else: |
592 try: |
603 try: |
593 selector.close() |
604 selector.close() |
594 sock.close() |
605 sock.close() |
|
606 self._mainipc.close() |
595 self._runworker(conn) |
607 self._runworker(conn) |
596 conn.close() |
608 conn.close() |
|
609 self._workeripc.close() |
597 os._exit(0) |
610 os._exit(0) |
598 except: # never return, hence no re-raises |
611 except: # never return, hence no re-raises |
599 try: |
612 try: |
600 self.ui.traceback(force=True) |
613 self.ui.traceback(force=True) |
601 finally: |
614 finally: |
602 os._exit(255) |
615 os._exit(255) |
|
616 |
|
617 def _handlemainipc(self, sock, selector): |
|
618 """Process messages sent from a worker""" |
|
619 try: |
|
620 path = sock.recv(32768) # large enough to receive path |
|
621 except socket.error as inst: |
|
622 if inst.args[0] == errno.EINTR: |
|
623 return |
|
624 raise |
|
625 |
|
626 self.ui.log(b'cmdserver', b'repository: %s\n', path) |
603 |
627 |
604 def _sigchldhandler(self, signal, frame): |
628 def _sigchldhandler(self, signal, frame): |
605 self._reapworkers(os.WNOHANG) |
629 self._reapworkers(os.WNOHANG) |
606 |
630 |
607 def _reapworkers(self, options): |
631 def _reapworkers(self, options): |
626 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
650 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
627 _initworkerprocess() |
651 _initworkerprocess() |
628 h = self._servicehandler |
652 h = self._servicehandler |
629 try: |
653 try: |
630 _serverequest(self.ui, self.repo, conn, h.createcmdserver, |
654 _serverequest(self.ui, self.repo, conn, h.createcmdserver, |
631 prereposetups=None) # TODO: pass in hook functions |
655 prereposetups=[self._reposetup]) |
632 finally: |
656 finally: |
633 gc.collect() # trigger __del__ since worker process uses os._exit |
657 gc.collect() # trigger __del__ since worker process uses os._exit |
|
658 |
|
659 def _reposetup(self, ui, repo): |
|
660 if not repo.local(): |
|
661 return |
|
662 |
|
663 class unixcmdserverrepo(repo.__class__): |
|
664 def close(self): |
|
665 super(unixcmdserverrepo, self).close() |
|
666 try: |
|
667 self._cmdserveripc.send(self.root) |
|
668 except socket.error: |
|
669 self.ui.log(b'cmdserver', |
|
670 b'failed to send repo root to master\n') |
|
671 |
|
672 repo.__class__ = unixcmdserverrepo |
|
673 repo._cmdserveripc = self._workeripc |