tests/testlib/sigpipe-remote.py
changeset 47618 27ff81547d35
parent 47617 d5fc1b59a2df
child 47636 b2ed9480b34a
--- a/tests/testlib/sigpipe-remote.py	Mon Jul 12 03:29:21 2021 +0200
+++ b/tests/testlib/sigpipe-remote.py	Mon Jul 12 03:30:04 2021 +0200
@@ -5,7 +5,6 @@
 import os
 import subprocess
 import sys
-import threading
 import time
 
 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
@@ -68,14 +67,6 @@
     return s.decode('latin-1')
 
 
-piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
-piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
-
-stdout_writer = os.fdopen(piped_stdout[1], "rb")
-stdout_reader = os.fdopen(piped_stdout[0], "rb")
-stderr_writer = os.fdopen(piped_stderr[1], "rb")
-stderr_reader = os.fdopen(piped_stderr[0], "rb")
-
 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
 
 TESTLIB_DIR = os.path.dirname(sys.argv[0])
@@ -88,67 +79,44 @@
     SYNCFILE1,
 )
 
-cmd = ['hg']
-cmd += sys.argv[1:]
-sub = subprocess.Popen(
-    cmd,
-    bufsize=0,
-    close_fds=True,
-    stdin=sys.stdin,
-    stdout=stdout_writer,
-    stderr=stderr_writer,
-)
-
-debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
+try:
+    cmd = ['hg']
+    cmd += sys.argv[1:]
+    sub = subprocess.Popen(
+        cmd,
+        bufsize=0,
+        close_fds=True,
+        stdin=sys.stdin,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+    )
 
-
-shut_down = threading.Event()
-
-close_lock = threading.Lock()
-
+    basedir = os.path.dirname(sys.argv[0])
+    worker = os.path.join(basedir, 'sigpipe-worker.py')
 
-def _read(stream):
-    try:
-        return stream.read()
-    except ValueError:
-        # read on closed file
-        return None
-
+    cmd = [sys.executable, worker]
 
-def forward_stdout():
-    while not shut_down.is_set():
-        c = _read(stdout_reader)
-        while c is not None:
-            sys.stdout.buffer.write(c)
-            c = _read(stdout_reader)
-        time.sleep(0.001)
-    with close_lock:
-        if not stdout_reader.closed:
-            stdout_reader.close()
-            debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
-
+    stdout_worker = subprocess.Popen(
+        cmd,
+        bufsize=0,
+        close_fds=True,
+        stdin=sub.stdout,
+        stdout=sys.stdout,
+        stderr=sys.stderr,
+    )
 
-def forward_stderr():
-    while not shut_down.is_set():
-        c = _read(stderr_reader)
-        if c is not None:
-            sys.stderr.buffer.write(c)
-            c = _read(stderr_reader)
-        time.sleep(0.001)
-    with close_lock:
-        if not stderr_reader.closed:
-            stderr_reader.close()
-            debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
-
-
-stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
-stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
-
-try:
-    stdout_thread.start()
-    stderr_thread.start()
-
+    stderr_worker = subprocess.Popen(
+        cmd,
+        bufsize=0,
+        close_fds=True,
+        stdin=sub.stderr,
+        stdout=sys.stderr,
+        stderr=sys.stderr,
+    )
     debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
+    os.close(sub.stdout.fileno())
+    os.close(sub.stderr.fileno())
+    debug_stream.write(b'SIGPIPE-HELPER: pipes closed in main\n')
 
     try:
         wait_file(sysbytes(SYNCFILE1))
@@ -157,18 +125,16 @@
         debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
     else:
         debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
-    with close_lock:
-        if not stdout_reader.closed:
-            stdout_reader.close()
-        if not stderr_reader.closed:
-            stderr_reader.close()
-        sys.stdin.close()
-        debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
+    stdout_worker.kill()
+    stderr_worker.kill()
+    stdout_worker.wait(10)
+    stderr_worker.wait(10)
+    debug_stream.write(b'SIGPIPE-HELPER: worker killed\n')
+
     debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
     write_file(sysbytes(SYNCFILE2))
 finally:
     debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
-    shut_down.set()
     if not sys.stdin.closed:
         sys.stdin.close()
     try:
@@ -176,6 +142,11 @@
     except subprocess.TimeoutExpired:
         msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
         debug_stream.write(msg)
+        sub.kill()
+        sub.wait()
+        msg = b'SIGPIPE-HELPER: Server process killed\n'
     else:
-        debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
+        msg = b'SIGPIPE-HELPER: Server process terminated with status %d\n'
+        msg %= sub.returncode
+        debug_stream.write(msg)
     debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')