mercurial/streamclone.py
changeset 35756 cfdccd560b66
parent 35491 ded3a63f305b
child 35757 bbf7abd09ff0
--- a/mercurial/streamclone.py	Fri Jan 19 22:52:35 2018 +0100
+++ b/mercurial/streamclone.py	Thu Jan 18 00:48:56 2018 +0100
@@ -428,3 +428,115 @@
 
     def apply(self, repo):
         return applybundlev1(repo, self._fh)
+
+def _emit(repo, entries, totalfilesize):
+    """actually emit the stream bundle"""
+    progress = repo.ui.progress
+    progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
+    vfs = repo.svfs
+    try:
+        seen = 0
+        for name, size in entries:
+            yield util.uvarintencode(len(name))
+            fp = vfs(name)
+            try:
+                yield util.uvarintencode(size)
+                yield name
+                if size <= 65536:
+                    chunks = (fp.read(size),)
+                else:
+                    chunks = util.filechunkiter(fp, limit=size)
+                for chunk in chunks:
+                    seen += len(chunk)
+                    progress(_('bundle'), seen, total=totalfilesize,
+                             unit=_('bytes'))
+                    yield chunk
+            finally:
+                fp.close()
+    finally:
+        progress(_('bundle'), None)
+
+def generatev2(repo):
+    """Emit content for version 2 of a streaming clone.
+
+    the data stream consists the following entries:
+    1) A varint containing the length of the filename
+    2) A varint containing the length of file data
+    3) N bytes containing the filename (the internal, store-agnostic form)
+    4) N bytes containing the file data
+
+    Returns a 3-tuple of (file count, file size, data iterator).
+    """
+
+    with repo.lock():
+
+        entries = []
+        totalfilesize = 0
+
+        repo.ui.debug('scanning\n')
+        for name, ename, size in _walkstreamfiles(repo):
+            if size:
+                entries.append((name, size))
+                totalfilesize += size
+
+        chunks = _emit(repo, entries, totalfilesize)
+
+    return len(entries), totalfilesize, chunks
+
+def consumev2(repo, fp, filecount, filesize):
+    """Apply the contents from a version 2 streaming clone.
+
+    Data is read from an object that only needs to provide a ``read(size)``
+    method.
+    """
+    with repo.lock():
+        repo.ui.status(_('%d files to transfer, %s of data\n') %
+                       (filecount, util.bytecount(filesize)))
+
+        start = util.timer()
+        handledbytes = 0
+        progress = repo.ui.progress
+
+        progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
+
+        vfs = repo.svfs
+
+        with repo.transaction('clone'):
+            with vfs.backgroundclosing(repo.ui):
+                for i in range(filecount):
+                    namelen = util.uvarintdecodestream(fp)
+                    datalen = util.uvarintdecodestream(fp)
+
+                    name = fp.read(namelen)
+
+                    if repo.ui.debugflag:
+                        repo.ui.debug('adding %s (%s)\n' %
+                                      (name, util.bytecount(datalen)))
+
+                    with vfs(name, 'w') as ofp:
+                        for chunk in util.filechunkiter(fp, limit=datalen):
+                            handledbytes += len(chunk)
+                            progress(_('clone'), handledbytes, total=filesize,
+                                     unit=_('bytes'))
+                            ofp.write(chunk)
+
+            # force @filecache properties to be reloaded from
+            # streamclone-ed file at next access
+            repo.invalidate(clearfilecache=True)
+
+        elapsed = util.timer() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        progress(_('clone'), None)
+        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(handledbytes), elapsed,
+                        util.bytecount(handledbytes / elapsed)))
+
+def applybundlev2(repo, fp, filecount, filesize, requirements):
+    missingreqs = [r for r in requirements if r not in repo.supported]
+    if missingreqs:
+        raise error.Abort(_('unable to apply stream clone: '
+                            'unsupported format: %s') %
+                          ', '.join(sorted(missingreqs)))
+
+    consumev2(repo, fp, filecount, filesize)