diff -r 2384523cee4d -r cfdccd560b66 mercurial/streamclone.py --- a/mercurial/streamclone.py Fri Jan 19 22:52:35 2018 +0100 +++ b/mercurial/streamclone.py Thu Jan 18 00:48:56 2018 +0100 @@ -428,3 +428,115 @@ def apply(self, repo): return applybundlev1(repo, self._fh) + +def _emit(repo, entries, totalfilesize): + """actually emit the stream bundle""" + progress = repo.ui.progress + progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) + vfs = repo.svfs + try: + seen = 0 + for name, size in entries: + yield util.uvarintencode(len(name)) + fp = vfs(name) + try: + yield util.uvarintencode(size) + yield name + if size <= 65536: + chunks = (fp.read(size),) + else: + chunks = util.filechunkiter(fp, limit=size) + for chunk in chunks: + seen += len(chunk) + progress(_('bundle'), seen, total=totalfilesize, + unit=_('bytes')) + yield chunk + finally: + fp.close() + finally: + progress(_('bundle'), None) + +def generatev2(repo): + """Emit content for version 2 of a streaming clone. + + the data stream consists the following entries: + 1) A varint containing the length of the filename + 2) A varint containing the length of file data + 3) N bytes containing the filename (the internal, store-agnostic form) + 4) N bytes containing the file data + + Returns a 3-tuple of (file count, file size, data iterator). + """ + + with repo.lock(): + + entries = [] + totalfilesize = 0 + + repo.ui.debug('scanning\n') + for name, ename, size in _walkstreamfiles(repo): + if size: + entries.append((name, size)) + totalfilesize += size + + chunks = _emit(repo, entries, totalfilesize) + + return len(entries), totalfilesize, chunks + +def consumev2(repo, fp, filecount, filesize): + """Apply the contents from a version 2 streaming clone. + + Data is read from an object that only needs to provide a ``read(size)`` + method. + """ + with repo.lock(): + repo.ui.status(_('%d files to transfer, %s of data\n') % + (filecount, util.bytecount(filesize))) + + start = util.timer() + handledbytes = 0 + progress = repo.ui.progress + + progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) + + vfs = repo.svfs + + with repo.transaction('clone'): + with vfs.backgroundclosing(repo.ui): + for i in range(filecount): + namelen = util.uvarintdecodestream(fp) + datalen = util.uvarintdecodestream(fp) + + name = fp.read(namelen) + + if repo.ui.debugflag: + repo.ui.debug('adding %s (%s)\n' % + (name, util.bytecount(datalen))) + + with vfs(name, 'w') as ofp: + for chunk in util.filechunkiter(fp, limit=datalen): + handledbytes += len(chunk) + progress(_('clone'), handledbytes, total=filesize, + unit=_('bytes')) + ofp.write(chunk) + + # force @filecache properties to be reloaded from + # streamclone-ed file at next access + repo.invalidate(clearfilecache=True) + + elapsed = util.timer() - start + if elapsed <= 0: + elapsed = 0.001 + progress(_('clone'), None) + repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % + (util.bytecount(handledbytes), elapsed, + util.bytecount(handledbytes / elapsed))) + +def applybundlev2(repo, fp, filecount, filesize, requirements): + missingreqs = [r for r in requirements if r not in repo.supported] + if missingreqs: + raise error.Abort(_('unable to apply stream clone: ' + 'unsupported format: %s') % + ', '.join(sorted(missingreqs))) + + consumev2(repo, fp, filecount, filesize)