hgext/remotefilelog/fileserverclient.py
changeset 43076 2372284d9457
parent 42771 fdeb4c1d23d5
child 43077 687b865b95ad
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
    41 fetched = 0
    41 fetched = 0
    42 fetchmisses = 0
    42 fetchmisses = 0
    43 
    43 
    44 _lfsmod = None
    44 _lfsmod = None
    45 
    45 
       
    46 
    46 def getcachekey(reponame, file, id):
    47 def getcachekey(reponame, file, id):
    47     pathhash = node.hex(hashlib.sha1(file).digest())
    48     pathhash = node.hex(hashlib.sha1(file).digest())
    48     return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
    49     return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
    49 
    50 
       
    51 
    50 def getlocalkey(file, id):
    52 def getlocalkey(file, id):
    51     pathhash = node.hex(hashlib.sha1(file).digest())
    53     pathhash = node.hex(hashlib.sha1(file).digest())
    52     return os.path.join(pathhash, id)
    54     return os.path.join(pathhash, id)
    53 
    55 
       
    56 
    54 def peersetup(ui, peer):
    57 def peersetup(ui, peer):
    55 
       
    56     class remotefilepeer(peer.__class__):
    58     class remotefilepeer(peer.__class__):
    57         @wireprotov1peer.batchable
    59         @wireprotov1peer.batchable
    58         def x_rfl_getfile(self, file, node):
    60         def x_rfl_getfile(self, file, node):
    59             if not self.capable('x_rfl_getfile'):
    61             if not self.capable('x_rfl_getfile'):
    60                 raise error.Abort(
    62                 raise error.Abort(
    61                     'configured remotefile server does not support getfile')
    63                     'configured remotefile server does not support getfile'
       
    64                 )
    62             f = wireprotov1peer.future()
    65             f = wireprotov1peer.future()
    63             yield {'file': file, 'node': node}, f
    66             yield {'file': file, 'node': node}, f
    64             code, data = f.value.split('\0', 1)
    67             code, data = f.value.split('\0', 1)
    65             if int(code):
    68             if int(code):
    66                 raise error.LookupError(file, node, data)
    69                 raise error.LookupError(file, node, data)
    67             yield data
    70             yield data
    68 
    71 
    69         @wireprotov1peer.batchable
    72         @wireprotov1peer.batchable
    70         def x_rfl_getflogheads(self, path):
    73         def x_rfl_getflogheads(self, path):
    71             if not self.capable('x_rfl_getflogheads'):
    74             if not self.capable('x_rfl_getflogheads'):
    72                 raise error.Abort('configured remotefile server does not '
    75                 raise error.Abort(
    73                                   'support getflogheads')
    76                     'configured remotefile server does not '
       
    77                     'support getflogheads'
       
    78                 )
    74             f = wireprotov1peer.future()
    79             f = wireprotov1peer.future()
    75             yield {'path': path}, f
    80             yield {'path': path}, f
    76             heads = f.value.split('\n') if f.value else []
    81             heads = f.value.split('\n') if f.value else []
    77             yield heads
    82             yield heads
    78 
    83 
    79         def _updatecallstreamopts(self, command, opts):
    84         def _updatecallstreamopts(self, command, opts):
    80             if command != 'getbundle':
    85             if command != 'getbundle':
    81                 return
    86                 return
    82             if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
    87             if (
    83                 not in self.capabilities()):
    88                 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
       
    89                 not in self.capabilities()
       
    90             ):
    84                 return
    91                 return
    85             if not util.safehasattr(self, '_localrepo'):
    92             if not util.safehasattr(self, '_localrepo'):
    86                 return
    93                 return
    87             if (constants.SHALLOWREPO_REQUIREMENT
    94             if (
    88                 not in self._localrepo.requirements):
    95                 constants.SHALLOWREPO_REQUIREMENT
       
    96                 not in self._localrepo.requirements
       
    97             ):
    89                 return
    98                 return
    90 
    99 
    91             bundlecaps = opts.get('bundlecaps')
   100             bundlecaps = opts.get('bundlecaps')
    92             if bundlecaps:
   101             if bundlecaps:
    93                 bundlecaps = [bundlecaps]
   102                 bundlecaps = [bundlecaps]
   112                 bundlecaps.append(excludecap)
   121                 bundlecaps.append(excludecap)
   113             opts['bundlecaps'] = ','.join(bundlecaps)
   122             opts['bundlecaps'] = ','.join(bundlecaps)
   114 
   123 
   115         def _sendrequest(self, command, args, **opts):
   124         def _sendrequest(self, command, args, **opts):
   116             self._updatecallstreamopts(command, args)
   125             self._updatecallstreamopts(command, args)
   117             return super(remotefilepeer, self)._sendrequest(command, args,
   126             return super(remotefilepeer, self)._sendrequest(
   118                                                             **opts)
   127                 command, args, **opts
       
   128             )
   119 
   129 
   120         def _callstream(self, command, **opts):
   130         def _callstream(self, command, **opts):
   121             supertype = super(remotefilepeer, self)
   131             supertype = super(remotefilepeer, self)
   122             if not util.safehasattr(supertype, '_sendrequest'):
   132             if not util.safehasattr(supertype, '_sendrequest'):
   123                 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
   133                 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
   124             return super(remotefilepeer, self)._callstream(command, **opts)
   134             return super(remotefilepeer, self)._callstream(command, **opts)
   125 
   135 
   126     peer.__class__ = remotefilepeer
   136     peer.__class__ = remotefilepeer
   127 
   137 
       
   138 
   128 class cacheconnection(object):
   139 class cacheconnection(object):
   129     """The connection for communicating with the remote cache. Performs
   140     """The connection for communicating with the remote cache. Performs
   130     gets and sets by communicating with an external process that has the
   141     gets and sets by communicating with an external process that has the
   131     cache-specific implementation.
   142     cache-specific implementation.
   132     """
   143     """
       
   144 
   133     def __init__(self):
   145     def __init__(self):
   134         self.pipeo = self.pipei = self.pipee = None
   146         self.pipeo = self.pipei = self.pipee = None
   135         self.subprocess = None
   147         self.subprocess = None
   136         self.connected = False
   148         self.connected = False
   137 
   149 
   138     def connect(self, cachecommand):
   150     def connect(self, cachecommand):
   139         if self.pipeo:
   151         if self.pipeo:
   140             raise error.Abort(_("cache connection already open"))
   152             raise error.Abort(_("cache connection already open"))
   141         self.pipei, self.pipeo, self.pipee, self.subprocess = (
   153         self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4(
   142             procutil.popen4(cachecommand))
   154             cachecommand
       
   155         )
   143         self.connected = True
   156         self.connected = True
   144 
   157 
   145     def close(self):
   158     def close(self):
   146         def tryclose(pipe):
   159         def tryclose(pipe):
   147             try:
   160             try:
   148                 pipe.close()
   161                 pipe.close()
   149             except Exception:
   162             except Exception:
   150                 pass
   163                 pass
       
   164 
   151         if self.connected:
   165         if self.connected:
   152             try:
   166             try:
   153                 self.pipei.write("exit\n")
   167                 self.pipei.write("exit\n")
   154             except Exception:
   168             except Exception:
   155                 pass
   169                 pass
   188         except IOError:
   202         except IOError:
   189             self.close()
   203             self.close()
   190 
   204 
   191         return result
   205         return result
   192 
   206 
       
   207 
   193 def _getfilesbatch(
   208 def _getfilesbatch(
   194         remote, receivemissing, progresstick, missed, idmap, batchsize):
   209     remote, receivemissing, progresstick, missed, idmap, batchsize
       
   210 ):
   195     # Over http(s), iterbatch is a streamy method and we can start
   211     # Over http(s), iterbatch is a streamy method and we can start
   196     # looking at results early. This means we send one (potentially
   212     # looking at results early. This means we send one (potentially
   197     # large) request, but then we show nice progress as we process
   213     # large) request, but then we show nice progress as we process
   198     # file results, rather than showing chunks of $batchsize in
   214     # file results, rather than showing chunks of $batchsize in
   199     # progress.
   215     # progress.
   203     # should probably introduce a streambatch() method upstream and
   219     # should probably introduce a streambatch() method upstream and
   204     # use that for this.
   220     # use that for this.
   205     with remote.commandexecutor() as e:
   221     with remote.commandexecutor() as e:
   206         futures = []
   222         futures = []
   207         for m in missed:
   223         for m in missed:
   208             futures.append(e.callcommand('x_rfl_getfile', {
   224             futures.append(
   209                 'file': idmap[m],
   225                 e.callcommand(
   210                 'node': m[-40:]
   226                     'x_rfl_getfile', {'file': idmap[m], 'node': m[-40:]}
   211             }))
   227                 )
       
   228             )
   212 
   229 
   213         for i, m in enumerate(missed):
   230         for i, m in enumerate(missed):
   214             r = futures[i].result()
   231             r = futures[i].result()
   215             futures[i] = None  # release memory
   232             futures[i] = None  # release memory
   216             file_ = idmap[m]
   233             file_ = idmap[m]
   217             node = m[-40:]
   234             node = m[-40:]
   218             receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
   235             receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
   219             progresstick()
   236             progresstick()
   220 
   237 
       
   238 
   221 def _getfiles_optimistic(
   239 def _getfiles_optimistic(
   222     remote, receivemissing, progresstick, missed, idmap, step):
   240     remote, receivemissing, progresstick, missed, idmap, step
       
   241 ):
   223     remote._callstream("x_rfl_getfiles")
   242     remote._callstream("x_rfl_getfiles")
   224     i = 0
   243     i = 0
   225     pipeo = remote._pipeo
   244     pipeo = remote._pipeo
   226     pipei = remote._pipei
   245     pipei = remote._pipei
   227     while i < len(missed):
   246     while i < len(missed):
   246 
   265 
   247     # End the command
   266     # End the command
   248     pipeo.write('\n')
   267     pipeo.write('\n')
   249     pipeo.flush()
   268     pipeo.flush()
   250 
   269 
       
   270 
   251 def _getfiles_threaded(
   271 def _getfiles_threaded(
   252     remote, receivemissing, progresstick, missed, idmap, step):
   272     remote, receivemissing, progresstick, missed, idmap, step
       
   273 ):
   253     remote._callstream("getfiles")
   274     remote._callstream("getfiles")
   254     pipeo = remote._pipeo
   275     pipeo = remote._pipeo
   255     pipei = remote._pipei
   276     pipei = remote._pipei
   256 
   277 
   257     def writer():
   278     def writer():
   259             versionid = missingid[-40:]
   280             versionid = missingid[-40:]
   260             file = idmap[missingid]
   281             file = idmap[missingid]
   261             sshrequest = "%s%s\n" % (versionid, file)
   282             sshrequest = "%s%s\n" % (versionid, file)
   262             pipeo.write(sshrequest)
   283             pipeo.write(sshrequest)
   263         pipeo.flush()
   284         pipeo.flush()
       
   285 
   264     writerthread = threading.Thread(target=writer)
   286     writerthread = threading.Thread(target=writer)
   265     writerthread.daemon = True
   287     writerthread.daemon = True
   266     writerthread.start()
   288     writerthread.start()
   267 
   289 
   268     for missingid in missed:
   290     for missingid in missed:
   274     writerthread.join()
   296     writerthread.join()
   275     # End the command
   297     # End the command
   276     pipeo.write('\n')
   298     pipeo.write('\n')
   277     pipeo.flush()
   299     pipeo.flush()
   278 
   300 
       
   301 
   279 class fileserverclient(object):
   302 class fileserverclient(object):
   280     """A client for requesting files from the remote file server.
   303     """A client for requesting files from the remote file server.
   281     """
   304     """
       
   305 
   282     def __init__(self, repo):
   306     def __init__(self, repo):
   283         ui = repo.ui
   307         ui = repo.ui
   284         self.repo = repo
   308         self.repo = repo
   285         self.ui = ui
   309         self.ui = ui
   286         self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
   310         self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
   288             self.cacheprocess = util.expandpath(self.cacheprocess)
   312             self.cacheprocess = util.expandpath(self.cacheprocess)
   289 
   313 
   290         # This option causes remotefilelog to pass the full file path to the
   314         # This option causes remotefilelog to pass the full file path to the
   291         # cacheprocess instead of a hashed key.
   315         # cacheprocess instead of a hashed key.
   292         self.cacheprocesspasspath = ui.configbool(
   316         self.cacheprocesspasspath = ui.configbool(
   293             "remotefilelog", "cacheprocess.includepath")
   317             "remotefilelog", "cacheprocess.includepath"
       
   318         )
   294 
   319 
   295         self.debugoutput = ui.configbool("remotefilelog", "debug")
   320         self.debugoutput = ui.configbool("remotefilelog", "debug")
   296 
   321 
   297         self.remotecache = cacheconnection()
   322         self.remotecache = cacheconnection()
   298 
   323 
   339             if not missingid:
   364             if not missingid:
   340                 missedset = set(missed)
   365                 missedset = set(missed)
   341                 for missingid in idmap:
   366                 for missingid in idmap:
   342                     if not missingid in missedset:
   367                     if not missingid in missedset:
   343                         missed.append(missingid)
   368                         missed.append(missingid)
   344                 self.ui.warn(_("warning: cache connection closed early - " +
   369                 self.ui.warn(
   345                     "falling back to server\n"))
   370                     _(
       
   371                         "warning: cache connection closed early - "
       
   372                         + "falling back to server\n"
       
   373                     )
       
   374                 )
   346                 break
   375                 break
   347             if missingid == "0":
   376             if missingid == "0":
   348                 break
   377                 break
   349             if missingid.startswith("_hits_"):
   378             if missingid.startswith("_hits_"):
   350                 # receive progress reports
   379                 # receive progress reports
   357         global fetchmisses
   386         global fetchmisses
   358         fetchmisses += len(missed)
   387         fetchmisses += len(missed)
   359 
   388 
   360         fromcache = total - len(missed)
   389         fromcache = total - len(missed)
   361         progress.update(fromcache, total=total)
   390         progress.update(fromcache, total=total)
   362         self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
   391         self.ui.log(
   363                     fromcache, total, hit=fromcache, total=total)
   392             "remotefilelog",
       
   393             "remote cache hit rate is %r of %r\n",
       
   394             fromcache,
       
   395             total,
       
   396             hit=fromcache,
       
   397             total=total,
       
   398         )
   364 
   399 
   365         oldumask = os.umask(0o002)
   400         oldumask = os.umask(0o002)
   366         try:
   401         try:
   367             # receive cache misses from master
   402             # receive cache misses from master
   368             if missed:
   403             if missed:
   373                 self.ui.verbose = False
   408                 self.ui.verbose = False
   374                 try:
   409                 try:
   375                     with self._connect() as conn:
   410                     with self._connect() as conn:
   376                         remote = conn.peer
   411                         remote = conn.peer
   377                         if remote.capable(
   412                         if remote.capable(
   378                                 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
   413                             constants.NETWORK_CAP_LEGACY_SSH_GETFILES
       
   414                         ):
   379                             if not isinstance(remote, _sshv1peer):
   415                             if not isinstance(remote, _sshv1peer):
   380                                 raise error.Abort('remotefilelog requires ssh '
   416                                 raise error.Abort(
   381                                                   'servers')
   417                                     'remotefilelog requires ssh ' 'servers'
   382                             step = self.ui.configint('remotefilelog',
   418                                 )
   383                                                      'getfilesstep')
   419                             step = self.ui.configint(
   384                             getfilestype = self.ui.config('remotefilelog',
   420                                 'remotefilelog', 'getfilesstep'
   385                                                           'getfilestype')
   421                             )
       
   422                             getfilestype = self.ui.config(
       
   423                                 'remotefilelog', 'getfilestype'
       
   424                             )
   386                             if getfilestype == 'threaded':
   425                             if getfilestype == 'threaded':
   387                                 _getfiles = _getfiles_threaded
   426                                 _getfiles = _getfiles_threaded
   388                             else:
   427                             else:
   389                                 _getfiles = _getfiles_optimistic
   428                                 _getfiles = _getfiles_optimistic
   390                             _getfiles(remote, self.receivemissing,
   429                             _getfiles(
   391                                       progress.increment, missed, idmap, step)
   430                                 remote,
       
   431                                 self.receivemissing,
       
   432                                 progress.increment,
       
   433                                 missed,
       
   434                                 idmap,
       
   435                                 step,
       
   436                             )
   392                         elif remote.capable("x_rfl_getfile"):
   437                         elif remote.capable("x_rfl_getfile"):
   393                             if remote.capable('batch'):
   438                             if remote.capable('batch'):
   394                                 batchdefault = 100
   439                                 batchdefault = 100
   395                             else:
   440                             else:
   396                                 batchdefault = 10
   441                                 batchdefault = 10
   397                             batchsize = self.ui.configint(
   442                             batchsize = self.ui.configint(
   398                                 'remotefilelog', 'batchsize', batchdefault)
   443                                 'remotefilelog', 'batchsize', batchdefault
       
   444                             )
   399                             self.ui.debug(
   445                             self.ui.debug(
   400                                 b'requesting %d files from '
   446                                 b'requesting %d files from '
   401                                 b'remotefilelog server...\n' % len(missed))
   447                                 b'remotefilelog server...\n' % len(missed)
       
   448                             )
   402                             _getfilesbatch(
   449                             _getfilesbatch(
   403                                 remote, self.receivemissing, progress.increment,
   450                                 remote,
   404                                 missed, idmap, batchsize)
   451                                 self.receivemissing,
       
   452                                 progress.increment,
       
   453                                 missed,
       
   454                                 idmap,
       
   455                                 batchsize,
       
   456                             )
   405                         else:
   457                         else:
   406                             raise error.Abort("configured remotefilelog server"
   458                             raise error.Abort(
   407                                              " does not support remotefilelog")
   459                                 "configured remotefilelog server"
   408 
   460                                 " does not support remotefilelog"
   409                     self.ui.log("remotefilefetchlog",
   461                             )
   410                                 "Success\n",
   462 
   411                                 fetched_files = progress.pos - fromcache,
   463                     self.ui.log(
   412                                 total_to_fetch = total - fromcache)
   464                         "remotefilefetchlog",
       
   465                         "Success\n",
       
   466                         fetched_files=progress.pos - fromcache,
       
   467                         total_to_fetch=total - fromcache,
       
   468                     )
   413                 except Exception:
   469                 except Exception:
   414                     self.ui.log("remotefilefetchlog",
   470                     self.ui.log(
   415                                 "Fail\n",
   471                         "remotefilefetchlog",
   416                                 fetched_files = progress.pos - fromcache,
   472                         "Fail\n",
   417                                 total_to_fetch = total - fromcache)
   473                         fetched_files=progress.pos - fromcache,
       
   474                         total_to_fetch=total - fromcache,
       
   475                     )
   418                     raise
   476                     raise
   419                 finally:
   477                 finally:
   420                     self.ui.verbose = verbose
   478                     self.ui.verbose = verbose
   421                 # send to memcache
   479                 # send to memcache
   422                 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
   480                 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
   430             os.umask(oldumask)
   488             os.umask(oldumask)
   431 
   489 
   432     def receivemissing(self, pipe, filename, node):
   490     def receivemissing(self, pipe, filename, node):
   433         line = pipe.readline()[:-1]
   491         line = pipe.readline()[:-1]
   434         if not line:
   492         if not line:
   435             raise error.ResponseError(_("error downloading file contents:"),
   493             raise error.ResponseError(
   436                                       _("connection closed early"))
   494                 _("error downloading file contents:"),
       
   495                 _("connection closed early"),
       
   496             )
   437         size = int(line)
   497         size = int(line)
   438         data = pipe.read(size)
   498         data = pipe.read(size)
   439         if len(data) != size:
   499         if len(data) != size:
   440             raise error.ResponseError(_("error downloading file contents:"),
   500             raise error.ResponseError(
   441                                       _("only received %s of %s bytes")
   501                 _("error downloading file contents:"),
   442                                       % (len(data), size))
   502                 _("only received %s of %s bytes") % (len(data), size),
   443 
   503             )
   444         self.writedata.addremotefilelognode(filename, bin(node),
   504 
   445                                              zlib.decompress(data))
   505         self.writedata.addremotefilelognode(
       
   506             filename, bin(node), zlib.decompress(data)
       
   507         )
   446 
   508 
   447     def connect(self):
   509     def connect(self):
   448         if self.cacheprocess:
   510         if self.cacheprocess:
   449             cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
   511             cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
   450             self.remotecache.connect(cmd)
   512             self.remotecache.connect(cmd)
   475 
   537 
   476             self.remotecache = simplecache()
   538             self.remotecache = simplecache()
   477 
   539 
   478     def close(self):
   540     def close(self):
   479         if fetches:
   541         if fetches:
   480             msg = ("%d files fetched over %d fetches - " +
   542             msg = (
   481                    "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
   543                 "%d files fetched over %d fetches - "
   482                        fetched,
   544                 + "(%d misses, %0.2f%% hit ratio) over %0.2fs\n"
   483                        fetches,
   545             ) % (
   484                        fetchmisses,
   546                 fetched,
   485                        float(fetched - fetchmisses) / float(fetched) * 100.0,
   547                 fetches,
   486                        fetchcost)
   548                 fetchmisses,
       
   549                 float(fetched - fetchmisses) / float(fetched) * 100.0,
       
   550                 fetchcost,
       
   551             )
   487             if self.debugoutput:
   552             if self.debugoutput:
   488                 self.ui.warn(msg)
   553                 self.ui.warn(msg)
   489             self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
   554             self.ui.log(
       
   555                 "remotefilelog.prefetch",
       
   556                 msg.replace("%", "%%"),
   490                 remotefilelogfetched=fetched,
   557                 remotefilelogfetched=fetched,
   491                 remotefilelogfetches=fetches,
   558                 remotefilelogfetches=fetches,
   492                 remotefilelogfetchmisses=fetchmisses,
   559                 remotefilelogfetchmisses=fetchmisses,
   493                 remotefilelogfetchtime=fetchcost * 1000)
   560                 remotefilelogfetchtime=fetchcost * 1000,
       
   561             )
   494 
   562 
   495         if self.remotecache.connected:
   563         if self.remotecache.connected:
   496             self.remotecache.close()
   564             self.remotecache.close()
   497 
   565 
   498     def prefetch(self, fileids, force=False, fetchdata=True,
   566     def prefetch(
   499                  fetchhistory=False):
   567         self, fileids, force=False, fetchdata=True, fetchhistory=False
       
   568     ):
   500         """downloads the given file versions to the cache
   569         """downloads the given file versions to the cache
   501         """
   570         """
   502         repo = self.repo
   571         repo = self.repo
   503         idstocheck = []
   572         idstocheck = []
   504         for file, id in fileids:
   573         for file, id in fileids:
   505             # hack
   574             # hack
   506             # - we don't use .hgtags
   575             # - we don't use .hgtags
   507             # - workingctx produces ids with length 42,
   576             # - workingctx produces ids with length 42,
   508             #   which we skip since they aren't in any cache
   577             #   which we skip since they aren't in any cache
   509             if (file == '.hgtags' or len(id) == 42
   578             if (
   510                 or not repo.shallowmatch(file)):
   579                 file == '.hgtags'
       
   580                 or len(id) == 42
       
   581                 or not repo.shallowmatch(file)
       
   582             ):
   511                 continue
   583                 continue
   512 
   584 
   513             idstocheck.append((file, bin(id)))
   585             idstocheck.append((file, bin(id)))
   514 
   586 
   515         datastore = self.datastore
   587         datastore = self.datastore
   516         historystore = self.historystore
   588         historystore = self.historystore
   517         if force:
   589         if force:
   518             datastore = contentstore.unioncontentstore(*repo.shareddatastores)
   590             datastore = contentstore.unioncontentstore(*repo.shareddatastores)
   519             historystore = metadatastore.unionmetadatastore(
   591             historystore = metadatastore.unionmetadatastore(
   520                 *repo.sharedhistorystores)
   592                 *repo.sharedhistorystores
       
   593             )
   521 
   594 
   522         missingids = set()
   595         missingids = set()
   523         if fetchdata:
   596         if fetchdata:
   524             missingids.update(datastore.getmissing(idstocheck))
   597             missingids.update(datastore.getmissing(idstocheck))
   525         if fetchhistory:
   598         if fetchhistory:
   529         # warn about this filtering potentially shadowing bugs.
   602         # warn about this filtering potentially shadowing bugs.
   530         nullids = len([None for unused, id in missingids if id == nullid])
   603         nullids = len([None for unused, id in missingids if id == nullid])
   531         if nullids:
   604         if nullids:
   532             missingids = [(f, id) for f, id in missingids if id != nullid]
   605             missingids = [(f, id) for f, id in missingids if id != nullid]
   533             repo.ui.develwarn(
   606             repo.ui.develwarn(
   534                 ('remotefilelog not fetching %d null revs'
   607                 (
   535                  ' - this is likely hiding bugs' % nullids),
   608                     'remotefilelog not fetching %d null revs'
   536                 config='remotefilelog-ext')
   609                     ' - this is likely hiding bugs' % nullids
       
   610                 ),
       
   611                 config='remotefilelog-ext',
       
   612             )
   537         if missingids:
   613         if missingids:
   538             global fetches, fetched, fetchcost
   614             global fetches, fetched, fetchcost
   539             fetches += 1
   615             fetches += 1
   540 
   616 
   541             # We want to be able to detect excess individual file downloads, so
   617             # We want to be able to detect excess individual file downloads, so
   542             # let's log that information for debugging.
   618             # let's log that information for debugging.
   543             if fetches >= 15 and fetches < 18:
   619             if fetches >= 15 and fetches < 18:
   544                 if fetches == 15:
   620                 if fetches == 15:
   545                     fetchwarning = self.ui.config('remotefilelog',
   621                     fetchwarning = self.ui.config(
   546                                                   'fetchwarning')
   622                         'remotefilelog', 'fetchwarning'
       
   623                     )
   547                     if fetchwarning:
   624                     if fetchwarning:
   548                         self.ui.warn(fetchwarning + '\n')
   625                         self.ui.warn(fetchwarning + '\n')
   549                 self.logstacktrace()
   626                 self.logstacktrace()
   550             missingids = [(file, hex(id)) for file, id in sorted(missingids)]
   627             missingids = [(file, hex(id)) for file, id in sorted(missingids)]
   551             fetched += len(missingids)
   628             fetched += len(missingids)
   552             start = time.time()
   629             start = time.time()
   553             missingids = self.request(missingids)
   630             missingids = self.request(missingids)
   554             if missingids:
   631             if missingids:
   555                 raise error.Abort(_("unable to download %d files") %
   632                 raise error.Abort(
   556                                   len(missingids))
   633                     _("unable to download %d files") % len(missingids)
       
   634                 )
   557             fetchcost += time.time() - start
   635             fetchcost += time.time() - start
   558             self._lfsprefetch(fileids)
   636             self._lfsprefetch(fileids)
   559 
   637 
   560     def _lfsprefetch(self, fileids):
   638     def _lfsprefetch(self, fileids):
   561         if not _lfsmod or not util.safehasattr(
   639         if not _lfsmod or not util.safehasattr(
   562                 self.repo.svfs, 'lfslocalblobstore'):
   640             self.repo.svfs, 'lfslocalblobstore'
       
   641         ):
   563             return
   642             return
   564         if not _lfsmod.wrapper.candownload(self.repo):
   643         if not _lfsmod.wrapper.candownload(self.repo):
   565             return
   644             return
   566         pointers = []
   645         pointers = []
   567         store = self.repo.svfs.lfslocalblobstore
   646         store = self.repo.svfs.lfslocalblobstore
   578             self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
   657             self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
   579             assert all(store.has(p.oid()) for p in pointers)
   658             assert all(store.has(p.oid()) for p in pointers)
   580 
   659 
   581     def logstacktrace(self):
   660     def logstacktrace(self):
   582         import traceback
   661         import traceback
   583         self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
   662 
   584                     ''.join(traceback.format_stack()))
   663         self.ui.log(
       
   664             'remotefilelog',
       
   665             'excess remotefilelog fetching:\n%s\n',
       
   666             ''.join(traceback.format_stack()),
       
   667         )