hgext/remotefilelog/fileserverclient.py
changeset 40495 3a333a582d7b
child 40502 6d64e2abe8d3
equal deleted inserted replaced
40494:9aeb9e2d28a7 40495:3a333a582d7b
       
     1 # fileserverclient.py - client for communicating with the cache process
       
     2 #
       
     3 # Copyright 2013 Facebook, Inc.
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 from __future__ import absolute_import
       
     9 
       
    10 import hashlib
       
    11 import io
       
    12 import os
       
    13 import struct
       
    14 import threading
       
    15 import time
       
    16 
       
    17 from mercurial.i18n import _
       
    18 from mercurial.node import bin, hex, nullid
       
    19 from mercurial import (
       
    20     error,
       
    21     revlog,
       
    22     sshpeer,
       
    23     util,
       
    24     wireprotov1peer,
       
    25 )
       
    26 from mercurial.utils import procutil
       
    27 
       
    28 from . import (
       
    29     constants,
       
    30     contentstore,
       
    31     lz4wrapper,
       
    32     metadatastore,
       
    33     shallowutil,
       
    34     wirepack,
       
    35 )
       
    36 
       
    37 _sshv1peer = sshpeer.sshv1peer
       
    38 
       
    39 # Statistics for debugging
       
    40 fetchcost = 0
       
    41 fetches = 0
       
    42 fetched = 0
       
    43 fetchmisses = 0
       
    44 
       
    45 _lfsmod = None
       
    46 _downloading = _('downloading')
       
    47 
       
    48 def getcachekey(reponame, file, id):
       
    49     pathhash = hashlib.sha1(file).hexdigest()
       
    50     return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
       
    51 
       
    52 def getlocalkey(file, id):
       
    53     pathhash = hashlib.sha1(file).hexdigest()
       
    54     return os.path.join(pathhash, id)
       
    55 
       
    56 def peersetup(ui, peer):
       
    57 
       
    58     class remotefilepeer(peer.__class__):
       
    59         @wireprotov1peer.batchable
       
    60         def getfile(self, file, node):
       
    61             if not self.capable('getfile'):
       
    62                 raise error.Abort(
       
    63                     'configured remotefile server does not support getfile')
       
    64             f = wireprotov1peer.future()
       
    65             yield {'file': file, 'node': node}, f
       
    66             code, data = f.value.split('\0', 1)
       
    67             if int(code):
       
    68                 raise error.LookupError(file, node, data)
       
    69             yield data
       
    70 
       
    71         @wireprotov1peer.batchable
       
    72         def getflogheads(self, path):
       
    73             if not self.capable('getflogheads'):
       
    74                 raise error.Abort('configured remotefile server does not '
       
    75                                   'support getflogheads')
       
    76             f = wireprotov1peer.future()
       
    77             yield {'path': path}, f
       
    78             heads = f.value.split('\n') if f.value else []
       
    79             yield heads
       
    80 
       
    81         def _updatecallstreamopts(self, command, opts):
       
    82             if command != 'getbundle':
       
    83                 return
       
    84             if 'remotefilelog' not in self.capabilities():
       
    85                 return
       
    86             if not util.safehasattr(self, '_localrepo'):
       
    87                 return
       
    88             if constants.REQUIREMENT not in self._localrepo.requirements:
       
    89                 return
       
    90 
       
    91             bundlecaps = opts.get('bundlecaps')
       
    92             if bundlecaps:
       
    93                 bundlecaps = [bundlecaps]
       
    94             else:
       
    95                 bundlecaps = []
       
    96 
       
    97             # shallow, includepattern, and excludepattern are a hacky way of
       
    98             # carrying over data from the local repo to this getbundle
       
    99             # command. We need to do it this way because bundle1 getbundle
       
   100             # doesn't provide any other place we can hook in to manipulate
       
   101             # getbundle args before it goes across the wire. Once we get rid
       
   102             # of bundle1, we can use bundle2's _pullbundle2extraprepare to
       
   103             # do this more cleanly.
       
   104             bundlecaps.append('remotefilelog')
       
   105             if self._localrepo.includepattern:
       
   106                 patterns = '\0'.join(self._localrepo.includepattern)
       
   107                 includecap = "includepattern=" + patterns
       
   108                 bundlecaps.append(includecap)
       
   109             if self._localrepo.excludepattern:
       
   110                 patterns = '\0'.join(self._localrepo.excludepattern)
       
   111                 excludecap = "excludepattern=" + patterns
       
   112                 bundlecaps.append(excludecap)
       
   113             opts['bundlecaps'] = ','.join(bundlecaps)
       
   114 
       
   115         def _sendrequest(self, command, args, **opts):
       
   116             self._updatecallstreamopts(command, args)
       
   117             return super(remotefilepeer, self)._sendrequest(command, args,
       
   118                                                             **opts)
       
   119 
       
   120         def _callstream(self, command, **opts):
       
   121             supertype = super(remotefilepeer, self)
       
   122             if not util.safehasattr(supertype, '_sendrequest'):
       
   123                 self._updatecallstreamopts(command, opts)
       
   124             return super(remotefilepeer, self)._callstream(command, **opts)
       
   125 
       
   126     peer.__class__ = remotefilepeer
       
   127 
       
   128 class cacheconnection(object):
       
   129     """The connection for communicating with the remote cache. Performs
       
   130     gets and sets by communicating with an external process that has the
       
   131     cache-specific implementation.
       
   132     """
       
   133     def __init__(self):
       
   134         self.pipeo = self.pipei = self.pipee = None
       
   135         self.subprocess = None
       
   136         self.connected = False
       
   137 
       
   138     def connect(self, cachecommand):
       
   139         if self.pipeo:
       
   140             raise error.Abort(_("cache connection already open"))
       
   141         self.pipei, self.pipeo, self.pipee, self.subprocess = \
       
   142             procutil.popen4(cachecommand)
       
   143         self.connected = True
       
   144 
       
   145     def close(self):
       
   146         def tryclose(pipe):
       
   147             try:
       
   148                 pipe.close()
       
   149             except Exception:
       
   150                 pass
       
   151         if self.connected:
       
   152             try:
       
   153                 self.pipei.write("exit\n")
       
   154             except Exception:
       
   155                 pass
       
   156             tryclose(self.pipei)
       
   157             self.pipei = None
       
   158             tryclose(self.pipeo)
       
   159             self.pipeo = None
       
   160             tryclose(self.pipee)
       
   161             self.pipee = None
       
   162             try:
       
   163                 # Wait for process to terminate, making sure to avoid deadlock.
       
   164                 # See https://docs.python.org/2/library/subprocess.html for
       
   165                 # warnings about wait() and deadlocking.
       
   166                 self.subprocess.communicate()
       
   167             except Exception:
       
   168                 pass
       
   169             self.subprocess = None
       
   170         self.connected = False
       
   171 
       
   172     def request(self, request, flush=True):
       
   173         if self.connected:
       
   174             try:
       
   175                 self.pipei.write(request)
       
   176                 if flush:
       
   177                     self.pipei.flush()
       
   178             except IOError:
       
   179                 self.close()
       
   180 
       
   181     def receiveline(self):
       
   182         if not self.connected:
       
   183             return None
       
   184         try:
       
   185             result = self.pipeo.readline()[:-1]
       
   186             if not result:
       
   187                 self.close()
       
   188         except IOError:
       
   189             self.close()
       
   190 
       
   191         return result
       
   192 
       
   193 def _getfilesbatch(
       
   194         remote, receivemissing, progresstick, missed, idmap, batchsize):
       
   195     # Over http(s), iterbatch is a streamy method and we can start
       
   196     # looking at results early. This means we send one (potentially
       
   197     # large) request, but then we show nice progress as we process
       
   198     # file results, rather than showing chunks of $batchsize in
       
   199     # progress.
       
   200     #
       
   201     # Over ssh, iterbatch isn't streamy because batch() wasn't
       
   202     # explicitly designed as a streaming method. In the future we
       
   203     # should probably introduce a streambatch() method upstream and
       
   204     # use that for this.
       
   205     with remote.commandexecutor() as e:
       
   206         futures = []
       
   207         for m in missed:
       
   208             futures.append(e.callcommand('getfile', {
       
   209                 'file': idmap[m],
       
   210                 'node': m[-40:]
       
   211             }))
       
   212 
       
   213         for i, m in enumerate(missed):
       
   214             r = futures[i].result()
       
   215             futures[i] = None  # release memory
       
   216             file_ = idmap[m]
       
   217             node = m[-40:]
       
   218             receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
       
   219             progresstick()
       
   220 
       
   221 def _getfiles_optimistic(
       
   222     remote, receivemissing, progresstick, missed, idmap, step):
       
   223     remote._callstream("getfiles")
       
   224     i = 0
       
   225     pipeo = remote._pipeo
       
   226     pipei = remote._pipei
       
   227     while i < len(missed):
       
   228         # issue a batch of requests
       
   229         start = i
       
   230         end = min(len(missed), start + step)
       
   231         i = end
       
   232         for missingid in missed[start:end]:
       
   233             # issue new request
       
   234             versionid = missingid[-40:]
       
   235             file = idmap[missingid]
       
   236             sshrequest = "%s%s\n" % (versionid, file)
       
   237             pipeo.write(sshrequest)
       
   238         pipeo.flush()
       
   239 
       
   240         # receive batch results
       
   241         for missingid in missed[start:end]:
       
   242             versionid = missingid[-40:]
       
   243             file = idmap[missingid]
       
   244             receivemissing(pipei, file, versionid)
       
   245             progresstick()
       
   246 
       
   247     # End the command
       
   248     pipeo.write('\n')
       
   249     pipeo.flush()
       
   250 
       
   251 def _getfiles_threaded(
       
   252     remote, receivemissing, progresstick, missed, idmap, step):
       
   253     remote._callstream("getfiles")
       
   254     pipeo = remote._pipeo
       
   255     pipei = remote._pipei
       
   256 
       
   257     def writer():
       
   258         for missingid in missed:
       
   259             versionid = missingid[-40:]
       
   260             file = idmap[missingid]
       
   261             sshrequest = "%s%s\n" % (versionid, file)
       
   262             pipeo.write(sshrequest)
       
   263         pipeo.flush()
       
   264     writerthread = threading.Thread(target=writer)
       
   265     writerthread.daemon = True
       
   266     writerthread.start()
       
   267 
       
   268     for missingid in missed:
       
   269         versionid = missingid[-40:]
       
   270         file = idmap[missingid]
       
   271         receivemissing(pipei, file, versionid)
       
   272         progresstick()
       
   273 
       
   274     writerthread.join()
       
   275     # End the command
       
   276     pipeo.write('\n')
       
   277     pipeo.flush()
       
   278 
       
   279 class fileserverclient(object):
       
   280     """A client for requesting files from the remote file server.
       
   281     """
       
   282     def __init__(self, repo):
       
   283         ui = repo.ui
       
   284         self.repo = repo
       
   285         self.ui = ui
       
   286         self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
       
   287         if self.cacheprocess:
       
   288             self.cacheprocess = util.expandpath(self.cacheprocess)
       
   289 
       
   290         # This option causes remotefilelog to pass the full file path to the
       
   291         # cacheprocess instead of a hashed key.
       
   292         self.cacheprocesspasspath = ui.configbool(
       
   293             "remotefilelog", "cacheprocess.includepath")
       
   294 
       
   295         self.debugoutput = ui.configbool("remotefilelog", "debug")
       
   296 
       
   297         self.remotecache = cacheconnection()
       
   298 
       
   299     def setstore(self, datastore, historystore, writedata, writehistory):
       
   300         self.datastore = datastore
       
   301         self.historystore = historystore
       
   302         self.writedata = writedata
       
   303         self.writehistory = writehistory
       
   304 
       
   305     def _connect(self):
       
   306         return self.repo.connectionpool.get(self.repo.fallbackpath)
       
   307 
       
   308     def request(self, fileids):
       
   309         """Takes a list of filename/node pairs and fetches them from the
       
   310         server. Files are stored in the local cache.
       
   311         A list of nodes that the server couldn't find is returned.
       
   312         If the connection fails, an exception is raised.
       
   313         """
       
   314         if not self.remotecache.connected:
       
   315             self.connect()
       
   316         cache = self.remotecache
       
   317         writedata = self.writedata
       
   318 
       
   319         if self.ui.configbool('remotefilelog', 'fetchpacks'):
       
   320             self.requestpack(fileids)
       
   321             return
       
   322 
       
   323         repo = self.repo
       
   324         count = len(fileids)
       
   325         request = "get\n%d\n" % count
       
   326         idmap = {}
       
   327         reponame = repo.name
       
   328         for file, id in fileids:
       
   329             fullid = getcachekey(reponame, file, id)
       
   330             if self.cacheprocesspasspath:
       
   331                 request += file + '\0'
       
   332             request += fullid + "\n"
       
   333             idmap[fullid] = file
       
   334 
       
   335         cache.request(request)
       
   336 
       
   337         total = count
       
   338         self.ui.progress(_downloading, 0, total=count)
       
   339 
       
   340         missed = []
       
   341         count = 0
       
   342         while True:
       
   343             missingid = cache.receiveline()
       
   344             if not missingid:
       
   345                 missedset = set(missed)
       
   346                 for missingid in idmap.iterkeys():
       
   347                     if not missingid in missedset:
       
   348                         missed.append(missingid)
       
   349                 self.ui.warn(_("warning: cache connection closed early - " +
       
   350                     "falling back to server\n"))
       
   351                 break
       
   352             if missingid == "0":
       
   353                 break
       
   354             if missingid.startswith("_hits_"):
       
   355                 # receive progress reports
       
   356                 parts = missingid.split("_")
       
   357                 count += int(parts[2])
       
   358                 self.ui.progress(_downloading, count, total=total)
       
   359                 continue
       
   360 
       
   361             missed.append(missingid)
       
   362 
       
   363         global fetchmisses
       
   364         fetchmisses += len(missed)
       
   365 
       
   366         count = [total - len(missed)]
       
   367         fromcache = count[0]
       
   368         self.ui.progress(_downloading, count[0], total=total)
       
   369         self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
       
   370                     count[0], total, hit=count[0], total=total)
       
   371 
       
   372         oldumask = os.umask(0o002)
       
   373         try:
       
   374             # receive cache misses from master
       
   375             if missed:
       
   376                 def progresstick():
       
   377                     count[0] += 1
       
   378                     self.ui.progress(_downloading, count[0], total=total)
       
   379                 # When verbose is true, sshpeer prints 'running ssh...'
       
   380                 # to stdout, which can interfere with some command
       
   381                 # outputs
       
   382                 verbose = self.ui.verbose
       
   383                 self.ui.verbose = False
       
   384                 try:
       
   385                     with self._connect() as conn:
       
   386                         remote = conn.peer
       
   387                         # TODO: deduplicate this with the constant in
       
   388                         #       shallowrepo
       
   389                         if remote.capable("remotefilelog"):
       
   390                             if not isinstance(remote, _sshv1peer):
       
   391                                 raise error.Abort('remotefilelog requires ssh '
       
   392                                                   'servers')
       
   393                             step = self.ui.configint('remotefilelog',
       
   394                                                      'getfilesstep')
       
   395                             getfilestype = self.ui.config('remotefilelog',
       
   396                                                           'getfilestype')
       
   397                             if getfilestype == 'threaded':
       
   398                                 _getfiles = _getfiles_threaded
       
   399                             else:
       
   400                                 _getfiles = _getfiles_optimistic
       
   401                             _getfiles(remote, self.receivemissing, progresstick,
       
   402                                       missed, idmap, step)
       
   403                         elif remote.capable("getfile"):
       
   404                             if remote.capable('batch'):
       
   405                                 batchdefault = 100
       
   406                             else:
       
   407                                 batchdefault = 10
       
   408                             batchsize = self.ui.configint(
       
   409                                 'remotefilelog', 'batchsize', batchdefault)
       
   410                             _getfilesbatch(
       
   411                                 remote, self.receivemissing, progresstick,
       
   412                                 missed, idmap, batchsize)
       
   413                         else:
       
   414                             raise error.Abort("configured remotefilelog server"
       
   415                                              " does not support remotefilelog")
       
   416 
       
   417                     self.ui.log("remotefilefetchlog",
       
   418                                 "Success\n",
       
   419                                 fetched_files = count[0] - fromcache,
       
   420                                 total_to_fetch = total - fromcache)
       
   421                 except Exception:
       
   422                     self.ui.log("remotefilefetchlog",
       
   423                                 "Fail\n",
       
   424                                 fetched_files = count[0] - fromcache,
       
   425                                 total_to_fetch = total - fromcache)
       
   426                     raise
       
   427                 finally:
       
   428                     self.ui.verbose = verbose
       
   429                 # send to memcache
       
   430                 count[0] = len(missed)
       
   431                 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
       
   432                 cache.request(request)
       
   433 
       
   434             self.ui.progress(_downloading, None)
       
   435 
       
   436             # mark ourselves as a user of this cache
       
   437             writedata.markrepo(self.repo.path)
       
   438         finally:
       
   439             os.umask(oldumask)
       
   440 
       
   441     def receivemissing(self, pipe, filename, node):
       
   442         line = pipe.readline()[:-1]
       
   443         if not line:
       
   444             raise error.ResponseError(_("error downloading file contents:"),
       
   445                                       _("connection closed early"))
       
   446         size = int(line)
       
   447         data = pipe.read(size)
       
   448         if len(data) != size:
       
   449             raise error.ResponseError(_("error downloading file contents:"),
       
   450                                       _("only received %s of %s bytes")
       
   451                                       % (len(data), size))
       
   452 
       
   453         self.writedata.addremotefilelognode(filename, bin(node),
       
   454                                              lz4wrapper.lz4decompress(data))
       
   455 
       
   456     def requestpack(self, fileids):
       
   457         """Requests the given file revisions from the server in a pack format.
       
   458 
       
   459         See `remotefilelogserver.getpack` for the file format.
       
   460         """
       
   461         try:
       
   462             with self._connect() as conn:
       
   463                 total = len(fileids)
       
   464                 rcvd = 0
       
   465 
       
   466                 remote = conn.peer
       
   467                 remote._callstream("getpackv1")
       
   468 
       
   469                 self._sendpackrequest(remote, fileids)
       
   470 
       
   471                 packpath = shallowutil.getcachepackpath(
       
   472                     self.repo, constants.FILEPACK_CATEGORY)
       
   473                 pipei = remote._pipei
       
   474                 receiveddata, receivedhistory = wirepack.receivepack(
       
   475                     self.repo.ui, pipei, packpath)
       
   476                 rcvd = len(receiveddata)
       
   477 
       
   478             self.ui.log("remotefilefetchlog",
       
   479                         "Success(pack)\n" if (rcvd==total) else "Fail(pack)\n",
       
   480                         fetched_files = rcvd,
       
   481                         total_to_fetch = total)
       
   482         except Exception:
       
   483             self.ui.log("remotefilefetchlog",
       
   484                         "Fail(pack)\n",
       
   485                         fetched_files = rcvd,
       
   486                         total_to_fetch = total)
       
   487             raise
       
   488 
       
   489     def _sendpackrequest(self, remote, fileids):
       
   490         """Formats and writes the given fileids to the remote as part of a
       
   491         getpackv1 call.
       
   492         """
       
   493         # Sort the requests by name, so we receive requests in batches by name
       
   494         grouped = {}
       
   495         for filename, node in fileids:
       
   496             grouped.setdefault(filename, set()).add(node)
       
   497 
       
   498         # Issue request
       
   499         pipeo = remote._pipeo
       
   500         for filename, nodes in grouped.iteritems():
       
   501             filenamelen = struct.pack(constants.FILENAMESTRUCT, len(filename))
       
   502             countlen = struct.pack(constants.PACKREQUESTCOUNTSTRUCT, len(nodes))
       
   503             rawnodes = ''.join(bin(n) for n in nodes)
       
   504 
       
   505             pipeo.write('%s%s%s%s' % (filenamelen, filename, countlen,
       
   506                                       rawnodes))
       
   507             pipeo.flush()
       
   508         pipeo.write(struct.pack(constants.FILENAMESTRUCT, 0))
       
   509         pipeo.flush()
       
   510 
       
   511     def connect(self):
       
   512         if self.cacheprocess:
       
   513             cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
       
   514             self.remotecache.connect(cmd)
       
   515         else:
       
   516             # If no cache process is specified, we fake one that always
       
   517             # returns cache misses.  This enables tests to run easily
       
   518             # and may eventually allow us to be a drop in replacement
       
   519             # for the largefiles extension.
       
   520             class simplecache(object):
       
   521                 def __init__(self):
       
   522                     self.missingids = []
       
   523                     self.connected = True
       
   524 
       
   525                 def close(self):
       
   526                     pass
       
   527 
       
   528                 def request(self, value, flush=True):
       
   529                     lines = value.split("\n")
       
   530                     if lines[0] != "get":
       
   531                         return
       
   532                     self.missingids = lines[2:-1]
       
   533                     self.missingids.append('0')
       
   534 
       
   535                 def receiveline(self):
       
   536                     if len(self.missingids) > 0:
       
   537                         return self.missingids.pop(0)
       
   538                     return None
       
   539 
       
   540             self.remotecache = simplecache()
       
   541 
       
   542     def close(self):
       
   543         if fetches:
       
   544             msg = ("%s files fetched over %d fetches - " +
       
   545                    "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
       
   546                        fetched,
       
   547                        fetches,
       
   548                        fetchmisses,
       
   549                        float(fetched - fetchmisses) / float(fetched) * 100.0,
       
   550                        fetchcost)
       
   551             if self.debugoutput:
       
   552                 self.ui.warn(msg)
       
   553             self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
       
   554                 remotefilelogfetched=fetched,
       
   555                 remotefilelogfetches=fetches,
       
   556                 remotefilelogfetchmisses=fetchmisses,
       
   557                 remotefilelogfetchtime=fetchcost * 1000)
       
   558 
       
   559         if self.remotecache.connected:
       
   560             self.remotecache.close()
       
   561 
       
   562     def prefetch(self, fileids, force=False, fetchdata=True,
       
   563                  fetchhistory=False):
       
   564         """downloads the given file versions to the cache
       
   565         """
       
   566         repo = self.repo
       
   567         idstocheck = []
       
   568         for file, id in fileids:
       
   569             # hack
       
   570             # - we don't use .hgtags
       
   571             # - workingctx produces ids with length 42,
       
   572             #   which we skip since they aren't in any cache
       
   573             if (file == '.hgtags' or len(id) == 42
       
   574                 or not repo.shallowmatch(file)):
       
   575                 continue
       
   576 
       
   577             idstocheck.append((file, bin(id)))
       
   578 
       
   579         datastore = self.datastore
       
   580         historystore = self.historystore
       
   581         if force:
       
   582             datastore = contentstore.unioncontentstore(*repo.shareddatastores)
       
   583             historystore = metadatastore.unionmetadatastore(
       
   584                 *repo.sharedhistorystores)
       
   585 
       
   586         missingids = set()
       
   587         if fetchdata:
       
   588             missingids.update(datastore.getmissing(idstocheck))
       
   589         if fetchhistory:
       
   590             missingids.update(historystore.getmissing(idstocheck))
       
   591 
       
   592         # partition missing nodes into nullid and not-nullid so we can
       
   593         # warn about this filtering potentially shadowing bugs.
       
   594         nullids = len([None for unused, id in missingids if id == nullid])
       
   595         if nullids:
       
   596             missingids = [(f, id) for f, id in missingids if id != nullid]
       
   597             repo.ui.develwarn(
       
   598                 ('remotefilelog not fetching %d null revs'
       
   599                  ' - this is likely hiding bugs' % nullids),
       
   600                 config='remotefilelog-ext')
       
   601         if missingids:
       
   602             global fetches, fetched, fetchcost
       
   603             fetches += 1
       
   604 
       
   605             # We want to be able to detect excess individual file downloads, so
       
   606             # let's log that information for debugging.
       
   607             if fetches >= 15 and fetches < 18:
       
   608                 if fetches == 15:
       
   609                     fetchwarning = self.ui.config('remotefilelog',
       
   610                                                   'fetchwarning')
       
   611                     if fetchwarning:
       
   612                         self.ui.warn(fetchwarning + '\n')
       
   613                 self.logstacktrace()
       
   614             missingids = [(file, hex(id)) for file, id in missingids]
       
   615             fetched += len(missingids)
       
   616             start = time.time()
       
   617             missingids = self.request(missingids)
       
   618             if missingids:
       
   619                 raise error.Abort(_("unable to download %d files") %
       
   620                                   len(missingids))
       
   621             fetchcost += time.time() - start
       
   622             self._lfsprefetch(fileids)
       
   623 
       
   624     def _lfsprefetch(self, fileids):
       
   625         if not _lfsmod or not util.safehasattr(
       
   626                 self.repo.svfs, 'lfslocalblobstore'):
       
   627             return
       
   628         if not _lfsmod.wrapper.candownload(self.repo):
       
   629             return
       
   630         pointers = []
       
   631         store = self.repo.svfs.lfslocalblobstore
       
   632         for file, id in fileids:
       
   633             node = bin(id)
       
   634             rlog = self.repo.file(file)
       
   635             if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
       
   636                 text = rlog.revision(node, raw=True)
       
   637                 p = _lfsmod.pointer.deserialize(text)
       
   638                 oid = p.oid()
       
   639                 if not store.has(oid):
       
   640                     pointers.append(p)
       
   641         if len(pointers) > 0:
       
   642             self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
       
   643             assert all(store.has(p.oid()) for p in pointers)
       
   644 
       
   645     def logstacktrace(self):
       
   646         import traceback
       
   647         self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
       
   648                     ''.join(traceback.format_stack()))