首页 > 使用multiprocessing.Process调用start方法后,有较小的几率子进程中run方法未执行

使用multiprocessing.Process调用start方法后,有较小的几率子进程中run方法未执行

继承multiprocessing.Process实现了一个worker类,在父进程中,自己实现了一个最多启动N的限制(出问题的环境是30个)。
实际运行中发现,大约有万分之二(当前每天运行46000+次,大约出现11次)的概率,子进程创建后run方法未执行。
代码和日志如下,注意打印日志的语句
父进程启动子进程(父进程里还有一个控制并发进程数量的逻辑,如果需要的话我贴出来):

...
    def run_task(self, task):
        logging.info('execute monitor %s' % task['id'])
        worker = execute_worker.ExecuteWorkerProcess(task)
        logging.debug('execute process start %s' % task['id'])
        worker.start()
        logging.info('worker pid is %s (%s)' % (worker.pid, task['id']))
        logging.debug('execute process started %s' % task['id'])
        self.worker_pool.append(worker)
...

子进程run方法

class ExecuteWorkerProcess(multiprocessing.Process):
...
    def __init__(self, task):
        super(ExecuteWorkerProcess, self).__init__()
        self.stopping = False
        self.task = task
        self.worker = ExecuteWorker(task)
        if 'task' in task:
            self.routine = False
        else:
            self.routine = True
        self.zk = None
        logging.debug('process created %s' % self.task['id'])
...
    def run(self):
        logging.debug('process start %s' % self.task['id'])
        try:
            logging.debug('process run before %s' % self.task['id'])
            self._run()
            logging.debug('process run after %s' % self.task['id'])
        except:
            logging.exception('')
            title = u'监控执行进程报错'
            text = u'监控项id:%s\n错误信息:\n%s' % (self.task['id'], traceback.format_exc())
            warning.email_warning(title, text, to_dev=True)
        logging.debug('process start done %s' % self.task['id'])
...

出现问题的进程日志如下:

正常任务日志如下:

可以看到正常和异常的日志主进程中都打印除了子进程的pid,但是异常继承子进程run行数的第一行没有执行。
是否有人遇到过?这个是不是multiprocessing.Process的坑,有没有规避办法...


原因已找到,由于主进程中使用了thread+mutiprocessing(fork),导致logging出现死锁,现象就是遇到子进程里第一句logging就hang住。问题只会发生在Linux下。
看了stckoverflow这个答案找到的复现方法,另一个回答,解决方案
复现demo:

#coding=utf-8
import os
import time
import logging
import threading
import multiprocessing
import logging.handlers


def init_log(log_path, level=logging.INFO, when="midnight", backup=7,
             format="%(levelname)s:[%(asctime)s][%(filename)s:%(lineno)d][%(process)s][%(thread)d] %(message)s",
             datefmt="%Y-%m-%d %H:%M:%S"):
    formatter = logging.Formatter(format, datefmt)
    logger = logging.getLogger()
    logger.setLevel(level)

    dir = os.path.dirname(log_path)
    if not os.path.isdir(dir):
        os.makedirs(dir)

    handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log",
                                                        when=when,
                                                        backupCount=backup)
    handler.setLevel(level)
    handler.setFormatter(formatter)
    logger.addHandler(handler)


stop = False

class Worker(multiprocessing.Process):
    u"""
    多进程方式执行任务,使用CPU密集型
    """
    def __init__(self):
        super(Worker, self).__init__()
        self.reset_ts = time.time() + 3
        self.stopping = False

    def run(self):
        logging.info('Process pid is %s' % os.getpid())

    def check_timeout(self, now):
        u"""
        若当前时间已超过复位时间,则结束进程
        :param now:
        :return:
        """
        global stop
        if now > self.reset_ts and not self.stopping:
            self.stopping = True
            logging.info('error pid is %s' % os.getpid())
            stop = True


def main():
    global stop
    worker_pool = []
    while 1:
        if worker_pool:
            now = time.time()
            for worker in worker_pool:
                worker.check_timeout(now)
                if stop:
                    logging.error('Process not run, exit!')
                    exit(-1)

        alive_workers = [worker for worker in worker_pool if worker.is_alive()]
        over_workers = list(set(alive_workers) ^ set(worker_pool))
        for over_worker in over_workers:
            over_worker.join()
        worker_pool = alive_workers
        if len(worker_pool) < 1000:
            logging.info('create worker')
            worker = Worker()
            worker.start()
            logging.info('worker pid is %s' % worker.pid)
            worker_pool.append(worker)
        time.sleep(0.001)


class ExecuteThread(threading.Thread):
    def run(self):
        main()

class ExecuteThread2(threading.Thread):
    def run(self):
        global stop
        while 1:
            logging.info('main thread')
            time.sleep(0.001)
            if stop:
                exit(-1)

if __name__ == '__main__':
    init_log('/yourpath/timeout')
    #main()

    thread = ExecuteThread()
    thread2 = ExecuteThread2()
    thread.start()
    thread2.start()
    thread.join()
    thread2.join()

复现完了记得清掉hang住的进程....

【热门文章】
【热门文章】