hgext/convert/transport.py
author Sandu Turcan <idlsoft@gmail.com>
Tue, 03 May 2022 21:44:30 -0400
branchstable
changeset 49241 6b10151b9621
parent 47062 f38bf44e077f
child 48875 6000f5b25c9b
permissions -rw-r--r--
narrow_widen_acl: enforce narrowacl in narrow_widen (SEC) Reviewer note: this was sent by the author as a simple bugfix, but can be considered a security patch, since it allows users to access things outside of the ACL, hence the (SEC) prefix. However, this affects the `narrow` extention which is still marked as experimental and has relatively few users aside from large companies with their own security layers on top from what we can gather. We feel (Alphare: or at least, I feel) like pinging the packaging list is enough in this case.

# -*- coding: utf-8 -*-

# Copyright (C) 2007 Daniel Holth <dholth@fastmail.fm>
# This is a stripped-down version of the original bzr-svn transport.py,
# Copyright (C) 2006 Jelmer Vernooij <jelmer@samba.org>

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import

import svn.client
import svn.core
import svn.ra

Pool = svn.core.Pool
SubversionException = svn.core.SubversionException

from mercurial.pycompat import getattr
from mercurial import util

# Some older versions of the Python bindings need to be
# explicitly initialized. But what we want to do probably
# won't work worth a darn against those libraries anyway!
svn.ra.initialize()

svn_config = None


def _create_auth_baton(pool):
    """Create a Subversion authentication baton."""
    import svn.client

    # Give the client context baton a suite of authentication
    # providers.h
    providers = [
        svn.client.get_simple_provider(pool),
        svn.client.get_username_provider(pool),
        svn.client.get_ssl_client_cert_file_provider(pool),
        svn.client.get_ssl_client_cert_pw_file_provider(pool),
        svn.client.get_ssl_server_trust_file_provider(pool),
    ]
    # Platform-dependent authentication methods
    getprovider = getattr(
        svn.core, 'svn_auth_get_platform_specific_provider', None
    )
    if getprovider:
        # Available in svn >= 1.6
        for name in (b'gnome_keyring', b'keychain', b'kwallet', b'windows'):
            for type in (b'simple', b'ssl_client_cert_pw', b'ssl_server_trust'):
                p = getprovider(name, type, pool)
                if p:
                    providers.append(p)
    else:
        if util.safehasattr(svn.client, b'get_windows_simple_provider'):
            providers.append(svn.client.get_windows_simple_provider(pool))

    return svn.core.svn_auth_open(providers, pool)


class NotBranchError(SubversionException):
    pass


class SvnRaTransport(object):
    """
    Open an ra connection to a Subversion repository.
    """

    def __init__(self, url=b"", ra=None):
        self.pool = Pool()
        self.svn_url = url
        self.username = b''
        self.password = b''

        # Only Subversion 1.4 has reparent()
        if ra is None or not util.safehasattr(svn.ra, b'reparent'):
            self.client = svn.client.create_context(self.pool)
            ab = _create_auth_baton(self.pool)
            self.client.auth_baton = ab
            global svn_config
            if svn_config is None:
                svn_config = svn.core.svn_config_get_config(None)
            self.client.config = svn_config
            try:
                self.ra = svn.client.open_ra_session(
                    self.svn_url, self.client, self.pool
                )
            except SubversionException as xxx_todo_changeme:
                (inst, num) = xxx_todo_changeme.args
                if num in (
                    svn.core.SVN_ERR_RA_ILLEGAL_URL,
                    svn.core.SVN_ERR_RA_LOCAL_REPOS_OPEN_FAILED,
                    svn.core.SVN_ERR_BAD_URL,
                ):
                    raise NotBranchError(url)
                raise
        else:
            self.ra = ra
            svn.ra.reparent(self.ra, self.svn_url.encode('utf8'))

    class Reporter(object):
        def __init__(self, reporter_data):
            self._reporter, self._baton = reporter_data

        def set_path(self, path, revnum, start_empty, lock_token, pool=None):
            svn.ra.reporter2_invoke_set_path(
                self._reporter,
                self._baton,
                path,
                revnum,
                start_empty,
                lock_token,
                pool,
            )

        def delete_path(self, path, pool=None):
            svn.ra.reporter2_invoke_delete_path(
                self._reporter, self._baton, path, pool
            )

        def link_path(
            self, path, url, revision, start_empty, lock_token, pool=None
        ):
            svn.ra.reporter2_invoke_link_path(
                self._reporter,
                self._baton,
                path,
                url,
                revision,
                start_empty,
                lock_token,
                pool,
            )

        def finish_report(self, pool=None):
            svn.ra.reporter2_invoke_finish_report(
                self._reporter, self._baton, pool
            )

        def abort_report(self, pool=None):
            svn.ra.reporter2_invoke_abort_report(
                self._reporter, self._baton, pool
            )

    def do_update(self, revnum, path, *args, **kwargs):
        return self.Reporter(
            svn.ra.do_update(self.ra, revnum, path, *args, **kwargs)
        )