procutil: ensure that procutil.std{out,err}.write() writes all bytes
authorManuel Jacob <me@manueljacob.de>
Fri, 10 Jul 2020 12:27:58 +0200
changeset 45095 8e04607023e5
parent 45094 b4c35e439ea5
child 45096 e9e452eafbfb
procutil: ensure that procutil.std{out,err}.write() writes all bytes Python 3 offers different kind of streams and it’s not guaranteed for all of them that calling write() writes all bytes. When Python is started in unbuffered mode, sys.std{out,err}.buffer are instances of io.FileIO, whose write() can write less bytes for platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could write less if interrupted by a signal; when writing to Windows consoles, it’s limited to 32767 bytes to avoid the "not enough space" error). This can lead to silent loss of data, both when using sys.std{out,err}.buffer (which may in fact not be a buffered stream) and when using the text streams sys.std{out,err} (I’ve created a CPython bug report for that: https://bugs.python.org/issue41221). Python may fix the problem at some point. For now, we implement our own wrapper for procutil.std{out,err} that calls the raw stream’s write() method until all bytes have been written. We don’t use sys.std{out,err} for larger writes, so I think it’s not worth the effort to patch them.
mercurial/utils/procutil.py
tests/test-stdio.py
--- a/mercurial/utils/procutil.py	Sat Jul 11 07:47:04 2020 +0200
+++ b/mercurial/utils/procutil.py	Fri Jul 10 12:27:58 2020 +0200
@@ -80,16 +80,49 @@
     return LineBufferedWrapper(stream)
 
 
+class WriteAllWrapper(object):
+    def __init__(self, orig):
+        self.orig = orig
+
+    def __getattr__(self, attr):
+        return getattr(self.orig, attr)
+
+    def write(self, s):
+        write1 = self.orig.write
+        m = memoryview(s)
+        total_to_write = len(s)
+        total_written = 0
+        while total_written < total_to_write:
+            total_written += write1(m[total_written:])
+        return total_written
+
+
+io.IOBase.register(WriteAllWrapper)
+
+
+def make_write_all(stream):
+    assert pycompat.ispy3
+    if isinstance(stream, WriteAllWrapper):
+        return stream
+    if isinstance(stream, io.BufferedIOBase):
+        # The io.BufferedIOBase.write() contract guarantees that all data is
+        # written.
+        return stream
+    # In general, the write() method of streams is free to write only part of
+    # the data.
+    return WriteAllWrapper(stream)
+
+
 if pycompat.ispy3:
     # Python 3 implements its own I/O streams.
     # TODO: .buffer might not exist if std streams were replaced; we'll need
     # a silly wrapper to make a bytes stream backed by a unicode one.
     stdin = sys.stdin.buffer
-    stdout = sys.stdout.buffer
+    stdout = make_write_all(sys.stdout.buffer)
     if isatty(stdout):
         # The standard library doesn't offer line-buffered binary streams.
         stdout = make_line_buffered(stdout)
-    stderr = sys.stderr.buffer
+    stderr = make_write_all(sys.stderr.buffer)
 else:
     # Python 2 uses the I/O streams provided by the C library.
     stdin = sys.stdin
--- a/tests/test-stdio.py	Sat Jul 11 07:47:04 2020 +0200
+++ b/tests/test-stdio.py	Fri Jul 10 12:27:58 2020 +0200
@@ -7,6 +7,7 @@
 import contextlib
 import errno
 import os
+import signal
 import subprocess
 import sys
 import unittest
@@ -31,6 +32,19 @@
 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
 
 
+TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
+import signal
+import sys
+
+from mercurial import dispatch
+from mercurial.utils import procutil
+
+signal.signal(signal.SIGINT, lambda *x: None)
+dispatch.initstdio()
+procutil.{stream}.write(b'x' * 1048576)
+'''
+
+
 @contextlib.contextmanager
 def _closing(fds):
     try:
@@ -63,8 +77,8 @@
         yield rwpair
 
 
-def _readall(fd, buffer_size):
-    buf = []
+def _readall(fd, buffer_size, initial_buf=None):
+    buf = initial_buf or []
     while True:
         try:
             s = os.read(fd, buffer_size)
@@ -101,7 +115,7 @@
             )
             try:
                 os.close(child_stream)
-                check_output(stream_receiver)
+                check_output(stream_receiver, proc)
             except:  # re-raises
                 proc.terminate()
                 raise
@@ -112,7 +126,7 @@
     def _test_buffering(
         self, stream, rwpair_generator, expected_output, python_args=[]
     ):
-        def check_output(stream_receiver):
+        def check_output(stream_receiver, proc):
             self.assertEqual(_readall(stream_receiver, 1024), expected_output)
 
         self._test(
@@ -143,6 +157,61 @@
             test_buffering_stdout_ptys_unbuffered
         )
 
+    def _test_large_write(self, stream, rwpair_generator, python_args=[]):
+        if not pycompat.ispy3 and pycompat.isdarwin:
+            # Python 2 doesn't always retry on EINTR, but the libc might retry.
+            # So far, it was observed only on macOS that EINTR is raised at the
+            # Python level. As Python 2 support will be dropped soon-ish, we
+            # won't attempt to fix it.
+            raise unittest.SkipTest("raises EINTR on macOS")
+
+        def check_output(stream_receiver, proc):
+            if not pycompat.iswindows:
+                # On Unix, we can provoke a partial write() by interrupting it
+                # by a signal handler as soon as a bit of data was written.
+                # We test that write() is called until all data is written.
+                buf = [os.read(stream_receiver, 1)]
+                proc.send_signal(signal.SIGINT)
+            else:
+                # On Windows, there doesn't seem to be a way to cause partial
+                # writes.
+                buf = []
+            self.assertEqual(
+                _readall(stream_receiver, 131072, buf), b'x' * 1048576
+            )
+
+        self._test(
+            TEST_LARGE_WRITE_CHILD_SCRIPT.format(stream=stream),
+            stream,
+            rwpair_generator,
+            check_output,
+            python_args,
+        )
+
+    def test_large_write_stdout_pipes(self):
+        self._test_large_write('stdout', _pipes)
+
+    def test_large_write_stdout_ptys(self):
+        self._test_large_write('stdout', _ptys)
+
+    def test_large_write_stdout_pipes_unbuffered(self):
+        self._test_large_write('stdout', _pipes, python_args=['-u'])
+
+    def test_large_write_stdout_ptys_unbuffered(self):
+        self._test_large_write('stdout', _ptys, python_args=['-u'])
+
+    def test_large_write_stderr_pipes(self):
+        self._test_large_write('stderr', _pipes)
+
+    def test_large_write_stderr_ptys(self):
+        self._test_large_write('stderr', _ptys)
+
+    def test_large_write_stderr_pipes_unbuffered(self):
+        self._test_large_write('stderr', _pipes, python_args=['-u'])
+
+    def test_large_write_stderr_ptys_unbuffered(self):
+        self._test_large_write('stderr', _ptys, python_args=['-u'])
+
 
 if __name__ == '__main__':
     import silenttestrunner