stream-clone: make it the responsability of the store entry to stream content
authorPierre-Yves David <pierre-yves.david@octobus.net>
Sun, 28 May 2023 05:52:58 +0200
changeset 50639 5e60abf811f3
parent 50638 8fc10bfd9887
child 50640 9caa860dcbec
stream-clone: make it the responsability of the store entry to stream content The store entry has more context, this will especially be true when it comes to revlogs. So we move the details of how to retrieve binary content to the StoreEntry. The stream clone code now focus on the protocol bits.
mercurial/store.py
mercurial/streamclone.py
--- a/mercurial/store.py	Mon May 29 11:42:16 2023 +0200
+++ b/mercurial/store.py	Sun May 28 05:52:58 2023 +0200
@@ -469,12 +469,36 @@
 
     def file_size(self, vfs):
         if self._file_size is None:
+            if vfs is None:
+                msg = b"calling vfs-less file_size without prior call: %s"
+                msg %= self.unencoded_path
+                raise error.ProgrammingError(msg)
             try:
                 self._file_size = vfs.stat(self.unencoded_path).st_size
             except FileNotFoundError:
                 self._file_size = 0
         return self._file_size
 
+    def get_stream(self, vfs, copies):
+        """return data "stream" information for this file
+
+        (unencoded_file_path, content_iterator, content_size)
+        """
+        size = self.file_size(None)
+
+        def get_stream():
+            actual_path = copies[vfs.join(self.unencoded_path)]
+            with open(actual_path, 'rb') as fp:
+                yield None  # ready to stream
+                if size <= 65536:
+                    yield fp.read(size)
+                else:
+                    yield from util.filechunkiter(fp, limit=size)
+
+        s = get_stream()
+        next(s)
+        return (self.unencoded_path, s, size)
+
 
 @attr.s(slots=True, init=False)
 class BaseStoreEntry:
@@ -485,6 +509,14 @@
     def files(self) -> List[StoreFile]:
         raise NotImplementedError
 
+    def get_streams(self, vfs, copies=None):
+        """return a list of data stream associated to files for this entry
+
+        return [(unencoded_file_path, content_iterator, content_size), …]
+        """
+        assert vfs is not None
+        return [f.get_stream(vfs, copies) for f in self.files()]
+
 
 @attr.s(slots=True, init=False)
 class SimpleStoreEntry(BaseStoreEntry):
--- a/mercurial/streamclone.py	Mon May 29 11:42:16 2023 +0200
+++ b/mercurial/streamclone.py	Sun May 28 05:52:58 2023 +0200
@@ -11,7 +11,6 @@
 import struct
 
 from .i18n import _
-from .pycompat import open
 from .interfaces import repository
 from . import (
     bookmarks,
@@ -658,36 +657,25 @@
         totalbytecount = 0
 
         for src, vfs, e in entries:
-            for f in e.files():
+            for name, stream, size in e.get_streams(vfs, copies=copy):
                 yield src
-                name = f.unencoded_path
                 yield util.uvarintencode(len(name))
-                actual_path = copy[vfs.join(name)]
-                fp = open(actual_path, b'rb')
-                size = f.file_size(vfs)
+                yield util.uvarintencode(size)
+                yield name
                 bytecount = 0
-                try:
-                    yield util.uvarintencode(size)
-                    yield name
-                    if size <= 65536:
-                        chunks = (fp.read(size),)
-                    else:
-                        chunks = util.filechunkiter(fp, limit=size)
-                    for chunk in chunks:
-                        bytecount += len(chunk)
-                        totalbytecount += len(chunk)
-                        progress.update(totalbytecount)
-                        yield chunk
-                    if bytecount != size:
-                        # Would most likely be caused by a race due to `hg
-                        # strip` or a revlog split
-                        msg = _(
-                            b'clone could only read %d bytes from %s, but '
-                            b'expected %d bytes'
-                        )
-                        raise error.Abort(msg % (bytecount, name, size))
-                finally:
-                    fp.close()
+                for chunk in stream:
+                    bytecount += len(chunk)
+                    totalbytecount += len(chunk)
+                    progress.update(totalbytecount)
+                    yield chunk
+                if bytecount != size:
+                    # Would most likely be caused by a race due to `hg
+                    # strip` or a revlog split
+                    msg = _(
+                        b'clone could only read %d bytes from %s, but '
+                        b'expected %d bytes'
+                    )
+                    raise error.Abort(msg % (bytecount, name, size))
 
 
 def _test_sync_point_walk_1(repo):