changelog-delay: move the delay/divert logic inside the (inner) revlog
authorPierre-Yves David <pierre-yves.david@octobus.net>
Tue, 24 Oct 2023 11:08:49 +0200
changeset 51106 d83d788590a8
parent 51105 af96fbb8f739
child 51107 c2d2e5b65def
changelog-delay: move the delay/divert logic inside the (inner) revlog Instead of hacking throught the vfs/opener, we implement the delay/divert logic inside the `_InnerRevlog` and `randomaccessfile` object. This will allow to an alternative implementation of the `_InnerRevlog` that does not need to use Python details. As a result, the new implementation can use the transaction less agressively and avoid some extra output since no data had been written yet. That seems like a good side effect.
mercurial/changelog.py
mercurial/repocache.py
mercurial/revlog.py
mercurial/revlogutils/randomaccessfile.py
tests/test-bundle2-exchange.t
tests/test-http-bad-server.t
--- a/mercurial/changelog.py	Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/changelog.py	Tue Oct 24 11:08:49 2023 +0200
@@ -27,7 +27,6 @@
 from .revlogutils import (
     constants as revlog_constants,
     flagutil,
-    randomaccessfile,
 )
 
 _defaultextra = {b'branch': b'default'}
@@ -92,38 +91,6 @@
     return b'\n'.join([l.rstrip() for l in desc.splitlines()]).strip(b'\n')
 
 
-class _divertopener:
-    def __init__(self, opener, target):
-        self._opener = opener
-        self._target = target
-
-    def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
-        if name != self._target:
-            return self._opener(name, mode, **kwargs)
-        return self._opener(name + b".a", mode, **kwargs)
-
-    def __getattr__(self, attr):
-        return getattr(self._opener, attr)
-
-
-class _delayopener:
-    """build an opener that stores chunks in 'buf' instead of 'target'"""
-
-    def __init__(self, opener, target, buf):
-        self._opener = opener
-        self._target = target
-        self._buf = buf
-
-    def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
-        if name != self._target:
-            return self._opener(name, mode, **kwargs)
-        assert not kwargs
-        return randomaccessfile.appender(self._opener, name, mode, self._buf)
-
-    def __getattr__(self, attr):
-        return getattr(self._opener, attr)
-
-
 @attr.s
 class _changelogrevision:
     # Extensions might modify _defaultextra, so let the constructor below pass
@@ -354,10 +321,7 @@
         # chains.
         self._storedeltachains = False
 
-        self._realopener = opener
-        self._delayed = False
-        self._delaybuf = None
-        self._divert = False
+        self._v2_delayed = False
         self._filteredrevs = frozenset()
         self._filteredrevs_hashcache = {}
         self._copiesstorage = opener.options.get(b'copies-storage')
@@ -374,90 +338,47 @@
         self._filteredrevs_hashcache = {}
 
     def _write_docket(self, tr):
-        if not self.is_delaying:
+        if not self._v2_delayed:
             super(changelog, self)._write_docket(tr)
 
-    @property
-    def is_delaying(self):
-        return self._delayed
-
     def delayupdate(self, tr):
         """delay visibility of index updates to other readers"""
         assert not self._inner.is_open
-        if self._docket is None and not self.is_delaying:
-            if len(self) == 0:
-                self._divert = True
-                if self._realopener.exists(self._indexfile + b'.a'):
-                    self._realopener.unlink(self._indexfile + b'.a')
-                self.opener = _divertopener(self._realopener, self._indexfile)
-            else:
-                self._delaybuf = []
-                self.opener = _delayopener(
-                    self._realopener, self._indexfile, self._delaybuf
-                )
-            self._inner.opener = self.opener
-            self._inner._segmentfile.opener = self.opener
-            self._inner._segmentfile_sidedata.opener = self.opener
-        self._delayed = True
+        if self._docket is not None:
+            self._v2_delayed = True
+        else:
+            new_index = self._inner.delay()
+            if new_index is not None:
+                self._indexfile = new_index
+                tr.registertmp(new_index)
         tr.addpending(b'cl-%i' % id(self), self._writepending)
         tr.addfinalize(b'cl-%i' % id(self), self._finalize)
 
     def _finalize(self, tr):
         """finalize index updates"""
         assert not self._inner.is_open
-        self._delayed = False
-        self.opener = self._realopener
-        self._inner.opener = self.opener
-        self._inner._segmentfile.opener = self.opener
-        self._inner._segmentfile_sidedata.opener = self.opener
-        # move redirected index data back into place
         if self._docket is not None:
-            self._write_docket(tr)
-        elif self._divert:
-            assert not self._delaybuf
-            tmpname = self._indexfile + b".a"
-            nfile = self.opener.open(tmpname)
-            nfile.close()
-            self.opener.rename(tmpname, self._indexfile, checkambig=True)
-        elif self._delaybuf:
-            fp = self.opener(self._indexfile, b'a', checkambig=True)
-            fp.write(b"".join(self._delaybuf))
-            fp.close()
-            self._delaybuf = None
-        self._divert = False
-        # split when we're done
-        self._enforceinlinesize(tr, side_write=False)
+            self._docket.write(tr)
+            self._v2_delayed = False
+        else:
+            new_index_file = self._inner.finalize_pending()
+            self._indexfile = new_index_file
+            # split when we're done
+            self._enforceinlinesize(tr, side_write=False)
 
     def _writepending(self, tr):
         """create a file containing the unfinalized state for
         pretxnchangegroup"""
         assert not self._inner.is_open
         if self._docket:
-            return self._docket.write(tr, pending=True)
-        if self._delaybuf:
-            # make a temporary copy of the index
-            fp1 = self._realopener(self._indexfile)
-            pendingfilename = self._indexfile + b".a"
-            # register as a temp file to ensure cleanup on failure
-            tr.registertmp(pendingfilename)
-            # write existing data
-            fp2 = self._realopener(pendingfilename, b"w")
-            fp2.write(fp1.read())
-            # add pending data
-            fp2.write(b"".join(self._delaybuf))
-            fp2.close()
-            # switch modes so finalize can simply rename
-            self._delaybuf = None
-            self._divert = True
-            self.opener = _divertopener(self._realopener, self._indexfile)
-            self._inner.opener = self.opener
-            self._inner._segmentfile.opener = self.opener
-            self._inner._segmentfile_sidedata.opener = self.opener
-
-        if self._divert:
-            return True
-
-        return False
+            any_pending = self._docket.write(tr, pending=True)
+            self._v2_delayed = False
+        else:
+            new_index, any_pending = self._inner.write_pending()
+            if new_index is not None:
+                self._indexfile = new_index
+                tr.registertmp(new_index)
+        return any_pending
 
     def _enforceinlinesize(self, tr, side_write=True):
         if not self.is_delaying:
--- a/mercurial/repocache.py	Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/repocache.py	Tue Oct 24 11:08:49 2023 +0200
@@ -129,7 +129,7 @@
     srcfilecache = srcrepo._filecache
     if b'changelog' in srcfilecache:
         destfilecache[b'changelog'] = ce = srcfilecache[b'changelog']
-        ce.obj.opener = ce.obj._realopener = destrepo.svfs
+        ce.obj.opener = ce.obj._inner.opener = destrepo.svfs
     if b'obsstore' in srcfilecache:
         destfilecache[b'obsstore'] = ce = srcfilecache[b'obsstore']
         ce.obj.svfs = destrepo.svfs
--- a/mercurial/revlog.py	Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/revlog.py	Tue Oct 24 11:08:49 2023 +0200
@@ -369,6 +369,9 @@
         self.delta_config = delta_config
         self.feature_config = feature_config
 
+        # used during diverted write.
+        self._orig_index_file = None
+
         self._default_compression_header = default_compression_header
 
         # index
@@ -393,6 +396,8 @@
         # 3-tuple of (node, rev, text) for a raw revision.
         self._revisioncache = None
 
+        self._delay_buffer = None
+
     @property
     def index_file(self):
         return self.__index_file
@@ -407,14 +412,27 @@
         return len(self.index)
 
     def clear_cache(self):
+        assert not self.is_delaying
         self._revisioncache = None
         self._segmentfile.clear_cache()
         self._segmentfile_sidedata.clear_cache()
 
     @property
     def canonical_index_file(self):
+        if self._orig_index_file is not None:
+            return self._orig_index_file
         return self.index_file
 
+    @property
+    def is_delaying(self):
+        """is the revlog is currently delaying the visibility of written data?
+
+        The delaying mechanism can be either in-memory or written on disk in a
+        side-file."""
+        return (self._delay_buffer is not None) or (
+            self._orig_index_file is not None
+        )
+
     # Derived from index values.
 
     def start(self, rev):
@@ -700,22 +718,36 @@
         You should not use this directly and use `_writing` instead
         """
         try:
-            f = self.opener(
-                self.index_file,
-                mode=b"r+",
-                checkambig=self.data_config.check_ambig,
-            )
+            if self._delay_buffer is None:
+                f = self.opener(
+                    self.index_file,
+                    mode=b"r+",
+                    checkambig=self.data_config.check_ambig,
+                )
+            else:
+                # check_ambig affect we way we open file for writing, however
+                # here, we do not actually open a file for writting as write
+                # will appened to a delay_buffer. So check_ambig is not
+                # meaningful and unneeded here.
+                f = randomaccessfile.appender(
+                    self.opener, self.index_file, b"r+", self._delay_buffer
+                )
             if index_end is None:
                 f.seek(0, os.SEEK_END)
             else:
                 f.seek(index_end, os.SEEK_SET)
             return f
         except FileNotFoundError:
-            return self.opener(
-                self.index_file,
-                mode=b"w+",
-                checkambig=self.data_config.check_ambig,
-            )
+            if self._delay_buffer is None:
+                return self.opener(
+                    self.index_file,
+                    mode=b"w+",
+                    checkambig=self.data_config.check_ambig,
+                )
+            else:
+                return randomaccessfile.appender(
+                    self.opener, self.index_file, b"w+", self._delay_buffer
+                )
 
     def __index_new_fp(self):
         """internal method to create a new index file for writing
@@ -1044,20 +1076,101 @@
             dfh.write(data[1])
             if sidedata:
                 sdfh.write(sidedata)
-            ifh.write(entry)
+            if self._delay_buffer is None:
+                ifh.write(entry)
+            else:
+                self._delay_buffer.append(entry)
         else:
             offset += curr * self.index.entry_size
             transaction.add(self.canonical_index_file, offset)
-            ifh.write(entry)
-            ifh.write(data[0])
-            ifh.write(data[1])
             assert not sidedata
+            if self._delay_buffer is None:
+                ifh.write(entry)
+                ifh.write(data[0])
+                ifh.write(data[1])
+            else:
+                self._delay_buffer.append(entry)
+                self._delay_buffer.append(data[0])
+                self._delay_buffer.append(data[1])
         return (
             ifh.tell(),
             dfh.tell() if dfh else None,
             sdfh.tell() if sdfh else None,
         )
 
+    def _divert_index(self):
+        return self.index_file + b'.a'
+
+    def delay(self):
+        assert not self.is_open
+        if self._delay_buffer is not None or self._orig_index_file is not None:
+            # delay or divert already in place
+            return None
+        elif len(self.index) == 0:
+            self._orig_index_file = self.index_file
+            self.index_file = self._divert_index()
+            self._segmentfile.filename = self.index_file
+            assert self._orig_index_file is not None
+            assert self.index_file is not None
+            if self.opener.exists(self.index_file):
+                self.opener.unlink(self.index_file)
+            return self.index_file
+        else:
+            self._segmentfile._delay_buffer = self._delay_buffer = []
+            return None
+
+    def write_pending(self):
+        assert not self.is_open
+        if self._orig_index_file is not None:
+            return None, True
+        any_pending = False
+        pending_index_file = self._divert_index()
+        if self.opener.exists(pending_index_file):
+            self.opener.unlink(pending_index_file)
+        util.copyfile(
+            self.opener.join(self.index_file),
+            self.opener.join(pending_index_file),
+        )
+        if self._delay_buffer:
+            with self.opener(pending_index_file, b'r+') as ifh:
+                ifh.seek(0, os.SEEK_END)
+                ifh.write(b"".join(self._delay_buffer))
+            any_pending = True
+        self._segmentfile._delay_buffer = self._delay_buffer = None
+        self._orig_index_file = self.index_file
+        self.index_file = pending_index_file
+        self._segmentfile.filename = self.index_file
+        return self.index_file, any_pending
+
+    def finalize_pending(self):
+        assert not self.is_open
+
+        delay = self._delay_buffer is not None
+        divert = self._orig_index_file is not None
+
+        if delay and divert:
+            assert False, "unreachable"
+        elif delay:
+            if self._delay_buffer:
+                with self.opener(self.index_file, b'r+') as ifh:
+                    ifh.seek(0, os.SEEK_END)
+                    ifh.write(b"".join(self._delay_buffer))
+            self._segmentfile._delay_buffer = self._delay_buffer = None
+        elif divert:
+            if self.opener.exists(self.index_file):
+                self.opener.rename(
+                    self.index_file,
+                    self._orig_index_file,
+                    checkambig=True,
+                )
+            self.index_file = self._orig_index_file
+            self._orig_index_file = None
+            self._segmentfile.filename = self.index_file
+        else:
+            msg = b"not delay or divert found on this revlog"
+            raise error.ProgrammingError(msg)
+        return self.canonical_index_file
+
 
 class revlog:
     """
@@ -2925,6 +3038,10 @@
                 if self._docket is not None:
                     self._write_docket(transaction)
 
+    @property
+    def is_delaying(self):
+        return self._inner.is_delaying
+
     def _write_docket(self, transaction):
         """write the current docket on disk
 
--- a/mercurial/revlogutils/randomaccessfile.py	Thu Oct 26 05:37:37 2023 +0200
+++ b/mercurial/revlogutils/randomaccessfile.py	Tue Oct 24 11:08:49 2023 +0200
@@ -116,6 +116,8 @@
         if initial_cache:
             self._cached_chunk_position, self._cached_chunk = initial_cache
 
+        self._delay_buffer = None
+
     def clear_cache(self):
         self._cached_chunk = b''
         self._cached_chunk_position = 0
@@ -131,7 +133,12 @@
 
     def _open(self, mode=b'r'):
         """Return a file object"""
-        return self.opener(self.filename, mode=mode)
+        if self._delay_buffer is None:
+            return self.opener(self.filename, mode=mode)
+        else:
+            return appender(
+                self.opener, self.filename, mode, self._delay_buffer
+            )
 
     @contextlib.contextmanager
     def _read_handle(self):
--- a/tests/test-bundle2-exchange.t	Thu Oct 26 05:37:37 2023 +0200
+++ b/tests/test-bundle2-exchange.t	Tue Oct 24 11:08:49 2023 +0200
@@ -1042,8 +1042,6 @@
   adding changesets
   remote: abort: incompatible Mercurial client; bundle2 required
   remote: (see https://www.mercurial-scm.org/wiki/IncompatibleClient)
-  transaction abort!
-  rollback completed
   abort: stream ended unexpectedly (got 0 bytes, expected 4)
   [255]
 
--- a/tests/test-http-bad-server.t	Thu Oct 26 05:37:37 2023 +0200
+++ b/tests/test-http-bad-server.t	Tue Oct 24 11:08:49 2023 +0200
@@ -725,8 +725,6 @@
   $ hg clone http://localhost:$HGPORT/ clone
   requesting all changes
   adding changesets
-  transaction abort!
-  rollback completed
   abort: HTTP request error (incomplete response)
   (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
   [255]
@@ -759,8 +757,6 @@
   $ hg clone http://localhost:$HGPORT/ clone
   requesting all changes
   adding changesets
-  transaction abort!
-  rollback completed
   abort: HTTP request error (incomplete response*) (glob)
   (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
   [255]
@@ -795,8 +791,6 @@
   $ hg clone http://localhost:$HGPORT/ clone
   requesting all changes
   adding changesets
-  transaction abort!
-  rollback completed
   abort: HTTP request error (incomplete response)
   (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
   [255]