stream-clone: add a v3 version of the protocol
authorArseniy Alekseyev <aalekseyev@janestreet.com>
Thu, 01 Jun 2023 17:39:22 +0100
changeset 50706 0452af304808
parent 50705 0bba730005df
child 50707 36478b2ab8b3
stream-clone: add a v3 version of the protocol This new version is less rigid regarding the extract number of files and number of bytes to be actually transfered, it also lays the groundwork for other improvements. The format stays experimental, but this is an interesting base to build upon.
mercurial/bundle2.py
mercurial/store.py
mercurial/streamclone.py
tests/test-clone-stream-revlog-split.t
tests/test-clone-stream.t
tests/test-stream-bundle-v2.t
--- a/mercurial/bundle2.py	Thu Jun 01 18:20:28 2023 +0100
+++ b/mercurial/bundle2.py	Thu Jun 01 17:39:22 2023 +0100
@@ -1952,14 +1952,12 @@
         part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
         part.addparam(b'requirements', requirements, mandatory=True)
     elif version == b"v3-exp":
-        filecount, bytecount, it = streamclone.generatev2(
+        it = streamclone.generatev3(
             repo, includepats, excludepats, includeobsmarkers
         )
         requirements = streamclone.streamed_requirements(repo)
         requirements = _formatrequirementsspec(requirements)
         part = bundler.newpart(b'stream3-exp', data=it)
-        part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
-        part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
         part.addparam(b'requirements', requirements, mandatory=True)
 
 
@@ -2616,9 +2614,18 @@
     streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
 
 
-@parthandler(b'stream3-exp', (b'requirements', b'filecount', b'bytecount'))
+@parthandler(b'stream3-exp', (b'requirements',))
 def handlestreamv3bundle(op, part):
-    return handlestreamv2bundle(op, part)
+    requirements = urlreq.unquote(part.params[b'requirements'])
+    requirements = requirements.split(b',') if requirements else []
+
+    repo = op.repo
+    if len(repo):
+        msg = _(b'cannot apply stream clone to non empty repository')
+        raise error.Abort(msg)
+
+    repo.ui.debug(b'applying stream bundle\n')
+    streamclone.applybundlev3(repo, part, requirements)
 
 
 def widen_bundle(
--- a/mercurial/store.py	Thu Jun 01 18:20:28 2023 +0100
+++ b/mercurial/store.py	Thu Jun 01 17:39:22 2023 +0100
@@ -490,6 +490,7 @@
         vfs=None,
         copies=None,
         max_changeset=None,
+        preserve_file_count=False,
     ):
         """return a list of data stream associated to files for this entry
 
@@ -599,6 +600,7 @@
         vfs=None,
         copies=None,
         max_changeset=None,
+        preserve_file_count=False,
     ):
         if (
             repo is None
@@ -613,7 +615,18 @@
                 vfs=vfs,
                 copies=copies,
                 max_changeset=max_changeset,
+                preserve_file_count=preserve_file_count,
             )
+        elif not preserve_file_count:
+            stream = [
+                f.get_stream(vfs, copies)
+                for f in self.files()
+                if not f.unencoded_path.endswith((b'.i', b'.d'))
+            ]
+            rl = self.get_revlog_instance(repo).get_revlog()
+            rl_stream = rl.get_streams(max_changeset)
+            stream.extend(rl_stream)
+            return stream
 
         name_to_size = {}
         for f in self.files():
--- a/mercurial/streamclone.py	Thu Jun 01 18:20:28 2023 +0100
+++ b/mercurial/streamclone.py	Thu Jun 01 17:39:22 2023 +0100
@@ -668,7 +668,11 @@
 
         for src, vfs, e in entries:
             entry_streams = e.get_streams(
-                repo=repo, vfs=vfs, copies=copy, max_changeset=max_linkrev
+                repo=repo,
+                vfs=vfs,
+                copies=copy,
+                max_changeset=max_linkrev,
+                preserve_file_count=True,
             )
             for name, stream, size in entry_streams:
                 yield src
@@ -691,6 +695,59 @@
                     raise error.Abort(msg % (bytecount, name, size))
 
 
+def _emit3(repo, entries):
+    """actually emit the stream bundle (v3)"""
+    vfsmap = _makemap(repo)
+    # we keep repo.vfs out of the map on purpose, ther are too many dangers
+    # there (eg: .hg/hgrc),
+    #
+    # this assert is duplicated (from _makemap) as authors might think this is
+    # fine, while this is really not fine.
+    if repo.vfs in vfsmap.values():
+        raise error.ProgrammingError(
+            b'repo.vfs must not be added to vfsmap for security reasons'
+        )
+
+    # translate the vfs once
+    entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    total_entry_count = len(entries)
+
+    max_linkrev = len(repo)
+    progress = repo.ui.makeprogress(
+        _(b'bundle'),
+        total=total_entry_count,
+        unit=_(b'entry'),
+    )
+    progress.update(0)
+    with TempCopyManager() as copy, progress:
+        # create a copy of volatile files
+        for k, vfs, e in entries:
+            for f in e.files():
+                f.file_size(vfs)  # record the expected size under lock
+                if f.is_volatile:
+                    copy(vfs.join(f.unencoded_path))
+        # the first yield release the lock on the repository
+        yield None
+
+        yield util.uvarintencode(total_entry_count)
+
+        for src, vfs, e in entries:
+            entry_streams = e.get_streams(
+                repo=repo,
+                vfs=vfs,
+                copies=copy,
+                max_changeset=max_linkrev,
+            )
+            yield util.uvarintencode(len(entry_streams))
+            for name, stream, size in entry_streams:
+                yield src
+                yield util.uvarintencode(len(name))
+                yield util.uvarintencode(size)
+                yield name
+                yield from stream
+            progress.increment()
+
+
 def _test_sync_point_walk_1(repo):
     """a function for synchronisation during tests"""
 
@@ -766,7 +823,47 @@
 
 
 def generatev3(repo, includes, excludes, includeobsmarkers):
-    return generatev2(repo, includes, excludes, includeobsmarkers)
+    """Emit content for version 3 of a streaming clone.
+
+    the data stream consists the following:
+    1) A varint E containing the number of entries (can be 0), then E entries follow
+    2) For each entry:
+    2.1) The number of files in this entry (can be 0, but typically 1 or 2)
+    2.2) For each file:
+    2.2.1) A char representing the file destination (eg: store or cache)
+    2.2.2) A varint N containing the length of the filename
+    2.2.3) A varint M containing the length of file data
+    2.2.4) N bytes containing the filename (the internal, store-agnostic form)
+    2.2.5) M bytes containing the file data
+
+    Returns the data iterator.
+
+    XXX This format is experimental and subject to change. Here is a
+    XXX non-exhaustive list of things this format could do or change:
+
+    - making it easier to write files in parallel
+    - holding the lock for a shorter time
+    - improving progress information
+    - ways to adjust the number of expected entries/files ?
+    """
+
+    with repo.lock():
+
+        repo.ui.debug(b'scanning\n')
+
+        entries = _entries_walk(
+            repo,
+            includes=includes,
+            excludes=excludes,
+            includeobsmarkers=includeobsmarkers,
+        )
+        chunks = _emit3(repo, list(entries))
+        first = next(chunks)
+        assert first is None
+        _test_sync_point_walk_1(repo)
+    _test_sync_point_walk_2(repo)
+
+    return chunks
 
 
 @contextlib.contextmanager
@@ -850,6 +947,80 @@
         progress.complete()
 
 
+def consumev3(repo, fp):
+    """Apply the contents from a version 3 streaming clone.
+
+    Data is read from an object that only needs to provide a ``read(size)``
+    method.
+    """
+    with repo.lock():
+        start = util.timer()
+
+        entrycount = util.uvarintdecodestream(fp)
+        repo.ui.status(_(b'%d entries to transfer\n') % (entrycount))
+
+        progress = repo.ui.makeprogress(
+            _(b'clone'),
+            total=entrycount,
+            unit=_(b'entries'),
+        )
+        progress.update(0)
+        bytes_transferred = 0
+
+        vfsmap = _makemap(repo)
+        # we keep repo.vfs out of the on purpose, there are too many dangers
+        # there (eg: .hg/hgrc),
+        #
+        # this assert is duplicated (from _makemap) as authors might think this
+        # is fine, while this is really not fine.
+        if repo.vfs in vfsmap.values():
+            raise error.ProgrammingError(
+                b'repo.vfs must not be added to vfsmap for security reasons'
+            )
+
+        with repo.transaction(b'clone'):
+            ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
+            with nested(*ctxs):
+
+                for i in range(entrycount):
+                    filecount = util.uvarintdecodestream(fp)
+                    if filecount == 0:
+                        if repo.ui.debugflag:
+                            repo.ui.debug(b'entry with no files [%d]\n' % (i))
+                    for i in range(filecount):
+                        src = util.readexactly(fp, 1)
+                        vfs = vfsmap[src]
+                        namelen = util.uvarintdecodestream(fp)
+                        datalen = util.uvarintdecodestream(fp)
+
+                        name = util.readexactly(fp, namelen)
+
+                        if repo.ui.debugflag:
+                            msg = b'adding [%s] %s (%s)\n'
+                            msg %= (src, name, util.bytecount(datalen))
+                            repo.ui.debug(msg)
+                        bytes_transferred += datalen
+
+                        with vfs(name, b'w') as ofp:
+                            for chunk in util.filechunkiter(fp, limit=datalen):
+                                ofp.write(chunk)
+                    progress.increment(step=1)
+
+            # force @filecache properties to be reloaded from
+            # streamclone-ed file at next access
+            repo.invalidate(clearfilecache=True)
+
+        elapsed = util.timer() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
+        byte_count = util.bytecount(bytes_transferred)
+        bytes_sec = util.bytecount(bytes_transferred / elapsed)
+        msg %= (byte_count, elapsed, bytes_sec)
+        repo.ui.status(msg)
+        progress.complete()
+
+
 def applybundlev2(repo, fp, filecount, filesize, requirements):
     from . import localrepo
 
@@ -873,6 +1044,28 @@
     nodemap.post_stream_cleanup(repo)
 
 
+def applybundlev3(repo, fp, requirements):
+    from . import localrepo
+
+    missingreqs = [r for r in requirements if r not in repo.supported]
+    if missingreqs:
+        msg = _(b'unable to apply stream clone: unsupported format: %s')
+        msg %= b', '.join(sorted(missingreqs))
+        raise error.Abort(msg)
+
+    consumev3(repo, fp)
+
+    repo.requirements = new_stream_clone_requirements(
+        repo.requirements,
+        requirements,
+    )
+    repo.svfs.options = localrepo.resolvestorevfsoptions(
+        repo.ui, repo.requirements, repo.features
+    )
+    scmutil.writereporequirements(repo)
+    nodemap.post_stream_cleanup(repo)
+
+
 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
     hardlink = [True]
 
--- a/tests/test-clone-stream-revlog-split.t	Thu Jun 01 18:20:28 2023 +0100
+++ b/tests/test-clone-stream-revlog-split.t	Thu Jun 01 17:39:22 2023 +0100
@@ -100,10 +100,13 @@
   sending getbundle command
   bundle2-input-bundle: with-transaction
   bundle2-input-part: "stream2" (params: 3 mandatory) supported (stream-bundle2-v2 !)
-  bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported (stream-bundle2-v3 !)
+  bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported (stream-bundle2-v3 !)
   applying stream bundle
-  7 files to transfer, 2.11 KB of data
-  adding [s] data/some-file.i (1.23 KB)
+  7 files to transfer, 2.11 KB of data (stream-bundle2-v2 !)
+  adding [s] data/some-file.i (1.23 KB) (stream-bundle2-v2 !)
+  7 entries to transfer (stream-bundle2-v3 !)
+  adding [s] data/some-file.d (1.04 KB) (stream-bundle2-v3 !)
+  adding [s] data/some-file.i (192 bytes) (stream-bundle2-v3 !)
   adding [s] phaseroots (43 bytes)
   adding [s] 00manifest.i (348 bytes)
   adding [s] 00changelog.i (381 bytes)
@@ -112,7 +115,8 @@
   adding [c] rbc-revs-v1 (24 bytes)
   updating the branch cache
   transferred 2.11 KB in * seconds (* */sec) (glob)
-  bundle2-input-part: total payload size 2268
+  bundle2-input-part: total payload size 2268 (stream-bundle2-v2 !)
+  bundle2-input-part: total payload size 2296 (stream-bundle2-v3 !)
   bundle2-input-part: "listkeys" (params: 1 mandatory) supported
   bundle2-input-bundle: 2 parts total
   checking for updated bookmarks
--- a/tests/test-clone-stream.t	Thu Jun 01 18:20:28 2023 +0100
+++ b/tests/test-clone-stream.t	Thu Jun 01 17:39:22 2023 +0100
@@ -360,9 +360,8 @@
 #if stream-bundle2-v3
   $ hg clone --stream -U http://localhost:$HGPORT clone1
   streaming all changes
-  1093 files to transfer, 102 KB of data (no-zstd !)
+  1093 entries to transfer
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  1093 files to transfer, 98.9 KB of data (zstd !)
   transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !)
 
   $ ls -1 clone1/.hg/cache
@@ -492,9 +491,8 @@
 #if stream-bundle2-v3
   $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
   streaming all changes
-  1093 files to transfer, 102 KB of data (no-zstd !)
+  1093 entries to transfer
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  1093 files to transfer, 98.9 KB of data (zstd !)
   transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !)
 #endif
 
@@ -564,18 +562,17 @@
   streaming all changes
   sending getbundle command
   bundle2-input-bundle: with-transaction
-  bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported
+  bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported
   applying stream bundle
-  1093 files to transfer, 102 KB of data (no-zstd !)
-  1093 files to transfer, 98.9 KB of data (zstd !)
+  1093 entries to transfer
   starting 4 threads for background file closing
   starting 4 threads for background file closing
   updating the branch cache
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  bundle2-input-part: total payload size 118984 (no-zstd !)
+  bundle2-input-part: total payload size 120079 (no-zstd !)
   transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !)
-  bundle2-input-part: total payload size 116145 (zstd no-bigendian !)
-  bundle2-input-part: total payload size 116140 (zstd bigendian !)
+  bundle2-input-part: total payload size 117240 (zstd no-bigendian !)
+  bundle2-input-part: total payload size 116138 (zstd bigendian !)
   bundle2-input-part: "listkeys" (params: 1 mandatory) supported
   bundle2-input-bundle: 2 parts total
   checking for updated bookmarks
@@ -625,9 +622,8 @@
 #if stream-bundle2-v3
   $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
   streaming all changes
-  1093 files to transfer, 102 KB of data (no-zstd !)
+  1093 entries to transfer
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  1093 files to transfer, 98.9 KB of data (zstd !)
   transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !)
 #endif
 
@@ -755,9 +751,8 @@
 #if stream-bundle2-v3
   $ hg clone --stream http://localhost:$HGPORT with-bookmarks
   streaming all changes
-  1096 files to transfer, 102 KB of data (no-zstd !)
+  1096 entries to transfer
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  1096 files to transfer, 99.1 KB of data (zstd !)
   transferred 99.1 KB in * seconds (* */sec) (glob) (zstd !)
   updating to branch default
   1088 files updated, 0 files merged, 0 files removed, 0 files unresolved
@@ -801,9 +796,8 @@
 #if stream-bundle2-v3
   $ hg clone --stream http://localhost:$HGPORT phase-publish
   streaming all changes
-  1096 files to transfer, 102 KB of data (no-zstd !)
+  1096 entries to transfer
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  1096 files to transfer, 99.1 KB of data (zstd !)
   transferred 99.1 KB in * seconds (* */sec) (glob) (zstd !)
   updating to branch default
   1088 files updated, 0 files merged, 0 files removed, 0 files unresolved
@@ -861,9 +855,8 @@
 #if stream-bundle2-v3
   $ hg clone --stream http://localhost:$HGPORT phase-no-publish
   streaming all changes
-  1097 files to transfer, 102 KB of data (no-zstd !)
+  1097 entries to transfer
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  1097 files to transfer, 99.1 KB of data (zstd !)
   transferred 99.1 KB in * seconds (* */sec) (glob) (zstd !)
   updating to branch default
   1088 files updated, 0 files merged, 0 files removed, 0 files unresolved
@@ -961,9 +954,8 @@
 
   $ hg clone -U --stream http://localhost:$HGPORT with-obsolescence
   streaming all changes
-  1098 files to transfer, 102 KB of data (no-zstd !)
+  1098 entries to transfer
   transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !)
-  1098 files to transfer, 99.5 KB of data (zstd !)
   transferred 99.5 KB in * seconds (* */sec) (glob) (zstd !)
   $ hg -R with-obsolescence log -T '{rev}: {phase}\n'
   2: draft
--- a/tests/test-stream-bundle-v2.t	Thu Jun 01 18:20:28 2023 +0100
+++ b/tests/test-stream-bundle-v2.t	Thu Jun 01 17:39:22 2023 +0100
@@ -63,9 +63,9 @@
   stream2 -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v2 no-zstd !)
   stream2 -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v2 zstd no-rust !)
   stream2 -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v2 rust !)
-  stream3-exp -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 no-zstd !)
-  stream3-exp -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 zstd no-rust !)
-  stream3-exp -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 rust !)
+  stream3-exp -- {requirements: generaldelta%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 no-zstd !)
+  stream3-exp -- {requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 zstd no-rust !)
+  stream3-exp -- {requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 rust !)
   $ hg debugbundle --spec bundle.hg
   none-v2;stream=v2;requirements%3Dgeneraldelta%2Crevlogv1%2Csparserevlog (stream-v2 no-zstd !)
   none-v2;stream=v2;requirements%3Dgeneraldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog (stream-v2 zstd no-rust !)
@@ -94,9 +94,10 @@
   applying clone bundle from http://localhost:$HGPORT1/bundle.hg
   bundle2-input-bundle: with-transaction
   bundle2-input-part: "stream2" (params: 3 mandatory) supported (stream-v2 !)
-  bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported (stream-v3 !)
+  bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported (stream-v3 !)
   applying stream bundle
-  11 files to transfer, 1.65 KB of data
+  11 files to transfer, 1.65 KB of data (stream-v2 !)
+  11 entries to transfer (stream-v3 !)
   starting 4 threads for background file closing (?)
   starting 4 threads for background file closing (?)
   adding [s] data/A.i (66 bytes)
@@ -111,7 +112,8 @@
   adding [c] rbc-names-v1 (7 bytes)
   adding [c] rbc-revs-v1 (40 bytes)
   transferred 1.65 KB in * seconds (* */sec) (glob)
-  bundle2-input-part: total payload size 1840
+  bundle2-input-part: total payload size 1840 (stream-v2 !)
+  bundle2-input-part: total payload size 1852 (stream-v3 !)
   bundle2-input-bundle: 1 parts total
   updating the branch cache
   finished applying clone bundle
@@ -152,9 +154,10 @@
   applying clone bundle from http://localhost:$HGPORT1/bundle.hg
   bundle2-input-bundle: with-transaction
   bundle2-input-part: "stream2" (params: 3 mandatory) supported (stream-v2 !)
-  bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported (stream-v3 !)
+  bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported (stream-v3 !)
   applying stream bundle
-  11 files to transfer, 1.65 KB of data
+  11 files to transfer, 1.65 KB of data (stream-v2 !)
+  11 entries to transfer (stream-v3 !)
   starting 4 threads for background file closing (?)
   starting 4 threads for background file closing (?)
   adding [s] data/A.i (66 bytes)
@@ -169,7 +172,8 @@
   adding [c] rbc-names-v1 (7 bytes)
   adding [c] rbc-revs-v1 (40 bytes)
   transferred 1.65 KB in * seconds (* */sec) (glob)
-  bundle2-input-part: total payload size 1840
+  bundle2-input-part: total payload size 1840 (stream-v2 !)
+  bundle2-input-part: total payload size 1852 (stream-v3 !)
   bundle2-input-bundle: 1 parts total
   updating the branch cache
   finished applying clone bundle