tests/test-stdio.py
author Manuel Jacob <me@manueljacob.de>
Tue, 07 Jul 2020 11:06:37 +0200
changeset 45068 8cd18aba5e6c
parent 45045 8403cc54bc83
child 45069 9172fd511999
permissions -rwxr-xr-x
tests: proof test-stdio.py against buffer fill-up With the previous code, it could in theory happen that the pipe / PTY buffer of the child stdout / stderr fills up and the process never finishes. To prevent that, we read all of the stream before waiting for the end of the process. To ensure that the stream reaches EOF when the child finishes, we must close the parent "copy" of the child stdout / stderr.

#!/usr/bin/env python
"""
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
"""
from __future__ import absolute_import

import contextlib
import errno
import os
import subprocess
import sys
import unittest

from mercurial import pycompat


CHILD_PROCESS = r'''
import os

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.{stream}.write(b'aaa')
os.write(procutil.{stream}.fileno(), b'[written aaa]')
procutil.{stream}.write(b'bbb\n')
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
'''
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'


@contextlib.contextmanager
def _closing(fds):
    try:
        yield
    finally:
        for fd in fds:
            try:
                os.close(fd)
            except EnvironmentError:
                pass


@contextlib.contextmanager
def _pipes():
    rwpair = os.pipe()
    with _closing(rwpair):
        yield rwpair


@contextlib.contextmanager
def _ptys():
    if pycompat.iswindows:
        raise unittest.SkipTest("PTYs are not supported on Windows")
    import pty
    import tty

    rwpair = pty.openpty()
    with _closing(rwpair):
        tty.setraw(rwpair[0])
        yield rwpair


def _readall(fd, buffer_size):
    buf = []
    while True:
        try:
            s = os.read(fd, buffer_size)
        except OSError as e:
            if e.errno == errno.EIO:
                # If the child-facing PTY got closed, reading from the
                # parent-facing PTY raises EIO.
                break
            raise
        if not s:
            break
        buf.append(s)
    return b''.join(buf)


class TestStdio(unittest.TestCase):
    def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
        assert stream in ('stdout', 'stderr')
        with rwpair_generator() as (stream_receiver, child_stream), open(
            os.devnull, 'rb'
        ) as child_stdin:
            proc = subprocess.Popen(
                [sys.executable]
                + python_args
                + ['-c', CHILD_PROCESS.format(stream=stream)],
                stdin=child_stdin,
                stdout=child_stream if stream == 'stdout' else None,
                stderr=child_stream if stream == 'stderr' else None,
            )
            try:
                os.close(child_stream)
                self.assertEqual(
                    _readall(stream_receiver, 1024), expected_output
                )
            finally:
                retcode = proc.wait()
            self.assertEqual(retcode, 0)

    def test_stdout_pipes(self):
        self._test('stdout', _pipes, FULLY_BUFFERED)

    def test_stdout_ptys(self):
        self._test('stdout', _ptys, LINE_BUFFERED)

    def test_stdout_pipes_unbuffered(self):
        self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])

    def test_stdout_ptys_unbuffered(self):
        self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])

    if not pycompat.ispy3 and not pycompat.iswindows:
        # On Python 2 on non-Windows, we manually open stdout in line-buffered
        # mode if connected to a TTY. We should check if Python was configured
        # to use unbuffered stdout, but it's hard to do that.
        test_stdout_ptys_unbuffered = unittest.expectedFailure(
            test_stdout_ptys_unbuffered
        )

    def test_stderr_pipes(self):
        self._test('stderr', _pipes, UNBUFFERED)

    def test_stderr_ptys(self):
        self._test('stderr', _ptys, UNBUFFERED)

    def test_stderr_pipes_unbuffered(self):
        self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])

    def test_stderr_ptys_unbuffered(self):
        self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)