revlog: new API to emit revision data
authorGregory Szorc <gregory.szorc@gmail.com>
Fri, 21 Sep 2018 14:28:21 -0700
changeset 39862 5a9ab91e0a45
parent 39861 db5501d93bcf
child 39863 c73f9f345ec0
revlog: new API to emit revision data I recently refactored changegroup generation code to make it more storage agnostic. I made significant progress. But there is still a bit of work to be done. Specifically: * Changegroup code is looking at low-level storage attributes to influence sorting. Sorting should be done at the storage layer. * The linknode lookup and sorting code for ellipsis is very complicated. * Linknodes are just generally wonky because e.g. file storage doesn't know how to translate a linkrev to a changelog node. * We regressed performance when introducing the request-response objects. Having thought about this problem a bit, I think I've come up with a better interface for emitting revision deltas. This commit defines and implements that interface. See the docstring in repository.py for more info. This API adds 3 notable features over the previous one. First, it defers node ordering to the storage implementation in the common case but allows overriding as necessary. We have a facility for requesting an exact ordering (used in ellipsis mode). We have another facility for storage order (used for changelog). Second, we have an argument specifying assumptions about parents revisions. This can be used to force a fulltext revision when we don't know the receiver has a parent revision to delta against. Third, we can control whether revision data is emitted. This makes the API suitable as a generic "index data retrieval" API as well as for producing revision deltas - possibly in the same operation! The new API is much simpler: we no longer need a complicated "request" object to encapsulate the delta generation request. I'm optimistic this will restore performance loss associated with emitrevisiondeltas(). Storage unit tests for the new API have been implemented. Future commits will port existing consumers of emitrevisiondeltas() to the new API then remove emitrevisiondeltas(). Differential Revision: https://phab.mercurial-scm.org/D4722
mercurial/filelog.py
mercurial/manifest.py
mercurial/repository.py
mercurial/revlog.py
mercurial/testing/storage.py
--- a/mercurial/filelog.py	Mon Sep 24 09:41:42 2018 -0700
+++ b/mercurial/filelog.py	Fri Sep 21 14:28:21 2018 -0700
@@ -96,6 +96,14 @@
     def emitrevisiondeltas(self, requests):
         return self._revlog.emitrevisiondeltas(requests)
 
+    def emitrevisions(self, nodes, nodesorder=None,
+                      revisiondata=False, assumehaveparentrevisions=False,
+                      deltaprevious=False):
+        return self._revlog.emitrevisions(
+            nodes, nodesorder=nodesorder, revisiondata=revisiondata,
+            assumehaveparentrevisions=assumehaveparentrevisions,
+            deltaprevious=deltaprevious)
+
     def addrevision(self, revisiondata, transaction, linkrev, p1, p2,
                     node=None, flags=revlog.REVIDX_DEFAULT_FLAGS,
                     cachedelta=None):
--- a/mercurial/manifest.py	Mon Sep 24 09:41:42 2018 -0700
+++ b/mercurial/manifest.py	Fri Sep 21 14:28:21 2018 -0700
@@ -1565,6 +1565,14 @@
     def emitrevisiondeltas(self, requests):
         return self._revlog.emitrevisiondeltas(requests)
 
+    def emitrevisions(self, nodes, nodesorder=None,
+                      revisiondata=False, assumehaveparentrevisions=False,
+                      deltaprevious=False):
+        return self._revlog.emitrevisions(
+            nodes, nodesorder=nodesorder, revisiondata=revisiondata,
+            assumehaveparentrevisions=assumehaveparentrevisions,
+            deltaprevious=deltaprevious)
+
     def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None):
         return self._revlog.addgroup(deltas, linkmapper, transaction,
                                      addrevisioncb=addrevisioncb)
--- a/mercurial/repository.py	Mon Sep 24 09:41:42 2018 -0700
+++ b/mercurial/repository.py	Fri Sep 21 14:28:21 2018 -0700
@@ -654,6 +654,59 @@
         delta against a censored revision.
         """
 
+    def emitrevisions(nodes,
+                      nodesorder=None,
+                      revisiondata=False,
+                      assumehaveparentrevisions=False,
+                      deltaprevious=False):
+        """Produce ``irevisiondelta`` for revisions.
+
+        Given an iterable of nodes, emits objects conforming to the
+        ``irevisiondelta`` interface that describe revisions in storage.
+
+        This method is a generator.
+
+        The input nodes may be unordered. Implementations must ensure that a
+        node's parents are emitted before the node itself. Transitively, this
+        means that a node may only be emitted once all its ancestors in
+        ``nodes`` have also been emitted.
+
+        By default, emits "index" data (the ``node``, ``p1node``, and
+        ``p2node`` attributes). If ``revisiondata`` is set, revision data
+        will also be present on the emitted objects.
+
+        With default argument values, implementations can choose to emit
+        either fulltext revision data or a delta. When emitting deltas,
+        implementations must consider whether the delta's base revision
+        fulltext is available to the receiver.
+
+        The base revision fulltext is guaranteed to be available if any of
+        the following are met:
+
+        * Its fulltext revision was emitted by this method call.
+        * A delta for that revision was emitted by this method call.
+        * ``assumehaveparentrevisions`` is True and the base revision is a
+          parent of the node.
+
+        ``nodesorder`` can be used to control the order that revisions are
+        emitted. By default, revisions can be reordered as long as they are
+        in DAG topological order (see above). If the value is ``nodes``,
+        the iteration order from ``nodes`` should be used. If the value is
+        ``storage``, then the native order from the backing storage layer
+        is used. (Not all storage layers will have strong ordering and behavior
+        of this mode is storage-dependent.) ``nodes`` ordering can force
+        revisions to be emitted before their ancestors, so consumers should
+        use it with care.
+
+        The ``linknode`` attribute on the returned ``irevisiondelta`` may not
+        be set and it is the caller's responsibility to resolve it, if needed.
+
+        If ``deltaprevious`` is True and revision data is requested, all
+        revision data should be emitted as deltas against the revision
+        emitted just prior. The initial revision should be a delta against
+        its 1st parent.
+        """
+
 class ifilemutation(interfaceutil.Interface):
     """Storage interface for mutation events of a tracked file."""
 
@@ -1127,6 +1180,15 @@
         See the documentation for ``ifiledata`` for more.
         """
 
+    def emitrevisions(nodes,
+                      nodesorder=None,
+                      revisiondata=False,
+                      assumehaveparentrevisions=False):
+        """Produce ``irevisiondelta`` describing revisions.
+
+        See the documentation for ``ifiledata`` for more.
+        """
+
     def addgroup(deltas, linkmapper, transaction, addrevisioncb=None):
         """Process a series of deltas for storage.
 
--- a/mercurial/revlog.py	Mon Sep 24 09:41:42 2018 -0700
+++ b/mercurial/revlog.py	Fri Sep 21 14:28:21 2018 -0700
@@ -59,6 +59,7 @@
 )
 from . import (
     ancestor,
+    dagop,
     error,
     mdiff,
     policy,
@@ -242,17 +243,17 @@
     flags = attr.ib()
 
 @interfaceutil.implementer(repository.irevisiondelta)
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True)
 class revlogrevisiondelta(object):
     node = attr.ib()
     p1node = attr.ib()
     p2node = attr.ib()
     basenode = attr.ib()
-    linknode = attr.ib()
     flags = attr.ib()
     baserevisionsize = attr.ib()
     revision = attr.ib()
     delta = attr.ib()
+    linknode = attr.ib(default=None)
 
 @interfaceutil.implementer(repository.iverifyproblem)
 @attr.s(frozen=True)
@@ -2374,6 +2375,122 @@
 
             prevrev = rev
 
+    def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
+                      assumehaveparentrevisions=False, deltaprevious=False):
+        if nodesorder not in ('nodes', 'storage', None):
+            raise error.ProgrammingError('unhandled value for nodesorder: %s' %
+                                         nodesorder)
+
+        if nodesorder is None and not self._generaldelta:
+            nodesorder = 'storage'
+
+        frev = self.rev
+        fnode = self.node
+
+        if nodesorder == 'nodes':
+            revs = [frev(n) for n in nodes]
+        elif nodesorder == 'storage':
+            revs = sorted(frev(n) for n in nodes)
+        else:
+            assert self._generaldelta
+            revs = set(frev(n) for n in nodes)
+            revs = dagop.linearize(revs, self.parentrevs)
+
+        prevrev = None
+
+        if deltaprevious or assumehaveparentrevisions:
+            prevrev = self.parentrevs(revs[0])[0]
+
+        # Set of revs available to delta against.
+        available = set()
+
+        for rev in revs:
+            if rev == nullrev:
+                continue
+
+            node = fnode(rev)
+            deltaparentrev = self.deltaparent(rev)
+            p1rev, p2rev = self.parentrevs(rev)
+
+            # Forced delta against previous mode.
+            if deltaprevious:
+                baserev = prevrev
+
+            # Revlog is configured to use full snapshots. Stick to that.
+            elif not self._storedeltachains:
+                baserev = nullrev
+
+            # There is a delta in storage. We try to use that because it
+            # amounts to effectively copying data from storage and is
+            # therefore the fastest.
+            elif deltaparentrev != nullrev:
+                # Base revision was already emitted in this group. We can
+                # always safely use the delta.
+                if deltaparentrev in available:
+                    baserev = deltaparentrev
+
+                # Base revision is a parent that hasn't been emitted already.
+                # Use it if we can assume the receiver has the parent revision.
+                elif (assumehaveparentrevisions
+                      and deltaparentrev in (p1rev, p2rev)):
+                    baserev = deltaparentrev
+
+                # No guarantee the receiver has the delta parent. Send delta
+                # against last revision (if possible), which in the common case
+                # should be similar enough to this revision that the delta is
+                # reasonable.
+                elif prevrev is not None:
+                    baserev = prevrev
+                else:
+                    baserev = nullrev
+
+            # Storage has a fulltext revision.
+
+            # Let's use the previous revision, which is as good a guess as any.
+            # There is definitely room to improve this logic.
+            elif prevrev is not None:
+                baserev = prevrev
+            else:
+                baserev = nullrev
+
+            # But we can't actually use our chosen delta base for whatever
+            # reason. Reset to fulltext.
+            if baserev != nullrev and not self.candelta(baserev, rev):
+                baserev = nullrev
+
+            revision = None
+            delta = None
+            baserevisionsize = None
+
+            if revisiondata:
+                if self.iscensored(baserev) or self.iscensored(rev):
+                    try:
+                        revision = self.revision(node, raw=True)
+                    except error.CensoredNodeError as e:
+                        revision = e.tombstone
+
+                    if baserev != nullrev:
+                        baserevisionsize = self.rawsize(baserev)
+
+                elif baserev == nullrev and not deltaprevious:
+                    revision = self.revision(node, raw=True)
+                    available.add(rev)
+                else:
+                    delta = self.revdiff(baserev, rev)
+                    available.add(rev)
+
+            yield revlogrevisiondelta(
+                node=node,
+                p1node=fnode(p1rev),
+                p2node=fnode(p2rev),
+                basenode=fnode(baserev),
+                flags=self.flags(rev),
+                baserevisionsize=baserevisionsize,
+                revision=revision,
+                delta=delta)
+
+            prevrev = rev
+
     DELTAREUSEALWAYS = 'always'
     DELTAREUSESAMEREVS = 'samerevs'
     DELTAREUSENEVER = 'never'
--- a/mercurial/testing/storage.py	Mon Sep 24 09:41:42 2018 -0700
+++ b/mercurial/testing/storage.py	Fri Sep 21 14:28:21 2018 -0700
@@ -500,6 +500,20 @@
         with self.assertRaises(StopIteration):
             next(gen)
 
+        # Emitting empty list is an empty generator.
+        gen = f.emitrevisions([])
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # Emitting null node yields nothing.
+        gen = f.emitrevisions([nullid])
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # Requesting unknown node fails.
+        with self.assertRaises(error.LookupError):
+            list(f.emitrevisions([b'\x01' * 20]))
+
     def testsinglerevision(self):
         fulltext = b'initial'
 
@@ -566,6 +580,42 @@
         with self.assertRaises(StopIteration):
             next(gen)
 
+        # Emitting a single revision works.
+        gen = f.emitrevisions([node])
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node)
+        self.assertEqual(rev.p1node, nullid)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertIsNone(rev.delta)
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # Requesting revision data works.
+        gen = f.emitrevisions([node], revisiondata=True)
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node)
+        self.assertEqual(rev.p1node, nullid)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertEqual(rev.revision, fulltext)
+        self.assertIsNone(rev.delta)
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # Emitting an unknown node after a known revision results in error.
+        with self.assertRaises(error.LookupError):
+            list(f.emitrevisions([node, b'\x01' * 20]))
+
     def testmultiplerevisions(self):
         fulltext0 = b'x' * 1024
         fulltext1 = fulltext0 + b'y'
@@ -697,6 +747,208 @@
         with self.assertRaises(StopIteration):
             next(gen)
 
+        # Nodes should be emitted in order.
+        gen = f.emitrevisions([node0, node1, node2], revisiondata=True)
+
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node0)
+        self.assertEqual(rev.p1node, nullid)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertEqual(rev.revision, fulltext0)
+        self.assertIsNone(rev.delta)
+
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node1)
+        self.assertEqual(rev.p1node, node0)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, node0)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x04\x01' +
+                         fulltext1)
+
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node2)
+        self.assertEqual(rev.p1node, node1)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, node1)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x04\x01\x00\x00\x04\x02' +
+                         fulltext2)
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # Request not in DAG order is reordered to be in DAG order.
+        gen = f.emitrevisions([node2, node1, node0], revisiondata=True)
+
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node0)
+        self.assertEqual(rev.p1node, nullid)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertEqual(rev.revision, fulltext0)
+        self.assertIsNone(rev.delta)
+
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node1)
+        self.assertEqual(rev.p1node, node0)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, node0)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x04\x01' +
+                         fulltext1)
+
+        rev = next(gen)
+
+        self.assertEqual(rev.node, node2)
+        self.assertEqual(rev.p1node, node1)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertIsNone(rev.linknode)
+        self.assertEqual(rev.basenode, node1)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x04\x01\x00\x00\x04\x02' +
+                         fulltext2)
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # Unrecognized nodesorder value raises ProgrammingError.
+        with self.assertRaises(error.ProgrammingError):
+            list(f.emitrevisions([], nodesorder='bad'))
+
+        # nodesorder=storage is recognized. But we can't test it thoroughly
+        # because behavior is storage-dependent.
+        res = list(f.emitrevisions([node2, node1, node0],
+                                         nodesorder='storage'))
+        self.assertEqual(len(res), 3)
+        self.assertEqual({o.node for o in res}, {node0, node1, node2})
+
+        # nodesorder=nodes forces the order.
+        gen = f.emitrevisions([node2, node0], nodesorder='nodes',
+                              revisiondata=True)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node2)
+        self.assertEqual(rev.p1node, node1)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertEqual(rev.revision, fulltext2)
+        self.assertIsNone(rev.delta)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node0)
+        self.assertEqual(rev.p1node, nullid)
+        self.assertEqual(rev.p2node, nullid)
+        # Delta behavior is storage dependent, so we can't easily test it.
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # assumehaveparentrevisions=False (the default) won't send a delta for
+        # the first revision.
+        gen = f.emitrevisions({node2, node1}, revisiondata=True)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node1)
+        self.assertEqual(rev.p1node, node0)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertEqual(rev.revision, fulltext1)
+        self.assertIsNone(rev.delta)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node2)
+        self.assertEqual(rev.p1node, node1)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertEqual(rev.basenode, node1)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x04\x01\x00\x00\x04\x02' +
+                         fulltext2)
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        # assumehaveparentrevisions=True allows delta against initial revision.
+        gen = f.emitrevisions([node2, node1],
+                              revisiondata=True, assumehaveparentrevisions=True)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node1)
+        self.assertEqual(rev.p1node, node0)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertEqual(rev.basenode, node0)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x04\x01' +
+                         fulltext1)
+
+        # forceprevious=True forces a delta against the previous revision.
+        # Special case for initial revision.
+        gen = f.emitrevisions([node0], revisiondata=True, deltaprevious=True)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node0)
+        self.assertEqual(rev.p1node, nullid)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00' +
+                         fulltext0)
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
+        gen = f.emitrevisions([node0, node2], revisiondata=True,
+                              deltaprevious=True)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node0)
+        self.assertEqual(rev.p1node, nullid)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertEqual(rev.basenode, nullid)
+        self.assertIsNone(rev.baserevisionsize)
+        self.assertIsNone(rev.revision)
+        self.assertEqual(rev.delta,
+                         b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00' +
+                         fulltext0)
+
+        rev = next(gen)
+        self.assertEqual(rev.node, node2)
+        self.assertEqual(rev.p1node, node1)
+        self.assertEqual(rev.p2node, nullid)
+        self.assertEqual(rev.basenode, node0)
+
+        with self.assertRaises(StopIteration):
+            next(gen)
+
     def testrenamed(self):
         fulltext0 = b'foo'
         fulltext1 = b'bar'