mercurial/commandserver.py
changeset 40998 042ed354b9eb
parent 40878 2525faf4ecdb
child 40999 dcac24ec935b
equal deleted inserted replaced
40997:038108a9811c 40998:042ed354b9eb
   504             raise error.Abort(_('unsupported platform'))
   504             raise error.Abort(_('unsupported platform'))
   505         if not self.address:
   505         if not self.address:
   506             raise error.Abort(_('no socket path specified with --address'))
   506             raise error.Abort(_('no socket path specified with --address'))
   507         self._servicehandler = handler or unixservicehandler(ui)
   507         self._servicehandler = handler or unixservicehandler(ui)
   508         self._sock = None
   508         self._sock = None
       
   509         self._mainipc = None
       
   510         self._workeripc = None
   509         self._oldsigchldhandler = None
   511         self._oldsigchldhandler = None
   510         self._workerpids = set()  # updated by signal handler; do not iterate
   512         self._workerpids = set()  # updated by signal handler; do not iterate
   511         self._socketunlinked = None
   513         self._socketunlinked = None
   512 
   514 
   513     def init(self):
   515     def init(self):
   514         self._sock = socket.socket(socket.AF_UNIX)
   516         self._sock = socket.socket(socket.AF_UNIX)
       
   517         # IPC channel from many workers to one main process; this is actually
       
   518         # a uni-directional pipe, but is backed by a DGRAM socket so each
       
   519         # message can be easily separated.
       
   520         o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
       
   521         self._mainipc, self._workeripc = o
   515         self._servicehandler.bindsocket(self._sock, self.address)
   522         self._servicehandler.bindsocket(self._sock, self.address)
   516         if util.safehasattr(procutil, 'unblocksignal'):
   523         if util.safehasattr(procutil, 'unblocksignal'):
   517             procutil.unblocksignal(signal.SIGCHLD)
   524             procutil.unblocksignal(signal.SIGCHLD)
   518         o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
   525         o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
   519         self._oldsigchldhandler = o
   526         self._oldsigchldhandler = o
   525             self._socketunlinked = True
   532             self._socketunlinked = True
   526 
   533 
   527     def _cleanup(self):
   534     def _cleanup(self):
   528         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
   535         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
   529         self._sock.close()
   536         self._sock.close()
       
   537         self._mainipc.close()
       
   538         self._workeripc.close()
   530         self._unlinksocket()
   539         self._unlinksocket()
   531         # don't kill child processes as they have active clients, just wait
   540         # don't kill child processes as they have active clients, just wait
   532         self._reapworkers(0)
   541         self._reapworkers(0)
   533 
   542 
   534     def run(self):
   543     def run(self):
   541         exiting = False
   550         exiting = False
   542         h = self._servicehandler
   551         h = self._servicehandler
   543         selector = selectors.DefaultSelector()
   552         selector = selectors.DefaultSelector()
   544         selector.register(self._sock, selectors.EVENT_READ,
   553         selector.register(self._sock, selectors.EVENT_READ,
   545                           self._acceptnewconnection)
   554                           self._acceptnewconnection)
       
   555         selector.register(self._mainipc, selectors.EVENT_READ,
       
   556                           self._handlemainipc)
   546         while True:
   557         while True:
   547             if not exiting and h.shouldexit():
   558             if not exiting and h.shouldexit():
   548                 # clients can no longer connect() to the domain socket, so
   559                 # clients can no longer connect() to the domain socket, so
   549                 # we stop queuing new requests.
   560                 # we stop queuing new requests.
   550                 # for requests that are queued (connect()-ed, but haven't been
   561                 # for requests that are queued (connect()-ed, but haven't been
   590                 conn.close()  # release handle in parent process
   601                 conn.close()  # release handle in parent process
   591         else:
   602         else:
   592             try:
   603             try:
   593                 selector.close()
   604                 selector.close()
   594                 sock.close()
   605                 sock.close()
       
   606                 self._mainipc.close()
   595                 self._runworker(conn)
   607                 self._runworker(conn)
   596                 conn.close()
   608                 conn.close()
       
   609                 self._workeripc.close()
   597                 os._exit(0)
   610                 os._exit(0)
   598             except:  # never return, hence no re-raises
   611             except:  # never return, hence no re-raises
   599                 try:
   612                 try:
   600                     self.ui.traceback(force=True)
   613                     self.ui.traceback(force=True)
   601                 finally:
   614                 finally:
   602                     os._exit(255)
   615                     os._exit(255)
       
   616 
       
   617     def _handlemainipc(self, sock, selector):
       
   618         """Process messages sent from a worker"""
       
   619         try:
       
   620             path = sock.recv(32768)  # large enough to receive path
       
   621         except socket.error as inst:
       
   622             if inst.args[0] == errno.EINTR:
       
   623                 return
       
   624             raise
       
   625 
       
   626         self.ui.log(b'cmdserver', b'repository: %s\n', path)
   603 
   627 
   604     def _sigchldhandler(self, signal, frame):
   628     def _sigchldhandler(self, signal, frame):
   605         self._reapworkers(os.WNOHANG)
   629         self._reapworkers(os.WNOHANG)
   606 
   630 
   607     def _reapworkers(self, options):
   631     def _reapworkers(self, options):
   626         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
   650         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
   627         _initworkerprocess()
   651         _initworkerprocess()
   628         h = self._servicehandler
   652         h = self._servicehandler
   629         try:
   653         try:
   630             _serverequest(self.ui, self.repo, conn, h.createcmdserver,
   654             _serverequest(self.ui, self.repo, conn, h.createcmdserver,
   631                           prereposetups=None)  # TODO: pass in hook functions
   655                           prereposetups=[self._reposetup])
   632         finally:
   656         finally:
   633             gc.collect()  # trigger __del__ since worker process uses os._exit
   657             gc.collect()  # trigger __del__ since worker process uses os._exit
       
   658 
       
   659     def _reposetup(self, ui, repo):
       
   660         if not repo.local():
       
   661             return
       
   662 
       
   663         class unixcmdserverrepo(repo.__class__):
       
   664             def close(self):
       
   665                 super(unixcmdserverrepo, self).close()
       
   666                 try:
       
   667                     self._cmdserveripc.send(self.root)
       
   668                 except socket.error:
       
   669                     self.ui.log(b'cmdserver',
       
   670                                 b'failed to send repo root to master\n')
       
   671 
       
   672         repo.__class__ = unixcmdserverrepo
       
   673         repo._cmdserveripc = self._workeripc