changeset 40495 3a333a582d7b
child 40502 6d64e2abe8d3
equal deleted inserted replaced
40494:9aeb9e2d28a7 40495:3a333a582d7b
     1 # fileserverclient.py - client for communicating with the cache process
     2 #
     3 # Copyright 2013 Facebook, Inc.
     4 #
     5 # This software may be used and distributed according to the terms of the
     6 # GNU General Public License version 2 or any later version.
     8 from __future__ import absolute_import
    10 import hashlib
    11 import io
    12 import os
    13 import struct
    14 import threading
    15 import time
    17 from mercurial.i18n import _
    18 from mercurial.node import bin, hex, nullid
    19 from mercurial import (
    20     error,
    21     revlog,
    22     sshpeer,
    23     util,
    24     wireprotov1peer,
    25 )
    26 from mercurial.utils import procutil
    28 from . import (
    29     constants,
    30     contentstore,
    31     lz4wrapper,
    32     metadatastore,
    33     shallowutil,
    34     wirepack,
    35 )
    37 _sshv1peer = sshpeer.sshv1peer
    39 # Statistics for debugging
    40 fetchcost = 0
    41 fetches = 0
    42 fetched = 0
    43 fetchmisses = 0
    45 _lfsmod = None
    46 _downloading = _('downloading')
    48 def getcachekey(reponame, file, id):
    49     pathhash = hashlib.sha1(file).hexdigest()
    50     return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
    52 def getlocalkey(file, id):
    53     pathhash = hashlib.sha1(file).hexdigest()
    54     return os.path.join(pathhash, id)
    56 def peersetup(ui, peer):
    58     class remotefilepeer(peer.__class__):
    59         @wireprotov1peer.batchable
    60         def getfile(self, file, node):
    61             if not self.capable('getfile'):
    62                 raise error.Abort(
    63                     'configured remotefile server does not support getfile')
    64             f = wireprotov1peer.future()
    65             yield {'file': file, 'node': node}, f
    66             code, data = f.value.split('\0', 1)
    67             if int(code):
    68                 raise error.LookupError(file, node, data)
    69             yield data
    71         @wireprotov1peer.batchable
    72         def getflogheads(self, path):
    73             if not self.capable('getflogheads'):
    74                 raise error.Abort('configured remotefile server does not '
    75                                   'support getflogheads')
    76             f = wireprotov1peer.future()
    77             yield {'path': path}, f
    78             heads = f.value.split('\n') if f.value else []
    79             yield heads
    81         def _updatecallstreamopts(self, command, opts):
    82             if command != 'getbundle':
    83                 return
    84             if 'remotefilelog' not in self.capabilities():
    85                 return
    86             if not util.safehasattr(self, '_localrepo'):
    87                 return
    88             if constants.REQUIREMENT not in self._localrepo.requirements:
    89                 return
    91             bundlecaps = opts.get('bundlecaps')
    92             if bundlecaps:
    93                 bundlecaps = [bundlecaps]
    94             else:
    95                 bundlecaps = []
    97             # shallow, includepattern, and excludepattern are a hacky way of
    98             # carrying over data from the local repo to this getbundle
    99             # command. We need to do it this way because bundle1 getbundle
   100             # doesn't provide any other place we can hook in to manipulate
   101             # getbundle args before it goes across the wire. Once we get rid
   102             # of bundle1, we can use bundle2's _pullbundle2extraprepare to
   103             # do this more cleanly.
   104             bundlecaps.append('remotefilelog')
   105             if self._localrepo.includepattern:
   106                 patterns = '\0'.join(self._localrepo.includepattern)
   107                 includecap = "includepattern=" + patterns
   108                 bundlecaps.append(includecap)
   109             if self._localrepo.excludepattern:
   110                 patterns = '\0'.join(self._localrepo.excludepattern)
   111                 excludecap = "excludepattern=" + patterns
   112                 bundlecaps.append(excludecap)
   113             opts['bundlecaps'] = ','.join(bundlecaps)
   115         def _sendrequest(self, command, args, **opts):
   116             self._updatecallstreamopts(command, args)
   117             return super(remotefilepeer, self)._sendrequest(command, args,
   118                                                             **opts)
   120         def _callstream(self, command, **opts):
   121             supertype = super(remotefilepeer, self)
   122             if not util.safehasattr(supertype, '_sendrequest'):
   123                 self._updatecallstreamopts(command, opts)
   124             return super(remotefilepeer, self)._callstream(command, **opts)
   126     peer.__class__ = remotefilepeer
   128 class cacheconnection(object):
   129     """The connection for communicating with the remote cache. Performs
   130     gets and sets by communicating with an external process that has the
   131     cache-specific implementation.
   132     """
   133     def __init__(self):
   134         self.pipeo = self.pipei = self.pipee = None
   135         self.subprocess = None
   136         self.connected = False
   138     def connect(self, cachecommand):
   139         if self.pipeo:
   140             raise error.Abort(_("cache connection already open"))
   141         self.pipei, self.pipeo, self.pipee, self.subprocess = \
   142             procutil.popen4(cachecommand)
   143         self.connected = True
   145     def close(self):
   146         def tryclose(pipe):
   147             try:
   148                 pipe.close()
   149             except Exception:
   150                 pass
   151         if self.connected:
   152             try:
   153                 self.pipei.write("exit\n")
   154             except Exception:
   155                 pass
   156             tryclose(self.pipei)
   157             self.pipei = None
   158             tryclose(self.pipeo)
   159             self.pipeo = None
   160             tryclose(self.pipee)
   161             self.pipee = None
   162             try:
   163                 # Wait for process to terminate, making sure to avoid deadlock.
   164                 # See https://docs.python.org/2/library/subprocess.html for
   165                 # warnings about wait() and deadlocking.
   166                 self.subprocess.communicate()
   167             except Exception:
   168                 pass
   169             self.subprocess = None
   170         self.connected = False
   172     def request(self, request, flush=True):
   173         if self.connected:
   174             try:
   175                 self.pipei.write(request)
   176                 if flush:
   177                     self.pipei.flush()
   178             except IOError:
   179                 self.close()
   181     def receiveline(self):
   182         if not self.connected:
   183             return None
   184         try:
   185             result = self.pipeo.readline()[:-1]
   186             if not result:
   187                 self.close()
   188         except IOError:
   189             self.close()
   191         return result
   193 def _getfilesbatch(
   194         remote, receivemissing, progresstick, missed, idmap, batchsize):
   195     # Over http(s), iterbatch is a streamy method and we can start
   196     # looking at results early. This means we send one (potentially
   197     # large) request, but then we show nice progress as we process
   198     # file results, rather than showing chunks of $batchsize in
   199     # progress.
   200     #
   201     # Over ssh, iterbatch isn't streamy because batch() wasn't
   202     # explicitly designed as a streaming method. In the future we
   203     # should probably introduce a streambatch() method upstream and
   204     # use that for this.
   205     with remote.commandexecutor() as e:
   206         futures = []
   207         for m in missed:
   208             futures.append(e.callcommand('getfile', {
   209                 'file': idmap[m],
   210                 'node': m[-40:]
   211             }))
   213         for i, m in enumerate(missed):
   214             r = futures[i].result()
   215             futures[i] = None  # release memory
   216             file_ = idmap[m]
   217             node = m[-40:]
   218             receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
   219             progresstick()
   221 def _getfiles_optimistic(
   222     remote, receivemissing, progresstick, missed, idmap, step):
   223     remote._callstream("getfiles")
   224     i = 0
   225     pipeo = remote._pipeo
   226     pipei = remote._pipei
   227     while i < len(missed):
   228         # issue a batch of requests
   229         start = i
   230         end = min(len(missed), start + step)
   231         i = end
   232         for missingid in missed[start:end]:
   233             # issue new request
   234             versionid = missingid[-40:]
   235             file = idmap[missingid]
   236             sshrequest = "%s%s\n" % (versionid, file)
   237             pipeo.write(sshrequest)
   238         pipeo.flush()
   240         # receive batch results
   241         for missingid in missed[start:end]:
   242             versionid = missingid[-40:]
   243             file = idmap[missingid]
   244             receivemissing(pipei, file, versionid)
   245             progresstick()
   247     # End the command
   248     pipeo.write('\n')
   249     pipeo.flush()
   251 def _getfiles_threaded(
   252     remote, receivemissing, progresstick, missed, idmap, step):
   253     remote._callstream("getfiles")
   254     pipeo = remote._pipeo
   255     pipei = remote._pipei
   257     def writer():
   258         for missingid in missed:
   259             versionid = missingid[-40:]
   260             file = idmap[missingid]
   261             sshrequest = "%s%s\n" % (versionid, file)
   262             pipeo.write(sshrequest)
   263         pipeo.flush()
   264     writerthread = threading.Thread(target=writer)
   265     writerthread.daemon = True
   266     writerthread.start()
   268     for missingid in missed:
   269         versionid = missingid[-40:]
   270         file = idmap[missingid]
   271         receivemissing(pipei, file, versionid)
   272         progresstick()
   274     writerthread.join()
   275     # End the command
   276     pipeo.write('\n')
   277     pipeo.flush()
   279 class fileserverclient(object):
   280     """A client for requesting files from the remote file server.
   281     """
   282     def __init__(self, repo):
   283         ui = repo.ui
   284         self.repo = repo
   285         self.ui = ui
   286         self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
   287         if self.cacheprocess:
   288             self.cacheprocess = util.expandpath(self.cacheprocess)
   290         # This option causes remotefilelog to pass the full file path to the
   291         # cacheprocess instead of a hashed key.
   292         self.cacheprocesspasspath = ui.configbool(
   293             "remotefilelog", "cacheprocess.includepath")
   295         self.debugoutput = ui.configbool("remotefilelog", "debug")
   297         self.remotecache = cacheconnection()
   299     def setstore(self, datastore, historystore, writedata, writehistory):
   300         self.datastore = datastore
   301         self.historystore = historystore
   302         self.writedata = writedata
   303         self.writehistory = writehistory
   305     def _connect(self):
   306         return self.repo.connectionpool.get(self.repo.fallbackpath)
   308     def request(self, fileids):
   309         """Takes a list of filename/node pairs and fetches them from the
   310         server. Files are stored in the local cache.
   311         A list of nodes that the server couldn't find is returned.
   312         If the connection fails, an exception is raised.
   313         """
   314         if not self.remotecache.connected:
   315             self.connect()
   316         cache = self.remotecache
   317         writedata = self.writedata
   319         if self.ui.configbool('remotefilelog', 'fetchpacks'):
   320             self.requestpack(fileids)
   321             return
   323         repo = self.repo
   324         count = len(fileids)
   325         request = "get\n%d\n" % count
   326         idmap = {}
   327         reponame = repo.name
   328         for file, id in fileids:
   329             fullid = getcachekey(reponame, file, id)
   330             if self.cacheprocesspasspath:
   331                 request += file + '\0'
   332             request += fullid + "\n"
   333             idmap[fullid] = file
   335         cache.request(request)
   337         total = count
   338         self.ui.progress(_downloading, 0, total=count)
   340         missed = []
   341         count = 0
   342         while True:
   343             missingid = cache.receiveline()
   344             if not missingid:
   345                 missedset = set(missed)
   346                 for missingid in idmap.iterkeys():
   347                     if not missingid in missedset:
   348                         missed.append(missingid)
   349                 self.ui.warn(_("warning: cache connection closed early - " +
   350                     "falling back to server\n"))
   351                 break
   352             if missingid == "0":
   353                 break
   354             if missingid.startswith("_hits_"):
   355                 # receive progress reports
   356                 parts = missingid.split("_")
   357                 count += int(parts[2])
   358                 self.ui.progress(_downloading, count, total=total)
   359                 continue
   361             missed.append(missingid)
   363         global fetchmisses
   364         fetchmisses += len(missed)
   366         count = [total - len(missed)]
   367         fromcache = count[0]
   368         self.ui.progress(_downloading, count[0], total=total)
   369         self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
   370                     count[0], total, hit=count[0], total=total)
   372         oldumask = os.umask(0o002)
   373         try:
   374             # receive cache misses from master
   375             if missed:
   376                 def progresstick():
   377                     count[0] += 1
   378                     self.ui.progress(_downloading, count[0], total=total)
   379                 # When verbose is true, sshpeer prints 'running ssh...'
   380                 # to stdout, which can interfere with some command
   381                 # outputs
   382                 verbose = self.ui.verbose
   383                 self.ui.verbose = False
   384                 try:
   385                     with self._connect() as conn:
   386                         remote = conn.peer
   387                         # TODO: deduplicate this with the constant in
   388                         #       shallowrepo
   389                         if remote.capable("remotefilelog"):
   390                             if not isinstance(remote, _sshv1peer):
   391                                 raise error.Abort('remotefilelog requires ssh '
   392                                                   'servers')
   393                             step = self.ui.configint('remotefilelog',
   394                                                      'getfilesstep')
   395                             getfilestype = self.ui.config('remotefilelog',
   396                                                           'getfilestype')
   397                             if getfilestype == 'threaded':
   398                                 _getfiles = _getfiles_threaded
   399                             else:
   400                                 _getfiles = _getfiles_optimistic
   401                             _getfiles(remote, self.receivemissing, progresstick,
   402                                       missed, idmap, step)
   403                         elif remote.capable("getfile"):
   404                             if remote.capable('batch'):
   405                                 batchdefault = 100
   406                             else:
   407                                 batchdefault = 10
   408                             batchsize = self.ui.configint(
   409                                 'remotefilelog', 'batchsize', batchdefault)
   410                             _getfilesbatch(
   411                                 remote, self.receivemissing, progresstick,
   412                                 missed, idmap, batchsize)
   413                         else:
   414                             raise error.Abort("configured remotefilelog server"
   415                                              " does not support remotefilelog")
   417                     self.ui.log("remotefilefetchlog",
   418                                 "Success\n",
   419                                 fetched_files = count[0] - fromcache,
   420                                 total_to_fetch = total - fromcache)
   421                 except Exception:
   422                     self.ui.log("remotefilefetchlog",
   423                                 "Fail\n",
   424                                 fetched_files = count[0] - fromcache,
   425                                 total_to_fetch = total - fromcache)
   426                     raise
   427                 finally:
   428                     self.ui.verbose = verbose
   429                 # send to memcache
   430                 count[0] = len(missed)
   431                 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
   432                 cache.request(request)
   434             self.ui.progress(_downloading, None)
   436             # mark ourselves as a user of this cache
   437             writedata.markrepo(self.repo.path)
   438         finally:
   439             os.umask(oldumask)
   441     def receivemissing(self, pipe, filename, node):
   442         line = pipe.readline()[:-1]
   443         if not line:
   444             raise error.ResponseError(_("error downloading file contents:"),
   445                                       _("connection closed early"))
   446         size = int(line)
   447         data = pipe.read(size)
   448         if len(data) != size:
   449             raise error.ResponseError(_("error downloading file contents:"),
   450                                       _("only received %s of %s bytes")
   451                                       % (len(data), size))
   453         self.writedata.addremotefilelognode(filename, bin(node),
   454                                              lz4wrapper.lz4decompress(data))
   456     def requestpack(self, fileids):
   457         """Requests the given file revisions from the server in a pack format.
   459         See `remotefilelogserver.getpack` for the file format.
   460         """
   461         try:
   462             with self._connect() as conn:
   463                 total = len(fileids)
   464                 rcvd = 0
   466                 remote = conn.peer
   467                 remote._callstream("getpackv1")
   469                 self._sendpackrequest(remote, fileids)
   471                 packpath = shallowutil.getcachepackpath(
   472                     self.repo, constants.FILEPACK_CATEGORY)
   473                 pipei = remote._pipei
   474                 receiveddata, receivedhistory = wirepack.receivepack(
   475                     self.repo.ui, pipei, packpath)
   476                 rcvd = len(receiveddata)
   478             self.ui.log("remotefilefetchlog",
   479                         "Success(pack)\n" if (rcvd==total) else "Fail(pack)\n",
   480                         fetched_files = rcvd,
   481                         total_to_fetch = total)
   482         except Exception:
   483             self.ui.log("remotefilefetchlog",
   484                         "Fail(pack)\n",
   485                         fetched_files = rcvd,
   486                         total_to_fetch = total)
   487             raise
   489     def _sendpackrequest(self, remote, fileids):
   490         """Formats and writes the given fileids to the remote as part of a
   491         getpackv1 call.
   492         """
   493         # Sort the requests by name, so we receive requests in batches by name
   494         grouped = {}
   495         for filename, node in fileids:
   496             grouped.setdefault(filename, set()).add(node)
   498         # Issue request
   499         pipeo = remote._pipeo
   500         for filename, nodes in grouped.iteritems():
   501             filenamelen = struct.pack(constants.FILENAMESTRUCT, len(filename))
   502             countlen = struct.pack(constants.PACKREQUESTCOUNTSTRUCT, len(nodes))
   503             rawnodes = ''.join(bin(n) for n in nodes)
   505             pipeo.write('%s%s%s%s' % (filenamelen, filename, countlen,
   506                                       rawnodes))
   507             pipeo.flush()
   508         pipeo.write(struct.pack(constants.FILENAMESTRUCT, 0))
   509         pipeo.flush()
   511     def connect(self):
   512         if self.cacheprocess:
   513             cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
   514             self.remotecache.connect(cmd)
   515         else:
   516             # If no cache process is specified, we fake one that always
   517             # returns cache misses.  This enables tests to run easily
   518             # and may eventually allow us to be a drop in replacement
   519             # for the largefiles extension.
   520             class simplecache(object):
   521                 def __init__(self):
   522                     self.missingids = []
   523                     self.connected = True
   525                 def close(self):
   526                     pass
   528                 def request(self, value, flush=True):
   529                     lines = value.split("\n")
   530                     if lines[0] != "get":
   531                         return
   532                     self.missingids = lines[2:-1]
   533                     self.missingids.append('0')
   535                 def receiveline(self):
   536                     if len(self.missingids) > 0:
   537                         return self.missingids.pop(0)
   538                     return None
   540             self.remotecache = simplecache()
   542     def close(self):
   543         if fetches:
   544             msg = ("%s files fetched over %d fetches - " +
   545                    "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
   546                        fetched,
   547                        fetches,
   548                        fetchmisses,
   549                        float(fetched - fetchmisses) / float(fetched) * 100.0,
   550                        fetchcost)
   551             if self.debugoutput:
   552                 self.ui.warn(msg)
   553             self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
   554                 remotefilelogfetched=fetched,
   555                 remotefilelogfetches=fetches,
   556                 remotefilelogfetchmisses=fetchmisses,
   557                 remotefilelogfetchtime=fetchcost * 1000)
   559         if self.remotecache.connected:
   560             self.remotecache.close()
   562     def prefetch(self, fileids, force=False, fetchdata=True,
   563                  fetchhistory=False):
   564         """downloads the given file versions to the cache
   565         """
   566         repo = self.repo
   567         idstocheck = []
   568         for file, id in fileids:
   569             # hack
   570             # - we don't use .hgtags
   571             # - workingctx produces ids with length 42,
   572             #   which we skip since they aren't in any cache
   573             if (file == '.hgtags' or len(id) == 42
   574                 or not repo.shallowmatch(file)):
   575                 continue
   577             idstocheck.append((file, bin(id)))
   579         datastore = self.datastore
   580         historystore = self.historystore
   581         if force:
   582             datastore = contentstore.unioncontentstore(*repo.shareddatastores)
   583             historystore = metadatastore.unionmetadatastore(
   584                 *repo.sharedhistorystores)
   586         missingids = set()
   587         if fetchdata:
   588             missingids.update(datastore.getmissing(idstocheck))
   589         if fetchhistory:
   590             missingids.update(historystore.getmissing(idstocheck))
   592         # partition missing nodes into nullid and not-nullid so we can
   593         # warn about this filtering potentially shadowing bugs.
   594         nullids = len([None for unused, id in missingids if id == nullid])
   595         if nullids:
   596             missingids = [(f, id) for f, id in missingids if id != nullid]
   597             repo.ui.develwarn(
   598                 ('remotefilelog not fetching %d null revs'
   599                  ' - this is likely hiding bugs' % nullids),
   600                 config='remotefilelog-ext')
   601         if missingids:
   602             global fetches, fetched, fetchcost
   603             fetches += 1
   605             # We want to be able to detect excess individual file downloads, so
   606             # let's log that information for debugging.
   607             if fetches >= 15 and fetches < 18:
   608                 if fetches == 15:
   609                     fetchwarning = self.ui.config('remotefilelog',
   610                                                   'fetchwarning')
   611                     if fetchwarning:
   612                         self.ui.warn(fetchwarning + '\n')
   613                 self.logstacktrace()
   614             missingids = [(file, hex(id)) for file, id in missingids]
   615             fetched += len(missingids)
   616             start = time.time()
   617             missingids = self.request(missingids)
   618             if missingids:
   619                 raise error.Abort(_("unable to download %d files") %
   620                                   len(missingids))
   621             fetchcost += time.time() - start
   622             self._lfsprefetch(fileids)
   624     def _lfsprefetch(self, fileids):
   625         if not _lfsmod or not util.safehasattr(
   626                 self.repo.svfs, 'lfslocalblobstore'):
   627             return
   628         if not _lfsmod.wrapper.candownload(self.repo):
   629             return
   630         pointers = []
   631         store = self.repo.svfs.lfslocalblobstore
   632         for file, id in fileids:
   633             node = bin(id)
   634             rlog = self.repo.file(file)
   635             if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
   636                 text = rlog.revision(node, raw=True)
   637                 p = _lfsmod.pointer.deserialize(text)
   638                 oid = p.oid()
   639                 if not store.has(oid):
   640                     pointers.append(p)
   641         if len(pointers) > 0:
   642             self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
   643             assert all(store.has(p.oid()) for p in pointers)
   645     def logstacktrace(self):
   646         import traceback
   647         self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
   648                     ''.join(traceback.format_stack()))