tests/sshprotoext.py
author Gregory Szorc <gregory.szorc@gmail.com>
Wed, 07 Feb 2018 20:17:05 -0800
changeset 36065 bf676267f64f
parent 36064 5767664d39a5
child 36074 2f7290555c96
permissions -rw-r--r--
wireprotoserver: split ssh protocol handler and server We want to formalize the interface for protocol handlers. Today, server functionality (which is domain specific) is interleaved with protocol handling functionality (which conforms to a generic interface) in the sshserver class. This commit splits the ssh protocol handling code out of the sshserver class. Differential Revision: https://phab.mercurial-scm.org/D2080

# sshprotoext.py - Extension to test behavior of SSH protocol
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# This extension replaces the SSH server started via `hg serve --stdio`.
# The server behaves differently depending on environment variables.

from __future__ import absolute_import

from mercurial import (
    error,
    extensions,
    registrar,
    sshpeer,
    wireproto,
    wireprotoserver,
)

configtable = {}
configitem = registrar.configitem(configtable)

configitem('sshpeer', 'mode', default=None)
configitem('sshpeer', 'handshake-mode', default=None)

class bannerserver(wireprotoserver.sshserver):
    """Server that sends a banner to stdout."""
    def serve_forever(self):
        for i in range(10):
            self._fout.write(b'banner: line %d\n' % i)

        super(bannerserver, self).serve_forever()

class prehelloserver(wireprotoserver.sshserver):
    """Tests behavior when connecting to <0.9.1 servers.

    The ``hello`` wire protocol command was introduced in Mercurial
    0.9.1. Modern clients send the ``hello`` command when connecting
    to SSH servers. This mock server tests behavior of the handshake
    when ``hello`` is not supported.
    """
    def serve_forever(self):
        l = self._fin.readline()
        assert l == b'hello\n'
        # Respond to unknown commands with an empty reply.
        wireprotoserver._sshv1respondbytes(self._fout, b'')
        l = self._fin.readline()
        assert l == b'between\n'
        rsp = wireproto.dispatch(self._repo, self._proto, b'between')
        wireprotoserver._sshv1respondbytes(self._fout, rsp)

        super(prehelloserver, self).serve_forever()

class upgradev2server(wireprotoserver.sshserver):
    """Tests behavior for clients that issue upgrade to version 2."""
    def serve_forever(self):
        name = wireprotoserver.SSHV2
        l = self._fin.readline()
        assert l.startswith(b'upgrade ')
        token, caps = l[:-1].split(b' ')[1:]
        assert caps == b'proto=%s' % name

        # Filter hello and between requests.
        l = self._fin.readline()
        assert l == b'hello\n'
        l = self._fin.readline()
        assert l == b'between\n'
        l = self._fin.readline()
        assert l == 'pairs 81\n'
        self._fin.read(81)

        # Send the upgrade response.
        self._fout.write(b'upgraded %s %s\n' % (token, name))
        servercaps = wireproto.capabilities(self._repo, self._proto)
        rsp = b'capabilities: %s' % servercaps
        self._fout.write(b'%d\n' % len(rsp))
        self._fout.write(rsp)
        self._fout.write(b'\n')
        self._fout.flush()

        super(upgradev2server, self).serve_forever()

def performhandshake(orig, ui, stdin, stdout, stderr):
    """Wrapped version of sshpeer._performhandshake to send extra commands."""
    mode = ui.config(b'sshpeer', b'handshake-mode')
    if mode == b'pre-no-args':
        ui.debug(b'sending no-args command\n')
        stdin.write(b'no-args\n')
        stdin.flush()
        return orig(ui, stdin, stdout, stderr)
    elif mode == b'pre-multiple-no-args':
        ui.debug(b'sending unknown1 command\n')
        stdin.write(b'unknown1\n')
        ui.debug(b'sending unknown2 command\n')
        stdin.write(b'unknown2\n')
        ui.debug(b'sending unknown3 command\n')
        stdin.write(b'unknown3\n')
        stdin.flush()
        return orig(ui, stdin, stdout, stderr)
    else:
        raise error.ProgrammingError(b'unknown HANDSHAKECOMMANDMODE: %s' %
                                     mode)

def extsetup(ui):
    # It's easier for tests to define the server behavior via environment
    # variables than config options. This is because `hg serve --stdio`
    # has to be invoked with a certain form for security reasons and
    # `dummyssh` can't just add `--config` flags to the command line.
    servermode = ui.environ.get(b'SSHSERVERMODE')

    if servermode == b'banner':
        wireprotoserver.sshserver = bannerserver
    elif servermode == b'no-hello':
        wireprotoserver.sshserver = prehelloserver
    elif servermode == b'upgradev2':
        wireprotoserver.sshserver = upgradev2server
    elif servermode:
        raise error.ProgrammingError(b'unknown server mode: %s' % servermode)

    peermode = ui.config(b'sshpeer', b'mode')

    if peermode == b'extra-handshake-commands':
        extensions.wrapfunction(sshpeer, '_performhandshake', performhandshake)
    elif peermode:
        raise error.ProgrammingError(b'unknown peer mode: %s' % peermode)