在Python3中可以使用asyncio库进行异步IO操作,原理是使用协程(coroutine)。在同一个线程中,可以创建多个协程,当某个协程进行IO操作时,会将代码执行权移交给另一个协程。通过在协程间的不断切换,在等待IO操作时合理利用了CPU。

但是对于多核CPU而言,一个线程在指定时刻只能在某一个CPU核心上运行。为了同时使用多个CPU核心,需要把并发任务分配到不同的线程中,然后在每个线程中通过协程来运行。

然而,CPython有GIL,同一时间只能执行一个线程,因此不能开多个线程来并发执行。所以只能用进程来代替线程,把并发任务分配到不同的进程,以达到同时使用多个CPU核心的目的。

假若需要执行的任务是静态的,也就是说可以一次性把任务分配给各个进程,那么编程模型就很简单:在创建多个子进程时直接把需要执行的任务传递过去即可。但是现实需求往往要求能动态地添加任务,并将任务合理地分配到各个子进程中。此时的编程模型是怎样的呢?如何完成进程之间的通信?如何动态添加协程?

Screen Shot 2017-10-05 at 15.41.26

上图所示的模型很好地解决了需求。使用Pipe进行进程间通讯,把任务分发给各个子进程。在子进程中需要开一个线程来监听Pipe传来的任务数据,每个子进程都有其各自对应的Pipe。任务结束后,各个子进程使用同一个Queue告知主调度进程。整个流程如下:

初始化流程

  • 对于主进程:

    1. 主进程确定需要的子进程的数量
    2. 主进程为每个子进程都创建一个Pipe
    3. 主进程创建一个共有的反馈Queue
    4. 主进程开启所有子进程,并依次传递对应的Pipe以及Queue
    5. 主进程开一个子线程,用来监听Queue传来的数据。
  • 对于子进程:

    1. 子进程启动时,开一个子线程用来监听Pipe传来的数据
    2. 子进程在主线程中开启协程的事件循环,等待任务。

任务分配流程

  • 对于主进程:

    1. 在主进程的主线程中,通过某种方式获取到了将要执行的任务。
    2. 主进程按照某种分配方式,把任务分配给各个子进程,通过子进程对应的Pipe按照某种协议把任务信息传递给子进程
  • 对于子进程:

    1. 子进程在监听线程中收到了Pipe传来的数据,经过解析之后得到了明确的任务信息,之后把任务以协程的形式提交到工作线程

任务结束后

  • 对于子线程:

    1. 任务结束时,在协程中将任务信息写入Queue中,告知主进程任务结束
  • 对于主进程:

    1. 监听线程收到了从Queue传来的数据,经过解析之后知道了任务T已由子进程P完成。

设计好架构之后,就可以建模了。可以建立以下几个模型:

  • Task: 表示任务
  • TaskStatus: 表示任务结束时的状态
  • TaskHandler: 处理从主进程分发的任务
  • JobScheduler: 主进程中的调度器
  • JobMonitor: 监听子进程的创建、任务的创建与终止

Task

Task对象通过Pipe被调度器传递到子进程,子进程通过调用其run方法来执行任务。

class Task(object):
    def __init__(self, pid, tid):
        self.pid = pid
        self.tid = tid

    def __repr__(self) -> str:
        return 'pid:{:d} tid:{}'.format(self.pid, self.tid)

    async def run(self):
        pass

TaskStatus

任务停止运行后,不论成功或失败,都要把其结束时的状态反馈给调度器

class TaskStatus(object):
    def __init__(self, pid, tid, succeeded=True, reason='finished'):
        self.pid = pid
        self.tid = tid
        self.succeeded = succeeded
        self.reason = reason

    def __repr__(self) -> str:
        return 'pid:{:d} tid:{} succeeded:{:s} for reason {:s}' \
            .format(self.pid, self.tid, str(self.succeeded), self.reason)

    def is_succeeded(self):
        return self.succeeded

TaskHandler

在子进程中控制任务的运行以及结果反馈

class TaskHandler(object):
    def __init__(self, pipe: Pipe, queue: Queue, name: str = 'TaskHandler'):
        self.pipe = pipe
        self.queue = queue
        self.loop = None
        self.logger = logging.getLogger(name)

    def on_task_result(self, result):
        pass

    def start(self):
        self.loop = asyncio.get_event_loop()
        threading.Thread(target=self._listen_pipe).start()
        self.loop.run_forever()

    def _listen_pipe(self):
        while True:
            task = self.pipe.recv()
            asyncio.run_coroutine_threadsafe(self._handle_task(task), self.loop)

    async def _handle_task(self, task: Task):
        succeeded = True
        reason = None
        try:
            result = await task.run()
            self.on_task_result(result)
        except Exception as e:
            self.logger.error('Task {:s} failed for {:s}'.format(repr(task), repr(e)))
            succeeded = False
            reason = repr(e)
        self.queue.put(TaskStatus(task.pid, task.tid, succeeded, reason))

JobMonitor

JobMonitor用来记录子进程、任务的相关信息

class JobMonitor(object):
    def on_process_creation(self, pid):
        pass

    def on_task_creation(self, task: Task):
        pass

    def on_task_stopping(self, status: TaskStatus):
        pass

JobScheduler

JobScheduler控制与子进程间的通信,以及把子进程、任务信息反馈给JobMonitor

class JobScheduler(object):
    def __init__(self, pcount: int, monitor: JobMonitor = None):

        self.pcount = pcount
        self.pipes = {}
        self.queue = Queue()
        self.monitor = monitor

    def start(self, handler_cls=TaskHandler, block=False):
        for i in range(self.pcount):
            pipe, child_pipe = Pipe()
            p = Process(target=handler_cls(child_pipe, self.queue).start)
            p.start()

            if self.monitor is not None:
                self.monitor.on_process_creation(p.pid)

            self.pipes[p.pid] = pipe

        if block:
            self._listen_queue()
        else:
            threading.Thread(target=self._listen_queue).start()

    def add_task(self, task: Task):
        pid = random.choice(list(self.pipes.keys())) if task.pid is None else task.pid

        self.pipes[pid].send(task)

        if self.monitor is not None:
            self.monitor.on_task_creation(task)

    def _listen_queue(self):
        while True:
            status = self.queue.get()
            if self.monitor is not None:
                self.monitor.on_task_stopping(status)

示例程序

有示例程序如下,每隔2秒随机产生0至10个任务,平均分配到各个子进程。

class DemoMonitor(JobMonitor):
    def __init__(self):
        self.tasks = {}
        self.logger = logging.getLogger('JobMonitor')

    def on_process_creation(self, pid):
        self.tasks[pid] = {
            'running': set(),
            'finished': set()
        }
        self.logger.info('Starting process {:d}'.format(pid))

    def on_task_creation(self, task: Task):
        p = self.tasks[task.pid]

        p['running'].add(task.tid)

        self.logger.info('Scheduled task: {:s}'.format(repr(task)))

    def on_task_stopping(self, status: TaskStatus):
        p = self.tasks[status.pid]

        p['running'].remove(status.tid)
        p['finished'].add(status.tid)

        self.logger.info('Task finished: {:s}'.format(repr(status)))


class DemoTask(Task):
    def __init__(self, pid, tid):
        super().__init__(pid, tid)

    async def run(self):
        await asyncio.sleep(1)
        return self.pid, self.tid, random.randint(0, 10)


class DemoHandler(TaskHandler):
    def on_task_result(self, result):
        self.logger.info('Task result ' + str(result))


if __name__ == '__main__':
    import sys

    logging.basicConfig(
        format='%(asctime)s [%(name)s] %(levelname)s pid-%(process)d : %(message)s',
        stream=sys.stdout,
        level=logging.INFO)

    monitor = DemoMonitor()
    scheduler = JobScheduler(4, monitor=monitor)
    scheduler.start(handler_cls=DemoHandler)

    tid = 0
    while True:
        for i in range(random.randint(0, 10)):
            pid, _ = min(monitor.tasks.items(), key=lambda x: len(x[1]['running']))
            scheduler.add_task(DemoTask(pid, tid))
            tid += 1

        time.sleep(2)

运行结果示例:

2017-10-06 19:24:29,699 [JobMonitor] INFO pid-8456 : Starting process 8457
2017-10-06 19:24:29,701 [JobMonitor] INFO pid-8456 : Starting process 8458
2017-10-06 19:24:29,703 [JobMonitor] INFO pid-8456 : Starting process 8459
2017-10-06 19:24:29,705 [JobMonitor] INFO pid-8456 : Starting process 8460
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8457 tid:0
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8458 tid:1
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8459 tid:2
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8460 tid:3
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8457 tid:4
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8458 tid:5
2017-10-06 19:24:30,709 [TaskHandler] INFO pid-8459 : Task result (8459, 2, 9)
2017-10-06 19:24:30,709 [TaskHandler] INFO pid-8458 : Task result (8458, 1, 9)
2017-10-06 19:24:30,709 [TaskHandler] INFO pid-8457 : Task result (8457, 0, 4)
2017-10-06 19:24:30,710 [TaskHandler] INFO pid-8458 : Task result (8458, 5, 0)
2017-10-06 19:24:30,710 [TaskHandler] INFO pid-8457 : Task result (8457, 4, 4)
2017-10-06 19:24:30,710 [JobMonitor] INFO pid-8456 : Task finished: pid:8459 tid:2 succeeded:True for reason finished
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8458 tid:1 succeeded:True for reason finished
2017-10-06 19:24:30,710 [TaskHandler] INFO pid-8460 : Task result (8460, 3, 4)
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8457 tid:0 succeeded:True for reason finished
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8458 tid:5 succeeded:True for reason finished
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8457 tid:4 succeeded:True for reason finished
2017-10-06 19:24:30,712 [JobMonitor] INFO pid-8456 : Task finished: pid:8460 tid:3 succeeded:True for reason finished

(完)

代码见py-coroutine