32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' |
32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' |
33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' |
33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' |
34 |
34 |
35 |
35 |
36 TEST_LARGE_WRITE_CHILD_SCRIPT = r''' |
36 TEST_LARGE_WRITE_CHILD_SCRIPT = r''' |
|
37 import os |
37 import signal |
38 import signal |
38 import sys |
39 import sys |
39 |
40 |
40 from mercurial import dispatch |
41 from mercurial import dispatch |
41 from mercurial.utils import procutil |
42 from mercurial.utils import procutil |
42 |
43 |
43 signal.signal(signal.SIGINT, lambda *x: None) |
44 signal.signal(signal.SIGINT, lambda *x: None) |
44 dispatch.initstdio() |
45 dispatch.initstdio() |
45 write_result = procutil.{stream}.write(b'x' * 1048576) |
46 write_result = procutil.{stream}.write(b'x' * 1048576) |
46 with open({write_result_fn}, 'w') as write_result_f: |
47 with os.fdopen( |
|
48 os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)), |
|
49 'w', |
|
50 ) as write_result_f: |
47 write_result_f.write(str(write_result)) |
51 write_result_f.write(str(write_result)) |
48 ''' |
52 ''' |
49 |
53 |
50 |
54 |
51 @contextlib.contextmanager |
55 @contextlib.contextmanager |
199 self.assertEqual( |
203 self.assertEqual( |
200 _readall(stream_receiver, 131072, buf), b'x' * 1048576 |
204 _readall(stream_receiver, 131072, buf), b'x' * 1048576 |
201 ) |
205 ) |
202 |
206 |
203 def post_child_check(): |
207 def post_child_check(): |
204 with open(write_result_fn, 'r') as write_result_f: |
208 write_result_str = write_result_f.read() |
205 write_result_str = write_result_f.read() |
|
206 if pycompat.ispy3: |
209 if pycompat.ispy3: |
207 # On Python 3, we test that the correct number of bytes is |
210 # On Python 3, we test that the correct number of bytes is |
208 # claimed to have been written. |
211 # claimed to have been written. |
209 expected_write_result_str = '1048576' |
212 expected_write_result_str = '1048576' |
210 else: |
213 else: |
211 # On Python 2, we only check that the large write does not |
214 # On Python 2, we only check that the large write does not |
212 # crash. |
215 # crash. |
213 expected_write_result_str = 'None' |
216 expected_write_result_str = 'None' |
214 self.assertEqual(write_result_str, expected_write_result_str) |
217 self.assertEqual(write_result_str, expected_write_result_str) |
215 |
218 |
216 try: |
219 with tempfile.NamedTemporaryFile('r') as write_result_f: |
217 # tempfile.mktemp() is unsafe in general, as a malicious process |
|
218 # could create the file before we do. But in tests, we're running |
|
219 # in a controlled environment. |
|
220 write_result_fn = tempfile.mktemp() |
|
221 self._test( |
220 self._test( |
222 TEST_LARGE_WRITE_CHILD_SCRIPT.format( |
221 TEST_LARGE_WRITE_CHILD_SCRIPT.format( |
223 stream=stream, write_result_fn=repr(write_result_fn) |
222 stream=stream, write_result_fn=write_result_f.name |
224 ), |
223 ), |
225 stream, |
224 stream, |
226 rwpair_generator, |
225 rwpair_generator, |
227 check_output, |
226 check_output, |
228 python_args, |
227 python_args, |
229 post_child_check=post_child_check, |
228 post_child_check=post_child_check, |
230 ) |
229 ) |
231 finally: |
|
232 try: |
|
233 os.unlink(write_result_fn) |
|
234 except OSError: |
|
235 pass |
|
236 |
230 |
237 def test_large_write_stdout_devnull(self): |
231 def test_large_write_stdout_devnull(self): |
238 self._test_large_write('stdout', _devnull) |
232 self._test_large_write('stdout', _devnull) |
239 |
233 |
240 def test_large_write_stdout_pipes(self): |
234 def test_large_write_stdout_pipes(self): |