streamclone: define first iteration of version 2 of stream format
authorBoris Feld <boris.feld@octobus.net>
Thu, 18 Jan 2018 00:48:56 +0100
changeset 35756 cfdccd560b66
parent 35755 2384523cee4d
child 35757 bbf7abd09ff0
streamclone: define first iteration of version 2 of stream format (This patch is based on a first draft from Gregory Szorc, with deeper rework) Version 1 of the stream clone format was invented many years ago and suffers from a few deficiencies: 1) Filenames are stored in store-encoded (on filesystem) form rather than in their internal form. This makes future compatibility with new store filename encodings more difficult. 2) File entry "headers" consist of a newline of the file name followed by the string file size. Converting strings to integers is avoidable overhead. We can't store filenames with newlines (manifests have this limitation as well, so it isn't a major concern). But the big concern here is the necessity for readline(). Scanning for newlines means reading ahead and that means extra buffer allocations and slicing (in Python) and this makes performance suffer. 3) Filenames aren't compressed optimally. Filenames should be compressed well since there is a lot of repeated data. However, since they are scattered all over the stream (with revlog data in between), they typically fall outside the window size of the compressor and don't compress. 4) It can only exchange stored based content, being able to exchange caches too would be nice. 5) It is limited to a stream-based protocol and isn't suitable for an on-disk format for general repository reading because the offset of individual file entries requires scanning the entire file to find file records. As part of enabling streaming clones to work in bundle2, #2 proved to have a significant negative impact on performance. Since bundle2 provides the opportunity to start fresh, Gregory Szorc figured he would take the opportunity to invent a new streaming clone data format. The new format devised in this series addresses #1, #2, and #4. It punts on #3 because it was complex without yielding a significant gain and on #5 because devising a new store format that "packs" multiple revlogs into a single "packed revlog" is massive scope bloat. However, this v2 format might be suitable for streaming into a "packed revlog" with minimal processing. If it works, great. If not, we can always invent stream format when it is needed. This patch only introduces the bases of the format. We'll get it usable through bundle2 first, then we'll extend the format in future patches to bring it to its full potential (especially #4).
mercurial/streamclone.py
--- a/mercurial/streamclone.py	Fri Jan 19 22:52:35 2018 +0100
+++ b/mercurial/streamclone.py	Thu Jan 18 00:48:56 2018 +0100
@@ -428,3 +428,115 @@
 
     def apply(self, repo):
         return applybundlev1(repo, self._fh)
+
+def _emit(repo, entries, totalfilesize):
+    """actually emit the stream bundle"""
+    progress = repo.ui.progress
+    progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
+    vfs = repo.svfs
+    try:
+        seen = 0
+        for name, size in entries:
+            yield util.uvarintencode(len(name))
+            fp = vfs(name)
+            try:
+                yield util.uvarintencode(size)
+                yield name
+                if size <= 65536:
+                    chunks = (fp.read(size),)
+                else:
+                    chunks = util.filechunkiter(fp, limit=size)
+                for chunk in chunks:
+                    seen += len(chunk)
+                    progress(_('bundle'), seen, total=totalfilesize,
+                             unit=_('bytes'))
+                    yield chunk
+            finally:
+                fp.close()
+    finally:
+        progress(_('bundle'), None)
+
+def generatev2(repo):
+    """Emit content for version 2 of a streaming clone.
+
+    the data stream consists the following entries:
+    1) A varint containing the length of the filename
+    2) A varint containing the length of file data
+    3) N bytes containing the filename (the internal, store-agnostic form)
+    4) N bytes containing the file data
+
+    Returns a 3-tuple of (file count, file size, data iterator).
+    """
+
+    with repo.lock():
+
+        entries = []
+        totalfilesize = 0
+
+        repo.ui.debug('scanning\n')
+        for name, ename, size in _walkstreamfiles(repo):
+            if size:
+                entries.append((name, size))
+                totalfilesize += size
+
+        chunks = _emit(repo, entries, totalfilesize)
+
+    return len(entries), totalfilesize, chunks
+
+def consumev2(repo, fp, filecount, filesize):
+    """Apply the contents from a version 2 streaming clone.
+
+    Data is read from an object that only needs to provide a ``read(size)``
+    method.
+    """
+    with repo.lock():
+        repo.ui.status(_('%d files to transfer, %s of data\n') %
+                       (filecount, util.bytecount(filesize)))
+
+        start = util.timer()
+        handledbytes = 0
+        progress = repo.ui.progress
+
+        progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
+
+        vfs = repo.svfs
+
+        with repo.transaction('clone'):
+            with vfs.backgroundclosing(repo.ui):
+                for i in range(filecount):
+                    namelen = util.uvarintdecodestream(fp)
+                    datalen = util.uvarintdecodestream(fp)
+
+                    name = fp.read(namelen)
+
+                    if repo.ui.debugflag:
+                        repo.ui.debug('adding %s (%s)\n' %
+                                      (name, util.bytecount(datalen)))
+
+                    with vfs(name, 'w') as ofp:
+                        for chunk in util.filechunkiter(fp, limit=datalen):
+                            handledbytes += len(chunk)
+                            progress(_('clone'), handledbytes, total=filesize,
+                                     unit=_('bytes'))
+                            ofp.write(chunk)
+
+            # force @filecache properties to be reloaded from
+            # streamclone-ed file at next access
+            repo.invalidate(clearfilecache=True)
+
+        elapsed = util.timer() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        progress(_('clone'), None)
+        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(handledbytes), elapsed,
+                        util.bytecount(handledbytes / elapsed)))
+
+def applybundlev2(repo, fp, filecount, filesize, requirements):
+    missingreqs = [r for r in requirements if r not in repo.supported]
+    if missingreqs:
+        raise error.Abort(_('unable to apply stream clone: '
+                            'unsupported format: %s') %
+                          ', '.join(sorted(missingreqs)))
+
+    consumev2(repo, fp, filecount, filesize)