3 # Copyright 2013 Facebook, Inc. |
3 # Copyright 2013 Facebook, Inc. |
4 # |
4 # |
5 # This software may be used and distributed according to the terms of the |
5 # This software may be used and distributed according to the terms of the |
6 # GNU General Public License version 2 or any later version. |
6 # GNU General Public License version 2 or any later version. |
7 |
7 |
8 from __future__ import absolute_import |
8 |
9 |
|
10 import errno |
|
11 import os |
9 import os |
|
10 import pickle |
|
11 import selectors |
12 import signal |
12 import signal |
13 import sys |
13 import sys |
14 import threading |
14 import threading |
15 import time |
15 import time |
16 |
|
17 try: |
|
18 import selectors |
|
19 |
|
20 selectors.BaseSelector |
|
21 except ImportError: |
|
22 from .thirdparty import selectors2 as selectors |
|
23 |
16 |
24 from .i18n import _ |
17 from .i18n import _ |
25 from . import ( |
18 from . import ( |
26 encoding, |
19 encoding, |
27 error, |
20 error, |
28 pycompat, |
21 pycompat, |
29 scmutil, |
22 scmutil, |
30 util, |
|
31 ) |
23 ) |
32 |
24 |
33 |
25 |
34 def countcpus(): |
26 def countcpus(): |
35 '''try to count the number of CPUs on the system''' |
27 '''try to count the number of CPUs on the system''' |
63 except ValueError: |
55 except ValueError: |
64 raise error.Abort(_(b'number of cpus must be an integer')) |
56 raise error.Abort(_(b'number of cpus must be an integer')) |
65 return min(max(countcpus(), 4), 32) |
57 return min(max(countcpus(), 4), 32) |
66 |
58 |
67 |
59 |
68 if pycompat.ispy3: |
60 def ismainthread(): |
69 |
61 return threading.current_thread() == threading.main_thread() |
70 def ismainthread(): |
62 |
71 return threading.current_thread() == threading.main_thread() |
63 |
72 |
64 class _blockingreader: |
73 class _blockingreader(object): |
65 """Wrap unbuffered stream such that pickle.load() works with it. |
74 def __init__(self, wrapped): |
66 |
75 self._wrapped = wrapped |
67 pickle.load() expects that calls to read() and readinto() read as many |
76 |
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case, |
77 # Do NOT implement readinto() by making it delegate to |
69 pickle.load() raises an EOFError. |
78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine |
70 """ |
79 # with just read() and readline(), so we don't need to implement it. |
71 |
80 |
72 def __init__(self, wrapped): |
81 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2): |
73 self._wrapped = wrapped |
82 |
74 |
83 # This is required for python 3.8, prior to 3.8.2. See issue6444. |
75 def readline(self): |
84 def readinto(self, b): |
76 return self._wrapped.readline() |
85 pos = 0 |
77 |
86 size = len(b) |
78 def readinto(self, buf): |
87 |
79 pos = 0 |
88 while pos < size: |
80 size = len(buf) |
89 ret = self._wrapped.readinto(b[pos:]) |
81 |
90 if not ret: |
82 with memoryview(buf) as view: |
91 break |
|
92 pos += ret |
|
93 |
|
94 return pos |
|
95 |
|
96 def readline(self): |
|
97 return self._wrapped.readline() |
|
98 |
|
99 # issue multiple reads until size is fulfilled |
|
100 def read(self, size=-1): |
|
101 if size < 0: |
|
102 return self._wrapped.readall() |
|
103 |
|
104 buf = bytearray(size) |
|
105 view = memoryview(buf) |
|
106 pos = 0 |
|
107 |
|
108 while pos < size: |
83 while pos < size: |
109 ret = self._wrapped.readinto(view[pos:]) |
84 with view[pos:] as subview: |
|
85 ret = self._wrapped.readinto(subview) |
110 if not ret: |
86 if not ret: |
111 break |
87 break |
112 pos += ret |
88 pos += ret |
113 |
89 |
114 del view |
90 return pos |
115 del buf[pos:] |
91 |
116 return bytes(buf) |
92 # issue multiple reads until size is fulfilled (or EOF is encountered) |
117 |
93 def read(self, size=-1): |
118 |
94 if size < 0: |
119 else: |
95 return self._wrapped.readall() |
120 |
96 |
121 def ismainthread(): |
97 buf = bytearray(size) |
122 # pytype: disable=module-attr |
98 n_read = self.readinto(buf) |
123 return isinstance(threading.current_thread(), threading._MainThread) |
99 del buf[n_read:] |
124 # pytype: enable=module-attr |
100 return bytes(buf) |
125 |
|
126 def _blockingreader(wrapped): |
|
127 return wrapped |
|
128 |
101 |
129 |
102 |
130 if pycompat.isposix or pycompat.iswindows: |
103 if pycompat.isposix or pycompat.iswindows: |
131 _STARTUP_COST = 0.01 |
104 _STARTUP_COST = 0.01 |
132 # The Windows worker is thread based. If tasks are CPU bound, threads |
105 # The Windows worker is thread based. If tasks are CPU bound, threads |
201 signal.signal(signal.SIGCHLD, oldchldhandler) |
174 signal.signal(signal.SIGCHLD, oldchldhandler) |
202 # if one worker bails, there's no good reason to wait for the rest |
175 # if one worker bails, there's no good reason to wait for the rest |
203 for p in pids: |
176 for p in pids: |
204 try: |
177 try: |
205 os.kill(p, signal.SIGTERM) |
178 os.kill(p, signal.SIGTERM) |
206 except OSError as err: |
179 except ProcessLookupError: |
207 if err.errno != errno.ESRCH: |
180 pass |
208 raise |
|
209 |
181 |
210 def waitforworkers(blocking=True): |
182 def waitforworkers(blocking=True): |
211 for pid in pids.copy(): |
183 for pid in pids.copy(): |
212 p = st = 0 |
184 p = st = 0 |
213 while True: |
185 try: |
214 try: |
186 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG)) |
215 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG)) |
187 except ChildProcessError: |
216 break |
188 # child would already be reaped, but pids yet been |
217 except OSError as e: |
189 # updated (maybe interrupted just after waitpid) |
218 if e.errno == errno.EINTR: |
190 pids.discard(pid) |
219 continue |
|
220 elif e.errno == errno.ECHILD: |
|
221 # child would already be reaped, but pids yet been |
|
222 # updated (maybe interrupted just after waitpid) |
|
223 pids.discard(pid) |
|
224 break |
|
225 else: |
|
226 raise |
|
227 if not p: |
191 if not p: |
228 # skip subsequent steps, because child process should |
192 # skip subsequent steps, because child process should |
229 # be still running in this case |
193 # be still running in this case |
230 continue |
194 continue |
231 pids.discard(p) |
195 pids.discard(p) |
268 def workerfunc(): |
232 def workerfunc(): |
269 for r, w in pipes[:-1]: |
233 for r, w in pipes[:-1]: |
270 os.close(r) |
234 os.close(r) |
271 os.close(w) |
235 os.close(w) |
272 os.close(rfd) |
236 os.close(rfd) |
273 for result in func(*(staticargs + (pargs,))): |
237 with os.fdopen(wfd, 'wb') as wf: |
274 os.write(wfd, util.pickle.dumps(result)) |
238 for result in func(*(staticargs + (pargs,))): |
|
239 pickle.dump(result, wf) |
|
240 wf.flush() |
275 return 0 |
241 return 0 |
276 |
242 |
277 ret = scmutil.callcatch(ui, workerfunc) |
243 ret = scmutil.callcatch(ui, workerfunc) |
278 except: # parent re-raises, child never returns |
244 except: # parent re-raises, child never returns |
279 if os.getpid() == parentpid: |
245 if os.getpid() == parentpid: |
291 os._exit(ret & 255) |
257 os._exit(ret & 255) |
292 pids.add(pid) |
258 pids.add(pid) |
293 selector = selectors.DefaultSelector() |
259 selector = selectors.DefaultSelector() |
294 for rfd, wfd in pipes: |
260 for rfd, wfd in pipes: |
295 os.close(wfd) |
261 os.close(wfd) |
|
262 # The stream has to be unbuffered. Otherwise, if all data is read from |
|
263 # the raw file into the buffer, the selector thinks that the FD is not |
|
264 # ready to read while pickle.load() could read from the buffer. This |
|
265 # would delay the processing of readable items. |
296 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) |
266 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) |
297 |
267 |
298 def cleanup(): |
268 def cleanup(): |
299 signal.signal(signal.SIGINT, oldhandler) |
269 signal.signal(signal.SIGINT, oldhandler) |
300 waitforworkers() |
270 waitforworkers() |
305 try: |
275 try: |
306 openpipes = len(pipes) |
276 openpipes = len(pipes) |
307 while openpipes > 0: |
277 while openpipes > 0: |
308 for key, events in selector.select(): |
278 for key, events in selector.select(): |
309 try: |
279 try: |
310 res = util.pickle.load(_blockingreader(key.fileobj)) |
280 # The pytype error likely goes away on a modern version of |
|
281 # pytype having a modern typeshed snapshot. |
|
282 # pytype: disable=wrong-arg-types |
|
283 res = pickle.load(_blockingreader(key.fileobj)) |
|
284 # pytype: enable=wrong-arg-types |
311 if hasretval and res[0]: |
285 if hasretval and res[0]: |
312 retval.update(res[1]) |
286 retval.update(res[1]) |
313 else: |
287 else: |
314 yield res |
288 yield res |
315 except EOFError: |
289 except EOFError: |
316 selector.unregister(key.fileobj) |
290 selector.unregister(key.fileobj) |
|
291 # pytype: disable=attribute-error |
317 key.fileobj.close() |
292 key.fileobj.close() |
|
293 # pytype: enable=attribute-error |
318 openpipes -= 1 |
294 openpipes -= 1 |
319 except IOError as e: |
|
320 if e.errno == errno.EINTR: |
|
321 continue |
|
322 raise |
|
323 except: # re-raises |
295 except: # re-raises |
324 killworkers() |
296 killworkers() |
325 cleanup() |
297 cleanup() |
326 raise |
298 raise |
327 status = cleanup() |
299 status = cleanup() |