tests/test-stdio.py
changeset 45104 eb26a9cf7821
parent 45097 dff208398ede
child 45147 c2c862b9b544
equal deleted inserted replaced
45103:a5fa2761a6cd 45104:eb26a9cf7821
    32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
    32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
    33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
    33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
    34 
    34 
    35 
    35 
    36 TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
    36 TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
       
    37 import os
    37 import signal
    38 import signal
    38 import sys
    39 import sys
    39 
    40 
    40 from mercurial import dispatch
    41 from mercurial import dispatch
    41 from mercurial.utils import procutil
    42 from mercurial.utils import procutil
    42 
    43 
    43 signal.signal(signal.SIGINT, lambda *x: None)
    44 signal.signal(signal.SIGINT, lambda *x: None)
    44 dispatch.initstdio()
    45 dispatch.initstdio()
    45 write_result = procutil.{stream}.write(b'x' * 1048576)
    46 write_result = procutil.{stream}.write(b'x' * 1048576)
    46 with open({write_result_fn}, 'w') as write_result_f:
    47 with os.fdopen(
       
    48     os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)),
       
    49     'w',
       
    50 ) as write_result_f:
    47     write_result_f.write(str(write_result))
    51     write_result_f.write(str(write_result))
    48 '''
    52 '''
    49 
    53 
    50 
    54 
    51 @contextlib.contextmanager
    55 @contextlib.contextmanager
   199             self.assertEqual(
   203             self.assertEqual(
   200                 _readall(stream_receiver, 131072, buf), b'x' * 1048576
   204                 _readall(stream_receiver, 131072, buf), b'x' * 1048576
   201             )
   205             )
   202 
   206 
   203         def post_child_check():
   207         def post_child_check():
   204             with open(write_result_fn, 'r') as write_result_f:
   208             write_result_str = write_result_f.read()
   205                 write_result_str = write_result_f.read()
       
   206             if pycompat.ispy3:
   209             if pycompat.ispy3:
   207                 # On Python 3, we test that the correct number of bytes is
   210                 # On Python 3, we test that the correct number of bytes is
   208                 # claimed to have been written.
   211                 # claimed to have been written.
   209                 expected_write_result_str = '1048576'
   212                 expected_write_result_str = '1048576'
   210             else:
   213             else:
   211                 # On Python 2, we only check that the large write does not
   214                 # On Python 2, we only check that the large write does not
   212                 # crash.
   215                 # crash.
   213                 expected_write_result_str = 'None'
   216                 expected_write_result_str = 'None'
   214             self.assertEqual(write_result_str, expected_write_result_str)
   217             self.assertEqual(write_result_str, expected_write_result_str)
   215 
   218 
   216         try:
   219         with tempfile.NamedTemporaryFile('r') as write_result_f:
   217             # tempfile.mktemp() is unsafe in general, as a malicious process
       
   218             # could create the file before we do. But in tests, we're running
       
   219             # in a controlled environment.
       
   220             write_result_fn = tempfile.mktemp()
       
   221             self._test(
   220             self._test(
   222                 TEST_LARGE_WRITE_CHILD_SCRIPT.format(
   221                 TEST_LARGE_WRITE_CHILD_SCRIPT.format(
   223                     stream=stream, write_result_fn=repr(write_result_fn)
   222                     stream=stream, write_result_fn=write_result_f.name
   224                 ),
   223                 ),
   225                 stream,
   224                 stream,
   226                 rwpair_generator,
   225                 rwpair_generator,
   227                 check_output,
   226                 check_output,
   228                 python_args,
   227                 python_args,
   229                 post_child_check=post_child_check,
   228                 post_child_check=post_child_check,
   230             )
   229             )
   231         finally:
       
   232             try:
       
   233                 os.unlink(write_result_fn)
       
   234             except OSError:
       
   235                 pass
       
   236 
   230 
   237     def test_large_write_stdout_devnull(self):
   231     def test_large_write_stdout_devnull(self):
   238         self._test_large_write('stdout', _devnull)
   232         self._test_large_write('stdout', _devnull)
   239 
   233 
   240     def test_large_write_stdout_pipes(self):
   234     def test_large_write_stdout_pipes(self):