exchangev2: support for calling rawstorefiledata to retrieve raw files
authorGregory Szorc <gregory.szorc@gmail.com>
Wed, 17 Oct 2018 10:10:05 +0200
changeset 40330 00a4cd368e3f
parent 40329 ed55a0077490
child 40331 ac59de55c8b4
exchangev2: support for calling rawstorefiledata to retrieve raw files This is somewhat hacky. For that I apologize. At the 4.8 Sprint, we decided we wanted to land support in wireprotov2 for doing a partial clone with changelog and manifestlog bootstrapped from a "stream clone" like primitive. This commit implements the client-side bits necessary to facilitate that. If the new server-side command for obtaining raw files data is available, we call it to get the raw files for the changelog and manifestlog. Then we fall through to an incremental pull. But when fetching files data, instead of using the list of a changesets and manifests that we fetched via the "changesetdata" command, we do a linear scan of the repo and resolve the changeset and manifest nodes along with the manifest linkrevs. Differential Revision: https://phab.mercurial-scm.org/D5135
mercurial/exchangev2.py
tests/test-wireproto-exchangev2.t
--- a/mercurial/exchangev2.py	Tue Oct 16 21:31:21 2018 +0200
+++ b/mercurial/exchangev2.py	Wed Oct 17 10:10:05 2018 +0200
@@ -29,6 +29,18 @@
     """Pull using wire protocol version 2."""
     repo = pullop.repo
     remote = pullop.remote
+
+    usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
+
+    # If this is a clone and it was requested to perform a "stream clone",
+    # we obtain the raw files data from the remote then fall back to an
+    # incremental pull. This is somewhat hacky and is not nearly robust enough
+    # for long-term usage.
+    if usingrawchangelogandmanifest:
+        with repo.transaction('clone'):
+            _fetchrawstorefiles(repo, remote)
+            repo.invalidate(clearfilecache=True)
+
     tr = pullop.trmanager.transaction()
 
     # We don't use the repo's narrow matcher here because the patterns passed
@@ -79,11 +91,122 @@
 
     manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
 
+    # If obtaining the raw store files, we need to scan the full repo to
+    # derive all the changesets, manifests, and linkrevs.
+    if usingrawchangelogandmanifest:
+        csetsforfiles = []
+        mnodesforfiles = []
+        manifestlinkrevs = {}
+
+        for rev in repo:
+            ctx = repo[rev]
+            mnode = ctx.manifestnode()
+
+            csetsforfiles.append(ctx.node())
+            mnodesforfiles.append(mnode)
+            manifestlinkrevs[mnode] = rev
+
+    else:
+        csetsforfiles = csetres['added']
+        mnodesforfiles = manres['added']
+        manifestlinkrevs = manres['linkrevs']
+
     # Find all file nodes referenced by added manifests and fetch those
     # revisions.
-    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, manres['added'])
-    _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetres['added'],
-                         manres['linkrevs'])
+    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
+    _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles,
+                         manifestlinkrevs)
+
+def _checkuserawstorefiledata(pullop):
+    """Check whether we should use rawstorefiledata command to retrieve data."""
+
+    repo = pullop.repo
+    remote = pullop.remote
+
+    # Command to obtain raw store data isn't available.
+    if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
+        return False
+
+    # Only honor if user requested stream clone operation.
+    if not pullop.streamclonerequested:
+        return False
+
+    # Only works on empty repos.
+    if len(repo):
+        return False
+
+    # TODO This is super hacky. There needs to be a storage API for this. We
+    # also need to check for compatibility with the remote.
+    if b'revlogv1' not in repo.requirements:
+        return False
+
+    return True
+
+def _fetchrawstorefiles(repo, remote):
+    with remote.commandexecutor() as e:
+        objs = e.callcommand(b'rawstorefiledata', {
+            b'files': [b'changelog', b'manifestlog'],
+        }).result()
+
+        # First object is a summary of files data that follows.
+        overall = next(objs)
+
+        progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'],
+                                        unit=_('bytes'))
+        with progress:
+            progress.update(0)
+
+            # Next are pairs of file metadata, data.
+            while True:
+                try:
+                    filemeta = next(objs)
+                except StopIteration:
+                    break
+
+                for k in (b'location', b'path', b'size'):
+                    if k not in filemeta:
+                        raise error.Abort(_(b'remote file data missing key: %s')
+                                          % k)
+
+                if filemeta[b'location'] == b'store':
+                    vfs = repo.svfs
+                else:
+                    raise error.Abort(_(b'invalid location for raw file data: '
+                                        b'%s') % filemeta[b'location'])
+
+                bytesremaining = filemeta[b'size']
+
+                with vfs.open(filemeta[b'path'], b'wb') as fh:
+                    while True:
+                        try:
+                            chunk = next(objs)
+                        except StopIteration:
+                            break
+
+                        bytesremaining -= len(chunk)
+
+                        if bytesremaining < 0:
+                            raise error.Abort(_(
+                                b'received invalid number of bytes for file '
+                                b'data; expected %d, got extra') %
+                                              filemeta[b'size'])
+
+                        progress.increment(step=len(chunk))
+                        fh.write(chunk)
+
+                        try:
+                            if chunk.islast:
+                                break
+                        except AttributeError:
+                            raise error.Abort(_(
+                                b'did not receive indefinite length bytestring '
+                                b'for file data'))
+
+                if bytesremaining:
+                    raise error.Abort(_(b'received invalid number of bytes for'
+                                        b'file data; expected %d got %d') %
+                                      (filemeta[b'size'],
+                                       filemeta[b'size'] - bytesremaining))
 
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
--- a/tests/test-wireproto-exchangev2.t	Tue Oct 16 21:31:21 2018 +0200
+++ b/tests/test-wireproto-exchangev2.t	Wed Oct 17 10:10:05 2018 +0200
@@ -963,3 +963,185 @@
   client-narrow-2/.hg/store/00manifest.i
   client-narrow-2/.hg/store/data/dir0/d.i
 #endif
+
+--stream will use rawfiledata to transfer changelog and manifestlog, then
+fall through to get files data
+
+  $ hg --debug clone --stream -U http://localhost:$HGPORT client-stream-0
+  using http://localhost:$HGPORT/
+  sending capabilities command
+  sending 1 commands
+  sending command rawstorefiledata: {
+    'files': [
+      'changelog',
+      'manifestlog'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=1275; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  updating the branch cache
+  query 1; heads
+  sending 2 commands
+  sending command heads: {}
+  sending command known: {
+    'nodes': [
+      '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=22; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=2; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
+  searching for changes
+  all remote heads known locally
+  sending 1 commands
+  sending command changesetdata: {
+    'fields': set([
+      'bookmarks',
+      'parents',
+      'phase',
+      'revision'
+    ]),
+    'revisions': [
+      {
+        'heads': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'roots': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetdagrange'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=13; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  checking for updated bookmarks
+  sending 1 commands
+  sending command filesdata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'haveparents': True,
+    'revisions': [
+      {
+        'nodes': [
+          '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+          '\xb7\t8\x08\x92\xb1\x93\xc1\t\x1d:\x81\x7fp`R\xe3F\x82\x1b',
+          'G\xfe\x01*\xb27\xa8\xc7\xfc\x0cx\xf9\xf2mXf\xee\xf3\xf8%',
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=1133; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)
+
+--stream + --include/--exclude will only obtain some files
+
+  $ hg --debug --config extensions.pullext=$TESTDIR/pullext.py clone --stream --include dir0/ -U http://localhost:$HGPORT client-stream-2
+  using http://localhost:$HGPORT/
+  sending capabilities command
+  sending 1 commands
+  sending command rawstorefiledata: {
+    'files': [
+      'changelog',
+      'manifestlog'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=1275; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  updating the branch cache
+  query 1; heads
+  sending 2 commands
+  sending command heads: {}
+  sending command known: {
+    'nodes': [
+      '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=22; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=2; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
+  searching for changes
+  all remote heads known locally
+  sending 1 commands
+  sending command changesetdata: {
+    'fields': set([
+      'bookmarks',
+      'parents',
+      'phase',
+      'revision'
+    ]),
+    'revisions': [
+      {
+        'heads': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'roots': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetdagrange'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=13; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  checking for updated bookmarks
+  sending 1 commands
+  sending command filesdata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'haveparents': True,
+    'pathfilter': {
+      'include': [
+        'path:dir0'
+      ]
+    },
+    'revisions': [
+      {
+        'nodes': [
+          '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+          '\xb7\t8\x08\x92\xb1\x93\xc1\t\x1d:\x81\x7fp`R\xe3F\x82\x1b',
+          'G\xfe\x01*\xb27\xa8\xc7\xfc\x0cx\xf9\xf2mXf\xee\xf3\xf8%',
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=449; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)
+
+#if reporevlogstore
+  $ find client-stream-2/.hg/store -type f -name '*.i' | sort
+  client-stream-2/.hg/store/00changelog.i
+  client-stream-2/.hg/store/00manifest.i
+  client-stream-2/.hg/store/data/dir0/c.i
+  client-stream-2/.hg/store/data/dir0/d.i
+#endif