mercurial/exchangev2.py
author Gregory Szorc <gregory.szorc@gmail.com>
Tue, 04 Sep 2018 10:42:24 -0700
changeset 39640 039bf1eddc2e
parent 39638 d292328e0143
child 39641 aa7e312375cf
permissions -rw-r--r--
exchangev2: fetch file revisions Now that the server has an API for fetching file data, we can call into it to fetch file revisions. The implementation is relatively straightforward: we examine the manifests that we fetched and find all new file revisions referenced by them. We build up a mapping from file path to file nodes to manifest node. (The mapping to first manifest node allows us to map back to first changelog node/revision, which is used for the linkrev.) Once that map is built up, we iterate over it in a deterministic manner and fetch and store file data. The code is very similar to manifest fetching. So similar that we could probably extract the common bits into a generic function. With file data retrieval implemented, `hg clone` and `hg pull` are effectively feature complete, at least as far as the completeness of data transfer for essential repository data (changesets, manifests, files, phases, and bookmarks). We're still missing support for obsolescence markers, the hgtags fnodes cache, and the branchmap cache. But these are non-essential for the moment (and will be implemented later). This is a good point to assess the state of exchangev2 in terms of performance. I ran a local `hg clone` for the mozilla-unified repository using both version 1 and version 2 of the wire protocols and exchange methods. This is effectively comparing the performance of the wire protocol overhead and "getbundle" versus domain-specific commands. Wire protocol version 2 doesn't have compression implemented yet. So I tested version 1 with `server.compressionengines=none` to remove compression overhead from the equation. server before: user 220.420+0.000 sys 14.420+0.000 after: user 321.980+0.000 sys 18.990+0.000 client before: real 561.650 secs (user 497.670+0.000 sys 28.160+0.000) after: real 1226.260 secs (user 944.240+0.000 sys 354.150+0.000) We have substantial regressions on both client and server. This is obviously not desirable. I'm aware of some reasons: * Lack of hgtagsfnodes transfer (contributes significant CPU to client). * Lack of branch cache transfer (contributes significant CPU to client). * Little to no profiling / optimization performed on wire protocol version 2 code. * There appears to be a memory leak on the client and that is likely causing swapping on my machine. * Using multiple threads on the client may be counter-productive because Python. * We're not compressing on the server. * We're tracking file nodes on the client via manifest diffing rather than using linkrev shortcuts on the server. I'm pretty confident that most of these issues are addressable. But even if we can't get wire protocol version 2 on performance parity with "getbundle," I still think it is important to have the set of low level data-specific retrieval commands that we have implemented so far. This is because the existence of such commands allows flexibility in how clients access server data. Differential Revision: https://phab.mercurial-scm.org/D4491

# exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import collections
import weakref

from .i18n import _
from .node import (
    nullid,
    short,
)
from . import (
    bookmarks,
    error,
    mdiff,
    phases,
    pycompat,
    setdiscovery,
)

def pull(pullop):
    """Pull using wire protocol version 2."""
    repo = pullop.repo
    remote = pullop.remote
    tr = pullop.trmanager.transaction()

    # Figure out what needs to be fetched.
    common, fetch, remoteheads = _pullchangesetdiscovery(
        repo, remote, pullop.heads, abortwhenunrelated=pullop.force)

    # And fetch the data.
    pullheads = pullop.heads or remoteheads
    csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)

    # New revisions are written to the changelog. But all other updates
    # are deferred. Do those now.

    # Ensure all new changesets are draft by default. If the repo is
    # publishing, the phase will be adjusted by the loop below.
    if csetres['added']:
        phases.registernew(repo, tr, phases.draft, csetres['added'])

    # And adjust the phase of all changesets accordingly.
    for phase in phases.phasenames:
        if phase == b'secret' or not csetres['nodesbyphase'][phase]:
            continue

        phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
                               csetres['nodesbyphase'][phase])

    # Write bookmark updates.
    bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
                               remote.url(), pullop.gettransaction,
                               explicit=pullop.explicitbookmarks)

    manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])

    # Find all file nodes referenced by added manifests and fetch those
    # revisions.
    fnodes = _derivefilesfrommanifests(repo, manres['added'])
    _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])

def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
    """Determine which changesets need to be pulled."""

    if heads:
        knownnode = repo.changelog.hasnode
        if all(knownnode(head) for head in heads):
            return heads, False, heads

    # TODO wire protocol version 2 is capable of more efficient discovery
    # than setdiscovery. Consider implementing something better.
    common, fetch, remoteheads = setdiscovery.findcommonheads(
        repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)

    common = set(common)
    remoteheads = set(remoteheads)

    # If a remote head is filtered locally, put it back in the common set.
    # See the comment in exchange._pulldiscoverychangegroup() for more.

    if fetch and remoteheads:
        nodemap = repo.unfiltered().changelog.nodemap

        common |= {head for head in remoteheads if head in nodemap}

        if set(remoteheads).issubset(common):
            fetch = []

    common.discard(nullid)

    return common, fetch, remoteheads

def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
    # TODO consider adding a step here where we obtain the DAG shape first
    # (or ask the server to slice changesets into chunks for us) so that
    # we can perform multiple fetches in batches. This will facilitate
    # resuming interrupted clones, higher server-side cache hit rates due
    # to smaller segments, etc.
    with remote.commandexecutor() as e:
        objs = e.callcommand(b'changesetdata', {
            b'noderange': [sorted(common), sorted(remoteheads)],
            b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
        }).result()

        # The context manager waits on all response data when exiting. So
        # we need to remain in the context manager in order to stream data.
        return _processchangesetdata(repo, tr, objs)

def _processchangesetdata(repo, tr, objs):
    repo.hook('prechangegroup', throw=True,
              **pycompat.strkwargs(tr.hookargs))

    urepo = repo.unfiltered()
    cl = urepo.changelog

    cl.delayupdate(tr)

    # The first emitted object is a header describing the data that
    # follows.
    meta = next(objs)

    progress = repo.ui.makeprogress(_('changesets'),
                                    unit=_('chunks'),
                                    total=meta.get(b'totalitems'))

    manifestnodes = {}

    def linkrev(node):
        repo.ui.debug('add changeset %s\n' % short(node))
        # Linkrev for changelog is always self.
        return len(cl)

    def onchangeset(cl, node):
        progress.increment()

        revision = cl.changelogrevision(node)

        # We need to preserve the mapping of changelog revision to node
        # so we can set the linkrev accordingly when manifests are added.
        manifestnodes[cl.rev(node)] = revision.manifest

    nodesbyphase = {phase: set() for phase in phases.phasenames}
    remotebookmarks = {}

    # addgroup() expects a 7-tuple describing revisions. This normalizes
    # the wire data to that format.
    #
    # This loop also aggregates non-revision metadata, such as phase
    # data.
    def iterrevisions():
        for cset in objs:
            node = cset[b'node']

            if b'phase' in cset:
                nodesbyphase[cset[b'phase']].add(node)

            for mark in cset.get(b'bookmarks', []):
                remotebookmarks[mark] = node

            # TODO add mechanism for extensions to examine records so they
            # can siphon off custom data fields.

            # Some entries might only be metadata only updates.
            if b'revisionsize' not in cset:
                continue

            data = next(objs)

            yield (
                node,
                cset[b'parents'][0],
                cset[b'parents'][1],
                # Linknode is always itself for changesets.
                cset[b'node'],
                # We always send full revisions. So delta base is not set.
                nullid,
                mdiff.trivialdiffheader(len(data)) + data,
                # Flags not yet supported.
                0,
            )

    added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
                        addrevisioncb=onchangeset)

    progress.complete()

    return {
        'added': added,
        'nodesbyphase': nodesbyphase,
        'bookmarks': remotebookmarks,
        'manifestnodes': manifestnodes,
    }

def _fetchmanifests(repo, tr, remote, manifestnodes):
    rootmanifest = repo.manifestlog.getstorage(b'')

    # Some manifests can be shared between changesets. Filter out revisions
    # we already know about.
    fetchnodes = []
    linkrevs = {}
    seen = set()

    for clrev, node in sorted(manifestnodes.iteritems()):
        if node in seen:
            continue

        try:
            rootmanifest.rev(node)
        except error.LookupError:
            fetchnodes.append(node)
            linkrevs[node] = clrev

        seen.add(node)

    # TODO handle tree manifests

    # addgroup() expects 7-tuple describing revisions. This normalizes
    # the wire data to that format.
    def iterrevisions(objs, progress):
        for manifest in objs:
            node = manifest[b'node']

            if b'deltasize' in manifest:
                basenode = manifest[b'deltabasenode']
                delta = next(objs)
            elif b'revisionsize' in manifest:
                basenode = nullid
                revision = next(objs)
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            yield (
                node,
                manifest[b'parents'][0],
                manifest[b'parents'][1],
                # The value passed in is passed to the lookup function passed
                # to addgroup(). We already have a map of manifest node to
                # changelog revision number. So we just pass in the
                # manifest node here and use linkrevs.__getitem__ as the
                # resolution function.
                node,
                basenode,
                delta,
                # Flags not yet supported.
                0
            )

            progress.increment()

    progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
                                    total=len(fetchnodes))

    # Fetch manifests 10,000 per command.
    # TODO have server advertise preferences?
    # TODO make size configurable on client?
    batchsize = 10000

    # We send commands 1 at a time to the remote. This is not the most
    # efficient because we incur a round trip at the end of each batch.
    # However, the existing frame-based reactor keeps consuming server
    # data in the background. And this results in response data buffering
    # in memory. This can consume gigabytes of memory.
    # TODO send multiple commands in a request once background buffering
    # issues are resolved.

    added = []

    for i in pycompat.xrange(0, len(fetchnodes), batchsize):
        batch = [node for node in fetchnodes[i:i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            objs = e.callcommand(b'manifestdata', {
                b'tree': b'',
                b'nodes': batch,
                b'fields': {b'parents', b'revision'},
            }).result()

            # Chomp off header object.
            next(objs)

            added.extend(rootmanifest.addgroup(
                iterrevisions(objs, progress),
                linkrevs.__getitem__,
                weakref.proxy(tr)))

    progress.complete()

    return {
        'added': added,
        'linkrevs': linkrevs,
    }

def _derivefilesfrommanifests(repo, manifestnodes):
    """Determine what file nodes are relevant given a set of manifest nodes.

    Returns a dict mapping file paths to dicts of file node to first manifest
    node.
    """
    ml = repo.manifestlog
    fnodes = collections.defaultdict(dict)

    for manifestnode in manifestnodes:
        m = ml.get(b'', manifestnode)

        # TODO this will pull in unwanted nodes because it takes the storage
        # delta into consideration. What we really want is something that takes
        # the delta between the manifest's parents. And ideally we would
        # ignore file nodes that are known locally. For now, ignore both
        # these limitations. This will result in incremental fetches requesting
        # data we already have. So this is far from ideal.
        md = m.readfast()

        for path, fnode in md.items():
            fnodes[path].setdefault(fnode, manifestnode)

    return fnodes

def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
    def iterrevisions(objs, progress):
        for filerevision in objs:
            node = filerevision[b'node']

            if b'deltasize' in filerevision:
                basenode = filerevision[b'deltabasenode']
                delta = next(objs)
            elif b'revisionsize' in filerevision:
                basenode = nullid
                revision = next(objs)
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            yield (
                node,
                filerevision[b'parents'][0],
                filerevision[b'parents'][1],
                node,
                basenode,
                delta,
                # Flags not yet supported.
                0,
            )

            progress.increment()

    progress = repo.ui.makeprogress(
        _('files'), unit=_('chunks'),
         total=sum(len(v) for v in fnodes.itervalues()))

    # TODO make batch size configurable
    batchsize = 10000
    fnodeslist = [x for x in sorted(fnodes.items())]

    for i in pycompat.xrange(0, len(fnodeslist), batchsize):
        batch = [x for x in fnodeslist[i:i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            fs = []
            locallinkrevs = {}

            for path, nodes in batch:
                fs.append((path, e.callcommand(b'filedata', {
                    b'path': path,
                    b'nodes': sorted(nodes),
                    b'fields': {b'parents', b'revision'}
                })))

                locallinkrevs[path] = {
                    node: linkrevs[manifestnode]
                    for node, manifestnode in nodes.iteritems()}

            for path, f in fs:
                objs = f.result()

                # Chomp off header objects.
                next(objs)

                store = repo.file(path)
                store.addgroup(
                    iterrevisions(objs, progress),
                    locallinkrevs[path].__getitem__,
                    weakref.proxy(tr))