hgext/infinitepush/__init__.py
changeset 50803 609a3b8058c3
parent 50802 cf0502231d56
child 50806 337bc83c1275
equal deleted inserted replaced
50802:cf0502231d56 50803:609a3b8058c3
     1 # Infinite push
       
     2 #
       
     3 # Copyright 2016 Facebook, Inc.
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 """ store some pushes in a remote blob store on the server (EXPERIMENTAL)
       
     8 
       
     9 IMPORTANT: if you use this extension, please contact
       
    10 mercurial-devel@mercurial-scm.org ASAP. This extension is believed to
       
    11 be unused and barring learning of users of this functionality, we will
       
    12 delete this code at the end of 2020.
       
    13 
       
    14     [infinitepush]
       
    15     # Server-side and client-side option. Pattern of the infinitepush bookmark
       
    16     branchpattern = PATTERN
       
    17 
       
    18     # Server or client
       
    19     server = False
       
    20 
       
    21     # Server-side option. Possible values: 'disk' or 'sql'. Fails if not set
       
    22     indextype = disk
       
    23 
       
    24     # Server-side option. Used only if indextype=sql.
       
    25     # Format: 'IP:PORT:DB_NAME:USER:PASSWORD'
       
    26     sqlhost = IP:PORT:DB_NAME:USER:PASSWORD
       
    27 
       
    28     # Server-side option. Used only if indextype=disk.
       
    29     # Filesystem path to the index store
       
    30     indexpath = PATH
       
    31 
       
    32     # Server-side option. Possible values: 'disk' or 'external'
       
    33     # Fails if not set
       
    34     storetype = disk
       
    35 
       
    36     # Server-side option.
       
    37     # Path to the binary that will save bundle to the bundlestore
       
    38     # Formatted cmd line will be passed to it (see `put_args`)
       
    39     put_binary = put
       
    40 
       
    41     # Serser-side option. Used only if storetype=external.
       
    42     # Format cmd-line string for put binary. Placeholder: {filename}
       
    43     put_args = {filename}
       
    44 
       
    45     # Server-side option.
       
    46     # Path to the binary that get bundle from the bundlestore.
       
    47     # Formatted cmd line will be passed to it (see `get_args`)
       
    48     get_binary = get
       
    49 
       
    50     # Serser-side option. Used only if storetype=external.
       
    51     # Format cmd-line string for get binary. Placeholders: {filename} {handle}
       
    52     get_args = {filename} {handle}
       
    53 
       
    54     # Server-side option
       
    55     logfile = FIlE
       
    56 
       
    57     # Server-side option
       
    58     loglevel = DEBUG
       
    59 
       
    60     # Server-side option. Used only if indextype=sql.
       
    61     # Sets mysql wait_timeout option.
       
    62     waittimeout = 300
       
    63 
       
    64     # Server-side option. Used only if indextype=sql.
       
    65     # Sets mysql innodb_lock_wait_timeout option.
       
    66     locktimeout = 120
       
    67 
       
    68     # Server-side option. Used only if indextype=sql.
       
    69     # Name of the repository
       
    70     reponame = ''
       
    71 
       
    72     # Client-side option. Used by --list-remote option. List of remote scratch
       
    73     # patterns to list if no patterns are specified.
       
    74     defaultremotepatterns = ['*']
       
    75 
       
    76     # Instructs infinitepush to forward all received bundle2 parts to the
       
    77     # bundle for storage. Defaults to False.
       
    78     storeallparts = True
       
    79 
       
    80     # routes each incoming push to the bundlestore. defaults to False
       
    81     pushtobundlestore = True
       
    82 
       
    83     [remotenames]
       
    84     # Client-side option
       
    85     # This option should be set only if remotenames extension is enabled.
       
    86     # Whether remote bookmarks are tracked by remotenames extension.
       
    87     bookmarks = True
       
    88 """
       
    89 
       
    90 
       
    91 import collections
       
    92 import contextlib
       
    93 import functools
       
    94 import logging
       
    95 import os
       
    96 import random
       
    97 import re
       
    98 import socket
       
    99 import subprocess
       
   100 import time
       
   101 
       
   102 from mercurial.node import (
       
   103     bin,
       
   104     hex,
       
   105 )
       
   106 
       
   107 from mercurial.i18n import _
       
   108 
       
   109 from mercurial.pycompat import (
       
   110     getattr,
       
   111     open,
       
   112 )
       
   113 
       
   114 from mercurial.utils import (
       
   115     procutil,
       
   116     stringutil,
       
   117     urlutil,
       
   118 )
       
   119 
       
   120 from mercurial import (
       
   121     bundle2,
       
   122     changegroup,
       
   123     commands,
       
   124     discovery,
       
   125     encoding,
       
   126     error,
       
   127     exchange,
       
   128     extensions,
       
   129     hg,
       
   130     localrepo,
       
   131     phases,
       
   132     pushkey,
       
   133     pycompat,
       
   134     registrar,
       
   135     util,
       
   136     wireprototypes,
       
   137     wireprotov1peer,
       
   138     wireprotov1server,
       
   139 )
       
   140 
       
   141 from . import (
       
   142     bundleparts,
       
   143     common,
       
   144 )
       
   145 
       
   146 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
       
   147 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
       
   148 # be specifying the version(s) of Mercurial they are tested with, or
       
   149 # leave the attribute unspecified.
       
   150 testedwith = b'ships-with-hg-core'
       
   151 
       
   152 configtable = {}
       
   153 configitem = registrar.configitem(configtable)
       
   154 
       
   155 configitem(
       
   156     b'infinitepush',
       
   157     b'deprecation-message',
       
   158     default=True,
       
   159 )
       
   160 
       
   161 configitem(
       
   162     b'infinitepush',
       
   163     b'deprecation-abort',
       
   164     default=True,
       
   165 )
       
   166 
       
   167 configitem(
       
   168     b'infinitepush',
       
   169     b'server',
       
   170     default=False,
       
   171 )
       
   172 configitem(
       
   173     b'infinitepush',
       
   174     b'storetype',
       
   175     default=b'',
       
   176 )
       
   177 configitem(
       
   178     b'infinitepush',
       
   179     b'indextype',
       
   180     default=b'',
       
   181 )
       
   182 configitem(
       
   183     b'infinitepush',
       
   184     b'indexpath',
       
   185     default=b'',
       
   186 )
       
   187 configitem(
       
   188     b'infinitepush',
       
   189     b'storeallparts',
       
   190     default=False,
       
   191 )
       
   192 configitem(
       
   193     b'infinitepush',
       
   194     b'reponame',
       
   195     default=b'',
       
   196 )
       
   197 configitem(
       
   198     b'scratchbranch',
       
   199     b'storepath',
       
   200     default=b'',
       
   201 )
       
   202 configitem(
       
   203     b'infinitepush',
       
   204     b'branchpattern',
       
   205     default=b'',
       
   206 )
       
   207 configitem(
       
   208     b'infinitepush',
       
   209     b'pushtobundlestore',
       
   210     default=False,
       
   211 )
       
   212 configitem(
       
   213     b'experimental',
       
   214     b'server-bundlestore-bookmark',
       
   215     default=b'',
       
   216 )
       
   217 configitem(
       
   218     b'experimental',
       
   219     b'infinitepush-scratchpush',
       
   220     default=False,
       
   221 )
       
   222 
       
   223 experimental = b'experimental'
       
   224 configbookmark = b'server-bundlestore-bookmark'
       
   225 configscratchpush = b'infinitepush-scratchpush'
       
   226 
       
   227 scratchbranchparttype = bundleparts.scratchbranchparttype
       
   228 revsetpredicate = registrar.revsetpredicate()
       
   229 templatekeyword = registrar.templatekeyword()
       
   230 _scratchbranchmatcher = lambda x: False
       
   231 _maybehash = re.compile('^[a-f0-9]+$').search
       
   232 
       
   233 
       
   234 def _buildexternalbundlestore(ui):
       
   235     put_args = ui.configlist(b'infinitepush', b'put_args', [])
       
   236     put_binary = ui.config(b'infinitepush', b'put_binary')
       
   237     if not put_binary:
       
   238         raise error.Abort(b'put binary is not specified')
       
   239     get_args = ui.configlist(b'infinitepush', b'get_args', [])
       
   240     get_binary = ui.config(b'infinitepush', b'get_binary')
       
   241     if not get_binary:
       
   242         raise error.Abort(b'get binary is not specified')
       
   243     from . import store
       
   244 
       
   245     return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
       
   246 
       
   247 
       
   248 def _buildsqlindex(ui):
       
   249     sqlhost = ui.config(b'infinitepush', b'sqlhost')
       
   250     if not sqlhost:
       
   251         raise error.Abort(_(b'please set infinitepush.sqlhost'))
       
   252     host, port, db, user, password = sqlhost.split(b':')
       
   253     reponame = ui.config(b'infinitepush', b'reponame')
       
   254     if not reponame:
       
   255         raise error.Abort(_(b'please set infinitepush.reponame'))
       
   256 
       
   257     logfile = ui.config(b'infinitepush', b'logfile', b'')
       
   258     waittimeout = ui.configint(b'infinitepush', b'waittimeout', 300)
       
   259     locktimeout = ui.configint(b'infinitepush', b'locktimeout', 120)
       
   260     from . import sqlindexapi
       
   261 
       
   262     return sqlindexapi.sqlindexapi(
       
   263         reponame,
       
   264         host,
       
   265         port,
       
   266         db,
       
   267         user,
       
   268         password,
       
   269         logfile,
       
   270         _getloglevel(ui),
       
   271         waittimeout=waittimeout,
       
   272         locktimeout=locktimeout,
       
   273     )
       
   274 
       
   275 
       
   276 def _getloglevel(ui):
       
   277     loglevel = ui.config(b'infinitepush', b'loglevel', b'DEBUG')
       
   278     numeric_loglevel = getattr(logging, loglevel.upper(), None)
       
   279     if not isinstance(numeric_loglevel, int):
       
   280         raise error.Abort(_(b'invalid log level %s') % loglevel)
       
   281     return numeric_loglevel
       
   282 
       
   283 
       
   284 def _tryhoist(ui, remotebookmark):
       
   285     """returns a bookmarks with hoisted part removed
       
   286 
       
   287     Remotenames extension has a 'hoist' config that allows to use remote
       
   288     bookmarks without specifying remote path. For example, 'hg update master'
       
   289     works as well as 'hg update remote/master'. We want to allow the same in
       
   290     infinitepush.
       
   291     """
       
   292 
       
   293     if common.isremotebooksenabled(ui):
       
   294         hoist = ui.config(b'remotenames', b'hoistedpeer') + b'/'
       
   295         if remotebookmark.startswith(hoist):
       
   296             return remotebookmark[len(hoist) :]
       
   297     return remotebookmark
       
   298 
       
   299 
       
   300 class bundlestore:
       
   301     def __init__(self, repo):
       
   302         self._repo = repo
       
   303         storetype = self._repo.ui.config(b'infinitepush', b'storetype')
       
   304         if storetype == b'disk':
       
   305             from . import store
       
   306 
       
   307             self.store = store.filebundlestore(self._repo.ui, self._repo)
       
   308         elif storetype == b'external':
       
   309             self.store = _buildexternalbundlestore(self._repo.ui)
       
   310         else:
       
   311             raise error.Abort(
       
   312                 _(b'unknown infinitepush store type specified %s') % storetype
       
   313             )
       
   314 
       
   315         indextype = self._repo.ui.config(b'infinitepush', b'indextype')
       
   316         if indextype == b'disk':
       
   317             from . import fileindexapi
       
   318 
       
   319             self.index = fileindexapi.fileindexapi(self._repo)
       
   320         elif indextype == b'sql':
       
   321             self.index = _buildsqlindex(self._repo.ui)
       
   322         else:
       
   323             raise error.Abort(
       
   324                 _(b'unknown infinitepush index type specified %s') % indextype
       
   325             )
       
   326 
       
   327 
       
   328 def _isserver(ui):
       
   329     return ui.configbool(b'infinitepush', b'server')
       
   330 
       
   331 
       
   332 WARNING_MSG = b"""IMPORTANT: if you use this extension, please contact
       
   333 mercurial-devel@mercurial-scm.org IMMEDIATELY. This extension is believed to be
       
   334 unused and barring learning of users of this functionality, we drop this
       
   335 extension in Mercurial 6.6.
       
   336 """
       
   337 
       
   338 
       
   339 def reposetup(ui, repo):
       
   340     if ui.configbool(b'infinitepush', b'deprecation-message'):
       
   341         ui.write_err(WARNING_MSG)
       
   342     if ui.configbool(b'infinitepush', b'deprecation-abort'):
       
   343         msg = b"USING EXTENSION INFINITE PUSH DESPITE PENDING DROP"
       
   344         hint = b"contact mercurial-devel@mercurial-scm.org"
       
   345         raise error.Abort(msg, hint=hint)
       
   346     if _isserver(ui) and repo.local():
       
   347         repo.bundlestore = bundlestore(repo)
       
   348 
       
   349 
       
   350 def extsetup(ui):
       
   351     commonsetup(ui)
       
   352     if _isserver(ui):
       
   353         serverextsetup(ui)
       
   354     else:
       
   355         clientextsetup(ui)
       
   356 
       
   357 
       
   358 def uipopulate(ui):
       
   359     if not ui.hasconfig(b"experimental", b"changegroup3"):
       
   360         ui.setconfig(b"experimental", b"changegroup3", False, b"infinitepush")
       
   361 
       
   362 
       
   363 def commonsetup(ui):
       
   364     wireprotov1server.commands[b'listkeyspatterns'] = (
       
   365         wireprotolistkeyspatterns,
       
   366         b'namespace patterns',
       
   367     )
       
   368     scratchbranchpat = ui.config(b'infinitepush', b'branchpattern')
       
   369     if scratchbranchpat:
       
   370         global _scratchbranchmatcher
       
   371         kind, pat, _scratchbranchmatcher = stringutil.stringmatcher(
       
   372             scratchbranchpat
       
   373         )
       
   374 
       
   375 
       
   376 def serverextsetup(ui):
       
   377     origpushkeyhandler = bundle2.parthandlermapping[b'pushkey']
       
   378 
       
   379     def newpushkeyhandler(*args, **kwargs):
       
   380         bundle2pushkey(origpushkeyhandler, *args, **kwargs)
       
   381 
       
   382     newpushkeyhandler.params = origpushkeyhandler.params
       
   383     bundle2.parthandlermapping[b'pushkey'] = newpushkeyhandler
       
   384 
       
   385     orighandlephasehandler = bundle2.parthandlermapping[b'phase-heads']
       
   386     newphaseheadshandler = lambda *args, **kwargs: bundle2handlephases(
       
   387         orighandlephasehandler, *args, **kwargs
       
   388     )
       
   389     newphaseheadshandler.params = orighandlephasehandler.params
       
   390     bundle2.parthandlermapping[b'phase-heads'] = newphaseheadshandler
       
   391 
       
   392     extensions.wrapfunction(
       
   393         localrepo.localrepository, 'listkeys', localrepolistkeys
       
   394     )
       
   395     wireprotov1server.commands[b'lookup'] = (
       
   396         _lookupwrap(wireprotov1server.commands[b'lookup'][0]),
       
   397         b'key',
       
   398     )
       
   399     extensions.wrapfunction(exchange, 'getbundlechunks', getbundlechunks)
       
   400 
       
   401     extensions.wrapfunction(bundle2, 'processparts', processparts)
       
   402 
       
   403 
       
   404 def clientextsetup(ui):
       
   405     entry = extensions.wrapcommand(commands.table, b'push', _push)
       
   406 
       
   407     entry[1].append(
       
   408         (
       
   409             b'',
       
   410             b'bundle-store',
       
   411             None,
       
   412             _(b'force push to go to bundle store (EXPERIMENTAL)'),
       
   413         )
       
   414     )
       
   415 
       
   416     extensions.wrapcommand(commands.table, b'pull', _pull)
       
   417 
       
   418     extensions.wrapfunction(discovery, 'checkheads', _checkheads)
       
   419 
       
   420     wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
       
   421 
       
   422     partorder = exchange.b2partsgenorder
       
   423     index = partorder.index(b'changeset')
       
   424     partorder.insert(
       
   425         index, partorder.pop(partorder.index(scratchbranchparttype))
       
   426     )
       
   427 
       
   428 
       
   429 def _checkheads(orig, pushop):
       
   430     if pushop.ui.configbool(experimental, configscratchpush, False):
       
   431         return
       
   432     return orig(pushop)
       
   433 
       
   434 
       
   435 def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
       
   436     patterns = wireprototypes.decodelist(patterns)
       
   437     d = repo.listkeys(encoding.tolocal(namespace), patterns).items()
       
   438     return pushkey.encodekeys(d)
       
   439 
       
   440 
       
   441 def localrepolistkeys(orig, self, namespace, patterns=None):
       
   442     if namespace == b'bookmarks' and patterns:
       
   443         index = self.bundlestore.index
       
   444         results = {}
       
   445         bookmarks = orig(self, namespace)
       
   446         for pattern in patterns:
       
   447             results.update(index.getbookmarks(pattern))
       
   448             if pattern.endswith(b'*'):
       
   449                 pattern = b're:^' + pattern[:-1] + b'.*'
       
   450             kind, pat, matcher = stringutil.stringmatcher(pattern)
       
   451             for bookmark, node in bookmarks.items():
       
   452                 if matcher(bookmark):
       
   453                     results[bookmark] = node
       
   454         return results
       
   455     else:
       
   456         return orig(self, namespace)
       
   457 
       
   458 
       
   459 @wireprotov1peer.batchable
       
   460 def listkeyspatterns(self, namespace, patterns):
       
   461     if not self.capable(b'pushkey'):
       
   462         return {}, None
       
   463     self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
       
   464 
       
   465     def decode(d):
       
   466         self.ui.debug(
       
   467             b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
       
   468         )
       
   469         return pushkey.decodekeys(d)
       
   470 
       
   471     return {
       
   472         b'namespace': encoding.fromlocal(namespace),
       
   473         b'patterns': wireprototypes.encodelist(patterns),
       
   474     }, decode
       
   475 
       
   476 
       
   477 def _readbundlerevs(bundlerepo):
       
   478     return list(bundlerepo.revs(b'bundle()'))
       
   479 
       
   480 
       
   481 def _includefilelogstobundle(bundlecaps, bundlerepo, bundlerevs, ui):
       
   482     """Tells remotefilelog to include all changed files to the changegroup
       
   483 
       
   484     By default remotefilelog doesn't include file content to the changegroup.
       
   485     But we need to include it if we are fetching from bundlestore.
       
   486     """
       
   487     changedfiles = set()
       
   488     cl = bundlerepo.changelog
       
   489     for r in bundlerevs:
       
   490         # [3] means changed files
       
   491         changedfiles.update(cl.read(r)[3])
       
   492     if not changedfiles:
       
   493         return bundlecaps
       
   494 
       
   495     changedfiles = b'\0'.join(changedfiles)
       
   496     newcaps = []
       
   497     appended = False
       
   498     for cap in bundlecaps or []:
       
   499         if cap.startswith(b'excludepattern='):
       
   500             newcaps.append(b'\0'.join((cap, changedfiles)))
       
   501             appended = True
       
   502         else:
       
   503             newcaps.append(cap)
       
   504     if not appended:
       
   505         # Not found excludepattern cap. Just append it
       
   506         newcaps.append(b'excludepattern=' + changedfiles)
       
   507 
       
   508     return newcaps
       
   509 
       
   510 
       
   511 def _rebundle(bundlerepo, bundleroots, unknownhead):
       
   512     """
       
   513     Bundle may include more revision then user requested. For example,
       
   514     if user asks for revision but bundle also consists its descendants.
       
   515     This function will filter out all revision that user is not requested.
       
   516     """
       
   517     parts = []
       
   518 
       
   519     version = b'02'
       
   520     outgoing = discovery.outgoing(
       
   521         bundlerepo, commonheads=bundleroots, ancestorsof=[unknownhead]
       
   522     )
       
   523     cgstream = changegroup.makestream(bundlerepo, outgoing, version, b'pull')
       
   524     cgstream = util.chunkbuffer(cgstream).read()
       
   525     cgpart = bundle2.bundlepart(b'changegroup', data=cgstream)
       
   526     cgpart.addparam(b'version', version)
       
   527     parts.append(cgpart)
       
   528 
       
   529     return parts
       
   530 
       
   531 
       
   532 def _getbundleroots(oldrepo, bundlerepo, bundlerevs):
       
   533     cl = bundlerepo.changelog
       
   534     bundleroots = []
       
   535     for rev in bundlerevs:
       
   536         node = cl.node(rev)
       
   537         parents = cl.parents(node)
       
   538         for parent in parents:
       
   539             # include all revs that exist in the main repo
       
   540             # to make sure that bundle may apply client-side
       
   541             if parent in oldrepo:
       
   542                 bundleroots.append(parent)
       
   543     return bundleroots
       
   544 
       
   545 
       
   546 def _needsrebundling(head, bundlerepo):
       
   547     bundleheads = list(bundlerepo.revs(b'heads(bundle())'))
       
   548     return not (
       
   549         len(bundleheads) == 1 and bundlerepo[bundleheads[0]].node() == head
       
   550     )
       
   551 
       
   552 
       
   553 def _generateoutputparts(head, bundlerepo, bundleroots, bundlefile):
       
   554     """generates bundle that will be send to the user
       
   555 
       
   556     returns tuple with raw bundle string and bundle type
       
   557     """
       
   558     parts = []
       
   559     if not _needsrebundling(head, bundlerepo):
       
   560         with util.posixfile(bundlefile, b"rb") as f:
       
   561             unbundler = exchange.readbundle(bundlerepo.ui, f, bundlefile)
       
   562             if isinstance(unbundler, changegroup.cg1unpacker):
       
   563                 part = bundle2.bundlepart(
       
   564                     b'changegroup', data=unbundler._stream.read()
       
   565                 )
       
   566                 part.addparam(b'version', b'01')
       
   567                 parts.append(part)
       
   568             elif isinstance(unbundler, bundle2.unbundle20):
       
   569                 haschangegroup = False
       
   570                 for part in unbundler.iterparts():
       
   571                     if part.type == b'changegroup':
       
   572                         haschangegroup = True
       
   573                     newpart = bundle2.bundlepart(part.type, data=part.read())
       
   574                     for key, value in part.params.items():
       
   575                         newpart.addparam(key, value)
       
   576                     parts.append(newpart)
       
   577 
       
   578                 if not haschangegroup:
       
   579                     raise error.Abort(
       
   580                         b'unexpected bundle without changegroup part, '
       
   581                         + b'head: %s' % hex(head),
       
   582                         hint=b'report to administrator',
       
   583                     )
       
   584             else:
       
   585                 raise error.Abort(b'unknown bundle type')
       
   586     else:
       
   587         parts = _rebundle(bundlerepo, bundleroots, head)
       
   588 
       
   589     return parts
       
   590 
       
   591 
       
   592 def getbundlechunks(orig, repo, source, heads=None, bundlecaps=None, **kwargs):
       
   593     heads = heads or []
       
   594     # newheads are parents of roots of scratch bundles that were requested
       
   595     newphases = {}
       
   596     scratchbundles = []
       
   597     newheads = []
       
   598     scratchheads = []
       
   599     nodestobundle = {}
       
   600     allbundlestocleanup = []
       
   601     try:
       
   602         for head in heads:
       
   603             if not repo.changelog.index.has_node(head):
       
   604                 if head not in nodestobundle:
       
   605                     newbundlefile = common.downloadbundle(repo, head)
       
   606                     bundlepath = b"bundle:%s+%s" % (repo.root, newbundlefile)
       
   607                     bundlerepo = hg.repository(repo.ui, bundlepath)
       
   608 
       
   609                     allbundlestocleanup.append((bundlerepo, newbundlefile))
       
   610                     bundlerevs = set(_readbundlerevs(bundlerepo))
       
   611                     bundlecaps = _includefilelogstobundle(
       
   612                         bundlecaps, bundlerepo, bundlerevs, repo.ui
       
   613                     )
       
   614                     cl = bundlerepo.changelog
       
   615                     bundleroots = _getbundleroots(repo, bundlerepo, bundlerevs)
       
   616                     for rev in bundlerevs:
       
   617                         node = cl.node(rev)
       
   618                         newphases[hex(node)] = str(phases.draft)
       
   619                         nodestobundle[node] = (
       
   620                             bundlerepo,
       
   621                             bundleroots,
       
   622                             newbundlefile,
       
   623                         )
       
   624 
       
   625                 scratchbundles.append(
       
   626                     _generateoutputparts(head, *nodestobundle[head])
       
   627                 )
       
   628                 newheads.extend(bundleroots)
       
   629                 scratchheads.append(head)
       
   630     finally:
       
   631         for bundlerepo, bundlefile in allbundlestocleanup:
       
   632             bundlerepo.close()
       
   633             try:
       
   634                 os.unlink(bundlefile)
       
   635             except (IOError, OSError):
       
   636                 # if we can't cleanup the file then just ignore the error,
       
   637                 # no need to fail
       
   638                 pass
       
   639 
       
   640     pullfrombundlestore = bool(scratchbundles)
       
   641     wrappedchangegrouppart = False
       
   642     wrappedlistkeys = False
       
   643     oldchangegrouppart = exchange.getbundle2partsmapping[b'changegroup']
       
   644     try:
       
   645 
       
   646         def _changegrouppart(bundler, *args, **kwargs):
       
   647             # Order is important here. First add non-scratch part
       
   648             # and only then add parts with scratch bundles because
       
   649             # non-scratch part contains parents of roots of scratch bundles.
       
   650             result = oldchangegrouppart(bundler, *args, **kwargs)
       
   651             for bundle in scratchbundles:
       
   652                 for part in bundle:
       
   653                     bundler.addpart(part)
       
   654             return result
       
   655 
       
   656         exchange.getbundle2partsmapping[b'changegroup'] = _changegrouppart
       
   657         wrappedchangegrouppart = True
       
   658 
       
   659         def _listkeys(orig, self, namespace):
       
   660             origvalues = orig(self, namespace)
       
   661             if namespace == b'phases' and pullfrombundlestore:
       
   662                 if origvalues.get(b'publishing') == b'True':
       
   663                     # Make repo non-publishing to preserve draft phase
       
   664                     del origvalues[b'publishing']
       
   665                 origvalues.update(newphases)
       
   666             return origvalues
       
   667 
       
   668         extensions.wrapfunction(
       
   669             localrepo.localrepository, 'listkeys', _listkeys
       
   670         )
       
   671         wrappedlistkeys = True
       
   672         heads = list((set(newheads) | set(heads)) - set(scratchheads))
       
   673         result = orig(
       
   674             repo, source, heads=heads, bundlecaps=bundlecaps, **kwargs
       
   675         )
       
   676     finally:
       
   677         if wrappedchangegrouppart:
       
   678             exchange.getbundle2partsmapping[b'changegroup'] = oldchangegrouppart
       
   679         if wrappedlistkeys:
       
   680             extensions.unwrapfunction(
       
   681                 localrepo.localrepository, 'listkeys', _listkeys
       
   682             )
       
   683     return result
       
   684 
       
   685 
       
   686 def _lookupwrap(orig):
       
   687     def _lookup(repo, proto, key):
       
   688         localkey = encoding.tolocal(key)
       
   689 
       
   690         if isinstance(localkey, str) and _scratchbranchmatcher(localkey):
       
   691             scratchnode = repo.bundlestore.index.getnode(localkey)
       
   692             if scratchnode:
       
   693                 return b"%d %s\n" % (1, scratchnode)
       
   694             else:
       
   695                 return b"%d %s\n" % (
       
   696                     0,
       
   697                     b'scratch branch %s not found' % localkey,
       
   698                 )
       
   699         else:
       
   700             try:
       
   701                 r = hex(repo.lookup(localkey))
       
   702                 return b"%d %s\n" % (1, r)
       
   703             except Exception as inst:
       
   704                 if repo.bundlestore.index.getbundle(localkey):
       
   705                     return b"%d %s\n" % (1, localkey)
       
   706                 else:
       
   707                     r = stringutil.forcebytestr(inst)
       
   708                     return b"%d %s\n" % (0, r)
       
   709 
       
   710     return _lookup
       
   711 
       
   712 
       
   713 def _pull(orig, ui, repo, source=b"default", **opts):
       
   714     opts = pycompat.byteskwargs(opts)
       
   715     # Copy paste from `pull` command
       
   716     path = urlutil.get_unique_pull_path_obj(
       
   717         b"infinite-push's pull",
       
   718         ui,
       
   719         source,
       
   720     )
       
   721 
       
   722     scratchbookmarks = {}
       
   723     unfi = repo.unfiltered()
       
   724     unknownnodes = []
       
   725     for rev in opts.get(b'rev', []):
       
   726         if rev not in unfi:
       
   727             unknownnodes.append(rev)
       
   728     if opts.get(b'bookmark'):
       
   729         bookmarks = []
       
   730         revs = opts.get(b'rev') or []
       
   731         for bookmark in opts.get(b'bookmark'):
       
   732             if _scratchbranchmatcher(bookmark):
       
   733                 # rev is not known yet
       
   734                 # it will be fetched with listkeyspatterns next
       
   735                 scratchbookmarks[bookmark] = b'REVTOFETCH'
       
   736             else:
       
   737                 bookmarks.append(bookmark)
       
   738 
       
   739         if scratchbookmarks:
       
   740             other = hg.peer(repo, opts, path)
       
   741             try:
       
   742                 fetchedbookmarks = other.listkeyspatterns(
       
   743                     b'bookmarks', patterns=scratchbookmarks
       
   744                 )
       
   745                 for bookmark in scratchbookmarks:
       
   746                     if bookmark not in fetchedbookmarks:
       
   747                         raise error.Abort(
       
   748                             b'remote bookmark %s not found!' % bookmark
       
   749                         )
       
   750                     scratchbookmarks[bookmark] = fetchedbookmarks[bookmark]
       
   751                     revs.append(fetchedbookmarks[bookmark])
       
   752             finally:
       
   753                 other.close()
       
   754         opts[b'bookmark'] = bookmarks
       
   755         opts[b'rev'] = revs
       
   756 
       
   757     if scratchbookmarks or unknownnodes:
       
   758         # Set anyincoming to True
       
   759         extensions.wrapfunction(
       
   760             discovery, 'findcommonincoming', _findcommonincoming
       
   761         )
       
   762     try:
       
   763         # Remote scratch bookmarks will be deleted because remotenames doesn't
       
   764         # know about them. Let's save it before pull and restore after
       
   765         remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, path.loc)
       
   766         result = orig(ui, repo, path.loc, **pycompat.strkwargs(opts))
       
   767         # TODO(stash): race condition is possible
       
   768         # if scratch bookmarks was updated right after orig.
       
   769         # But that's unlikely and shouldn't be harmful.
       
   770         if common.isremotebooksenabled(ui):
       
   771             remotescratchbookmarks.update(scratchbookmarks)
       
   772             _saveremotebookmarks(repo, remotescratchbookmarks, path.loc)
       
   773         else:
       
   774             _savelocalbookmarks(repo, scratchbookmarks)
       
   775         return result
       
   776     finally:
       
   777         if scratchbookmarks:
       
   778             extensions.unwrapfunction(discovery, 'findcommonincoming')
       
   779 
       
   780 
       
   781 def _readscratchremotebookmarks(ui, repo, other):
       
   782     if common.isremotebooksenabled(ui):
       
   783         remotenamesext = extensions.find(b'remotenames')
       
   784         remotepath = remotenamesext.activepath(repo.ui, other)
       
   785         result = {}
       
   786         # Let's refresh remotenames to make sure we have it up to date
       
   787         # Seems that `repo.names['remotebookmarks']` may return stale bookmarks
       
   788         # and it results in deleting scratch bookmarks. Our best guess how to
       
   789         # fix it is to use `clearnames()`
       
   790         repo._remotenames.clearnames()
       
   791         for remotebookmark in repo.names[b'remotebookmarks'].listnames(repo):
       
   792             path, bookname = remotenamesext.splitremotename(remotebookmark)
       
   793             if path == remotepath and _scratchbranchmatcher(bookname):
       
   794                 nodes = repo.names[b'remotebookmarks'].nodes(
       
   795                     repo, remotebookmark
       
   796                 )
       
   797                 if nodes:
       
   798                     result[bookname] = hex(nodes[0])
       
   799         return result
       
   800     else:
       
   801         return {}
       
   802 
       
   803 
       
   804 def _saveremotebookmarks(repo, newbookmarks, remote):
       
   805     remotenamesext = extensions.find(b'remotenames')
       
   806     remotepath = remotenamesext.activepath(repo.ui, remote)
       
   807     branches = collections.defaultdict(list)
       
   808     bookmarks = {}
       
   809     remotenames = remotenamesext.readremotenames(repo)
       
   810     for hexnode, nametype, remote, rname in remotenames:
       
   811         if remote != remotepath:
       
   812             continue
       
   813         if nametype == b'bookmarks':
       
   814             if rname in newbookmarks:
       
   815                 # It's possible if we have a normal bookmark that matches
       
   816                 # scratch branch pattern. In this case just use the current
       
   817                 # bookmark node
       
   818                 del newbookmarks[rname]
       
   819             bookmarks[rname] = hexnode
       
   820         elif nametype == b'branches':
       
   821             # saveremotenames expects 20 byte binary nodes for branches
       
   822             branches[rname].append(bin(hexnode))
       
   823 
       
   824     for bookmark, hexnode in newbookmarks.items():
       
   825         bookmarks[bookmark] = hexnode
       
   826     remotenamesext.saveremotenames(repo, remotepath, branches, bookmarks)
       
   827 
       
   828 
       
   829 def _savelocalbookmarks(repo, bookmarks):
       
   830     if not bookmarks:
       
   831         return
       
   832     with repo.wlock(), repo.lock(), repo.transaction(b'bookmark') as tr:
       
   833         changes = []
       
   834         for scratchbook, node in bookmarks.items():
       
   835             changectx = repo[node]
       
   836             changes.append((scratchbook, changectx.node()))
       
   837         repo._bookmarks.applychanges(repo, tr, changes)
       
   838 
       
   839 
       
   840 def _findcommonincoming(orig, *args, **kwargs):
       
   841     common, inc, remoteheads = orig(*args, **kwargs)
       
   842     return common, True, remoteheads
       
   843 
       
   844 
       
   845 def _push(orig, ui, repo, *dests, **opts):
       
   846     opts = pycompat.byteskwargs(opts)
       
   847     bookmark = opts.get(b'bookmark')
       
   848     # we only support pushing one infinitepush bookmark at once
       
   849     if len(bookmark) == 1:
       
   850         bookmark = bookmark[0]
       
   851     else:
       
   852         bookmark = b''
       
   853 
       
   854     oldphasemove = None
       
   855     overrides = {(experimental, configbookmark): bookmark}
       
   856 
       
   857     with ui.configoverride(overrides, b'infinitepush'):
       
   858         scratchpush = opts.get(b'bundle_store')
       
   859         if _scratchbranchmatcher(bookmark):
       
   860             scratchpush = True
       
   861             # bundle2 can be sent back after push (for example, bundle2
       
   862             # containing `pushkey` part to update bookmarks)
       
   863             ui.setconfig(experimental, b'bundle2.pushback', True)
       
   864 
       
   865         if scratchpush:
       
   866             # this is an infinitepush, we don't want the bookmark to be applied
       
   867             # rather that should be stored in the bundlestore
       
   868             opts[b'bookmark'] = []
       
   869             ui.setconfig(experimental, configscratchpush, True)
       
   870             oldphasemove = extensions.wrapfunction(
       
   871                 exchange, '_localphasemove', _phasemove
       
   872             )
       
   873 
       
   874         paths = list(urlutil.get_push_paths(repo, ui, dests))
       
   875         if len(paths) > 1:
       
   876             msg = _(b'cannot push to multiple path with infinitepush')
       
   877             raise error.Abort(msg)
       
   878 
       
   879         path = paths[0]
       
   880         destpath = path.loc
       
   881         # Remote scratch bookmarks will be deleted because remotenames doesn't
       
   882         # know about them. Let's save it before push and restore after
       
   883         remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, destpath)
       
   884         result = orig(ui, repo, *dests, **pycompat.strkwargs(opts))
       
   885         if common.isremotebooksenabled(ui):
       
   886             if bookmark and scratchpush:
       
   887                 other = hg.peer(repo, opts, path)
       
   888                 try:
       
   889                     fetchedbookmarks = other.listkeyspatterns(
       
   890                         b'bookmarks', patterns=[bookmark]
       
   891                     )
       
   892                     remotescratchbookmarks.update(fetchedbookmarks)
       
   893                 finally:
       
   894                     other.close()
       
   895             _saveremotebookmarks(repo, remotescratchbookmarks, destpath)
       
   896     if oldphasemove:
       
   897         exchange._localphasemove = oldphasemove
       
   898     return result
       
   899 
       
   900 
       
   901 def _deleteinfinitepushbookmarks(ui, repo, path, names):
       
   902     """Prune remote names by removing the bookmarks we don't want anymore,
       
   903     then writing the result back to disk
       
   904     """
       
   905     remotenamesext = extensions.find(b'remotenames')
       
   906 
       
   907     # remotename format is:
       
   908     # (node, nametype ("branches" or "bookmarks"), remote, name)
       
   909     nametype_idx = 1
       
   910     remote_idx = 2
       
   911     name_idx = 3
       
   912     remotenames = [
       
   913         remotename
       
   914         for remotename in remotenamesext.readremotenames(repo)
       
   915         if remotename[remote_idx] == path
       
   916     ]
       
   917     remote_bm_names = [
       
   918         remotename[name_idx]
       
   919         for remotename in remotenames
       
   920         if remotename[nametype_idx] == b"bookmarks"
       
   921     ]
       
   922 
       
   923     for name in names:
       
   924         if name not in remote_bm_names:
       
   925             raise error.Abort(
       
   926                 _(
       
   927                     b"infinitepush bookmark '{}' does not exist "
       
   928                     b"in path '{}'"
       
   929                 ).format(name, path)
       
   930             )
       
   931 
       
   932     bookmarks = {}
       
   933     branches = collections.defaultdict(list)
       
   934     for node, nametype, remote, name in remotenames:
       
   935         if nametype == b"bookmarks" and name not in names:
       
   936             bookmarks[name] = node
       
   937         elif nametype == b"branches":
       
   938             # saveremotenames wants binary nodes for branches
       
   939             branches[name].append(bin(node))
       
   940 
       
   941     remotenamesext.saveremotenames(repo, path, branches, bookmarks)
       
   942 
       
   943 
       
   944 def _phasemove(orig, pushop, nodes, phase=phases.public):
       
   945     """prevent commits from being marked public
       
   946 
       
   947     Since these are going to a scratch branch, they aren't really being
       
   948     published."""
       
   949 
       
   950     if phase != phases.public:
       
   951         orig(pushop, nodes, phase)
       
   952 
       
   953 
       
   954 @exchange.b2partsgenerator(scratchbranchparttype)
       
   955 def partgen(pushop, bundler):
       
   956     bookmark = pushop.ui.config(experimental, configbookmark)
       
   957     scratchpush = pushop.ui.configbool(experimental, configscratchpush)
       
   958     if b'changesets' in pushop.stepsdone or not scratchpush:
       
   959         return
       
   960 
       
   961     if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
       
   962         return
       
   963 
       
   964     pushop.stepsdone.add(b'changesets')
       
   965     if not pushop.outgoing.missing:
       
   966         pushop.ui.status(_(b'no changes found\n'))
       
   967         pushop.cgresult = 0
       
   968         return
       
   969 
       
   970     # This parameter tells the server that the following bundle is an
       
   971     # infinitepush. This let's it switch the part processing to our infinitepush
       
   972     # code path.
       
   973     bundler.addparam(b"infinitepush", b"True")
       
   974 
       
   975     scratchparts = bundleparts.getscratchbranchparts(
       
   976         pushop.repo, pushop.remote, pushop.outgoing, pushop.ui, bookmark
       
   977     )
       
   978 
       
   979     for scratchpart in scratchparts:
       
   980         bundler.addpart(scratchpart)
       
   981 
       
   982     def handlereply(op):
       
   983         # server either succeeds or aborts; no code to read
       
   984         pushop.cgresult = 1
       
   985 
       
   986     return handlereply
       
   987 
       
   988 
       
   989 bundle2.capabilities[bundleparts.scratchbranchparttype] = ()
       
   990 
       
   991 
       
   992 def _getrevs(bundle, oldnode, force, bookmark):
       
   993     b'extracts and validates the revs to be imported'
       
   994     revs = [bundle[r] for r in bundle.revs(b'sort(bundle())')]
       
   995 
       
   996     # new bookmark
       
   997     if oldnode is None:
       
   998         return revs
       
   999 
       
  1000     # Fast forward update
       
  1001     if oldnode in bundle and list(bundle.set(b'bundle() & %s::', oldnode)):
       
  1002         return revs
       
  1003 
       
  1004     return revs
       
  1005 
       
  1006 
       
  1007 @contextlib.contextmanager
       
  1008 def logservicecall(logger, service, **kwargs):
       
  1009     start = time.time()
       
  1010     logger(service, eventtype=b'start', **kwargs)
       
  1011     try:
       
  1012         yield
       
  1013         logger(
       
  1014             service,
       
  1015             eventtype=b'success',
       
  1016             elapsedms=(time.time() - start) * 1000,
       
  1017             **kwargs
       
  1018         )
       
  1019     except Exception as e:
       
  1020         logger(
       
  1021             service,
       
  1022             eventtype=b'failure',
       
  1023             elapsedms=(time.time() - start) * 1000,
       
  1024             errormsg=stringutil.forcebytestr(e),
       
  1025             **kwargs
       
  1026         )
       
  1027         raise
       
  1028 
       
  1029 
       
  1030 def _getorcreateinfinitepushlogger(op):
       
  1031     logger = op.records[b'infinitepushlogger']
       
  1032     if not logger:
       
  1033         ui = op.repo.ui
       
  1034         try:
       
  1035             username = procutil.getuser()
       
  1036         except Exception:
       
  1037             username = b'unknown'
       
  1038         # Generate random request id to be able to find all logged entries
       
  1039         # for the same request. Since requestid is pseudo-generated it may
       
  1040         # not be unique, but we assume that (hostname, username, requestid)
       
  1041         # is unique.
       
  1042         random.seed()
       
  1043         requestid = random.randint(0, 2000000000)
       
  1044         hostname = socket.gethostname()
       
  1045         logger = functools.partial(
       
  1046             ui.log,
       
  1047             b'infinitepush',
       
  1048             user=username,
       
  1049             requestid=requestid,
       
  1050             hostname=hostname,
       
  1051             reponame=ui.config(b'infinitepush', b'reponame'),
       
  1052         )
       
  1053         op.records.add(b'infinitepushlogger', logger)
       
  1054     else:
       
  1055         logger = logger[0]
       
  1056     return logger
       
  1057 
       
  1058 
       
  1059 def storetobundlestore(orig, repo, op, unbundler):
       
  1060     """stores the incoming bundle coming from push command to the bundlestore
       
  1061     instead of applying on the revlogs"""
       
  1062 
       
  1063     repo.ui.status(_(b"storing changesets on the bundlestore\n"))
       
  1064     bundler = bundle2.bundle20(repo.ui)
       
  1065 
       
  1066     # processing each part and storing it in bundler
       
  1067     with bundle2.partiterator(repo, op, unbundler) as parts:
       
  1068         for part in parts:
       
  1069             bundlepart = None
       
  1070             if part.type == b'replycaps':
       
  1071                 # This configures the current operation to allow reply parts.
       
  1072                 bundle2._processpart(op, part)
       
  1073             else:
       
  1074                 bundlepart = bundle2.bundlepart(part.type, data=part.read())
       
  1075                 for key, value in part.params.items():
       
  1076                     bundlepart.addparam(key, value)
       
  1077 
       
  1078                 # Certain parts require a response
       
  1079                 if part.type in (b'pushkey', b'changegroup'):
       
  1080                     if op.reply is not None:
       
  1081                         rpart = op.reply.newpart(b'reply:%s' % part.type)
       
  1082                         rpart.addparam(
       
  1083                             b'in-reply-to', b'%d' % part.id, mandatory=False
       
  1084                         )
       
  1085                         rpart.addparam(b'return', b'1', mandatory=False)
       
  1086 
       
  1087             op.records.add(
       
  1088                 part.type,
       
  1089                 {
       
  1090                     b'return': 1,
       
  1091                 },
       
  1092             )
       
  1093             if bundlepart:
       
  1094                 bundler.addpart(bundlepart)
       
  1095 
       
  1096     # storing the bundle in the bundlestore
       
  1097     buf = util.chunkbuffer(bundler.getchunks())
       
  1098     fd, bundlefile = pycompat.mkstemp()
       
  1099     try:
       
  1100         try:
       
  1101             fp = os.fdopen(fd, 'wb')
       
  1102             fp.write(buf.read())
       
  1103         finally:
       
  1104             fp.close()
       
  1105         storebundle(op, {}, bundlefile)
       
  1106     finally:
       
  1107         try:
       
  1108             os.unlink(bundlefile)
       
  1109         except Exception:
       
  1110             # we would rather see the original exception
       
  1111             pass
       
  1112 
       
  1113 
       
  1114 def processparts(orig, repo, op, unbundler):
       
  1115 
       
  1116     # make sure we don't wrap processparts in case of `hg unbundle`
       
  1117     if op.source == b'unbundle':
       
  1118         return orig(repo, op, unbundler)
       
  1119 
       
  1120     # this server routes each push to bundle store
       
  1121     if repo.ui.configbool(b'infinitepush', b'pushtobundlestore'):
       
  1122         return storetobundlestore(orig, repo, op, unbundler)
       
  1123 
       
  1124     if unbundler.params.get(b'infinitepush') != b'True':
       
  1125         return orig(repo, op, unbundler)
       
  1126 
       
  1127     handleallparts = repo.ui.configbool(b'infinitepush', b'storeallparts')
       
  1128 
       
  1129     bundler = bundle2.bundle20(repo.ui)
       
  1130     cgparams = None
       
  1131     with bundle2.partiterator(repo, op, unbundler) as parts:
       
  1132         for part in parts:
       
  1133             bundlepart = None
       
  1134             if part.type == b'replycaps':
       
  1135                 # This configures the current operation to allow reply parts.
       
  1136                 bundle2._processpart(op, part)
       
  1137             elif part.type == bundleparts.scratchbranchparttype:
       
  1138                 # Scratch branch parts need to be converted to normal
       
  1139                 # changegroup parts, and the extra parameters stored for later
       
  1140                 # when we upload to the store. Eventually those parameters will
       
  1141                 # be put on the actual bundle instead of this part, then we can
       
  1142                 # send a vanilla changegroup instead of the scratchbranch part.
       
  1143                 cgversion = part.params.get(b'cgversion', b'01')
       
  1144                 bundlepart = bundle2.bundlepart(
       
  1145                     b'changegroup', data=part.read()
       
  1146                 )
       
  1147                 bundlepart.addparam(b'version', cgversion)
       
  1148                 cgparams = part.params
       
  1149 
       
  1150                 # If we're not dumping all parts into the new bundle, we need to
       
  1151                 # alert the future pushkey and phase-heads handler to skip
       
  1152                 # the part.
       
  1153                 if not handleallparts:
       
  1154                     op.records.add(
       
  1155                         scratchbranchparttype + b'_skippushkey', True
       
  1156                     )
       
  1157                     op.records.add(
       
  1158                         scratchbranchparttype + b'_skipphaseheads', True
       
  1159                     )
       
  1160             else:
       
  1161                 if handleallparts:
       
  1162                     # Ideally we would not process any parts, and instead just
       
  1163                     # forward them to the bundle for storage, but since this
       
  1164                     # differs from previous behavior, we need to put it behind a
       
  1165                     # config flag for incremental rollout.
       
  1166                     bundlepart = bundle2.bundlepart(part.type, data=part.read())
       
  1167                     for key, value in part.params.items():
       
  1168                         bundlepart.addparam(key, value)
       
  1169 
       
  1170                     # Certain parts require a response
       
  1171                     if part.type == b'pushkey':
       
  1172                         if op.reply is not None:
       
  1173                             rpart = op.reply.newpart(b'reply:pushkey')
       
  1174                             rpart.addparam(
       
  1175                                 b'in-reply-to', str(part.id), mandatory=False
       
  1176                             )
       
  1177                             rpart.addparam(b'return', b'1', mandatory=False)
       
  1178                 else:
       
  1179                     bundle2._processpart(op, part)
       
  1180 
       
  1181             if handleallparts:
       
  1182                 op.records.add(
       
  1183                     part.type,
       
  1184                     {
       
  1185                         b'return': 1,
       
  1186                     },
       
  1187                 )
       
  1188             if bundlepart:
       
  1189                 bundler.addpart(bundlepart)
       
  1190 
       
  1191     # If commits were sent, store them
       
  1192     if cgparams:
       
  1193         buf = util.chunkbuffer(bundler.getchunks())
       
  1194         fd, bundlefile = pycompat.mkstemp()
       
  1195         try:
       
  1196             try:
       
  1197                 fp = os.fdopen(fd, 'wb')
       
  1198                 fp.write(buf.read())
       
  1199             finally:
       
  1200                 fp.close()
       
  1201             storebundle(op, cgparams, bundlefile)
       
  1202         finally:
       
  1203             try:
       
  1204                 os.unlink(bundlefile)
       
  1205             except Exception:
       
  1206                 # we would rather see the original exception
       
  1207                 pass
       
  1208 
       
  1209 
       
  1210 def storebundle(op, params, bundlefile):
       
  1211     log = _getorcreateinfinitepushlogger(op)
       
  1212     parthandlerstart = time.time()
       
  1213     log(scratchbranchparttype, eventtype=b'start')
       
  1214     index = op.repo.bundlestore.index
       
  1215     store = op.repo.bundlestore.store
       
  1216     op.records.add(scratchbranchparttype + b'_skippushkey', True)
       
  1217 
       
  1218     bundle = None
       
  1219     try:  # guards bundle
       
  1220         bundlepath = b"bundle:%s+%s" % (op.repo.root, bundlefile)
       
  1221         bundle = hg.repository(op.repo.ui, bundlepath)
       
  1222 
       
  1223         bookmark = params.get(b'bookmark')
       
  1224         bookprevnode = params.get(b'bookprevnode', b'')
       
  1225         force = params.get(b'force')
       
  1226 
       
  1227         if bookmark:
       
  1228             oldnode = index.getnode(bookmark)
       
  1229         else:
       
  1230             oldnode = None
       
  1231         bundleheads = bundle.revs(b'heads(bundle())')
       
  1232         if bookmark and len(bundleheads) > 1:
       
  1233             raise error.Abort(
       
  1234                 _(b'cannot push more than one head to a scratch branch')
       
  1235             )
       
  1236 
       
  1237         revs = _getrevs(bundle, oldnode, force, bookmark)
       
  1238 
       
  1239         # Notify the user of what is being pushed
       
  1240         plural = b's' if len(revs) > 1 else b''
       
  1241         op.repo.ui.warn(_(b"pushing %d commit%s:\n") % (len(revs), plural))
       
  1242         maxoutput = 10
       
  1243         for i in range(0, min(len(revs), maxoutput)):
       
  1244             firstline = bundle[revs[i]].description().split(b'\n')[0][:50]
       
  1245             op.repo.ui.warn(b"    %s  %s\n" % (revs[i], firstline))
       
  1246 
       
  1247         if len(revs) > maxoutput + 1:
       
  1248             op.repo.ui.warn(b"    ...\n")
       
  1249             firstline = bundle[revs[-1]].description().split(b'\n')[0][:50]
       
  1250             op.repo.ui.warn(b"    %s  %s\n" % (revs[-1], firstline))
       
  1251 
       
  1252         nodesctx = [bundle[rev] for rev in revs]
       
  1253         inindex = lambda rev: bool(index.getbundle(bundle[rev].hex()))
       
  1254         if bundleheads:
       
  1255             newheadscount = sum(not inindex(rev) for rev in bundleheads)
       
  1256         else:
       
  1257             newheadscount = 0
       
  1258         # If there's a bookmark specified, there should be only one head,
       
  1259         # so we choose the last node, which will be that head.
       
  1260         # If a bug or malicious client allows there to be a bookmark
       
  1261         # with multiple heads, we will place the bookmark on the last head.
       
  1262         bookmarknode = nodesctx[-1].hex() if nodesctx else None
       
  1263         key = None
       
  1264         if newheadscount:
       
  1265             with open(bundlefile, b'rb') as f:
       
  1266                 bundledata = f.read()
       
  1267                 with logservicecall(
       
  1268                     log, b'bundlestore', bundlesize=len(bundledata)
       
  1269                 ):
       
  1270                     bundlesizelimit = 100 * 1024 * 1024  # 100 MB
       
  1271                     if len(bundledata) > bundlesizelimit:
       
  1272                         error_msg = (
       
  1273                             b'bundle is too big: %d bytes. '
       
  1274                             + b'max allowed size is 100 MB'
       
  1275                         )
       
  1276                         raise error.Abort(error_msg % (len(bundledata),))
       
  1277                     key = store.write(bundledata)
       
  1278 
       
  1279         with logservicecall(log, b'index', newheadscount=newheadscount), index:
       
  1280             if key:
       
  1281                 index.addbundle(key, nodesctx)
       
  1282             if bookmark:
       
  1283                 index.addbookmark(bookmark, bookmarknode)
       
  1284                 _maybeaddpushbackpart(
       
  1285                     op, bookmark, bookmarknode, bookprevnode, params
       
  1286                 )
       
  1287         log(
       
  1288             scratchbranchparttype,
       
  1289             eventtype=b'success',
       
  1290             elapsedms=(time.time() - parthandlerstart) * 1000,
       
  1291         )
       
  1292 
       
  1293     except Exception as e:
       
  1294         log(
       
  1295             scratchbranchparttype,
       
  1296             eventtype=b'failure',
       
  1297             elapsedms=(time.time() - parthandlerstart) * 1000,
       
  1298             errormsg=stringutil.forcebytestr(e),
       
  1299         )
       
  1300         raise
       
  1301     finally:
       
  1302         if bundle:
       
  1303             bundle.close()
       
  1304 
       
  1305 
       
  1306 @bundle2.parthandler(
       
  1307     scratchbranchparttype,
       
  1308     (
       
  1309         b'bookmark',
       
  1310         b'bookprevnode',
       
  1311         b'force',
       
  1312         b'pushbackbookmarks',
       
  1313         b'cgversion',
       
  1314     ),
       
  1315 )
       
  1316 def bundle2scratchbranch(op, part):
       
  1317     '''unbundle a bundle2 part containing a changegroup to store'''
       
  1318 
       
  1319     bundler = bundle2.bundle20(op.repo.ui)
       
  1320     cgversion = part.params.get(b'cgversion', b'01')
       
  1321     cgpart = bundle2.bundlepart(b'changegroup', data=part.read())
       
  1322     cgpart.addparam(b'version', cgversion)
       
  1323     bundler.addpart(cgpart)
       
  1324     buf = util.chunkbuffer(bundler.getchunks())
       
  1325 
       
  1326     fd, bundlefile = pycompat.mkstemp()
       
  1327     try:
       
  1328         try:
       
  1329             fp = os.fdopen(fd, 'wb')
       
  1330             fp.write(buf.read())
       
  1331         finally:
       
  1332             fp.close()
       
  1333         storebundle(op, part.params, bundlefile)
       
  1334     finally:
       
  1335         try:
       
  1336             os.unlink(bundlefile)
       
  1337         except FileNotFoundError:
       
  1338             pass
       
  1339 
       
  1340     return 1
       
  1341 
       
  1342 
       
  1343 def _maybeaddpushbackpart(op, bookmark, newnode, oldnode, params):
       
  1344     if params.get(b'pushbackbookmarks'):
       
  1345         if op.reply and b'pushback' in op.reply.capabilities:
       
  1346             params = {
       
  1347                 b'namespace': b'bookmarks',
       
  1348                 b'key': bookmark,
       
  1349                 b'new': newnode,
       
  1350                 b'old': oldnode,
       
  1351             }
       
  1352             op.reply.newpart(b'pushkey', mandatoryparams=params.items())
       
  1353 
       
  1354 
       
  1355 def bundle2pushkey(orig, op, part):
       
  1356     """Wrapper of bundle2.handlepushkey()
       
  1357 
       
  1358     The only goal is to skip calling the original function if flag is set.
       
  1359     It's set if infinitepush push is happening.
       
  1360     """
       
  1361     if op.records[scratchbranchparttype + b'_skippushkey']:
       
  1362         if op.reply is not None:
       
  1363             rpart = op.reply.newpart(b'reply:pushkey')
       
  1364             rpart.addparam(b'in-reply-to', str(part.id), mandatory=False)
       
  1365             rpart.addparam(b'return', b'1', mandatory=False)
       
  1366         return 1
       
  1367 
       
  1368     return orig(op, part)
       
  1369 
       
  1370 
       
  1371 def bundle2handlephases(orig, op, part):
       
  1372     """Wrapper of bundle2.handlephases()
       
  1373 
       
  1374     The only goal is to skip calling the original function if flag is set.
       
  1375     It's set if infinitepush push is happening.
       
  1376     """
       
  1377 
       
  1378     if op.records[scratchbranchparttype + b'_skipphaseheads']:
       
  1379         return
       
  1380 
       
  1381     return orig(op, part)
       
  1382 
       
  1383 
       
  1384 def _asyncsavemetadata(root, nodes):
       
  1385     """starts a separate process that fills metadata for the nodes
       
  1386 
       
  1387     This function creates a separate process and doesn't wait for it's
       
  1388     completion. This was done to avoid slowing down pushes
       
  1389     """
       
  1390 
       
  1391     maxnodes = 50
       
  1392     if len(nodes) > maxnodes:
       
  1393         return
       
  1394     nodesargs = []
       
  1395     for node in nodes:
       
  1396         nodesargs.append(b'--node')
       
  1397         nodesargs.append(node)
       
  1398     with open(os.devnull, b'w+b') as devnull:
       
  1399         cmdline = [
       
  1400             util.hgexecutable(),
       
  1401             b'debugfillinfinitepushmetadata',
       
  1402             b'-R',
       
  1403             root,
       
  1404         ] + nodesargs
       
  1405         # Process will run in background. We don't care about the return code
       
  1406         subprocess.Popen(
       
  1407             pycompat.rapply(procutil.tonativestr, cmdline),
       
  1408             close_fds=True,
       
  1409             shell=False,
       
  1410             stdin=devnull,
       
  1411             stdout=devnull,
       
  1412             stderr=devnull,
       
  1413         )