# HG changeset patch # User Pierre-Yves David # Date 1698138529 -7200 # Node ID d83d788590a886b45bd83d469e4c263070564de3 # Parent af96fbb8f739444afa9105cb20fdd3c74a492a48 changelog-delay: move the delay/divert logic inside the (inner) revlog Instead of hacking throught the vfs/opener, we implement the delay/divert logic inside the `_InnerRevlog` and `randomaccessfile` object. This will allow to an alternative implementation of the `_InnerRevlog` that does not need to use Python details. As a result, the new implementation can use the transaction less agressively and avoid some extra output since no data had been written yet. That seems like a good side effect. diff -r af96fbb8f739 -r d83d788590a8 mercurial/changelog.py --- a/mercurial/changelog.py Thu Oct 26 05:37:37 2023 +0200 +++ b/mercurial/changelog.py Tue Oct 24 11:08:49 2023 +0200 @@ -27,7 +27,6 @@ from .revlogutils import ( constants as revlog_constants, flagutil, - randomaccessfile, ) _defaultextra = {b'branch': b'default'} @@ -92,38 +91,6 @@ return b'\n'.join([l.rstrip() for l in desc.splitlines()]).strip(b'\n') -class _divertopener: - def __init__(self, opener, target): - self._opener = opener - self._target = target - - def __call__(self, name, mode=b'r', checkambig=False, **kwargs): - if name != self._target: - return self._opener(name, mode, **kwargs) - return self._opener(name + b".a", mode, **kwargs) - - def __getattr__(self, attr): - return getattr(self._opener, attr) - - -class _delayopener: - """build an opener that stores chunks in 'buf' instead of 'target'""" - - def __init__(self, opener, target, buf): - self._opener = opener - self._target = target - self._buf = buf - - def __call__(self, name, mode=b'r', checkambig=False, **kwargs): - if name != self._target: - return self._opener(name, mode, **kwargs) - assert not kwargs - return randomaccessfile.appender(self._opener, name, mode, self._buf) - - def __getattr__(self, attr): - return getattr(self._opener, attr) - - @attr.s class _changelogrevision: # Extensions might modify _defaultextra, so let the constructor below pass @@ -354,10 +321,7 @@ # chains. self._storedeltachains = False - self._realopener = opener - self._delayed = False - self._delaybuf = None - self._divert = False + self._v2_delayed = False self._filteredrevs = frozenset() self._filteredrevs_hashcache = {} self._copiesstorage = opener.options.get(b'copies-storage') @@ -374,90 +338,47 @@ self._filteredrevs_hashcache = {} def _write_docket(self, tr): - if not self.is_delaying: + if not self._v2_delayed: super(changelog, self)._write_docket(tr) - @property - def is_delaying(self): - return self._delayed - def delayupdate(self, tr): """delay visibility of index updates to other readers""" assert not self._inner.is_open - if self._docket is None and not self.is_delaying: - if len(self) == 0: - self._divert = True - if self._realopener.exists(self._indexfile + b'.a'): - self._realopener.unlink(self._indexfile + b'.a') - self.opener = _divertopener(self._realopener, self._indexfile) - else: - self._delaybuf = [] - self.opener = _delayopener( - self._realopener, self._indexfile, self._delaybuf - ) - self._inner.opener = self.opener - self._inner._segmentfile.opener = self.opener - self._inner._segmentfile_sidedata.opener = self.opener - self._delayed = True + if self._docket is not None: + self._v2_delayed = True + else: + new_index = self._inner.delay() + if new_index is not None: + self._indexfile = new_index + tr.registertmp(new_index) tr.addpending(b'cl-%i' % id(self), self._writepending) tr.addfinalize(b'cl-%i' % id(self), self._finalize) def _finalize(self, tr): """finalize index updates""" assert not self._inner.is_open - self._delayed = False - self.opener = self._realopener - self._inner.opener = self.opener - self._inner._segmentfile.opener = self.opener - self._inner._segmentfile_sidedata.opener = self.opener - # move redirected index data back into place if self._docket is not None: - self._write_docket(tr) - elif self._divert: - assert not self._delaybuf - tmpname = self._indexfile + b".a" - nfile = self.opener.open(tmpname) - nfile.close() - self.opener.rename(tmpname, self._indexfile, checkambig=True) - elif self._delaybuf: - fp = self.opener(self._indexfile, b'a', checkambig=True) - fp.write(b"".join(self._delaybuf)) - fp.close() - self._delaybuf = None - self._divert = False - # split when we're done - self._enforceinlinesize(tr, side_write=False) + self._docket.write(tr) + self._v2_delayed = False + else: + new_index_file = self._inner.finalize_pending() + self._indexfile = new_index_file + # split when we're done + self._enforceinlinesize(tr, side_write=False) def _writepending(self, tr): """create a file containing the unfinalized state for pretxnchangegroup""" assert not self._inner.is_open if self._docket: - return self._docket.write(tr, pending=True) - if self._delaybuf: - # make a temporary copy of the index - fp1 = self._realopener(self._indexfile) - pendingfilename = self._indexfile + b".a" - # register as a temp file to ensure cleanup on failure - tr.registertmp(pendingfilename) - # write existing data - fp2 = self._realopener(pendingfilename, b"w") - fp2.write(fp1.read()) - # add pending data - fp2.write(b"".join(self._delaybuf)) - fp2.close() - # switch modes so finalize can simply rename - self._delaybuf = None - self._divert = True - self.opener = _divertopener(self._realopener, self._indexfile) - self._inner.opener = self.opener - self._inner._segmentfile.opener = self.opener - self._inner._segmentfile_sidedata.opener = self.opener - - if self._divert: - return True - - return False + any_pending = self._docket.write(tr, pending=True) + self._v2_delayed = False + else: + new_index, any_pending = self._inner.write_pending() + if new_index is not None: + self._indexfile = new_index + tr.registertmp(new_index) + return any_pending def _enforceinlinesize(self, tr, side_write=True): if not self.is_delaying: diff -r af96fbb8f739 -r d83d788590a8 mercurial/repocache.py --- a/mercurial/repocache.py Thu Oct 26 05:37:37 2023 +0200 +++ b/mercurial/repocache.py Tue Oct 24 11:08:49 2023 +0200 @@ -129,7 +129,7 @@ srcfilecache = srcrepo._filecache if b'changelog' in srcfilecache: destfilecache[b'changelog'] = ce = srcfilecache[b'changelog'] - ce.obj.opener = ce.obj._realopener = destrepo.svfs + ce.obj.opener = ce.obj._inner.opener = destrepo.svfs if b'obsstore' in srcfilecache: destfilecache[b'obsstore'] = ce = srcfilecache[b'obsstore'] ce.obj.svfs = destrepo.svfs diff -r af96fbb8f739 -r d83d788590a8 mercurial/revlog.py --- a/mercurial/revlog.py Thu Oct 26 05:37:37 2023 +0200 +++ b/mercurial/revlog.py Tue Oct 24 11:08:49 2023 +0200 @@ -369,6 +369,9 @@ self.delta_config = delta_config self.feature_config = feature_config + # used during diverted write. + self._orig_index_file = None + self._default_compression_header = default_compression_header # index @@ -393,6 +396,8 @@ # 3-tuple of (node, rev, text) for a raw revision. self._revisioncache = None + self._delay_buffer = None + @property def index_file(self): return self.__index_file @@ -407,14 +412,27 @@ return len(self.index) def clear_cache(self): + assert not self.is_delaying self._revisioncache = None self._segmentfile.clear_cache() self._segmentfile_sidedata.clear_cache() @property def canonical_index_file(self): + if self._orig_index_file is not None: + return self._orig_index_file return self.index_file + @property + def is_delaying(self): + """is the revlog is currently delaying the visibility of written data? + + The delaying mechanism can be either in-memory or written on disk in a + side-file.""" + return (self._delay_buffer is not None) or ( + self._orig_index_file is not None + ) + # Derived from index values. def start(self, rev): @@ -700,22 +718,36 @@ You should not use this directly and use `_writing` instead """ try: - f = self.opener( - self.index_file, - mode=b"r+", - checkambig=self.data_config.check_ambig, - ) + if self._delay_buffer is None: + f = self.opener( + self.index_file, + mode=b"r+", + checkambig=self.data_config.check_ambig, + ) + else: + # check_ambig affect we way we open file for writing, however + # here, we do not actually open a file for writting as write + # will appened to a delay_buffer. So check_ambig is not + # meaningful and unneeded here. + f = randomaccessfile.appender( + self.opener, self.index_file, b"r+", self._delay_buffer + ) if index_end is None: f.seek(0, os.SEEK_END) else: f.seek(index_end, os.SEEK_SET) return f except FileNotFoundError: - return self.opener( - self.index_file, - mode=b"w+", - checkambig=self.data_config.check_ambig, - ) + if self._delay_buffer is None: + return self.opener( + self.index_file, + mode=b"w+", + checkambig=self.data_config.check_ambig, + ) + else: + return randomaccessfile.appender( + self.opener, self.index_file, b"w+", self._delay_buffer + ) def __index_new_fp(self): """internal method to create a new index file for writing @@ -1044,20 +1076,101 @@ dfh.write(data[1]) if sidedata: sdfh.write(sidedata) - ifh.write(entry) + if self._delay_buffer is None: + ifh.write(entry) + else: + self._delay_buffer.append(entry) else: offset += curr * self.index.entry_size transaction.add(self.canonical_index_file, offset) - ifh.write(entry) - ifh.write(data[0]) - ifh.write(data[1]) assert not sidedata + if self._delay_buffer is None: + ifh.write(entry) + ifh.write(data[0]) + ifh.write(data[1]) + else: + self._delay_buffer.append(entry) + self._delay_buffer.append(data[0]) + self._delay_buffer.append(data[1]) return ( ifh.tell(), dfh.tell() if dfh else None, sdfh.tell() if sdfh else None, ) + def _divert_index(self): + return self.index_file + b'.a' + + def delay(self): + assert not self.is_open + if self._delay_buffer is not None or self._orig_index_file is not None: + # delay or divert already in place + return None + elif len(self.index) == 0: + self._orig_index_file = self.index_file + self.index_file = self._divert_index() + self._segmentfile.filename = self.index_file + assert self._orig_index_file is not None + assert self.index_file is not None + if self.opener.exists(self.index_file): + self.opener.unlink(self.index_file) + return self.index_file + else: + self._segmentfile._delay_buffer = self._delay_buffer = [] + return None + + def write_pending(self): + assert not self.is_open + if self._orig_index_file is not None: + return None, True + any_pending = False + pending_index_file = self._divert_index() + if self.opener.exists(pending_index_file): + self.opener.unlink(pending_index_file) + util.copyfile( + self.opener.join(self.index_file), + self.opener.join(pending_index_file), + ) + if self._delay_buffer: + with self.opener(pending_index_file, b'r+') as ifh: + ifh.seek(0, os.SEEK_END) + ifh.write(b"".join(self._delay_buffer)) + any_pending = True + self._segmentfile._delay_buffer = self._delay_buffer = None + self._orig_index_file = self.index_file + self.index_file = pending_index_file + self._segmentfile.filename = self.index_file + return self.index_file, any_pending + + def finalize_pending(self): + assert not self.is_open + + delay = self._delay_buffer is not None + divert = self._orig_index_file is not None + + if delay and divert: + assert False, "unreachable" + elif delay: + if self._delay_buffer: + with self.opener(self.index_file, b'r+') as ifh: + ifh.seek(0, os.SEEK_END) + ifh.write(b"".join(self._delay_buffer)) + self._segmentfile._delay_buffer = self._delay_buffer = None + elif divert: + if self.opener.exists(self.index_file): + self.opener.rename( + self.index_file, + self._orig_index_file, + checkambig=True, + ) + self.index_file = self._orig_index_file + self._orig_index_file = None + self._segmentfile.filename = self.index_file + else: + msg = b"not delay or divert found on this revlog" + raise error.ProgrammingError(msg) + return self.canonical_index_file + class revlog: """ @@ -2925,6 +3038,10 @@ if self._docket is not None: self._write_docket(transaction) + @property + def is_delaying(self): + return self._inner.is_delaying + def _write_docket(self, transaction): """write the current docket on disk diff -r af96fbb8f739 -r d83d788590a8 mercurial/revlogutils/randomaccessfile.py --- a/mercurial/revlogutils/randomaccessfile.py Thu Oct 26 05:37:37 2023 +0200 +++ b/mercurial/revlogutils/randomaccessfile.py Tue Oct 24 11:08:49 2023 +0200 @@ -116,6 +116,8 @@ if initial_cache: self._cached_chunk_position, self._cached_chunk = initial_cache + self._delay_buffer = None + def clear_cache(self): self._cached_chunk = b'' self._cached_chunk_position = 0 @@ -131,7 +133,12 @@ def _open(self, mode=b'r'): """Return a file object""" - return self.opener(self.filename, mode=mode) + if self._delay_buffer is None: + return self.opener(self.filename, mode=mode) + else: + return appender( + self.opener, self.filename, mode, self._delay_buffer + ) @contextlib.contextmanager def _read_handle(self): diff -r af96fbb8f739 -r d83d788590a8 tests/test-bundle2-exchange.t --- a/tests/test-bundle2-exchange.t Thu Oct 26 05:37:37 2023 +0200 +++ b/tests/test-bundle2-exchange.t Tue Oct 24 11:08:49 2023 +0200 @@ -1042,8 +1042,6 @@ adding changesets remote: abort: incompatible Mercurial client; bundle2 required remote: (see https://www.mercurial-scm.org/wiki/IncompatibleClient) - transaction abort! - rollback completed abort: stream ended unexpectedly (got 0 bytes, expected 4) [255] diff -r af96fbb8f739 -r d83d788590a8 tests/test-http-bad-server.t --- a/tests/test-http-bad-server.t Thu Oct 26 05:37:37 2023 +0200 +++ b/tests/test-http-bad-server.t Tue Oct 24 11:08:49 2023 +0200 @@ -725,8 +725,6 @@ $ hg clone http://localhost:$HGPORT/ clone requesting all changes adding changesets - transaction abort! - rollback completed abort: HTTP request error (incomplete response) (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator) [255] @@ -759,8 +757,6 @@ $ hg clone http://localhost:$HGPORT/ clone requesting all changes adding changesets - transaction abort! - rollback completed abort: HTTP request error (incomplete response*) (glob) (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator) [255] @@ -795,8 +791,6 @@ $ hg clone http://localhost:$HGPORT/ clone requesting all changes adding changesets - transaction abort! - rollback completed abort: HTTP request error (incomplete response) (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator) [255]