tests/test-stdio.py
changeset 45148 a37f290a7124
parent 45147 c2c862b9b544
child 45830 c102b704edb5
--- a/tests/test-stdio.py	Fri Jul 17 00:37:33 2020 +0200
+++ b/tests/test-stdio.py	Fri Jul 17 03:28:52 2020 +0200
@@ -13,7 +13,7 @@
 import tempfile
 import unittest
 
-from mercurial import pycompat
+from mercurial import pycompat, util
 
 
 if pycompat.ispy3:
@@ -71,6 +71,34 @@
 '''
 
 
+TEST_BROKEN_PIPE_CHILD_SCRIPT = r'''
+import os
+import pickle
+
+from mercurial import dispatch
+from mercurial.utils import procutil
+
+dispatch.initstdio()
+procutil.stdin.read(1)  # wait until parent process closed pipe
+try:
+    procutil.{stream}.write(b'test')
+    procutil.{stream}.flush()
+except EnvironmentError as e:
+    with os.fdopen(
+        os.open(
+            {err_fn!r},
+            os.O_WRONLY
+            | getattr(os, 'O_BINARY', 0)
+            | getattr(os, 'O_TEMPORARY', 0),
+        ),
+        'wb',
+    ) as err_f:
+        pickle.dump(e, err_f)
+# Exit early to suppress further broken pipe errors at interpreter shutdown.
+os._exit(0)
+'''
+
+
 @contextlib.contextmanager
 def _closing(fds):
     try:
@@ -148,11 +176,15 @@
         check_output,
         python_args=[],
         post_child_check=None,
+        stdin_generator=None,
     ):
         assert stream in ('stdout', 'stderr')
-        with rwpair_generator() as (stream_receiver, child_stream), open(
-            os.devnull, 'rb'
-        ) as child_stdin:
+        if stdin_generator is None:
+            stdin_generator = open(os.devnull, 'rb')
+        with rwpair_generator() as (
+            stream_receiver,
+            child_stream,
+        ), stdin_generator as child_stdin:
             proc = subprocess.Popen(
                 [sys.executable] + python_args + ['-c', child_script],
                 stdin=child_stdin,
@@ -295,6 +327,37 @@
     def test_large_write_stderr_ptys_unbuffered(self):
         self._test_large_write('stderr', _ptys, python_args=['-u'])
 
+    def _test_broken_pipe(self, stream):
+        assert stream in ('stdout', 'stderr')
+
+        def check_output(stream_receiver, proc):
+            os.close(stream_receiver)
+            proc.stdin.write(b'x')
+            proc.stdin.close()
+
+        def post_child_check():
+            err = util.pickle.load(err_f)
+            self.assertEqual(err.errno, errno.EPIPE)
+            self.assertEqual(err.strerror, "Broken pipe")
+
+        with tempfile.NamedTemporaryFile('rb') as err_f:
+            self._test(
+                TEST_BROKEN_PIPE_CHILD_SCRIPT.format(
+                    stream=stream, err_fn=err_f.name
+                ),
+                stream,
+                _pipes,
+                check_output,
+                post_child_check=post_child_check,
+                stdin_generator=util.nullcontextmanager(subprocess.PIPE),
+            )
+
+    def test_broken_pipe_stdout(self):
+        self._test_broken_pipe('stdout')
+
+    def test_broken_pipe_stderr(self):
+        self._test_broken_pipe('stderr')
+
 
 if __name__ == '__main__':
     import silenttestrunner