# HG changeset patch # User Yuya Nishihara # Date 1540991943 -32400 # Node ID 042ed354b9eb6209b09dbdd19018462431cb464c # Parent 038108a9811c472d38046adab8dba7bcac522140 commandserver: add IPC channel to teach repository path on command finished The idea is to load recently-used repositories first in the master process, and fork(). The forked worker can reuse a warm repository if it's preloaded. There are a couple of ways of in-memory repository caching. They have pros and cons: a. "preload by master" pros: can use a single cache dict, maximizing cache hit rate cons: need to reload a repo in master process (because worker process dies per command) b. "prefork" pros: can cache a repo without reloading (as worker processes persist) cons: lower cache hit rate since each worker has to maintain its own cache c. "shared memory" (or separate key-value store server) pros: no need to reload a repo in master process, ideally cons: need to serialize objects to sharable form Since my primary goal is to get rid of the cost of loading obsstore without massive rewrites, (c) doesn't work. (b) isn't ideal since it would require much more SDRAMs than (a). So I take (a). The idea credits to Jun Wu. diff -r 038108a9811c -r 042ed354b9eb mercurial/commandserver.py --- a/mercurial/commandserver.py Thu Dec 13 23:20:28 2018 -0800 +++ b/mercurial/commandserver.py Wed Oct 31 22:19:03 2018 +0900 @@ -506,12 +506,19 @@ raise error.Abort(_('no socket path specified with --address')) self._servicehandler = handler or unixservicehandler(ui) self._sock = None + self._mainipc = None + self._workeripc = None self._oldsigchldhandler = None self._workerpids = set() # updated by signal handler; do not iterate self._socketunlinked = None def init(self): self._sock = socket.socket(socket.AF_UNIX) + # IPC channel from many workers to one main process; this is actually + # a uni-directional pipe, but is backed by a DGRAM socket so each + # message can be easily separated. + o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) + self._mainipc, self._workeripc = o self._servicehandler.bindsocket(self._sock, self.address) if util.safehasattr(procutil, 'unblocksignal'): procutil.unblocksignal(signal.SIGCHLD) @@ -527,6 +534,8 @@ def _cleanup(self): signal.signal(signal.SIGCHLD, self._oldsigchldhandler) self._sock.close() + self._mainipc.close() + self._workeripc.close() self._unlinksocket() # don't kill child processes as they have active clients, just wait self._reapworkers(0) @@ -543,6 +552,8 @@ selector = selectors.DefaultSelector() selector.register(self._sock, selectors.EVENT_READ, self._acceptnewconnection) + selector.register(self._mainipc, selectors.EVENT_READ, + self._handlemainipc) while True: if not exiting and h.shouldexit(): # clients can no longer connect() to the domain socket, so @@ -592,8 +603,10 @@ try: selector.close() sock.close() + self._mainipc.close() self._runworker(conn) conn.close() + self._workeripc.close() os._exit(0) except: # never return, hence no re-raises try: @@ -601,6 +614,17 @@ finally: os._exit(255) + def _handlemainipc(self, sock, selector): + """Process messages sent from a worker""" + try: + path = sock.recv(32768) # large enough to receive path + except socket.error as inst: + if inst.args[0] == errno.EINTR: + return + raise + + self.ui.log(b'cmdserver', b'repository: %s\n', path) + def _sigchldhandler(self, signal, frame): self._reapworkers(os.WNOHANG) @@ -628,6 +652,22 @@ h = self._servicehandler try: _serverequest(self.ui, self.repo, conn, h.createcmdserver, - prereposetups=None) # TODO: pass in hook functions + prereposetups=[self._reposetup]) finally: gc.collect() # trigger __del__ since worker process uses os._exit + + def _reposetup(self, ui, repo): + if not repo.local(): + return + + class unixcmdserverrepo(repo.__class__): + def close(self): + super(unixcmdserverrepo, self).close() + try: + self._cmdserveripc.send(self.root) + except socket.error: + self.ui.log(b'cmdserver', + b'failed to send repo root to master\n') + + repo.__class__ = unixcmdserverrepo + repo._cmdserveripc = self._workeripc diff -r 038108a9811c -r 042ed354b9eb tests/test-chg.t --- a/tests/test-chg.t Thu Dec 13 23:20:28 2018 -0800 +++ b/tests/test-chg.t Wed Oct 31 22:19:03 2018 +0900 @@ -230,7 +230,6 @@ preserved: $ cat log/server.log.1 log/server.log | tail -10 | filterlog - YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...) YYYY/MM/DD HH:MM:SS (PID)> setprocname: ... YYYY/MM/DD HH:MM:SS (PID)> received fds: ... YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload' @@ -238,5 +237,6 @@ YYYY/MM/DD HH:MM:SS (PID)> setenv: ... YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ... YYYY/MM/DD HH:MM:SS (PID)> validate: [] + YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...) YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.