diff -r 57875cf423c9 -r 2372284d9457 hgext/remotefilelog/repack.py --- a/hgext/remotefilelog/repack.py Sat Oct 05 10:29:34 2019 -0400 +++ b/hgext/remotefilelog/repack.py Sun Oct 06 09:45:02 2019 -0400 @@ -31,11 +31,14 @@ osutil = policy.importmod(r'osutil') + class RepackAlreadyRunning(error.Abort): pass -def backgroundrepack(repo, incremental=True, packsonly=False, - ensurestart=False): + +def backgroundrepack( + repo, incremental=True, packsonly=False, ensurestart=False +): cmd = [procutil.hgexecutable(), '-R', repo.origroot, 'repack'] msg = _("(running background repack)\n") if incremental: @@ -47,21 +50,27 @@ # We know this command will find a binary, so don't block on it starting. procutil.runbgcommand(cmd, encoding.environ, ensurestart=ensurestart) + def fullrepack(repo, options=None): """If ``packsonly`` is True, stores creating only loose objects are skipped. """ if util.safehasattr(repo, 'shareddatastores'): - datasource = contentstore.unioncontentstore( - *repo.shareddatastores) + datasource = contentstore.unioncontentstore(*repo.shareddatastores) historysource = metadatastore.unionmetadatastore( - *repo.sharedhistorystores, - allowincomplete=True) + *repo.sharedhistorystores, allowincomplete=True + ) packpath = shallowutil.getcachepackpath( + repo, constants.FILEPACK_CATEGORY + ) + _runrepack( repo, - constants.FILEPACK_CATEGORY) - _runrepack(repo, datasource, historysource, packpath, - constants.FILEPACK_CATEGORY, options=options) + datasource, + historysource, + packpath, + constants.FILEPACK_CATEGORY, + options=options, + ) if util.safehasattr(repo.manifestlog, 'datastore'): localdata, shareddata = _getmanifeststores(repo) @@ -71,20 +80,33 @@ # Repack the shared manifest store datasource = contentstore.unioncontentstore(*sdstores) historysource = metadatastore.unionmetadatastore( - *shstores, - allowincomplete=True) - _runrepack(repo, datasource, historysource, spackpath, - constants.TREEPACK_CATEGORY, options=options) + *shstores, allowincomplete=True + ) + _runrepack( + repo, + datasource, + historysource, + spackpath, + constants.TREEPACK_CATEGORY, + options=options, + ) # Repack the local manifest store datasource = contentstore.unioncontentstore( - *ldstores, - allowincomplete=True) + *ldstores, allowincomplete=True + ) historysource = metadatastore.unionmetadatastore( - *lhstores, - allowincomplete=True) - _runrepack(repo, datasource, historysource, lpackpath, - constants.TREEPACK_CATEGORY, options=options) + *lhstores, allowincomplete=True + ) + _runrepack( + repo, + datasource, + historysource, + lpackpath, + constants.TREEPACK_CATEGORY, + options=options, + ) + def incrementalrepack(repo, options=None): """This repacks the repo by looking at the distribution of pack files in the @@ -92,14 +114,16 @@ """ if util.safehasattr(repo, 'shareddatastores'): packpath = shallowutil.getcachepackpath( + repo, constants.FILEPACK_CATEGORY + ) + _incrementalrepack( repo, - constants.FILEPACK_CATEGORY) - _incrementalrepack(repo, - repo.shareddatastores, - repo.sharedhistorystores, - packpath, - constants.FILEPACK_CATEGORY, - options=options) + repo.shareddatastores, + repo.sharedhistorystores, + packpath, + constants.FILEPACK_CATEGORY, + options=options, + ) if util.safehasattr(repo.manifestlog, 'datastore'): localdata, shareddata = _getmanifeststores(repo) @@ -107,21 +131,26 @@ spackpath, sdstores, shstores = shareddata # Repack the shared manifest store - _incrementalrepack(repo, - sdstores, - shstores, - spackpath, - constants.TREEPACK_CATEGORY, - options=options) + _incrementalrepack( + repo, + sdstores, + shstores, + spackpath, + constants.TREEPACK_CATEGORY, + options=options, + ) # Repack the local manifest store - _incrementalrepack(repo, - ldstores, - lhstores, - lpackpath, - constants.TREEPACK_CATEGORY, - allowincompletedata=True, - options=options) + _incrementalrepack( + repo, + ldstores, + lhstores, + lpackpath, + constants.TREEPACK_CATEGORY, + allowincompletedata=True, + options=options, + ) + def _getmanifeststores(repo): shareddatastores = repo.manifestlog.shareddatastores @@ -129,19 +158,25 @@ sharedhistorystores = repo.manifestlog.sharedhistorystores localhistorystores = repo.manifestlog.localhistorystores - sharedpackpath = shallowutil.getcachepackpath(repo, - constants.TREEPACK_CATEGORY) - localpackpath = shallowutil.getlocalpackpath(repo.svfs.vfs.base, - constants.TREEPACK_CATEGORY) + sharedpackpath = shallowutil.getcachepackpath( + repo, constants.TREEPACK_CATEGORY + ) + localpackpath = shallowutil.getlocalpackpath( + repo.svfs.vfs.base, constants.TREEPACK_CATEGORY + ) - return ((localpackpath, localdatastores, localhistorystores), - (sharedpackpath, shareddatastores, sharedhistorystores)) + return ( + (localpackpath, localdatastores, localhistorystores), + (sharedpackpath, shareddatastores, sharedhistorystores), + ) + def _topacks(packpath, files, constructor): paths = list(os.path.join(packpath, p) for p in files) packs = list(constructor(p) for p in paths) return packs + def _deletebigpacks(repo, folder, files): """Deletes packfiles that are bigger than ``packs.maxpacksize``. @@ -156,96 +191,128 @@ # Either an oversize index or datapack will trigger cleanup of the whole # pack: - oversized = {os.path.splitext(path)[0] for path, ftype, stat in files - if (stat.st_size > maxsize and (os.path.splitext(path)[1] - in VALIDEXTS))} + oversized = { + os.path.splitext(path)[0] + for path, ftype, stat in files + if (stat.st_size > maxsize and (os.path.splitext(path)[1] in VALIDEXTS)) + } for rootfname in oversized: rootpath = os.path.join(folder, rootfname) for ext in VALIDEXTS: path = rootpath + ext - repo.ui.debug('removing oversize packfile %s (%s)\n' % - (path, util.bytecount(os.stat(path).st_size))) + repo.ui.debug( + 'removing oversize packfile %s (%s)\n' + % (path, util.bytecount(os.stat(path).st_size)) + ) os.unlink(path) return [row for row in files if os.path.basename(row[0]) not in oversized] -def _incrementalrepack(repo, datastore, historystore, packpath, category, - allowincompletedata=False, options=None): + +def _incrementalrepack( + repo, + datastore, + historystore, + packpath, + category, + allowincompletedata=False, + options=None, +): shallowutil.mkstickygroupdir(repo.ui, packpath) files = osutil.listdir(packpath, stat=True) files = _deletebigpacks(repo, packpath, files) - datapacks = _topacks(packpath, - _computeincrementaldatapack(repo.ui, files), - datapack.datapack) - datapacks.extend(s for s in datastore - if not isinstance(s, datapack.datapackstore)) + datapacks = _topacks( + packpath, _computeincrementaldatapack(repo.ui, files), datapack.datapack + ) + datapacks.extend( + s for s in datastore if not isinstance(s, datapack.datapackstore) + ) - historypacks = _topacks(packpath, + historypacks = _topacks( + packpath, _computeincrementalhistorypack(repo.ui, files), - historypack.historypack) - historypacks.extend(s for s in historystore - if not isinstance(s, historypack.historypackstore)) + historypack.historypack, + ) + historypacks.extend( + s + for s in historystore + if not isinstance(s, historypack.historypackstore) + ) # ``allhistory{files,packs}`` contains all known history packs, even ones we # don't plan to repack. They are used during the datapack repack to ensure # good ordering of nodes. - allhistoryfiles = _allpackfileswithsuffix(files, historypack.PACKSUFFIX, - historypack.INDEXSUFFIX) - allhistorypacks = _topacks(packpath, + allhistoryfiles = _allpackfileswithsuffix( + files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX + ) + allhistorypacks = _topacks( + packpath, (f for f, mode, stat in allhistoryfiles), - historypack.historypack) - allhistorypacks.extend(s for s in historystore - if not isinstance(s, historypack.historypackstore)) - _runrepack(repo, - contentstore.unioncontentstore( - *datapacks, - allowincomplete=allowincompletedata), - metadatastore.unionmetadatastore( - *historypacks, - allowincomplete=True), - packpath, category, - fullhistory=metadatastore.unionmetadatastore( - *allhistorypacks, - allowincomplete=True), - options=options) + historypack.historypack, + ) + allhistorypacks.extend( + s + for s in historystore + if not isinstance(s, historypack.historypackstore) + ) + _runrepack( + repo, + contentstore.unioncontentstore( + *datapacks, allowincomplete=allowincompletedata + ), + metadatastore.unionmetadatastore(*historypacks, allowincomplete=True), + packpath, + category, + fullhistory=metadatastore.unionmetadatastore( + *allhistorypacks, allowincomplete=True + ), + options=options, + ) + def _computeincrementaldatapack(ui, files): opts = { - 'gencountlimit' : ui.configint( - 'remotefilelog', 'data.gencountlimit'), - 'generations' : ui.configlist( - 'remotefilelog', 'data.generations'), - 'maxrepackpacks' : ui.configint( - 'remotefilelog', 'data.maxrepackpacks'), - 'repackmaxpacksize' : ui.configbytes( - 'remotefilelog', 'data.repackmaxpacksize'), - 'repacksizelimit' : ui.configbytes( - 'remotefilelog', 'data.repacksizelimit'), + 'gencountlimit': ui.configint('remotefilelog', 'data.gencountlimit'), + 'generations': ui.configlist('remotefilelog', 'data.generations'), + 'maxrepackpacks': ui.configint('remotefilelog', 'data.maxrepackpacks'), + 'repackmaxpacksize': ui.configbytes( + 'remotefilelog', 'data.repackmaxpacksize' + ), + 'repacksizelimit': ui.configbytes( + 'remotefilelog', 'data.repacksizelimit' + ), } packfiles = _allpackfileswithsuffix( - files, datapack.PACKSUFFIX, datapack.INDEXSUFFIX) + files, datapack.PACKSUFFIX, datapack.INDEXSUFFIX + ) return _computeincrementalpack(packfiles, opts) + def _computeincrementalhistorypack(ui, files): opts = { - 'gencountlimit' : ui.configint( - 'remotefilelog', 'history.gencountlimit'), - 'generations' : ui.configlist( - 'remotefilelog', 'history.generations', ['100MB']), - 'maxrepackpacks' : ui.configint( - 'remotefilelog', 'history.maxrepackpacks'), - 'repackmaxpacksize' : ui.configbytes( - 'remotefilelog', 'history.repackmaxpacksize', '400MB'), - 'repacksizelimit' : ui.configbytes( - 'remotefilelog', 'history.repacksizelimit'), + 'gencountlimit': ui.configint('remotefilelog', 'history.gencountlimit'), + 'generations': ui.configlist( + 'remotefilelog', 'history.generations', ['100MB'] + ), + 'maxrepackpacks': ui.configint( + 'remotefilelog', 'history.maxrepackpacks' + ), + 'repackmaxpacksize': ui.configbytes( + 'remotefilelog', 'history.repackmaxpacksize', '400MB' + ), + 'repacksizelimit': ui.configbytes( + 'remotefilelog', 'history.repacksizelimit' + ), } packfiles = _allpackfileswithsuffix( - files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX) + files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX + ) return _computeincrementalpack(packfiles, opts) + def _allpackfileswithsuffix(files, packsuffix, indexsuffix): result = [] fileset = set(fn for fn, mode, stat in files) @@ -253,7 +320,7 @@ if not filename.endswith(packsuffix): continue - prefix = filename[:-len(packsuffix)] + prefix = filename[: -len(packsuffix)] # Don't process a pack if it doesn't have an index. if (prefix + indexsuffix) not in fileset: @@ -262,6 +329,7 @@ return result + def _computeincrementalpack(files, opts): """Given a set of pack files along with the configuration options, this function computes the list of files that should be packed as part of an @@ -272,8 +340,9 @@ over time). """ - limits = list(sorted((util.sizetoint(s) for s in opts['generations']), - reverse=True)) + limits = list( + sorted((util.sizetoint(s) for s in opts['generations']), reverse=True) + ) limits.append(0) # Group the packs by generation (i.e. by size) @@ -303,23 +372,29 @@ for i, limit in enumerate(limits): if len(generations[i]) > opts['gencountlimit']: # Sort to be smallest last, for easy popping later - genpacks.extend(sorted(generations[i], reverse=True, - key=lambda x: sizes[x])) + genpacks.extend( + sorted(generations[i], reverse=True, key=lambda x: sizes[x]) + ) break # Take as many packs from the generation as we can chosenpacks = genpacks[-3:] genpacks = genpacks[:-3] repacksize = sum(sizes[n] for n in chosenpacks) - while (repacksize < opts['repacksizelimit'] and genpacks and - len(chosenpacks) < opts['maxrepackpacks']): + while ( + repacksize < opts['repacksizelimit'] + and genpacks + and len(chosenpacks) < opts['maxrepackpacks'] + ): chosenpacks.append(genpacks.pop()) repacksize += sizes[chosenpacks[-1]] return chosenpacks -def _runrepack(repo, data, history, packpath, category, fullhistory=None, - options=None): + +def _runrepack( + repo, data, history, packpath, category, fullhistory=None, options=None +): shallowutil.mkstickygroupdir(repo.ui, packpath) def isold(repo, filename, node): @@ -337,16 +412,26 @@ garbagecollect = repo.ui.configbool('remotefilelog', 'gcrepack') if not fullhistory: fullhistory = history - packer = repacker(repo, data, history, fullhistory, category, - gc=garbagecollect, isold=isold, options=options) + packer = repacker( + repo, + data, + history, + fullhistory, + category, + gc=garbagecollect, + isold=isold, + options=options, + ) with datapack.mutabledatapack(repo.ui, packpath) as dpack: with historypack.mutablehistorypack(repo.ui, packpath) as hpack: try: packer.run(dpack, hpack) except error.LockHeld: - raise RepackAlreadyRunning(_("skipping repack - another repack " - "is already running")) + raise RepackAlreadyRunning( + _("skipping repack - another repack " "is already running") + ) + def keepset(repo, keyfn, lastkeepkeys=None): """Computes a keepset which is not garbage collected. @@ -409,12 +494,23 @@ return keepkeys + class repacker(object): """Class for orchestrating the repack of data and history information into a new format. """ - def __init__(self, repo, data, history, fullhistory, category, gc=False, - isold=None, options=None): + + def __init__( + self, + repo, + data, + history, + fullhistory, + category, + gc=False, + isold=None, + options=None, + ): self.repo = repo self.data = data self.history = history @@ -426,14 +522,15 @@ if not isold: raise ValueError("Function 'isold' is not properly specified") # use (filename, node) tuple as a keepset key - self.keepkeys = keepset(repo, lambda f, n : (f, n)) + self.keepkeys = keepset(repo, lambda f, n: (f, n)) self.isold = isold def run(self, targetdata, targethistory): ledger = repackledger() - with lockmod.lock(repacklockvfs(self.repo), "repacklock", desc=None, - timeout=0): + with lockmod.lock( + repacklockvfs(self.repo), "repacklock", desc=None, timeout=0 + ): self.repo.hook('prerepack') # Populate ledger from source @@ -473,8 +570,10 @@ orphans = sorted(orphans) orphans = list(sorted(orphans, key=getsize, reverse=True)) if ui.debugflag: - ui.debug("%s: orphan chain: %s\n" % (filename, - ", ".join([short(s) for s in orphans]))) + ui.debug( + "%s: orphan chain: %s\n" + % (filename, ", ".join([short(s) for s in orphans])) + ) # Create one contiguous chain and reassign deltabases. for i, node in enumerate(orphans): @@ -497,23 +596,28 @@ byfile.setdefault(entry.filename, {})[entry.node] = entry count = 0 - repackprogress = ui.makeprogress(_("repacking data"), unit=self.unit, - total=len(byfile)) + repackprogress = ui.makeprogress( + _("repacking data"), unit=self.unit, total=len(byfile) + ) for filename, entries in sorted(byfile.iteritems()): repackprogress.update(count) ancestors = {} nodes = list(node for node in entries) nohistory = [] - buildprogress = ui.makeprogress(_("building history"), unit='nodes', - total=len(nodes)) + buildprogress = ui.makeprogress( + _("building history"), unit='nodes', total=len(nodes) + ) for i, node in enumerate(nodes): if node in ancestors: continue buildprogress.update(i) try: - ancestors.update(self.fullhistory.getancestors(filename, - node, known=ancestors)) + ancestors.update( + self.fullhistory.getancestors( + filename, node, known=ancestors + ) + ) except KeyError: # Since we're packing data entries, we may not have the # corresponding history entries for them. It's not a big @@ -524,14 +628,16 @@ # Order the nodes children first, so we can produce reverse deltas orderednodes = list(reversed(self._toposort(ancestors))) if len(nohistory) > 0: - ui.debug('repackdata: %d nodes without history\n' % - len(nohistory)) + ui.debug( + 'repackdata: %d nodes without history\n' % len(nohistory) + ) orderednodes.extend(sorted(nohistory)) # Filter orderednodes to just the nodes we want to serialize (it # currently also has the edge nodes' ancestors). - orderednodes = list(filter(lambda node: node in nodes, - orderednodes)) + orderednodes = list( + filter(lambda node: node in nodes, orderednodes) + ) # Garbage collect old nodes: if self.garbagecollect: @@ -539,8 +645,9 @@ for node in orderednodes: # If the node is old and is not in the keepset, we skip it, # and mark as garbage collected - if ((filename, node) not in self.keepkeys and - self.isold(self.repo, filename, node)): + if (filename, node) not in self.keepkeys and self.isold( + self.repo, filename, node + ): entries[node].gced = True continue neworderednodes.append(node) @@ -551,9 +658,9 @@ nobase = set() referenced = set() nodes = set(nodes) - processprogress = ui.makeprogress(_("processing nodes"), - unit='nodes', - total=len(orderednodes)) + processprogress = ui.makeprogress( + _("processing nodes"), unit='nodes', total=len(orderednodes) + ) for i, node in enumerate(orderednodes): processprogress.update(i) # Find delta base @@ -593,8 +700,9 @@ # experimental config: repack.chainorphansbysize if ui.configbool('repack', 'chainorphansbysize'): orphans = nobase - referenced - orderednodes = self._chainorphans(ui, filename, orderednodes, - orphans, deltabases) + orderednodes = self._chainorphans( + ui, filename, orderednodes, orphans, deltabases + ) # Compute deltas and write to the pack for i, node in enumerate(orderednodes): @@ -607,8 +715,11 @@ deltaentry = self.data.getdelta(filename, node) delta, deltabasename, origdeltabase, meta = deltaentry size = meta.get(constants.METAKEYSIZE) - if (deltabasename != filename or origdeltabase != deltabase - or size is None): + if ( + deltabasename != filename + or origdeltabase != deltabase + or size is None + ): deltabasetext = self.data.get(filename, deltabase) original = self.data.get(filename, node) size = len(original) @@ -639,8 +750,9 @@ if entry.historysource: byfile.setdefault(entry.filename, {})[entry.node] = entry - progress = ui.makeprogress(_("repacking history"), unit=self.unit, - total=len(byfile)) + progress = ui.makeprogress( + _("repacking history"), unit=self.unit, total=len(byfile) + ) for filename, entries in sorted(byfile.iteritems()): ancestors = {} nodes = list(node for node in entries) @@ -648,8 +760,9 @@ for node in nodes: if node in ancestors: continue - ancestors.update(self.history.getancestors(filename, node, - known=ancestors)) + ancestors.update( + self.history.getancestors(filename, node, known=ancestors) + ) # Order the nodes children first orderednodes = reversed(self._toposort(ancestors)) @@ -702,11 +815,13 @@ sortednodes = shallowutil.sortnodes(ancestors.keys(), parentfunc) return sortednodes + class repackledger(object): """Storage for all the bookkeeping that happens during a repack. It contains the list of revisions being repacked, what happened to each revision, and which source store contained which revision originally (for later cleanup). """ + def __init__(self): self.entries = {} self.sources = {} @@ -748,11 +863,21 @@ def addcreated(self, value): self.created.add(value) + class repackentry(object): """Simple class representing a single revision entry in the repackledger. """ - __slots__ = (r'filename', r'node', r'datasource', r'historysource', - r'datarepacked', r'historyrepacked', r'gced') + + __slots__ = ( + r'filename', + r'node', + r'datasource', + r'historysource', + r'datarepacked', + r'historyrepacked', + r'gced', + ) + def __init__(self, filename, node): self.filename = filename self.node = node @@ -767,13 +892,14 @@ # If garbage collected self.gced = False + def repacklockvfs(repo): if util.safehasattr(repo, 'name'): # Lock in the shared cache so repacks across multiple copies of the same # repo are coordinated. sharedcachepath = shallowutil.getcachepackpath( - repo, - constants.FILEPACK_CATEGORY) + repo, constants.FILEPACK_CATEGORY + ) return vfs.vfs(sharedcachepath) else: return repo.svfs