mercurial/commandserver.py
changeset 43076 2372284d9457
parent 41284 b0e3f2d7c143
child 43077 687b865b95ad
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
    16 import struct
    16 import struct
    17 import traceback
    17 import traceback
    18 
    18 
    19 try:
    19 try:
    20     import selectors
    20     import selectors
       
    21 
    21     selectors.BaseSelector
    22     selectors.BaseSelector
    22 except ImportError:
    23 except ImportError:
    23     from .thirdparty import selectors2 as selectors
    24     from .thirdparty import selectors2 as selectors
    24 
    25 
    25 from .i18n import _
    26 from .i18n import _
    35 from .utils import (
    36 from .utils import (
    36     cborutil,
    37     cborutil,
    37     procutil,
    38     procutil,
    38 )
    39 )
    39 
    40 
       
    41 
    40 class channeledoutput(object):
    42 class channeledoutput(object):
    41     """
    43     """
    42     Write data to out in the following format:
    44     Write data to out in the following format:
    43 
    45 
    44     data length (unsigned int),
    46     data length (unsigned int),
    45     data
    47     data
    46     """
    48     """
       
    49 
    47     def __init__(self, out, channel):
    50     def __init__(self, out, channel):
    48         self.out = out
    51         self.out = out
    49         self.channel = channel
    52         self.channel = channel
    50 
    53 
    51     @property
    54     @property
    62     def __getattr__(self, attr):
    65     def __getattr__(self, attr):
    63         if attr in (r'isatty', r'fileno', r'tell', r'seek'):
    66         if attr in (r'isatty', r'fileno', r'tell', r'seek'):
    64             raise AttributeError(attr)
    67             raise AttributeError(attr)
    65         return getattr(self.out, attr)
    68         return getattr(self.out, attr)
    66 
    69 
       
    70 
    67 class channeledmessage(object):
    71 class channeledmessage(object):
    68     """
    72     """
    69     Write encoded message and metadata to out in the following format:
    73     Write encoded message and metadata to out in the following format:
    70 
    74 
    71     data length (unsigned int),
    75     data length (unsigned int),
    89             opts[b'data'] = data
    93             opts[b'data'] = data
    90         self._cout.write(self._encodefn(opts))
    94         self._cout.write(self._encodefn(opts))
    91 
    95 
    92     def __getattr__(self, attr):
    96     def __getattr__(self, attr):
    93         return getattr(self._cout, attr)
    97         return getattr(self._cout, attr)
       
    98 
    94 
    99 
    95 class channeledinput(object):
   100 class channeledinput(object):
    96     """
   101     """
    97     Read data from in_.
   102     Read data from in_.
    98 
   103 
   176     def __getattr__(self, attr):
   181     def __getattr__(self, attr):
   177         if attr in (r'isatty', r'fileno', r'tell', r'seek'):
   182         if attr in (r'isatty', r'fileno', r'tell', r'seek'):
   178             raise AttributeError(attr)
   183             raise AttributeError(attr)
   179         return getattr(self.in_, attr)
   184         return getattr(self.in_, attr)
   180 
   185 
       
   186 
   181 _messageencoders = {
   187 _messageencoders = {
   182     b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
   188     b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
   183 }
   189 }
       
   190 
   184 
   191 
   185 def _selectmessageencoder(ui):
   192 def _selectmessageencoder(ui):
   186     # experimental config: cmdserver.message-encodings
   193     # experimental config: cmdserver.message-encodings
   187     encnames = ui.configlist(b'cmdserver', b'message-encodings')
   194     encnames = ui.configlist(b'cmdserver', b'message-encodings')
   188     for n in encnames:
   195     for n in encnames:
   189         f = _messageencoders.get(n)
   196         f = _messageencoders.get(n)
   190         if f:
   197         if f:
   191             return n, f
   198             return n, f
   192     raise error.Abort(b'no supported message encodings: %s'
   199     raise error.Abort(
   193                       % b' '.join(encnames))
   200         b'no supported message encodings: %s' % b' '.join(encnames)
       
   201     )
       
   202 
   194 
   203 
   195 class server(object):
   204 class server(object):
   196     """
   205     """
   197     Listens for commands on fin, runs them and writes the output on a channel
   206     Listens for commands on fin, runs them and writes the output on a channel
   198     based stream to fout.
   207     based stream to fout.
   199     """
   208     """
       
   209 
   200     def __init__(self, ui, repo, fin, fout, prereposetups=None):
   210     def __init__(self, ui, repo, fin, fout, prereposetups=None):
   201         self.cwd = encoding.getcwd()
   211         self.cwd = encoding.getcwd()
   202 
   212 
   203         if repo:
   213         if repo:
   204             # the ui here is really the repo ui so take its baseui so we don't
   214             # the ui here is really the repo ui so take its baseui so we don't
   280         uis = [copiedui]
   290         uis = [copiedui]
   281         if self.repo:
   291         if self.repo:
   282             self.repo.baseui = copiedui
   292             self.repo.baseui = copiedui
   283             # clone ui without using ui.copy because this is protected
   293             # clone ui without using ui.copy because this is protected
   284             repoui = self.repoui.__class__(self.repoui)
   294             repoui = self.repoui.__class__(self.repoui)
   285             repoui.copy = copiedui.copy # redo copy protection
   295             repoui.copy = copiedui.copy  # redo copy protection
   286             uis.append(repoui)
   296             uis.append(repoui)
   287             self.repo.ui = self.repo.dirstate._ui = repoui
   297             self.repo.ui = self.repo.dirstate._ui = repoui
   288             self.repo.invalidateall()
   298             self.repo.invalidateall()
   289 
   299 
   290         for ui in uis:
   300         for ui in uis:
   293             # replace channels by fully functional tty files. so nontty is
   303             # replace channels by fully functional tty files. so nontty is
   294             # enforced only if cin is a channel.
   304             # enforced only if cin is a channel.
   295             if not util.safehasattr(self.cin, 'fileno'):
   305             if not util.safehasattr(self.cin, 'fileno'):
   296                 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
   306                 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
   297 
   307 
   298         req = dispatch.request(args[:], copiedui, self.repo, self.cin,
   308         req = dispatch.request(
   299                                self.cout, self.cerr, self.cmsg,
   309             args[:],
   300                                prereposetups=self._prereposetups)
   310             copiedui,
       
   311             self.repo,
       
   312             self.cin,
       
   313             self.cout,
       
   314             self.cerr,
       
   315             self.cmsg,
       
   316             prereposetups=self._prereposetups,
       
   317         )
   301 
   318 
   302         try:
   319         try:
   303             ret = dispatch.dispatch(req) & 255
   320             ret = dispatch.dispatch(req) & 255
   304             self.cresult.write(struct.pack('>i', int(ret)))
   321             self.cresult.write(struct.pack('>i', int(ret)))
   305         finally:
   322         finally:
   322                 # looking at the servers capabilities
   339                 # looking at the servers capabilities
   323                 raise error.Abort(_('unknown command %s') % cmd)
   340                 raise error.Abort(_('unknown command %s') % cmd)
   324 
   341 
   325         return cmd != ''
   342         return cmd != ''
   326 
   343 
   327     capabilities = {'runcommand': runcommand,
   344     capabilities = {'runcommand': runcommand, 'getencoding': getencoding}
   328                     'getencoding': getencoding}
       
   329 
   345 
   330     def serve(self):
   346     def serve(self):
   331         hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
   347         hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
   332         hellomsg += '\n'
   348         hellomsg += '\n'
   333         hellomsg += 'encoding: ' + encoding.encoding
   349         hellomsg += 'encoding: ' + encoding.encoding
   350             # its request
   366             # its request
   351             return 1
   367             return 1
   352 
   368 
   353         return 0
   369         return 0
   354 
   370 
       
   371 
   355 def setuplogging(ui, repo=None, fp=None):
   372 def setuplogging(ui, repo=None, fp=None):
   356     """Set up server logging facility
   373     """Set up server logging facility
   357 
   374 
   358     If cmdserver.log is '-', log messages will be sent to the given fp.
   375     If cmdserver.log is '-', log messages will be sent to the given fp.
   359     It should be the 'd' channel while a client is connected, and otherwise
   376     It should be the 'd' channel while a client is connected, and otherwise
   375         # developer config: cmdserver.max-log-files
   392         # developer config: cmdserver.max-log-files
   376         maxfiles = ui.configint(b'cmdserver', b'max-log-files')
   393         maxfiles = ui.configint(b'cmdserver', b'max-log-files')
   377         # developer config: cmdserver.max-log-size
   394         # developer config: cmdserver.max-log-size
   378         maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
   395         maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
   379         vfs = vfsmod.vfs(os.path.dirname(logpath))
   396         vfs = vfsmod.vfs(os.path.dirname(logpath))
   380         logger = loggingutil.filelogger(vfs, os.path.basename(logpath), tracked,
   397         logger = loggingutil.filelogger(
   381                                         maxfiles=maxfiles, maxsize=maxsize)
   398             vfs,
       
   399             os.path.basename(logpath),
       
   400             tracked,
       
   401             maxfiles=maxfiles,
       
   402             maxsize=maxsize,
       
   403         )
   382 
   404 
   383     targetuis = {ui}
   405     targetuis = {ui}
   384     if repo:
   406     if repo:
   385         targetuis.add(repo.baseui)
   407         targetuis.add(repo.baseui)
   386         targetuis.add(repo.ui)
   408         targetuis.add(repo.ui)
   387     for u in targetuis:
   409     for u in targetuis:
   388         u.setlogger(b'cmdserver', logger)
   410         u.setlogger(b'cmdserver', logger)
       
   411 
   389 
   412 
   390 class pipeservice(object):
   413 class pipeservice(object):
   391     def __init__(self, ui, repo, opts):
   414     def __init__(self, ui, repo, opts):
   392         self.ui = ui
   415         self.ui = ui
   393         self.repo = repo
   416         self.repo = repo
   403             sv = server(ui, self.repo, fin, fout)
   426             sv = server(ui, self.repo, fin, fout)
   404             try:
   427             try:
   405                 return sv.serve()
   428                 return sv.serve()
   406             finally:
   429             finally:
   407                 sv.cleanup()
   430                 sv.cleanup()
       
   431 
   408 
   432 
   409 def _initworkerprocess():
   433 def _initworkerprocess():
   410     # use a different process group from the master process, in order to:
   434     # use a different process group from the master process, in order to:
   411     # 1. make the current process group no longer "orphaned" (because the
   435     # 1. make the current process group no longer "orphaned" (because the
   412     #    parent of this process is in a different process group while
   436     #    parent of this process is in a different process group while
   420     #    unrelated processes.
   444     #    unrelated processes.
   421     os.setpgid(0, 0)
   445     os.setpgid(0, 0)
   422     # change random state otherwise forked request handlers would have a
   446     # change random state otherwise forked request handlers would have a
   423     # same state inherited from parent.
   447     # same state inherited from parent.
   424     random.seed()
   448     random.seed()
       
   449 
   425 
   450 
   426 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
   451 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
   427     fin = conn.makefile(r'rb')
   452     fin = conn.makefile(r'rb')
   428     fout = conn.makefile(r'wb')
   453     fout = conn.makefile(r'wb')
   429     sv = None
   454     sv = None
   440                 raise
   465                 raise
   441         except KeyboardInterrupt:
   466         except KeyboardInterrupt:
   442             pass
   467             pass
   443         finally:
   468         finally:
   444             sv.cleanup()
   469             sv.cleanup()
   445     except: # re-raises
   470     except:  # re-raises
   446         # also write traceback to error channel. otherwise client cannot
   471         # also write traceback to error channel. otherwise client cannot
   447         # see it because it is written to server's stderr by default.
   472         # see it because it is written to server's stderr by default.
   448         if sv:
   473         if sv:
   449             cerr = sv.cerr
   474             cerr = sv.cerr
   450         else:
   475         else:
   457             fout.close()  # implicit flush() may cause another EPIPE
   482             fout.close()  # implicit flush() may cause another EPIPE
   458         except IOError as inst:
   483         except IOError as inst:
   459             if inst.errno != errno.EPIPE:
   484             if inst.errno != errno.EPIPE:
   460                 raise
   485                 raise
   461 
   486 
       
   487 
   462 class unixservicehandler(object):
   488 class unixservicehandler(object):
   463     """Set of pluggable operations for unix-mode services
   489     """Set of pluggable operations for unix-mode services
   464 
   490 
   465     Almost all methods except for createcmdserver() are called in the main
   491     Almost all methods except for createcmdserver() are called in the main
   466     process. You can't pass mutable resource back from createcmdserver().
   492     process. You can't pass mutable resource back from createcmdserver().
   489 
   515 
   490     def createcmdserver(self, repo, conn, fin, fout, prereposetups):
   516     def createcmdserver(self, repo, conn, fin, fout, prereposetups):
   491         """Create new command server instance; called in the process that
   517         """Create new command server instance; called in the process that
   492         serves for the current connection"""
   518         serves for the current connection"""
   493         return server(self.ui, repo, fin, fout, prereposetups)
   519         return server(self.ui, repo, fin, fout, prereposetups)
       
   520 
   494 
   521 
   495 class unixforkingservice(object):
   522 class unixforkingservice(object):
   496     """
   523     """
   497     Listens on unix domain socket and forks server per connection
   524     Listens on unix domain socket and forks server per connection
   498     """
   525     """
   556 
   583 
   557     def _mainloop(self):
   584     def _mainloop(self):
   558         exiting = False
   585         exiting = False
   559         h = self._servicehandler
   586         h = self._servicehandler
   560         selector = selectors.DefaultSelector()
   587         selector = selectors.DefaultSelector()
   561         selector.register(self._sock, selectors.EVENT_READ,
   588         selector.register(
   562                           self._acceptnewconnection)
   589             self._sock, selectors.EVENT_READ, self._acceptnewconnection
   563         selector.register(self._mainipc, selectors.EVENT_READ,
   590         )
   564                           self._handlemainipc)
   591         selector.register(
       
   592             self._mainipc, selectors.EVENT_READ, self._handlemainipc
       
   593         )
   565         while True:
   594         while True:
   566             if not exiting and h.shouldexit():
   595             if not exiting and h.shouldexit():
   567                 # clients can no longer connect() to the domain socket, so
   596                 # clients can no longer connect() to the domain socket, so
   568                 # we stop queuing new requests.
   597                 # we stop queuing new requests.
   569                 # for requests that are queued (connect()-ed, but haven't been
   598                 # for requests that are queued (connect()-ed, but haven't been
   603         # https://instagram-engineering.com/
   632         # https://instagram-engineering.com/
   604         #   copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
   633         #   copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
   605         pid = os.fork()
   634         pid = os.fork()
   606         if pid:
   635         if pid:
   607             try:
   636             try:
   608                 self.ui.log(b'cmdserver', b'forked worker process (pid=%d)\n',
   637                 self.ui.log(
   609                             pid)
   638                     b'cmdserver', b'forked worker process (pid=%d)\n', pid
       
   639                 )
   610                 self._workerpids.add(pid)
   640                 self._workerpids.add(pid)
   611                 h.newconnection()
   641                 h.newconnection()
   612             finally:
   642             finally:
   613                 conn.close()  # release handle in parent process
   643                 conn.close()  # release handle in parent process
   614         else:
   644         else:
   660     def _runworker(self, conn):
   690     def _runworker(self, conn):
   661         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
   691         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
   662         _initworkerprocess()
   692         _initworkerprocess()
   663         h = self._servicehandler
   693         h = self._servicehandler
   664         try:
   694         try:
   665             _serverequest(self.ui, self.repo, conn, h.createcmdserver,
   695             _serverequest(
   666                           prereposetups=[self._reposetup])
   696                 self.ui,
       
   697                 self.repo,
       
   698                 conn,
       
   699                 h.createcmdserver,
       
   700                 prereposetups=[self._reposetup],
       
   701             )
   667         finally:
   702         finally:
   668             gc.collect()  # trigger __del__ since worker process uses os._exit
   703             gc.collect()  # trigger __del__ since worker process uses os._exit
   669 
   704 
   670     def _reposetup(self, ui, repo):
   705     def _reposetup(self, ui, repo):
   671         if not repo.local():
   706         if not repo.local():
   675             def close(self):
   710             def close(self):
   676                 super(unixcmdserverrepo, self).close()
   711                 super(unixcmdserverrepo, self).close()
   677                 try:
   712                 try:
   678                     self._cmdserveripc.send(self.root)
   713                     self._cmdserveripc.send(self.root)
   679                 except socket.error:
   714                 except socket.error:
   680                     self.ui.log(b'cmdserver',
   715                     self.ui.log(
   681                                 b'failed to send repo root to master\n')
   716                         b'cmdserver', b'failed to send repo root to master\n'
       
   717                     )
   682 
   718 
   683         repo.__class__ = unixcmdserverrepo
   719         repo.__class__ = unixcmdserverrepo
   684         repo._cmdserveripc = self._workeripc
   720         repo._cmdserveripc = self._workeripc
   685 
   721 
   686         cachedrepo = self._repoloader.get(repo.root)
   722         cachedrepo = self._repoloader.get(repo.root)