localrepo: extract stream clone application into reusable function
authorGregory Szorc <gregory.szorc@gmail.com>
Thu, 21 May 2015 10:41:06 -0700
changeset 25237 7504a7325e4c
parent 25236 5095059340dc
child 25238 5a55ad6e8e24
localrepo: extract stream clone application into reusable function The existing stream_in method assumes a streaming clone is applied via the wire protocol. Previous patches have enabled streaming clone data to be produced and consumed outside the context of the wire protocol. However, the consuming part was incomplete because it didn't deal with things like updating the branch caches or writing out a requirements file. This patch finishes the separation of stream clone handling from the wire protocol. After this patch, it is possible to consume stream clones from arbitrary sources, including files. Mozilla plans to leverage this to serve pre-generated stream clone files to consumers, drastically reducing the wall and CPU time required to clone large repositories. This will enable clones to be nearly as fast as `tar`.
mercurial/localrepo.py
--- a/mercurial/localrepo.py	Thu May 21 10:27:45 2015 -0700
+++ b/mercurial/localrepo.py	Thu May 21 10:41:06 2015 -0700
@@ -1755,28 +1755,40 @@
         return util.hooks()
 
     def stream_in(self, remote, remotereqs):
+        # Save remote branchmap. We will use it later
+        # to speed up branchcache creation
+        rbranchmap = None
+        if remote.capable("branchmap"):
+            rbranchmap = remote.branchmap()
+
+        fp = remote.stream_out()
+        l = fp.readline()
+        try:
+            resp = int(l)
+        except ValueError:
+            raise error.ResponseError(
+                _('unexpected response from remote server:'), l)
+        if resp == 1:
+            raise util.Abort(_('operation forbidden by server'))
+        elif resp == 2:
+            raise util.Abort(_('locking the remote repository failed'))
+        elif resp != 0:
+            raise util.Abort(_('the server sent an unknown error code'))
+
+        self.applystreamclone(remotereqs, rbranchmap, fp)
+        return len(self.heads()) + 1
+
+    def applystreamclone(self, remotereqs, remotebranchmap, fp):
+        """Apply stream clone data to this repository.
+
+        "remotereqs" is a set of requirements to handle the incoming data.
+        "remotebranchmap" is the result of a branchmap lookup on the remote. It
+        can be None.
+        "fp" is a file object containing the raw stream data, suitable for
+        feeding into exchange.consumestreamclone.
+        """
         lock = self.lock()
         try:
-            # Save remote branchmap. We will use it later
-            # to speed up branchcache creation
-            rbranchmap = None
-            if remote.capable("branchmap"):
-                rbranchmap = remote.branchmap()
-
-            fp = remote.stream_out()
-            l = fp.readline()
-            try:
-                resp = int(l)
-            except ValueError:
-                raise error.ResponseError(
-                    _('unexpected response from remote server:'), l)
-            if resp == 1:
-                raise util.Abort(_('operation forbidden by server'))
-            elif resp == 2:
-                raise util.Abort(_('locking the remote repository failed'))
-            elif resp != 0:
-                raise util.Abort(_('the server sent an unknown error code'))
-
             exchange.consumestreamclone(self, fp)
 
             # new requirements = old non-format requirements +
@@ -1787,10 +1799,10 @@
             self._applyopenerreqs()
             self._writerequirements()
 
-            if rbranchmap:
+            if remotebranchmap:
                 rbheads = []
                 closed = []
-                for bheads in rbranchmap.itervalues():
+                for bheads in remotebranchmap.itervalues():
                     rbheads.extend(bheads)
                     for h in bheads:
                         r = self.changelog.rev(h)
@@ -1801,7 +1813,7 @@
                 if rbheads:
                     rtiprev = max((int(self.changelog.rev(node))
                             for node in rbheads))
-                    cache = branchmap.branchcache(rbranchmap,
+                    cache = branchmap.branchcache(remotebranchmap,
                                                   self[rtiprev].node(),
                                                   rtiprev,
                                                   closednodes=closed)
@@ -1814,7 +1826,6 @@
                             cache.write(rview)
                             break
             self.invalidate()
-            return len(self.heads()) + 1
         finally:
             lock.release()