tests/testlib/ext-sidedata.py
author Raphaël Gomès <rgomes@octobus.net>
Mon, 19 Apr 2021 11:22:24 +0200
changeset 47084 27f1191b1305
parent 47012 d55b71393907
child 47085 3aab2330b7d3
permissions -rw-r--r--
sidedata: replace sidedata upgrade mechanism with the new one Note: this is split into a separate change (like some other patches in this series) because it's not easy to have all patches work 100% and this seemed easier for reviewers. When cloning or upgrading a repo, we may need to compute (or remove) sidedata. This is the same mechanism that is used in exchange, so we re-use the new system to simplify the code and fix the remaining issues (correctly dropping flags and handling partial removal, etc.). This also highlighted an issue with `test-copies-in-changeset.t` that kept sidedata categories that are not relevant anymore. They should probably be dropped entirely, but that would be for another patch. Differential Revision: https://phab.mercurial-scm.org/D10359

# ext-sidedata.py - small extension to test the sidedata logic
#
# Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import hashlib
import struct

from mercurial.node import nullrev
from mercurial import (
    changegroup,
    extensions,
    requirements,
    revlog,
)

from mercurial.upgrade_utils import engine as upgrade_engine

from mercurial.revlogutils import constants
from mercurial.revlogutils import sidedata


def wrapaddrevision(
    orig, self, text, transaction, link, p1, p2, *args, **kwargs
):
    if kwargs.get('sidedata') is None:
        kwargs['sidedata'] = {}
    sd = kwargs['sidedata']
    ## let's store some arbitrary data just for testing
    # text length
    sd[sidedata.SD_TEST1] = struct.pack('>I', len(text))
    # and sha2 hashes
    sha256 = hashlib.sha256(text).digest()
    sd[sidedata.SD_TEST2] = struct.pack('>32s', sha256)
    return orig(self, text, transaction, link, p1, p2, *args, **kwargs)


def wrap_revisiondata(orig, self, nodeorrev, *args, **kwargs):
    text, sd = orig(self, nodeorrev, *args, **kwargs)
    if getattr(self, 'sidedatanocheck', False):
        return text, sd
    if self.version & 0xFFFF != 2:
        return text, sd
    if nodeorrev != nullrev and nodeorrev != self.nullid:
        cat1 = sd.get(sidedata.SD_TEST1)
        if cat1 is not None and len(text) != struct.unpack('>I', cat1)[0]:
            raise RuntimeError('text size mismatch')
        expected = sd.get(sidedata.SD_TEST2)
        got = hashlib.sha256(text).digest()
        if expected is not None and got != expected:
            raise RuntimeError('sha256 mismatch')
    return text, sd


def wrapget_sidedata_helpers(orig, srcrepo, dstrepo):
    repo, computers, removers = orig(srcrepo, dstrepo)
    assert not computers and not removers  # deal with composition later
    addedreqs = dstrepo.requirements - srcrepo.requirements

    if requirements.SIDEDATA_REQUIREMENT in addedreqs:

        def computer(repo, revlog, rev, old_sidedata):
            assert not old_sidedata  # not supported yet
            update = {}
            revlog.sidedatanocheck = True
            try:
                text = revlog.revision(rev)
            finally:
                del revlog.sidedatanocheck
            ## let's store some arbitrary data just for testing
            # text length
            update[sidedata.SD_TEST1] = struct.pack('>I', len(text))
            # and sha2 hashes
            sha256 = hashlib.sha256(text).digest()
            update[sidedata.SD_TEST2] = struct.pack('>32s', sha256)
            return update, (0, 0)

        srcrepo.register_sidedata_computer(
            constants.KIND_CHANGELOG,
            b"whatever",
            (sidedata.SD_TEST1, sidedata.SD_TEST2),
            computer,
            0,
        )
        dstrepo.register_wanted_sidedata(b"whatever")

    return changegroup.get_sidedata_helpers(srcrepo, dstrepo._wanted_sidedata)


def extsetup(ui):
    extensions.wrapfunction(revlog.revlog, 'addrevision', wrapaddrevision)
    extensions.wrapfunction(revlog.revlog, '_revisiondata', wrap_revisiondata)
    extensions.wrapfunction(
        upgrade_engine, 'get_sidedata_helpers', wrapget_sidedata_helpers
    )


def reposetup(ui, repo):
    # We don't register sidedata computers because we don't care within these
    # tests
    repo.register_wanted_sidedata(sidedata.SD_TEST1)
    repo.register_wanted_sidedata(sidedata.SD_TEST2)