revlog: introduce a mandatory `_writing` context to update revlog content
authorPierre-Yves David <pierre-yves.david@octobus.net>
Mon, 03 May 2021 12:27:42 +0200
changeset 47214 906a7bcaac86
parent 47213 100f061d88f6
child 47215 042388bba644
revlog: introduce a mandatory `_writing` context to update revlog content Before this change, various revlog methods where managing the opening and closing of the revlog files manually and passing the file descriptor alors the call path. To simplify the tracking of the write operation by a future docket, we need something more organised. As a result, we introduce a `revlog._writing` context manager that will wrap each revlog update operation. The file descriptor are kept in the existing `revlog._writinghandles` parameter that was already used by the `addgroup` logic. All this change is internal to the revlog only, the "public" interface is not affected. The `addrevision` and `addgroup` logic are still responsible for setup up this context. However this new context give us multiple benefits: * all writer use a same, unified, logic, * this context is programmatically enforced, * each write "session" as a clearly identified start and end. The post-pull sidedata update logic is still doing writing by end and will be adjusted in a later changesets. This change affect the concurrency checker test, because register the state of the file in the transaction sooner in `addrevision` (about as early as what `addgroup` would do), so the abort is rollbacking the other commit. I don't want to weaken the current main logic. Differential Revision: https://phab.mercurial-scm.org/D10605
mercurial/changelog.py
mercurial/revlog.py
tests/test-racy-mutations.t
tests/test-revlog-raw.py
--- a/mercurial/changelog.py	Mon May 03 12:27:31 2021 +0200
+++ b/mercurial/changelog.py	Mon May 03 12:27:42 2021 +0200
@@ -506,9 +506,9 @@
 
         return False
 
-    def _enforceinlinesize(self, tr, fp=None):
+    def _enforceinlinesize(self, tr):
         if not self._delayed:
-            revlog.revlog._enforceinlinesize(self, tr, fp)
+            revlog.revlog._enforceinlinesize(self, tr)
 
     def read(self, nodeorrev):
         """Obtain data from a parsed changelog revision.
--- a/mercurial/revlog.py	Mon May 03 12:27:31 2021 +0200
+++ b/mercurial/revlog.py	Mon May 03 12:27:42 2021 +0200
@@ -360,6 +360,8 @@
 
         # 2-tuple of file handles being used for active writing.
         self._writinghandles = None
+        # prevent nesting of addgroup
+        self._adding_group = None
 
         self._loadindex()
 
@@ -1955,7 +1957,7 @@
                 raise error.CensoredNodeError(self.display_id, node, text)
             raise
 
-    def _enforceinlinesize(self, tr, fp=None):
+    def _enforceinlinesize(self, tr):
         """Check if the revlog is too big for inline and convert if so.
 
         This should be called after revisions are added to the revlog. If the
@@ -1975,21 +1977,27 @@
         trindex = 0
         tr.add(self._datafile, 0)
 
-        if fp:
+        existing_handles = False
+        if self._writinghandles is not None:
+            existing_handles = True
+            fp = self._writinghandles[0]
             fp.flush()
             fp.close()
             # We can't use the cached file handle after close(). So prevent
             # its usage.
             self._writinghandles = None
 
-        if True:
-            with self._indexfp(b'r') as ifh, self._datafp(b'w') as dfh:
+        new_dfh = self._datafp(b'w+')
+        new_dfh.truncate(0)  # drop any potentially existing data
+        try:
+            with self._indexfp(b'r') as read_ifh:
                 for r in self:
-                    dfh.write(self._getsegmentforrevs(r, r, df=ifh)[1])
+                    new_dfh.write(self._getsegmentforrevs(r, r, df=read_ifh)[1])
                     if troffset <= self.start(r):
                         trindex = r
-
-            with self._indexfp(b'w') as fp:
+                new_dfh.flush()
+
+            with self.opener(self._indexfile, mode=b'w', atomictemp=True) as fp:
                 self._format_flags &= ~FLAG_INLINE_DATA
                 self._inline = False
                 for i in self:
@@ -1999,7 +2007,6 @@
                         header = self.index.pack_header(header)
                         e = header + e
                     fp.write(e)
-
                 # the temp file replace the real index when we exit the context
                 # manager
 
@@ -2007,9 +2014,50 @@
             nodemaputil.setup_persistent_nodemap(tr, self)
             self._chunkclear()
 
+            if existing_handles:
+                # switched from inline to conventional reopen the index
+                ifh = self._indexfp(b"a+")
+                self._writinghandles = (ifh, new_dfh)
+                new_dfh = None
+        finally:
+            if new_dfh is not None:
+                new_dfh.close()
+
     def _nodeduplicatecallback(self, transaction, node):
         """called when trying to add a node already stored."""
 
+    @contextlib.contextmanager
+    def _writing(self, transaction):
+        if self._writinghandles is not None:
+            yield
+        else:
+            r = len(self)
+            dsize = 0
+            if r:
+                dsize = self.end(r - 1)
+            dfh = None
+            if not self._inline:
+                dfh = self._datafp(b"a+")
+                transaction.add(self._datafile, dsize)
+            try:
+                isize = r * self.index.entry_size
+                ifh = self._indexfp(b"a+")
+                if self._inline:
+                    transaction.add(self._indexfile, dsize + isize)
+                else:
+                    transaction.add(self._indexfile, isize)
+                try:
+                    self._writinghandles = (ifh, dfh)
+                    try:
+                        yield
+                    finally:
+                        self._writinghandles = None
+                finally:
+                    ifh.close()
+            finally:
+                if dfh is not None:
+                    dfh.close()
+
     def addrevision(
         self,
         text,
@@ -2105,11 +2153,7 @@
         useful when reusing a revision not stored in this revlog (ex: received
         over wire, or read from an external bundle).
         """
-        dfh = None
-        if not self._inline:
-            dfh = self._datafp(b"a+")
-        ifh = self._indexfp(b"a+")
-        try:
+        with self._writing(transaction):
             return self._addrevision(
                 node,
                 rawtext,
@@ -2119,15 +2163,9 @@
                 p2,
                 flags,
                 cachedelta,
-                ifh,
-                dfh,
                 deltacomputer=deltacomputer,
                 sidedata=sidedata,
             )
-        finally:
-            if dfh:
-                dfh.close()
-            ifh.close()
 
     def compress(self, data):
         """Generate a possibly-compressed representation of data."""
@@ -2214,8 +2252,6 @@
         p2,
         flags,
         cachedelta,
-        ifh,
-        dfh,
         alwayscache=False,
         deltacomputer=None,
         sidedata=None,
@@ -2244,11 +2280,14 @@
             raise error.RevlogError(
                 _(b"%s: attempt to add wdir revision") % self.display_id
             )
+        if self._writinghandles is None:
+            msg = b'adding revision outside `revlog._writing` context'
+            raise error.ProgrammingError(msg)
 
         if self._inline:
-            fh = ifh
+            fh = self._writinghandles[0]
         else:
-            fh = dfh
+            fh = self._writinghandles[1]
 
         btext = [rawtext]
 
@@ -2258,6 +2297,7 @@
         offset = self._get_data_offset(prev)
 
         if self._concurrencychecker:
+            ifh, dfh = self._writinghandles
             if self._inline:
                 # offset is "as if" it were in the .d file, so we need to add on
                 # the size of the entry metadata.
@@ -2323,8 +2363,6 @@
             entry = header + entry
         self._writeentry(
             transaction,
-            ifh,
-            dfh,
             entry,
             deltainfo.data,
             link,
@@ -2362,9 +2400,7 @@
             offset = max(self.end(rev), offset, sidedata_end)
         return offset
 
-    def _writeentry(
-        self, transaction, ifh, dfh, entry, data, link, offset, sidedata
-    ):
+    def _writeentry(self, transaction, entry, data, link, offset, sidedata):
         # Files opened in a+ mode have inconsistent behavior on various
         # platforms. Windows requires that a file positioning call be made
         # when the file handle transitions between reads and writes. See
@@ -2377,6 +2413,10 @@
         # Note: This is likely not necessary on Python 3. However, because
         # the file handle is reused for reads and may be seeked there, we need
         # to be careful before changing this.
+        if self._writinghandles is None:
+            msg = b'adding revision outside `revlog._writing` context'
+            raise error.ProgrammingError(msg)
+        ifh, dfh = self._writinghandles
         ifh.seek(0, os.SEEK_END)
         if dfh:
             dfh.seek(0, os.SEEK_END)
@@ -2399,7 +2439,7 @@
             ifh.write(data[1])
             if sidedata:
                 ifh.write(sidedata)
-            self._enforceinlinesize(transaction, ifh)
+            self._enforceinlinesize(transaction)
         nodemaputil.setup_persistent_nodemap(transaction, self)
 
     def addgroup(
@@ -2422,28 +2462,13 @@
         this revlog and the node that was added.
         """
 
-        if self._writinghandles:
+        if self._adding_group:
             raise error.ProgrammingError(b'cannot nest addgroup() calls')
 
-        r = len(self)
-        end = 0
-        if r:
-            end = self.end(r - 1)
-        ifh = self._indexfp(b"a+")
-        isize = r * self.index.entry_size
-        if self._inline:
-            transaction.add(self._indexfile, end + isize)
-            dfh = None
-        else:
-            transaction.add(self._indexfile, isize)
-            transaction.add(self._datafile, end)
-            dfh = self._datafp(b"a+")
-
-        self._writinghandles = (ifh, dfh)
+        self._adding_group = True
         empty = True
-
         try:
-            if True:
+            with self._writing(transaction):
                 deltacomputer = deltautil.deltacomputer(self)
                 # loop through our set of deltas
                 for data in deltas:
@@ -2514,8 +2539,6 @@
                         p2,
                         flags,
                         (baserev, delta),
-                        ifh,
-                        dfh,
                         alwayscache=alwayscache,
                         deltacomputer=deltacomputer,
                         sidedata=sidedata,
@@ -2524,20 +2547,8 @@
                     if addrevisioncb:
                         addrevisioncb(self, rev)
                     empty = False
-
-                    if not dfh and not self._inline:
-                        # addrevision switched from inline to conventional
-                        # reopen the index
-                        ifh.close()
-                        dfh = self._datafp(b"a+")
-                        ifh = self._indexfp(b"a+")
-                        self._writinghandles = (ifh, dfh)
         finally:
-            self._writinghandles = None
-
-            if dfh:
-                dfh.close()
-            ifh.close()
+            self._adding_group = False
         return not empty
 
     def iscensored(self, rev):
@@ -2868,13 +2879,7 @@
                     )
                     flags = flags | new_flags[0] & ~new_flags[1]
 
-                ifh = destrevlog.opener(
-                    destrevlog._indexfile, b'a+', checkambig=False
-                )
-                dfh = None
-                if not destrevlog._inline:
-                    dfh = destrevlog.opener(destrevlog._datafile, b'a+')
-                try:
+                with destrevlog._writing(tr):
                     destrevlog._addrevision(
                         node,
                         rawtext,
@@ -2884,15 +2889,9 @@
                         p2,
                         flags,
                         cachedelta,
-                        ifh,
-                        dfh,
                         deltacomputer=deltacomputer,
                         sidedata=sidedata,
                     )
-                finally:
-                    if dfh:
-                        dfh.close()
-                    ifh.close()
 
             if addrevisioncb:
                 addrevisioncb(self, rev, node)
--- a/tests/test-racy-mutations.t	Mon May 03 12:27:31 2021 +0200
+++ b/tests/test-racy-mutations.t	Mon May 03 12:27:42 2021 +0200
@@ -91,7 +91,7 @@
   $ hg debugrevlogindex -c
      rev linkrev nodeid       p1           p2
        0       0 222799e2f90b 000000000000 000000000000
-       1       1 6f124f6007a0 222799e2f90b 000000000000
+       1       1 6f124f6007a0 222799e2f90b 000000000000 (missing-correct-output !)
 And, because of transactions, there's none in the manifestlog either.
   $ hg debugrevlogindex -m
      rev linkrev nodeid       p1           p2
--- a/tests/test-revlog-raw.py	Mon May 03 12:27:31 2021 +0200
+++ b/tests/test-revlog-raw.py	Mon May 03 12:27:42 2021 +0200
@@ -19,6 +19,32 @@
     flagutil,
 )
 
+
+class _NoTransaction(object):
+    """transaction like object to update the nodemap outside a transaction"""
+
+    def __init__(self):
+        self._postclose = {}
+
+    def addpostclose(self, callback_id, callback_func):
+        self._postclose[callback_id] = callback_func
+
+    def registertmp(self, *args, **kwargs):
+        pass
+
+    def addbackup(self, *args, **kwargs):
+        pass
+
+    def add(self, *args, **kwargs):
+        pass
+
+    def addabort(self, *args, **kwargs):
+        pass
+
+    def _report(self, *args):
+        pass
+
+
 # TESTTMP is optional. This makes it convenient to run without run-tests.py
 tvfs = vfs.vfs(encoding.environ.get(b'TESTTMP', b'/tmp'))
 
@@ -201,19 +227,17 @@
             text = None
             cachedelta = (deltaparent, rlog.revdiff(deltaparent, r))
         flags = rlog.flags(r)
-        ifh = dfh = None
-        try:
-            ifh = dlog.opener(dlog._indexfile, b'a+')
-            if not dlog._inline:
-                dfh = dlog.opener(dlog._datafile, b'a+')
+        with dlog._writing(_NoTransaction()):
             dlog._addrevision(
-                rlog.node(r), text, tr, r, p1, p2, flags, cachedelta, ifh, dfh
+                rlog.node(r),
+                text,
+                tr,
+                r,
+                p1,
+                p2,
+                flags,
+                cachedelta,
             )
-        finally:
-            if dfh is not None:
-                dfh.close()
-            if ifh is not None:
-                ifh.close()
     return dlog