hgext/fix.py
changeset 48178 f12a19d03d2c
parent 48116 5ced12cfa41b
child 48246 2f7caef017d9
--- a/hgext/fix.py	Thu Sep 02 14:07:55 2021 -0700
+++ b/hgext/fix.py	Thu Sep 02 14:08:45 2021 -0700
@@ -284,20 +284,29 @@
         # There are no data dependencies between the workers fixing each file
         # revision, so we can use all available parallelism.
         def getfixes(items):
-            for rev, path in items:
-                ctx = repo[rev]
+            for srcrev, path, dstrevs in items:
+                ctx = repo[srcrev]
                 olddata = ctx[path].data()
                 metadata, newdata = fixfile(
-                    ui, repo, opts, fixers, ctx, path, basepaths, basectxs[rev]
+                    ui,
+                    repo,
+                    opts,
+                    fixers,
+                    ctx,
+                    path,
+                    basepaths,
+                    basectxs[srcrev],
                 )
-                # Don't waste memory/time passing unchanged content back, but
-                # produce one result per item either way.
-                yield (
-                    rev,
-                    path,
-                    metadata,
-                    newdata if newdata != olddata else None,
-                )
+                # We ungroup the work items now, because the code that consumes
+                # these results has to handle each dstrev separately, and in
+                # topological order. Because these are handled in topological
+                # order, it's important that we pass around references to
+                # "newdata" instead of copying it. Otherwise, we would be
+                # keeping more copies of file content in memory at a time than
+                # if we hadn't bothered to group/deduplicate the work items.
+                data = newdata if newdata != olddata else None
+                for dstrev in dstrevs:
+                    yield (dstrev, path, metadata, data)
 
         results = worker.worker(
             ui, 1.0, getfixes, tuple(), workqueue, threadsafe=False
@@ -377,23 +386,32 @@
 
 
 def getworkqueue(ui, repo, pats, opts, revstofix, basectxs):
-    """Constructs the list of files to be fixed at specific revisions
+    """Constructs a list of files to fix and which revisions each fix applies to
 
-    It is up to the caller how to consume the work items, and the only
-    dependence between them is that replacement revisions must be committed in
-    topological order. Each work item represents a file in the working copy or
-    in some revision that should be fixed and written back to the working copy
-    or into a replacement revision.
+    To avoid duplicating work, there is usually only one work item for each file
+    revision that might need to be fixed. There can be multiple work items per
+    file revision if the same file needs to be fixed in multiple changesets with
+    different baserevs. Each work item also contains a list of changesets where
+    the file's data should be replaced with the fixed data. The work items for
+    earlier changesets come earlier in the work queue, to improve pipelining by
+    allowing the first changeset to be replaced while fixes are still being
+    computed for later changesets.
 
-    Work items for the same revision are grouped together, so that a worker
-    pool starting with the first N items in parallel is likely to finish the
-    first revision's work before other revisions. This can allow us to write
-    the result to disk and reduce memory footprint. At time of writing, the
-    partition strategy in worker.py seems favorable to this. We also sort the
-    items by ascending revision number to match the order in which we commit
-    the fixes later.
+    Also returned is a map from changesets to the count of work items that might
+    affect each changeset. This is used later to count when all of a changeset's
+    work items have been finished, without having to inspect the remaining work
+    queue in each worker subprocess.
+
+    The example work item (1, "foo/bar.txt", (1, 2, 3)) means that the data of
+    bar.txt should be read from revision 1, then fixed, and written back to
+    revisions 1, 2 and 3. Revision 1 is called the "srcrev" and the list of
+    revisions is called the "dstrevs". In practice the srcrev is always one of
+    the dstrevs, and we make that choice when constructing the work item so that
+    the choice can't be made inconsistently later on. The dstrevs should all
+    have the same file revision for the given path, so the choice of srcrev is
+    arbitrary. The wdirrev can be a dstrev and a srcrev.
     """
-    workqueue = []
+    dstrevmap = collections.defaultdict(list)
     numitems = collections.defaultdict(int)
     maxfilesize = ui.configbytes(b'fix', b'maxfilesize')
     for rev in sorted(revstofix):
@@ -411,8 +429,21 @@
                     % (util.bytecount(maxfilesize), path)
                 )
                 continue
-            workqueue.append((rev, path))
+            baserevs = tuple(ctx.rev() for ctx in basectxs[rev])
+            dstrevmap[(fctx.filerev(), baserevs, path)].append(rev)
             numitems[rev] += 1
+    workqueue = [
+        (min(dstrevs), path, dstrevs)
+        for (filerev, baserevs, path), dstrevs in dstrevmap.items()
+    ]
+    # Move work items for earlier changesets to the front of the queue, so we
+    # might be able to replace those changesets (in topological order) while
+    # we're still processing later work items. Note the min() in the previous
+    # expression, which means we don't need a custom comparator here. The path
+    # is also important in the sort order to make the output order stable. There
+    # are some situations where this doesn't help much, but some situations
+    # where it lets us buffer O(1) files instead of O(n) files.
+    workqueue.sort()
     return workqueue, numitems
 
 
@@ -517,9 +548,9 @@
         return {}
 
     basepaths = {}
-    for rev, path in workqueue:
-        fixctx = repo[rev]
-        for basectx in basectxs[rev]:
+    for srcrev, path, _dstrevs in workqueue:
+        fixctx = repo[srcrev]
+        for basectx in basectxs[srcrev]:
             basepath = copies.pathcopies(basectx, fixctx).get(path, path)
             if basepath in basectx:
                 basepaths[(basectx.rev(), fixctx.rev(), path)] = basepath
@@ -642,10 +673,10 @@
     toprefetch = set()
 
     # Prefetch the files that will be fixed.
-    for rev, path in workqueue:
-        if rev == wdirrev:
+    for srcrev, path, _dstrevs in workqueue:
+        if srcrev == wdirrev:
             continue
-        toprefetch.add((rev, path))
+        toprefetch.add((srcrev, path))
 
     # Prefetch the base contents for lineranges().
     for (baserev, fixrev, path), basepath in basepaths.items():