mercurial/streamclone.py
changeset 43077 687b865b95ad
parent 43076 2372284d9457
child 43085 eef9a2d67051
equal deleted inserted replaced
43076:2372284d9457 43077:687b865b95ad
    38     repo = pullop.repo
    38     repo = pullop.repo
    39     remote = pullop.remote
    39     remote = pullop.remote
    40 
    40 
    41     bundle2supported = False
    41     bundle2supported = False
    42     if pullop.canusebundle2:
    42     if pullop.canusebundle2:
    43         if 'v2' in pullop.remotebundle2caps.get('stream', []):
    43         if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
    44             bundle2supported = True
    44             bundle2supported = True
    45         # else
    45         # else
    46         # Server doesn't support bundle2 stream clone or doesn't support
    46         # Server doesn't support bundle2 stream clone or doesn't support
    47         # the versions we support. Fall back and possibly allow legacy.
    47         # the versions we support. Fall back and possibly allow legacy.
    48 
    48 
    65 
    65 
    66     # If we don't have a preference, let the server decide for us. This
    66     # If we don't have a preference, let the server decide for us. This
    67     # likely only comes into play in LANs.
    67     # likely only comes into play in LANs.
    68     if streamrequested is None:
    68     if streamrequested is None:
    69         # The server can advertise whether to prefer streaming clone.
    69         # The server can advertise whether to prefer streaming clone.
    70         streamrequested = remote.capable('stream-preferred')
    70         streamrequested = remote.capable(b'stream-preferred')
    71 
    71 
    72     if not streamrequested:
    72     if not streamrequested:
    73         return False, None
    73         return False, None
    74 
    74 
    75     # In order for stream clone to work, the client has to support all the
    75     # In order for stream clone to work, the client has to support all the
    78     # The server advertises its requirements via the "stream" and "streamreqs"
    78     # The server advertises its requirements via the "stream" and "streamreqs"
    79     # capability. "stream" (a value-less capability) is advertised if and only
    79     # capability. "stream" (a value-less capability) is advertised if and only
    80     # if the only requirement is "revlogv1." Else, the "streamreqs" capability
    80     # if the only requirement is "revlogv1." Else, the "streamreqs" capability
    81     # is advertised and contains a comma-delimited list of requirements.
    81     # is advertised and contains a comma-delimited list of requirements.
    82     requirements = set()
    82     requirements = set()
    83     if remote.capable('stream'):
    83     if remote.capable(b'stream'):
    84         requirements.add('revlogv1')
    84         requirements.add(b'revlogv1')
    85     else:
    85     else:
    86         streamreqs = remote.capable('streamreqs')
    86         streamreqs = remote.capable(b'streamreqs')
    87         # This is weird and shouldn't happen with modern servers.
    87         # This is weird and shouldn't happen with modern servers.
    88         if not streamreqs:
    88         if not streamreqs:
    89             pullop.repo.ui.warn(
    89             pullop.repo.ui.warn(
    90                 _(
    90                 _(
    91                     'warning: stream clone requested but server has them '
    91                     b'warning: stream clone requested but server has them '
    92                     'disabled\n'
    92                     b'disabled\n'
    93                 )
    93                 )
    94             )
    94             )
    95             return False, None
    95             return False, None
    96 
    96 
    97         streamreqs = set(streamreqs.split(','))
    97         streamreqs = set(streamreqs.split(b','))
    98         # Server requires something we don't support. Bail.
    98         # Server requires something we don't support. Bail.
    99         missingreqs = streamreqs - repo.supportedformats
    99         missingreqs = streamreqs - repo.supportedformats
   100         if missingreqs:
   100         if missingreqs:
   101             pullop.repo.ui.warn(
   101             pullop.repo.ui.warn(
   102                 _(
   102                 _(
   103                     'warning: stream clone requested but client is missing '
   103                     b'warning: stream clone requested but client is missing '
   104                     'requirements: %s\n'
   104                     b'requirements: %s\n'
   105                 )
   105                 )
   106                 % ', '.join(sorted(missingreqs))
   106                 % b', '.join(sorted(missingreqs))
   107             )
   107             )
   108             pullop.repo.ui.warn(
   108             pullop.repo.ui.warn(
   109                 _(
   109                 _(
   110                     '(see https://www.mercurial-scm.org/wiki/MissingRequirement '
   110                     b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
   111                     'for more information)\n'
   111                     b'for more information)\n'
   112                 )
   112                 )
   113             )
   113             )
   114             return False, None
   114             return False, None
   115         requirements = streamreqs
   115         requirements = streamreqs
   116 
   116 
   137     remote = pullop.remote
   137     remote = pullop.remote
   138 
   138 
   139     # Save remote branchmap. We will use it later to speed up branchcache
   139     # Save remote branchmap. We will use it later to speed up branchcache
   140     # creation.
   140     # creation.
   141     rbranchmap = None
   141     rbranchmap = None
   142     if remote.capable('branchmap'):
   142     if remote.capable(b'branchmap'):
   143         with remote.commandexecutor() as e:
   143         with remote.commandexecutor() as e:
   144             rbranchmap = e.callcommand('branchmap', {}).result()
   144             rbranchmap = e.callcommand(b'branchmap', {}).result()
   145 
   145 
   146     repo.ui.status(_('streaming all changes\n'))
   146     repo.ui.status(_(b'streaming all changes\n'))
   147 
   147 
   148     with remote.commandexecutor() as e:
   148     with remote.commandexecutor() as e:
   149         fp = e.callcommand('stream_out', {}).result()
   149         fp = e.callcommand(b'stream_out', {}).result()
   150 
   150 
   151     # TODO strictly speaking, this code should all be inside the context
   151     # TODO strictly speaking, this code should all be inside the context
   152     # manager because the context manager is supposed to ensure all wire state
   152     # manager because the context manager is supposed to ensure all wire state
   153     # is flushed when exiting. But the legacy peers don't do this, so it
   153     # is flushed when exiting. But the legacy peers don't do this, so it
   154     # doesn't matter.
   154     # doesn't matter.
   155     l = fp.readline()
   155     l = fp.readline()
   156     try:
   156     try:
   157         resp = int(l)
   157         resp = int(l)
   158     except ValueError:
   158     except ValueError:
   159         raise error.ResponseError(
   159         raise error.ResponseError(
   160             _('unexpected response from remote server:'), l
   160             _(b'unexpected response from remote server:'), l
   161         )
   161         )
   162     if resp == 1:
   162     if resp == 1:
   163         raise error.Abort(_('operation forbidden by server'))
   163         raise error.Abort(_(b'operation forbidden by server'))
   164     elif resp == 2:
   164     elif resp == 2:
   165         raise error.Abort(_('locking the remote repository failed'))
   165         raise error.Abort(_(b'locking the remote repository failed'))
   166     elif resp != 0:
   166     elif resp != 0:
   167         raise error.Abort(_('the server sent an unknown error code'))
   167         raise error.Abort(_(b'the server sent an unknown error code'))
   168 
   168 
   169     l = fp.readline()
   169     l = fp.readline()
   170     try:
   170     try:
   171         filecount, bytecount = map(int, l.split(' ', 1))
   171         filecount, bytecount = map(int, l.split(b' ', 1))
   172     except (ValueError, TypeError):
   172     except (ValueError, TypeError):
   173         raise error.ResponseError(
   173         raise error.ResponseError(
   174             _('unexpected response from remote server:'), l
   174             _(b'unexpected response from remote server:'), l
   175         )
   175         )
   176 
   176 
   177     with repo.lock():
   177     with repo.lock():
   178         consumev1(repo, fp, filecount, bytecount)
   178         consumev1(repo, fp, filecount, bytecount)
   179 
   179 
   197 def allowservergeneration(repo):
   197 def allowservergeneration(repo):
   198     """Whether streaming clones are allowed from the server."""
   198     """Whether streaming clones are allowed from the server."""
   199     if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
   199     if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
   200         return False
   200         return False
   201 
   201 
   202     if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
   202     if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
   203         return False
   203         return False
   204 
   204 
   205     # The way stream clone works makes it impossible to hide secret changesets.
   205     # The way stream clone works makes it impossible to hide secret changesets.
   206     # So don't allow this by default.
   206     # So don't allow this by default.
   207     secret = phases.hassecret(repo)
   207     secret = phases.hassecret(repo)
   208     if secret:
   208     if secret:
   209         return repo.ui.configbool('server', 'uncompressedallowsecret')
   209         return repo.ui.configbool(b'server', b'uncompressedallowsecret')
   210 
   210 
   211     return True
   211     return True
   212 
   212 
   213 
   213 
   214 # This is it's own function so extensions can override it.
   214 # This is it's own function so extensions can override it.
   237     """
   237     """
   238     entries = []
   238     entries = []
   239     total_bytes = 0
   239     total_bytes = 0
   240     # Get consistent snapshot of repo, lock during scan.
   240     # Get consistent snapshot of repo, lock during scan.
   241     with repo.lock():
   241     with repo.lock():
   242         repo.ui.debug('scanning\n')
   242         repo.ui.debug(b'scanning\n')
   243         for name, ename, size in _walkstreamfiles(repo):
   243         for name, ename, size in _walkstreamfiles(repo):
   244             if size:
   244             if size:
   245                 entries.append((name, size))
   245                 entries.append((name, size))
   246                 total_bytes += size
   246                 total_bytes += size
   247 
   247 
   248     repo.ui.debug(
   248     repo.ui.debug(
   249         '%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
   249         b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
   250     )
   250     )
   251 
   251 
   252     svfs = repo.svfs
   252     svfs = repo.svfs
   253     debugflag = repo.ui.debugflag
   253     debugflag = repo.ui.debugflag
   254 
   254 
   255     def emitrevlogdata():
   255     def emitrevlogdata():
   256         for name, size in entries:
   256         for name, size in entries:
   257             if debugflag:
   257             if debugflag:
   258                 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
   258                 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
   259             # partially encode name over the wire for backwards compat
   259             # partially encode name over the wire for backwards compat
   260             yield '%s\0%d\n' % (store.encodedir(name), size)
   260             yield b'%s\0%d\n' % (store.encodedir(name), size)
   261             # auditing at this stage is both pointless (paths are already
   261             # auditing at this stage is both pointless (paths are already
   262             # trusted by the local repo) and expensive
   262             # trusted by the local repo) and expensive
   263             with svfs(name, 'rb', auditpath=False) as fp:
   263             with svfs(name, b'rb', auditpath=False) as fp:
   264                 if size <= 65536:
   264                 if size <= 65536:
   265                     yield fp.read(size)
   265                     yield fp.read(size)
   266                 else:
   266                 else:
   267                     for chunk in util.filechunkiter(fp, limit=size):
   267                     for chunk in util.filechunkiter(fp, limit=size):
   268                         yield chunk
   268                         yield chunk
   280     The success line contains "0" for success, "1" for stream generation not
   280     The success line contains "0" for success, "1" for stream generation not
   281     allowed, and "2" for error locking the repository (possibly indicating
   281     allowed, and "2" for error locking the repository (possibly indicating
   282     a permissions error for the server process).
   282     a permissions error for the server process).
   283     """
   283     """
   284     if not allowservergeneration(repo):
   284     if not allowservergeneration(repo):
   285         yield '1\n'
   285         yield b'1\n'
   286         return
   286         return
   287 
   287 
   288     try:
   288     try:
   289         filecount, bytecount, it = generatev1(repo)
   289         filecount, bytecount, it = generatev1(repo)
   290     except error.LockError:
   290     except error.LockError:
   291         yield '2\n'
   291         yield b'2\n'
   292         return
   292         return
   293 
   293 
   294     # Indicates successful response.
   294     # Indicates successful response.
   295     yield '0\n'
   295     yield b'0\n'
   296     yield '%d %d\n' % (filecount, bytecount)
   296     yield b'%d %d\n' % (filecount, bytecount)
   297     for chunk in it:
   297     for chunk in it:
   298         yield chunk
   298         yield chunk
   299 
   299 
   300 
   300 
   301 def generatebundlev1(repo, compression='UN'):
   301 def generatebundlev1(repo, compression=b'UN'):
   302     """Emit content for version 1 of a stream clone bundle.
   302     """Emit content for version 1 of a stream clone bundle.
   303 
   303 
   304     The first 4 bytes of the output ("HGS1") denote this as stream clone
   304     The first 4 bytes of the output ("HGS1") denote this as stream clone
   305     bundle version 1.
   305     bundle version 1.
   306 
   306 
   318     The remaining content is the output of ``generatev1()`` (which may be
   318     The remaining content is the output of ``generatev1()`` (which may be
   319     compressed in the future).
   319     compressed in the future).
   320 
   320 
   321     Returns a tuple of (requirements, data generator).
   321     Returns a tuple of (requirements, data generator).
   322     """
   322     """
   323     if compression != 'UN':
   323     if compression != b'UN':
   324         raise ValueError('we do not support the compression argument yet')
   324         raise ValueError(b'we do not support the compression argument yet')
   325 
   325 
   326     requirements = repo.requirements & repo.supportedformats
   326     requirements = repo.requirements & repo.supportedformats
   327     requires = ','.join(sorted(requirements))
   327     requires = b','.join(sorted(requirements))
   328 
   328 
   329     def gen():
   329     def gen():
   330         yield 'HGS1'
   330         yield b'HGS1'
   331         yield compression
   331         yield compression
   332 
   332 
   333         filecount, bytecount, it = generatev1(repo)
   333         filecount, bytecount, it = generatev1(repo)
   334         repo.ui.status(
   334         repo.ui.status(
   335             _('writing %d bytes for %d files\n') % (bytecount, filecount)
   335             _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
   336         )
   336         )
   337 
   337 
   338         yield struct.pack('>QQ', filecount, bytecount)
   338         yield struct.pack(b'>QQ', filecount, bytecount)
   339         yield struct.pack('>H', len(requires) + 1)
   339         yield struct.pack(b'>H', len(requires) + 1)
   340         yield requires + '\0'
   340         yield requires + b'\0'
   341 
   341 
   342         # This is where we'll add compression in the future.
   342         # This is where we'll add compression in the future.
   343         assert compression == 'UN'
   343         assert compression == b'UN'
   344 
   344 
   345         progress = repo.ui.makeprogress(
   345         progress = repo.ui.makeprogress(
   346             _('bundle'), total=bytecount, unit=_('bytes')
   346             _(b'bundle'), total=bytecount, unit=_(b'bytes')
   347         )
   347         )
   348         progress.update(0)
   348         progress.update(0)
   349 
   349 
   350         for chunk in it:
   350         for chunk in it:
   351             progress.increment(step=len(chunk))
   351             progress.increment(step=len(chunk))
   365     Like "stream_out," the status line added by the wire protocol is not
   365     Like "stream_out," the status line added by the wire protocol is not
   366     handled by this function.
   366     handled by this function.
   367     """
   367     """
   368     with repo.lock():
   368     with repo.lock():
   369         repo.ui.status(
   369         repo.ui.status(
   370             _('%d files to transfer, %s of data\n')
   370             _(b'%d files to transfer, %s of data\n')
   371             % (filecount, util.bytecount(bytecount))
   371             % (filecount, util.bytecount(bytecount))
   372         )
   372         )
   373         progress = repo.ui.makeprogress(
   373         progress = repo.ui.makeprogress(
   374             _('clone'), total=bytecount, unit=_('bytes')
   374             _(b'clone'), total=bytecount, unit=_(b'bytes')
   375         )
   375         )
   376         progress.update(0)
   376         progress.update(0)
   377         start = util.timer()
   377         start = util.timer()
   378 
   378 
   379         # TODO: get rid of (potential) inconsistency
   379         # TODO: get rid of (potential) inconsistency
   388         #
   388         #
   389         # But transaction nesting can't be simply prohibited, because
   389         # But transaction nesting can't be simply prohibited, because
   390         # nesting occurs also in ordinary case (e.g. enabling
   390         # nesting occurs also in ordinary case (e.g. enabling
   391         # clonebundles).
   391         # clonebundles).
   392 
   392 
   393         with repo.transaction('clone'):
   393         with repo.transaction(b'clone'):
   394             with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
   394             with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
   395                 for i in pycompat.xrange(filecount):
   395                 for i in pycompat.xrange(filecount):
   396                     # XXX doesn't support '\n' or '\r' in filenames
   396                     # XXX doesn't support '\n' or '\r' in filenames
   397                     l = fp.readline()
   397                     l = fp.readline()
   398                     try:
   398                     try:
   399                         name, size = l.split('\0', 1)
   399                         name, size = l.split(b'\0', 1)
   400                         size = int(size)
   400                         size = int(size)
   401                     except (ValueError, TypeError):
   401                     except (ValueError, TypeError):
   402                         raise error.ResponseError(
   402                         raise error.ResponseError(
   403                             _('unexpected response from remote server:'), l
   403                             _(b'unexpected response from remote server:'), l
   404                         )
   404                         )
   405                     if repo.ui.debugflag:
   405                     if repo.ui.debugflag:
   406                         repo.ui.debug(
   406                         repo.ui.debug(
   407                             'adding %s (%s)\n' % (name, util.bytecount(size))
   407                             b'adding %s (%s)\n' % (name, util.bytecount(size))
   408                         )
   408                         )
   409                     # for backwards compat, name was partially encoded
   409                     # for backwards compat, name was partially encoded
   410                     path = store.decodedir(name)
   410                     path = store.decodedir(name)
   411                     with repo.svfs(path, 'w', backgroundclose=True) as ofp:
   411                     with repo.svfs(path, b'w', backgroundclose=True) as ofp:
   412                         for chunk in util.filechunkiter(fp, limit=size):
   412                         for chunk in util.filechunkiter(fp, limit=size):
   413                             progress.increment(step=len(chunk))
   413                             progress.increment(step=len(chunk))
   414                             ofp.write(chunk)
   414                             ofp.write(chunk)
   415 
   415 
   416             # force @filecache properties to be reloaded from
   416             # force @filecache properties to be reloaded from
   420         elapsed = util.timer() - start
   420         elapsed = util.timer() - start
   421         if elapsed <= 0:
   421         if elapsed <= 0:
   422             elapsed = 0.001
   422             elapsed = 0.001
   423         progress.complete()
   423         progress.complete()
   424         repo.ui.status(
   424         repo.ui.status(
   425             _('transferred %s in %.1f seconds (%s/sec)\n')
   425             _(b'transferred %s in %.1f seconds (%s/sec)\n')
   426             % (
   426             % (
   427                 util.bytecount(bytecount),
   427                 util.bytecount(bytecount),
   428                 elapsed,
   428                 elapsed,
   429                 util.bytecount(bytecount / elapsed),
   429                 util.bytecount(bytecount / elapsed),
   430             )
   430             )
   431         )
   431         )
   432 
   432 
   433 
   433 
   434 def readbundle1header(fp):
   434 def readbundle1header(fp):
   435     compression = fp.read(2)
   435     compression = fp.read(2)
   436     if compression != 'UN':
   436     if compression != b'UN':
   437         raise error.Abort(
       
   438             _('only uncompressed stream clone bundles are ' 'supported; got %s')
       
   439             % compression
       
   440         )
       
   441 
       
   442     filecount, bytecount = struct.unpack('>QQ', fp.read(16))
       
   443     requireslen = struct.unpack('>H', fp.read(2))[0]
       
   444     requires = fp.read(requireslen)
       
   445 
       
   446     if not requires.endswith('\0'):
       
   447         raise error.Abort(
   437         raise error.Abort(
   448             _(
   438             _(
   449                 'malformed stream clone bundle: '
   439                 b'only uncompressed stream clone bundles are '
   450                 'requirements not properly encoded'
   440                 b'supported; got %s'
   451             )
   441             )
   452         )
   442             % compression
   453 
   443         )
   454     requirements = set(requires.rstrip('\0').split(','))
   444 
       
   445     filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
       
   446     requireslen = struct.unpack(b'>H', fp.read(2))[0]
       
   447     requires = fp.read(requireslen)
       
   448 
       
   449     if not requires.endswith(b'\0'):
       
   450         raise error.Abort(
       
   451             _(
       
   452                 b'malformed stream clone bundle: '
       
   453                 b'requirements not properly encoded'
       
   454             )
       
   455         )
       
   456 
       
   457     requirements = set(requires.rstrip(b'\0').split(b','))
   455 
   458 
   456     return filecount, bytecount, requirements
   459     return filecount, bytecount, requirements
   457 
   460 
   458 
   461 
   459 def applybundlev1(repo, fp):
   462 def applybundlev1(repo, fp):
   462     We assume the 4 byte header has been read and validated and the file handle
   465     We assume the 4 byte header has been read and validated and the file handle
   463     is at the 2 byte compression identifier.
   466     is at the 2 byte compression identifier.
   464     """
   467     """
   465     if len(repo):
   468     if len(repo):
   466         raise error.Abort(
   469         raise error.Abort(
   467             _('cannot apply stream clone bundle on non-empty ' 'repo')
   470             _(b'cannot apply stream clone bundle on non-empty ' b'repo')
   468         )
   471         )
   469 
   472 
   470     filecount, bytecount, requirements = readbundle1header(fp)
   473     filecount, bytecount, requirements = readbundle1header(fp)
   471     missingreqs = requirements - repo.supportedformats
   474     missingreqs = requirements - repo.supportedformats
   472     if missingreqs:
   475     if missingreqs:
   473         raise error.Abort(
   476         raise error.Abort(
   474             _('unable to apply stream clone: ' 'unsupported format: %s')
   477             _(b'unable to apply stream clone: ' b'unsupported format: %s')
   475             % ', '.join(sorted(missingreqs))
   478             % b', '.join(sorted(missingreqs))
   476         )
   479         )
   477 
   480 
   478     consumev1(repo, fp, filecount, bytecount)
   481     consumev1(repo, fp, filecount, bytecount)
   479 
   482 
   480 
   483 
   495 # type of file to stream
   498 # type of file to stream
   496 _fileappend = 0  # append only file
   499 _fileappend = 0  # append only file
   497 _filefull = 1  # full snapshot file
   500 _filefull = 1  # full snapshot file
   498 
   501 
   499 # Source of the file
   502 # Source of the file
   500 _srcstore = 's'  # store (svfs)
   503 _srcstore = b's'  # store (svfs)
   501 _srccache = 'c'  # cache (cache)
   504 _srccache = b'c'  # cache (cache)
   502 
   505 
   503 # This is it's own function so extensions can override it.
   506 # This is it's own function so extensions can override it.
   504 def _walkstreamfullstorefiles(repo):
   507 def _walkstreamfullstorefiles(repo):
   505     """list snapshot file from the store"""
   508     """list snapshot file from the store"""
   506     fnames = []
   509     fnames = []
   507     if not repo.publishing():
   510     if not repo.publishing():
   508         fnames.append('phaseroots')
   511         fnames.append(b'phaseroots')
   509     return fnames
   512     return fnames
   510 
   513 
   511 
   514 
   512 def _filterfull(entry, copy, vfsmap):
   515 def _filterfull(entry, copy, vfsmap):
   513     """actually copy the snapshot files"""
   516     """actually copy the snapshot files"""
   551 
   554 
   552 def _emit2(repo, entries, totalfilesize):
   555 def _emit2(repo, entries, totalfilesize):
   553     """actually emit the stream bundle"""
   556     """actually emit the stream bundle"""
   554     vfsmap = _makemap(repo)
   557     vfsmap = _makemap(repo)
   555     progress = repo.ui.makeprogress(
   558     progress = repo.ui.makeprogress(
   556         _('bundle'), total=totalfilesize, unit=_('bytes')
   559         _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
   557     )
   560     )
   558     progress.update(0)
   561     progress.update(0)
   559     with maketempcopies() as copy, progress:
   562     with maketempcopies() as copy, progress:
   560         # copy is delayed until we are in the try
   563         # copy is delayed until we are in the try
   561         entries = [_filterfull(e, copy, vfsmap) for e in entries]
   564         entries = [_filterfull(e, copy, vfsmap) for e in entries]
   568             yield util.uvarintencode(len(name))
   571             yield util.uvarintencode(len(name))
   569             if ftype == _fileappend:
   572             if ftype == _fileappend:
   570                 fp = vfs(name)
   573                 fp = vfs(name)
   571                 size = data
   574                 size = data
   572             elif ftype == _filefull:
   575             elif ftype == _filefull:
   573                 fp = open(data, 'rb')
   576                 fp = open(data, b'rb')
   574                 size = util.fstat(fp).st_size
   577                 size = util.fstat(fp).st_size
   575             try:
   578             try:
   576                 yield util.uvarintencode(size)
   579                 yield util.uvarintencode(size)
   577                 yield name
   580                 yield name
   578                 if size <= 65536:
   581                 if size <= 65536:
   607 
   610 
   608         matcher = None
   611         matcher = None
   609         if includes or excludes:
   612         if includes or excludes:
   610             matcher = narrowspec.match(repo.root, includes, excludes)
   613             matcher = narrowspec.match(repo.root, includes, excludes)
   611 
   614 
   612         repo.ui.debug('scanning\n')
   615         repo.ui.debug(b'scanning\n')
   613         for name, ename, size in _walkstreamfiles(repo, matcher):
   616         for name, ename, size in _walkstreamfiles(repo, matcher):
   614             if size:
   617             if size:
   615                 entries.append((_srcstore, name, _fileappend, size))
   618                 entries.append((_srcstore, name, _fileappend, size))
   616                 totalfilesize += size
   619                 totalfilesize += size
   617         for name in _walkstreamfullstorefiles(repo):
   620         for name in _walkstreamfullstorefiles(repo):
   618             if repo.svfs.exists(name):
   621             if repo.svfs.exists(name):
   619                 totalfilesize += repo.svfs.lstat(name).st_size
   622                 totalfilesize += repo.svfs.lstat(name).st_size
   620                 entries.append((_srcstore, name, _filefull, None))
   623                 entries.append((_srcstore, name, _filefull, None))
   621         if includeobsmarkers and repo.svfs.exists('obsstore'):
   624         if includeobsmarkers and repo.svfs.exists(b'obsstore'):
   622             totalfilesize += repo.svfs.lstat('obsstore').st_size
   625             totalfilesize += repo.svfs.lstat(b'obsstore').st_size
   623             entries.append((_srcstore, 'obsstore', _filefull, None))
   626             entries.append((_srcstore, b'obsstore', _filefull, None))
   624         for name in cacheutil.cachetocopy(repo):
   627         for name in cacheutil.cachetocopy(repo):
   625             if repo.cachevfs.exists(name):
   628             if repo.cachevfs.exists(name):
   626                 totalfilesize += repo.cachevfs.lstat(name).st_size
   629                 totalfilesize += repo.cachevfs.lstat(name).st_size
   627                 entries.append((_srccache, name, _filefull, None))
   630                 entries.append((_srccache, name, _filefull, None))
   628 
   631 
   651     Data is read from an object that only needs to provide a ``read(size)``
   654     Data is read from an object that only needs to provide a ``read(size)``
   652     method.
   655     method.
   653     """
   656     """
   654     with repo.lock():
   657     with repo.lock():
   655         repo.ui.status(
   658         repo.ui.status(
   656             _('%d files to transfer, %s of data\n')
   659             _(b'%d files to transfer, %s of data\n')
   657             % (filecount, util.bytecount(filesize))
   660             % (filecount, util.bytecount(filesize))
   658         )
   661         )
   659 
   662 
   660         start = util.timer()
   663         start = util.timer()
   661         progress = repo.ui.makeprogress(
   664         progress = repo.ui.makeprogress(
   662             _('clone'), total=filesize, unit=_('bytes')
   665             _(b'clone'), total=filesize, unit=_(b'bytes')
   663         )
   666         )
   664         progress.update(0)
   667         progress.update(0)
   665 
   668 
   666         vfsmap = _makemap(repo)
   669         vfsmap = _makemap(repo)
   667 
   670 
   668         with repo.transaction('clone'):
   671         with repo.transaction(b'clone'):
   669             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
   672             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
   670             with nested(*ctxs):
   673             with nested(*ctxs):
   671                 for i in range(filecount):
   674                 for i in range(filecount):
   672                     src = util.readexactly(fp, 1)
   675                     src = util.readexactly(fp, 1)
   673                     vfs = vfsmap[src]
   676                     vfs = vfsmap[src]
   676 
   679 
   677                     name = util.readexactly(fp, namelen)
   680                     name = util.readexactly(fp, namelen)
   678 
   681 
   679                     if repo.ui.debugflag:
   682                     if repo.ui.debugflag:
   680                         repo.ui.debug(
   683                         repo.ui.debug(
   681                             'adding [%s] %s (%s)\n'
   684                             b'adding [%s] %s (%s)\n'
   682                             % (src, name, util.bytecount(datalen))
   685                             % (src, name, util.bytecount(datalen))
   683                         )
   686                         )
   684 
   687 
   685                     with vfs(name, 'w') as ofp:
   688                     with vfs(name, b'w') as ofp:
   686                         for chunk in util.filechunkiter(fp, limit=datalen):
   689                         for chunk in util.filechunkiter(fp, limit=datalen):
   687                             progress.increment(step=len(chunk))
   690                             progress.increment(step=len(chunk))
   688                             ofp.write(chunk)
   691                             ofp.write(chunk)
   689 
   692 
   690             # force @filecache properties to be reloaded from
   693             # force @filecache properties to be reloaded from
   693 
   696 
   694         elapsed = util.timer() - start
   697         elapsed = util.timer() - start
   695         if elapsed <= 0:
   698         if elapsed <= 0:
   696             elapsed = 0.001
   699             elapsed = 0.001
   697         repo.ui.status(
   700         repo.ui.status(
   698             _('transferred %s in %.1f seconds (%s/sec)\n')
   701             _(b'transferred %s in %.1f seconds (%s/sec)\n')
   699             % (
   702             % (
   700                 util.bytecount(progress.pos),
   703                 util.bytecount(progress.pos),
   701                 elapsed,
   704                 elapsed,
   702                 util.bytecount(progress.pos / elapsed),
   705                 util.bytecount(progress.pos / elapsed),
   703             )
   706             )
   709     from . import localrepo
   712     from . import localrepo
   710 
   713 
   711     missingreqs = [r for r in requirements if r not in repo.supported]
   714     missingreqs = [r for r in requirements if r not in repo.supported]
   712     if missingreqs:
   715     if missingreqs:
   713         raise error.Abort(
   716         raise error.Abort(
   714             _('unable to apply stream clone: ' 'unsupported format: %s')
   717             _(b'unable to apply stream clone: ' b'unsupported format: %s')
   715             % ', '.join(sorted(missingreqs))
   718             % b', '.join(sorted(missingreqs))
   716         )
   719         )
   717 
   720 
   718     consumev2(repo, fp, filecount, filesize)
   721     consumev2(repo, fp, filecount, filesize)
   719 
   722 
   720     # new requirements = old non-format requirements +
   723     # new requirements = old non-format requirements +