hgext/remotefilelog/repack.py
changeset 43076 2372284d9457
parent 42943 5fadf6103790
child 43077 687b865b95ad
--- a/hgext/remotefilelog/repack.py	Sat Oct 05 10:29:34 2019 -0400
+++ b/hgext/remotefilelog/repack.py	Sun Oct 06 09:45:02 2019 -0400
@@ -31,11 +31,14 @@
 
 osutil = policy.importmod(r'osutil')
 
+
 class RepackAlreadyRunning(error.Abort):
     pass
 
-def backgroundrepack(repo, incremental=True, packsonly=False,
-                     ensurestart=False):
+
+def backgroundrepack(
+    repo, incremental=True, packsonly=False, ensurestart=False
+):
     cmd = [procutil.hgexecutable(), '-R', repo.origroot, 'repack']
     msg = _("(running background repack)\n")
     if incremental:
@@ -47,21 +50,27 @@
     # We know this command will find a binary, so don't block on it starting.
     procutil.runbgcommand(cmd, encoding.environ, ensurestart=ensurestart)
 
+
 def fullrepack(repo, options=None):
     """If ``packsonly`` is True, stores creating only loose objects are skipped.
     """
     if util.safehasattr(repo, 'shareddatastores'):
-        datasource = contentstore.unioncontentstore(
-            *repo.shareddatastores)
+        datasource = contentstore.unioncontentstore(*repo.shareddatastores)
         historysource = metadatastore.unionmetadatastore(
-            *repo.sharedhistorystores,
-            allowincomplete=True)
+            *repo.sharedhistorystores, allowincomplete=True
+        )
 
         packpath = shallowutil.getcachepackpath(
+            repo, constants.FILEPACK_CATEGORY
+        )
+        _runrepack(
             repo,
-            constants.FILEPACK_CATEGORY)
-        _runrepack(repo, datasource, historysource, packpath,
-                   constants.FILEPACK_CATEGORY, options=options)
+            datasource,
+            historysource,
+            packpath,
+            constants.FILEPACK_CATEGORY,
+            options=options,
+        )
 
     if util.safehasattr(repo.manifestlog, 'datastore'):
         localdata, shareddata = _getmanifeststores(repo)
@@ -71,20 +80,33 @@
         # Repack the shared manifest store
         datasource = contentstore.unioncontentstore(*sdstores)
         historysource = metadatastore.unionmetadatastore(
-                        *shstores,
-                        allowincomplete=True)
-        _runrepack(repo, datasource, historysource, spackpath,
-                   constants.TREEPACK_CATEGORY, options=options)
+            *shstores, allowincomplete=True
+        )
+        _runrepack(
+            repo,
+            datasource,
+            historysource,
+            spackpath,
+            constants.TREEPACK_CATEGORY,
+            options=options,
+        )
 
         # Repack the local manifest store
         datasource = contentstore.unioncontentstore(
-                        *ldstores,
-                        allowincomplete=True)
+            *ldstores, allowincomplete=True
+        )
         historysource = metadatastore.unionmetadatastore(
-                        *lhstores,
-                        allowincomplete=True)
-        _runrepack(repo, datasource, historysource, lpackpath,
-                   constants.TREEPACK_CATEGORY, options=options)
+            *lhstores, allowincomplete=True
+        )
+        _runrepack(
+            repo,
+            datasource,
+            historysource,
+            lpackpath,
+            constants.TREEPACK_CATEGORY,
+            options=options,
+        )
+
 
 def incrementalrepack(repo, options=None):
     """This repacks the repo by looking at the distribution of pack files in the
@@ -92,14 +114,16 @@
     """
     if util.safehasattr(repo, 'shareddatastores'):
         packpath = shallowutil.getcachepackpath(
+            repo, constants.FILEPACK_CATEGORY
+        )
+        _incrementalrepack(
             repo,
-            constants.FILEPACK_CATEGORY)
-        _incrementalrepack(repo,
-                           repo.shareddatastores,
-                           repo.sharedhistorystores,
-                           packpath,
-                           constants.FILEPACK_CATEGORY,
-                           options=options)
+            repo.shareddatastores,
+            repo.sharedhistorystores,
+            packpath,
+            constants.FILEPACK_CATEGORY,
+            options=options,
+        )
 
     if util.safehasattr(repo.manifestlog, 'datastore'):
         localdata, shareddata = _getmanifeststores(repo)
@@ -107,21 +131,26 @@
         spackpath, sdstores, shstores = shareddata
 
         # Repack the shared manifest store
-        _incrementalrepack(repo,
-                           sdstores,
-                           shstores,
-                           spackpath,
-                           constants.TREEPACK_CATEGORY,
-                           options=options)
+        _incrementalrepack(
+            repo,
+            sdstores,
+            shstores,
+            spackpath,
+            constants.TREEPACK_CATEGORY,
+            options=options,
+        )
 
         # Repack the local manifest store
-        _incrementalrepack(repo,
-                           ldstores,
-                           lhstores,
-                           lpackpath,
-                           constants.TREEPACK_CATEGORY,
-                           allowincompletedata=True,
-                           options=options)
+        _incrementalrepack(
+            repo,
+            ldstores,
+            lhstores,
+            lpackpath,
+            constants.TREEPACK_CATEGORY,
+            allowincompletedata=True,
+            options=options,
+        )
+
 
 def _getmanifeststores(repo):
     shareddatastores = repo.manifestlog.shareddatastores
@@ -129,19 +158,25 @@
     sharedhistorystores = repo.manifestlog.sharedhistorystores
     localhistorystores = repo.manifestlog.localhistorystores
 
-    sharedpackpath = shallowutil.getcachepackpath(repo,
-                                            constants.TREEPACK_CATEGORY)
-    localpackpath = shallowutil.getlocalpackpath(repo.svfs.vfs.base,
-                                            constants.TREEPACK_CATEGORY)
+    sharedpackpath = shallowutil.getcachepackpath(
+        repo, constants.TREEPACK_CATEGORY
+    )
+    localpackpath = shallowutil.getlocalpackpath(
+        repo.svfs.vfs.base, constants.TREEPACK_CATEGORY
+    )
 
-    return ((localpackpath, localdatastores, localhistorystores),
-            (sharedpackpath, shareddatastores, sharedhistorystores))
+    return (
+        (localpackpath, localdatastores, localhistorystores),
+        (sharedpackpath, shareddatastores, sharedhistorystores),
+    )
+
 
 def _topacks(packpath, files, constructor):
     paths = list(os.path.join(packpath, p) for p in files)
     packs = list(constructor(p) for p in paths)
     return packs
 
+
 def _deletebigpacks(repo, folder, files):
     """Deletes packfiles that are bigger than ``packs.maxpacksize``.
 
@@ -156,96 +191,128 @@
 
     # Either an oversize index or datapack will trigger cleanup of the whole
     # pack:
-    oversized = {os.path.splitext(path)[0] for path, ftype, stat in files
-        if (stat.st_size > maxsize and (os.path.splitext(path)[1]
-                                        in VALIDEXTS))}
+    oversized = {
+        os.path.splitext(path)[0]
+        for path, ftype, stat in files
+        if (stat.st_size > maxsize and (os.path.splitext(path)[1] in VALIDEXTS))
+    }
 
     for rootfname in oversized:
         rootpath = os.path.join(folder, rootfname)
         for ext in VALIDEXTS:
             path = rootpath + ext
-            repo.ui.debug('removing oversize packfile %s (%s)\n' %
-                          (path, util.bytecount(os.stat(path).st_size)))
+            repo.ui.debug(
+                'removing oversize packfile %s (%s)\n'
+                % (path, util.bytecount(os.stat(path).st_size))
+            )
             os.unlink(path)
     return [row for row in files if os.path.basename(row[0]) not in oversized]
 
-def _incrementalrepack(repo, datastore, historystore, packpath, category,
-        allowincompletedata=False, options=None):
+
+def _incrementalrepack(
+    repo,
+    datastore,
+    historystore,
+    packpath,
+    category,
+    allowincompletedata=False,
+    options=None,
+):
     shallowutil.mkstickygroupdir(repo.ui, packpath)
 
     files = osutil.listdir(packpath, stat=True)
     files = _deletebigpacks(repo, packpath, files)
-    datapacks = _topacks(packpath,
-        _computeincrementaldatapack(repo.ui, files),
-        datapack.datapack)
-    datapacks.extend(s for s in datastore
-                     if not isinstance(s, datapack.datapackstore))
+    datapacks = _topacks(
+        packpath, _computeincrementaldatapack(repo.ui, files), datapack.datapack
+    )
+    datapacks.extend(
+        s for s in datastore if not isinstance(s, datapack.datapackstore)
+    )
 
-    historypacks = _topacks(packpath,
+    historypacks = _topacks(
+        packpath,
         _computeincrementalhistorypack(repo.ui, files),
-        historypack.historypack)
-    historypacks.extend(s for s in historystore
-                        if not isinstance(s, historypack.historypackstore))
+        historypack.historypack,
+    )
+    historypacks.extend(
+        s
+        for s in historystore
+        if not isinstance(s, historypack.historypackstore)
+    )
 
     # ``allhistory{files,packs}`` contains all known history packs, even ones we
     # don't plan to repack. They are used during the datapack repack to ensure
     # good ordering of nodes.
-    allhistoryfiles = _allpackfileswithsuffix(files, historypack.PACKSUFFIX,
-                            historypack.INDEXSUFFIX)
-    allhistorypacks = _topacks(packpath,
+    allhistoryfiles = _allpackfileswithsuffix(
+        files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX
+    )
+    allhistorypacks = _topacks(
+        packpath,
         (f for f, mode, stat in allhistoryfiles),
-        historypack.historypack)
-    allhistorypacks.extend(s for s in historystore
-                        if not isinstance(s, historypack.historypackstore))
-    _runrepack(repo,
-               contentstore.unioncontentstore(
-                   *datapacks,
-                   allowincomplete=allowincompletedata),
-               metadatastore.unionmetadatastore(
-                   *historypacks,
-                   allowincomplete=True),
-               packpath, category,
-               fullhistory=metadatastore.unionmetadatastore(
-                   *allhistorypacks,
-                   allowincomplete=True),
-                options=options)
+        historypack.historypack,
+    )
+    allhistorypacks.extend(
+        s
+        for s in historystore
+        if not isinstance(s, historypack.historypackstore)
+    )
+    _runrepack(
+        repo,
+        contentstore.unioncontentstore(
+            *datapacks, allowincomplete=allowincompletedata
+        ),
+        metadatastore.unionmetadatastore(*historypacks, allowincomplete=True),
+        packpath,
+        category,
+        fullhistory=metadatastore.unionmetadatastore(
+            *allhistorypacks, allowincomplete=True
+        ),
+        options=options,
+    )
+
 
 def _computeincrementaldatapack(ui, files):
     opts = {
-        'gencountlimit' : ui.configint(
-            'remotefilelog', 'data.gencountlimit'),
-        'generations' : ui.configlist(
-            'remotefilelog', 'data.generations'),
-        'maxrepackpacks' : ui.configint(
-            'remotefilelog', 'data.maxrepackpacks'),
-        'repackmaxpacksize' : ui.configbytes(
-            'remotefilelog', 'data.repackmaxpacksize'),
-        'repacksizelimit' : ui.configbytes(
-            'remotefilelog', 'data.repacksizelimit'),
+        'gencountlimit': ui.configint('remotefilelog', 'data.gencountlimit'),
+        'generations': ui.configlist('remotefilelog', 'data.generations'),
+        'maxrepackpacks': ui.configint('remotefilelog', 'data.maxrepackpacks'),
+        'repackmaxpacksize': ui.configbytes(
+            'remotefilelog', 'data.repackmaxpacksize'
+        ),
+        'repacksizelimit': ui.configbytes(
+            'remotefilelog', 'data.repacksizelimit'
+        ),
     }
 
     packfiles = _allpackfileswithsuffix(
-        files, datapack.PACKSUFFIX, datapack.INDEXSUFFIX)
+        files, datapack.PACKSUFFIX, datapack.INDEXSUFFIX
+    )
     return _computeincrementalpack(packfiles, opts)
 
+
 def _computeincrementalhistorypack(ui, files):
     opts = {
-        'gencountlimit' : ui.configint(
-            'remotefilelog', 'history.gencountlimit'),
-        'generations' : ui.configlist(
-            'remotefilelog', 'history.generations', ['100MB']),
-        'maxrepackpacks' : ui.configint(
-            'remotefilelog', 'history.maxrepackpacks'),
-        'repackmaxpacksize' : ui.configbytes(
-            'remotefilelog', 'history.repackmaxpacksize', '400MB'),
-        'repacksizelimit' : ui.configbytes(
-            'remotefilelog', 'history.repacksizelimit'),
+        'gencountlimit': ui.configint('remotefilelog', 'history.gencountlimit'),
+        'generations': ui.configlist(
+            'remotefilelog', 'history.generations', ['100MB']
+        ),
+        'maxrepackpacks': ui.configint(
+            'remotefilelog', 'history.maxrepackpacks'
+        ),
+        'repackmaxpacksize': ui.configbytes(
+            'remotefilelog', 'history.repackmaxpacksize', '400MB'
+        ),
+        'repacksizelimit': ui.configbytes(
+            'remotefilelog', 'history.repacksizelimit'
+        ),
     }
 
     packfiles = _allpackfileswithsuffix(
-        files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX)
+        files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX
+    )
     return _computeincrementalpack(packfiles, opts)
 
+
 def _allpackfileswithsuffix(files, packsuffix, indexsuffix):
     result = []
     fileset = set(fn for fn, mode, stat in files)
@@ -253,7 +320,7 @@
         if not filename.endswith(packsuffix):
             continue
 
-        prefix = filename[:-len(packsuffix)]
+        prefix = filename[: -len(packsuffix)]
 
         # Don't process a pack if it doesn't have an index.
         if (prefix + indexsuffix) not in fileset:
@@ -262,6 +329,7 @@
 
     return result
 
+
 def _computeincrementalpack(files, opts):
     """Given a set of pack files along with the configuration options, this
     function computes the list of files that should be packed as part of an
@@ -272,8 +340,9 @@
     over time).
     """
 
-    limits = list(sorted((util.sizetoint(s) for s in opts['generations']),
-                                reverse=True))
+    limits = list(
+        sorted((util.sizetoint(s) for s in opts['generations']), reverse=True)
+    )
     limits.append(0)
 
     # Group the packs by generation (i.e. by size)
@@ -303,23 +372,29 @@
     for i, limit in enumerate(limits):
         if len(generations[i]) > opts['gencountlimit']:
             # Sort to be smallest last, for easy popping later
-            genpacks.extend(sorted(generations[i], reverse=True,
-                                   key=lambda x: sizes[x]))
+            genpacks.extend(
+                sorted(generations[i], reverse=True, key=lambda x: sizes[x])
+            )
             break
 
     # Take as many packs from the generation as we can
     chosenpacks = genpacks[-3:]
     genpacks = genpacks[:-3]
     repacksize = sum(sizes[n] for n in chosenpacks)
-    while (repacksize < opts['repacksizelimit'] and genpacks and
-           len(chosenpacks) < opts['maxrepackpacks']):
+    while (
+        repacksize < opts['repacksizelimit']
+        and genpacks
+        and len(chosenpacks) < opts['maxrepackpacks']
+    ):
         chosenpacks.append(genpacks.pop())
         repacksize += sizes[chosenpacks[-1]]
 
     return chosenpacks
 
-def _runrepack(repo, data, history, packpath, category, fullhistory=None,
-               options=None):
+
+def _runrepack(
+    repo, data, history, packpath, category, fullhistory=None, options=None
+):
     shallowutil.mkstickygroupdir(repo.ui, packpath)
 
     def isold(repo, filename, node):
@@ -337,16 +412,26 @@
     garbagecollect = repo.ui.configbool('remotefilelog', 'gcrepack')
     if not fullhistory:
         fullhistory = history
-    packer = repacker(repo, data, history, fullhistory, category,
-                      gc=garbagecollect, isold=isold, options=options)
+    packer = repacker(
+        repo,
+        data,
+        history,
+        fullhistory,
+        category,
+        gc=garbagecollect,
+        isold=isold,
+        options=options,
+    )
 
     with datapack.mutabledatapack(repo.ui, packpath) as dpack:
         with historypack.mutablehistorypack(repo.ui, packpath) as hpack:
             try:
                 packer.run(dpack, hpack)
             except error.LockHeld:
-                raise RepackAlreadyRunning(_("skipping repack - another repack "
-                                             "is already running"))
+                raise RepackAlreadyRunning(
+                    _("skipping repack - another repack " "is already running")
+                )
+
 
 def keepset(repo, keyfn, lastkeepkeys=None):
     """Computes a keepset which is not garbage collected.
@@ -409,12 +494,23 @@
 
     return keepkeys
 
+
 class repacker(object):
     """Class for orchestrating the repack of data and history information into a
     new format.
     """
-    def __init__(self, repo, data, history, fullhistory, category, gc=False,
-                 isold=None, options=None):
+
+    def __init__(
+        self,
+        repo,
+        data,
+        history,
+        fullhistory,
+        category,
+        gc=False,
+        isold=None,
+        options=None,
+    ):
         self.repo = repo
         self.data = data
         self.history = history
@@ -426,14 +522,15 @@
             if not isold:
                 raise ValueError("Function 'isold' is not properly specified")
             # use (filename, node) tuple as a keepset key
-            self.keepkeys = keepset(repo, lambda f, n : (f, n))
+            self.keepkeys = keepset(repo, lambda f, n: (f, n))
             self.isold = isold
 
     def run(self, targetdata, targethistory):
         ledger = repackledger()
 
-        with lockmod.lock(repacklockvfs(self.repo), "repacklock", desc=None,
-                          timeout=0):
+        with lockmod.lock(
+            repacklockvfs(self.repo), "repacklock", desc=None, timeout=0
+        ):
             self.repo.hook('prerepack')
 
             # Populate ledger from source
@@ -473,8 +570,10 @@
         orphans = sorted(orphans)
         orphans = list(sorted(orphans, key=getsize, reverse=True))
         if ui.debugflag:
-            ui.debug("%s: orphan chain: %s\n" % (filename,
-                ", ".join([short(s) for s in orphans])))
+            ui.debug(
+                "%s: orphan chain: %s\n"
+                % (filename, ", ".join([short(s) for s in orphans]))
+            )
 
         # Create one contiguous chain and reassign deltabases.
         for i, node in enumerate(orphans):
@@ -497,23 +596,28 @@
                 byfile.setdefault(entry.filename, {})[entry.node] = entry
 
         count = 0
-        repackprogress = ui.makeprogress(_("repacking data"), unit=self.unit,
-                                            total=len(byfile))
+        repackprogress = ui.makeprogress(
+            _("repacking data"), unit=self.unit, total=len(byfile)
+        )
         for filename, entries in sorted(byfile.iteritems()):
             repackprogress.update(count)
 
             ancestors = {}
             nodes = list(node for node in entries)
             nohistory = []
-            buildprogress = ui.makeprogress(_("building history"), unit='nodes',
-                                            total=len(nodes))
+            buildprogress = ui.makeprogress(
+                _("building history"), unit='nodes', total=len(nodes)
+            )
             for i, node in enumerate(nodes):
                 if node in ancestors:
                     continue
                 buildprogress.update(i)
                 try:
-                    ancestors.update(self.fullhistory.getancestors(filename,
-                        node, known=ancestors))
+                    ancestors.update(
+                        self.fullhistory.getancestors(
+                            filename, node, known=ancestors
+                        )
+                    )
                 except KeyError:
                     # Since we're packing data entries, we may not have the
                     # corresponding history entries for them. It's not a big
@@ -524,14 +628,16 @@
             # Order the nodes children first, so we can produce reverse deltas
             orderednodes = list(reversed(self._toposort(ancestors)))
             if len(nohistory) > 0:
-                ui.debug('repackdata: %d nodes without history\n' %
-                         len(nohistory))
+                ui.debug(
+                    'repackdata: %d nodes without history\n' % len(nohistory)
+                )
             orderednodes.extend(sorted(nohistory))
 
             # Filter orderednodes to just the nodes we want to serialize (it
             # currently also has the edge nodes' ancestors).
-            orderednodes = list(filter(lambda node: node in nodes,
-                                orderednodes))
+            orderednodes = list(
+                filter(lambda node: node in nodes, orderednodes)
+            )
 
             # Garbage collect old nodes:
             if self.garbagecollect:
@@ -539,8 +645,9 @@
                 for node in orderednodes:
                     # If the node is old and is not in the keepset, we skip it,
                     # and mark as garbage collected
-                    if ((filename, node) not in self.keepkeys and
-                        self.isold(self.repo, filename, node)):
+                    if (filename, node) not in self.keepkeys and self.isold(
+                        self.repo, filename, node
+                    ):
                         entries[node].gced = True
                         continue
                     neworderednodes.append(node)
@@ -551,9 +658,9 @@
             nobase = set()
             referenced = set()
             nodes = set(nodes)
-            processprogress = ui.makeprogress(_("processing nodes"),
-                                              unit='nodes',
-                                              total=len(orderednodes))
+            processprogress = ui.makeprogress(
+                _("processing nodes"), unit='nodes', total=len(orderednodes)
+            )
             for i, node in enumerate(orderednodes):
                 processprogress.update(i)
                 # Find delta base
@@ -593,8 +700,9 @@
             # experimental config: repack.chainorphansbysize
             if ui.configbool('repack', 'chainorphansbysize'):
                 orphans = nobase - referenced
-                orderednodes = self._chainorphans(ui, filename, orderednodes,
-                    orphans, deltabases)
+                orderednodes = self._chainorphans(
+                    ui, filename, orderednodes, orphans, deltabases
+                )
 
             # Compute deltas and write to the pack
             for i, node in enumerate(orderednodes):
@@ -607,8 +715,11 @@
                     deltaentry = self.data.getdelta(filename, node)
                     delta, deltabasename, origdeltabase, meta = deltaentry
                     size = meta.get(constants.METAKEYSIZE)
-                    if (deltabasename != filename or origdeltabase != deltabase
-                        or size is None):
+                    if (
+                        deltabasename != filename
+                        or origdeltabase != deltabase
+                        or size is None
+                    ):
                         deltabasetext = self.data.get(filename, deltabase)
                         original = self.data.get(filename, node)
                         size = len(original)
@@ -639,8 +750,9 @@
             if entry.historysource:
                 byfile.setdefault(entry.filename, {})[entry.node] = entry
 
-        progress = ui.makeprogress(_("repacking history"), unit=self.unit,
-                                   total=len(byfile))
+        progress = ui.makeprogress(
+            _("repacking history"), unit=self.unit, total=len(byfile)
+        )
         for filename, entries in sorted(byfile.iteritems()):
             ancestors = {}
             nodes = list(node for node in entries)
@@ -648,8 +760,9 @@
             for node in nodes:
                 if node in ancestors:
                     continue
-                ancestors.update(self.history.getancestors(filename, node,
-                                                           known=ancestors))
+                ancestors.update(
+                    self.history.getancestors(filename, node, known=ancestors)
+                )
 
             # Order the nodes children first
             orderednodes = reversed(self._toposort(ancestors))
@@ -702,11 +815,13 @@
         sortednodes = shallowutil.sortnodes(ancestors.keys(), parentfunc)
         return sortednodes
 
+
 class repackledger(object):
     """Storage for all the bookkeeping that happens during a repack. It contains
     the list of revisions being repacked, what happened to each revision, and
     which source store contained which revision originally (for later cleanup).
     """
+
     def __init__(self):
         self.entries = {}
         self.sources = {}
@@ -748,11 +863,21 @@
     def addcreated(self, value):
         self.created.add(value)
 
+
 class repackentry(object):
     """Simple class representing a single revision entry in the repackledger.
     """
-    __slots__ = (r'filename', r'node', r'datasource', r'historysource',
-                 r'datarepacked', r'historyrepacked', r'gced')
+
+    __slots__ = (
+        r'filename',
+        r'node',
+        r'datasource',
+        r'historysource',
+        r'datarepacked',
+        r'historyrepacked',
+        r'gced',
+    )
+
     def __init__(self, filename, node):
         self.filename = filename
         self.node = node
@@ -767,13 +892,14 @@
         # If garbage collected
         self.gced = False
 
+
 def repacklockvfs(repo):
     if util.safehasattr(repo, 'name'):
         # Lock in the shared cache so repacks across multiple copies of the same
         # repo are coordinated.
         sharedcachepath = shallowutil.getcachepackpath(
-            repo,
-            constants.FILEPACK_CATEGORY)
+            repo, constants.FILEPACK_CATEGORY
+        )
         return vfs.vfs(sharedcachepath)
     else:
         return repo.svfs