mercurial/worker.py
author Yuya Nishihara <yuya@tcha.org>
Thu, 17 Nov 2016 21:43:01 +0900
changeset 30422 0e6ce6313e47
parent 30416 c27614f2dec1
child 30423 237b2883cbd8
permissions -rw-r--r--
worker: fix missed break on successful waitpid() Follow-up for 5069a8a40b1b.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     1
# worker.py - master-slave parallelism support
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     2
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     3
# Copyright 2013 Facebook, Inc.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     4
#
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     5
# This software may be used and distributed according to the terms of the
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     6
# GNU General Public License version 2 or any later version.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
     7
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     8
from __future__ import absolute_import
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
     9
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    10
import errno
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    11
import os
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    12
import signal
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    13
import sys
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    14
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
    15
from .i18n import _
30396
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    16
from . import (
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    17
    error,
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    18
    util,
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
    19
)
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    20
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    21
def countcpus():
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    22
    '''try to count the number of CPUs on the system'''
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    23
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    24
    # posix
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
    25
    try:
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    26
        n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    27
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    28
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    29
    except (AttributeError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    30
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    31
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    32
    # windows
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    33
    try:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    34
        n = int(os.environ['NUMBER_OF_PROCESSORS'])
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    35
        if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    36
            return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    37
    except (KeyError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    38
        pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    39
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
    40
    return 1
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    41
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    42
def _numworkers(ui):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    43
    s = ui.config('worker', 'numcpus')
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    44
    if s:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    45
        try:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    46
            n = int(s)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    47
            if n >= 1:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    48
                return n
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    49
        except ValueError:
26587
56b2bcea2529 error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents: 26568
diff changeset
    50
            raise error.Abort(_('number of cpus must be an integer'))
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    51
    return min(max(countcpus(), 4), 32)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    52
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    53
if os.name == 'posix':
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    54
    _startupcost = 0.01
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    55
else:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    56
    _startupcost = 1e30
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    57
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    58
def worthwhile(ui, costperop, nops):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    59
    '''try to determine whether the benefit of multiple processes can
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    60
    outweigh the cost of starting them'''
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    61
    linear = costperop * nops
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    62
    workers = _numworkers(ui)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    63
    benefit = linear - (_startupcost * workers + linear / workers)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
    64
    return benefit >= 0.15
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
    65
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    66
def worker(ui, costperarg, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    67
    '''run a function, possibly in parallel in multiple worker
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    68
    processes.
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    69
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    70
    returns a progress iterator
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    71
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    72
    costperarg - cost of a single task
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    73
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    74
    func - function to run
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    75
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    76
    staticargs - arguments to pass to every invocation of the function
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    77
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    78
    args - arguments to split into chunks, to pass to individual
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    79
    workers
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    80
    '''
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    81
    if worthwhile(ui, costperarg, len(args)):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    82
        return _platformworker(ui, func, staticargs, args)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    83
    return func(*staticargs + (args,))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    84
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    85
def _posixworker(ui, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    86
    rfd, wfd = os.pipe()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
    87
    workers = _numworkers(ui)
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
    88
    oldhandler = signal.getsignal(signal.SIGINT)
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
    89
    signal.signal(signal.SIGINT, signal.SIG_IGN)
30413
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30412
diff changeset
    90
    pids, problem = set(), [0]
30410
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    91
    def killworkers():
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    92
        # if one worker bails, there's no good reason to wait for the rest
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    93
        for p in pids:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    94
            try:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    95
                os.kill(p, signal.SIGTERM)
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    96
            except OSError as err:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    97
                if err.errno != errno.ESRCH:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
    98
                    raise
30412
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30411
diff changeset
    99
    def waitforworkers(blocking=True):
30414
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   100
        for pid in pids.copy():
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   101
            p = st = 0
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   102
            while True:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   103
                try:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   104
                    p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
30422
0e6ce6313e47 worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents: 30416
diff changeset
   105
                    break
30414
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   106
                except OSError as e:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   107
                    if e.errno == errno.EINTR:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   108
                        continue
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   109
                    elif e.errno == errno.ECHILD:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   110
                        break # ignore ECHILD
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   111
                    else:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   112
                        raise
30412
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30411
diff changeset
   113
            if p:
30414
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30413
diff changeset
   114
                pids.remove(p)
30412
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30411
diff changeset
   115
                st = _exitstatus(st)
30410
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   116
            if st and not problem[0]:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   117
                problem[0] = st
30415
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   118
                # unregister SIGCHLD handler as all children will be killed
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   119
                signal.signal(signal.SIGCHLD, oldchldhandler)
30410
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30396
diff changeset
   120
                killworkers()
30415
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   121
    def sigchldhandler(signum, frame):
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   122
        waitforworkers(blocking=False)
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   123
    oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   124
    for pargs in partition(args, workers):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   125
        pid = os.fork()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   126
        if pid == 0:
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
   127
            signal.signal(signal.SIGINT, oldhandler)
30415
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   128
            signal.signal(signal.SIGCHLD, oldchldhandler)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   129
            try:
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   130
                os.close(rfd)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   131
                for i, item in func(*(staticargs + (pargs,))):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   132
                    os.write(wfd, '%d %s\n' % (i, item))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   133
                os._exit(0)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   134
            except KeyboardInterrupt:
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   135
                os._exit(255)
19408
c7ec39c1a381 worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents: 19406
diff changeset
   136
                # other exceptions are allowed to propagate, we rely
c7ec39c1a381 worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents: 19406
diff changeset
   137
                # on lock.py's pid checks to avoid release callbacks
30413
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30412
diff changeset
   138
        pids.add(pid)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   139
    os.close(wfd)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   140
    fp = os.fdopen(rfd, 'rb', 0)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   141
    def cleanup():
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   142
        signal.signal(signal.SIGINT, oldhandler)
30416
c27614f2dec1 worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents: 30415
diff changeset
   143
        waitforworkers()
30415
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30414
diff changeset
   144
        signal.signal(signal.SIGCHLD, oldchldhandler)
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   145
        status = problem[0]
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   146
        if status:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   147
            if status < 0:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   148
                os.kill(os.getpid(), -status)
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   149
            sys.exit(status)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   150
    try:
30396
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
   151
        for line in util.iterfile(fp):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   152
            l = line.split(' ', 1)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   153
            yield int(l[0]), l[1][:-1]
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   154
    except: # re-raises
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
   155
        killworkers()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   156
        cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   157
        raise
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   158
    cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   159
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   160
def _posixexitstatus(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   161
    '''convert a posix exit status into the same form returned by
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   162
    os.spawnv
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   163
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   164
    returns None if the process was stopped instead of exiting'''
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   165
    if os.WIFEXITED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   166
        return os.WEXITSTATUS(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   167
    elif os.WIFSIGNALED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   168
        return -os.WTERMSIG(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   169
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   170
if os.name != 'nt':
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   171
    _platformworker = _posixworker
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
   172
    _exitstatus = _posixexitstatus
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
   173
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
   174
def partition(lst, nslices):
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   175
    '''partition a list into N slices of roughly equal size
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   176
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   177
    The current strategy takes every Nth element from the input. If
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   178
    we ever write workers that need to preserve grouping in input
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   179
    we should consider allowing callers to specify a partition strategy.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   180
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   181
    mpm is not a fan of this partitioning strategy when files are involved.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   182
    In his words:
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   183
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   184
        Single-threaded Mercurial makes a point of creating and visiting
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   185
        files in a fixed order (alphabetical). When creating files in order,
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   186
        a typical filesystem is likely to allocate them on nearby regions on
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   187
        disk. Thus, when revisiting in the same order, locality is maximized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   188
        and various forms of OS and disk-level caching and read-ahead get a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   189
        chance to work.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   190
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   191
        This effect can be quite significant on spinning disks. I discovered it
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   192
        circa Mercurial v0.4 when revlogs were named by hashes of filenames.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   193
        Tarring a repo and copying it to another disk effectively randomized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   194
        the revlog ordering on disk by sorting the revlogs by hash and suddenly
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   195
        performance of my kernel checkout benchmark dropped by ~10x because the
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   196
        "working set" of sectors visited no longer fit in the drive's cache and
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   197
        the workload switched from streaming to random I/O.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   198
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   199
        What we should really be doing is have workers read filenames from a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   200
        ordered queue. This preserves locality and also keeps any worker from
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
   201
        getting more than one file out of balance.
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   202
    '''
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   203
    for i in range(nslices):
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
   204
        yield lst[i::nslices]