在Python3中可以使用asyncio库进行异步IO操作,原理是使用协程(coroutine)。在同一个线程中,可以创建多个协程,当某个协程进行IO操作时,会将代码执行权移交给另一个协程。通过在协程间的不断切换,在等待IO操作时合理利用了CPU。
但是对于多核CPU而言,一个线程在指定时刻只能在某一个CPU核心上运行。为了同时使用多个CPU核心,需要把并发任务分配到不同的线程中,然后在每个线程中通过协程来运行。
然而,CPython有GIL,同一时间只能执行一个线程,因此不能开多个线程来并发执行。所以只能用进程来代替线程,把并发任务分配到不同的进程,以达到同时使用多个CPU核心的目的。
假若需要执行的任务是静态的,也就是说可以一次性把任务分配给各个进程,那么编程模型就很简单:在创建多个子进程时直接把需要执行的任务传递过去即可。但是现实需求往往要求能动态地添加任务,并将任务合理地分配到各个子进程中。此时的编程模型是怎样的呢?如何完成进程之间的通信?如何动态添加协程?
上图所示的模型很好地解决了需求。使用Pipe进行进程间通讯,把任务分发给各个子进程。在子进程中需要开一个线程来监听Pipe传来的任务数据,每个子进程都有其各自对应的Pipe。任务结束后,各个子进程使用同一个Queue告知主调度进程。整个流程如下:
初始化流程
-
对于主进程:
- 主进程确定需要的子进程的数量
- 主进程为每个子进程都创建一个Pipe
- 主进程创建一个共有的反馈Queue
- 主进程开启所有子进程,并依次传递对应的Pipe以及Queue
- 主进程开一个子线程,用来监听Queue传来的数据。
-
对于子进程:
- 子进程启动时,开一个子线程用来监听Pipe传来的数据
- 子进程在主线程中开启协程的事件循环,等待任务。
任务分配流程
-
对于主进程:
- 在主进程的主线程中,通过某种方式获取到了将要执行的任务。
- 主进程按照某种分配方式,把任务分配给各个子进程,通过子进程对应的Pipe按照某种协议把任务信息传递给子进程
-
对于子进程:
- 子进程在监听线程中收到了Pipe传来的数据,经过解析之后得到了明确的任务信息,之后把任务以协程的形式提交到工作线程
任务结束后
-
对于子线程:
- 任务结束时,在协程中将任务信息写入Queue中,告知主进程任务结束
-
对于主进程:
- 监听线程收到了从Queue传来的数据,经过解析之后知道了任务T已由子进程P完成。
设计好架构之后,就可以建模了。可以建立以下几个模型:
- Task: 表示任务
- TaskStatus: 表示任务结束时的状态
- TaskHandler: 处理从主进程分发的任务
- JobScheduler: 主进程中的调度器
- JobMonitor: 监听子进程的创建、任务的创建与终止
Task
Task对象通过Pipe被调度器传递到子进程,子进程通过调用其run
方法来执行任务。
class Task(object):
def __init__(self, pid, tid):
self.pid = pid
self.tid = tid
def __repr__(self) -> str:
return 'pid:{:d} tid:{}'.format(self.pid, self.tid)
async def run(self):
pass
TaskStatus
任务停止运行后,不论成功或失败,都要把其结束时的状态反馈给调度器
class TaskStatus(object):
def __init__(self, pid, tid, succeeded=True, reason='finished'):
self.pid = pid
self.tid = tid
self.succeeded = succeeded
self.reason = reason
def __repr__(self) -> str:
return 'pid:{:d} tid:{} succeeded:{:s} for reason {:s}' \
.format(self.pid, self.tid, str(self.succeeded), self.reason)
def is_succeeded(self):
return self.succeeded
TaskHandler
在子进程中控制任务的运行以及结果反馈
class TaskHandler(object):
def __init__(self, pipe: Pipe, queue: Queue, name: str = 'TaskHandler'):
self.pipe = pipe
self.queue = queue
self.loop = None
self.logger = logging.getLogger(name)
def on_task_result(self, result):
pass
def start(self):
self.loop = asyncio.get_event_loop()
threading.Thread(target=self._listen_pipe).start()
self.loop.run_forever()
def _listen_pipe(self):
while True:
task = self.pipe.recv()
asyncio.run_coroutine_threadsafe(self._handle_task(task), self.loop)
async def _handle_task(self, task: Task):
succeeded = True
reason = None
try:
result = await task.run()
self.on_task_result(result)
except Exception as e:
self.logger.error('Task {:s} failed for {:s}'.format(repr(task), repr(e)))
succeeded = False
reason = repr(e)
self.queue.put(TaskStatus(task.pid, task.tid, succeeded, reason))
JobMonitor
JobMonitor用来记录子进程、任务的相关信息
class JobMonitor(object):
def on_process_creation(self, pid):
pass
def on_task_creation(self, task: Task):
pass
def on_task_stopping(self, status: TaskStatus):
pass
JobScheduler
JobScheduler控制与子进程间的通信,以及把子进程、任务信息反馈给JobMonitor
class JobScheduler(object):
def __init__(self, pcount: int, monitor: JobMonitor = None):
self.pcount = pcount
self.pipes = {}
self.queue = Queue()
self.monitor = monitor
def start(self, handler_cls=TaskHandler, block=False):
for i in range(self.pcount):
pipe, child_pipe = Pipe()
p = Process(target=handler_cls(child_pipe, self.queue).start)
p.start()
if self.monitor is not None:
self.monitor.on_process_creation(p.pid)
self.pipes[p.pid] = pipe
if block:
self._listen_queue()
else:
threading.Thread(target=self._listen_queue).start()
def add_task(self, task: Task):
pid = random.choice(list(self.pipes.keys())) if task.pid is None else task.pid
self.pipes[pid].send(task)
if self.monitor is not None:
self.monitor.on_task_creation(task)
def _listen_queue(self):
while True:
status = self.queue.get()
if self.monitor is not None:
self.monitor.on_task_stopping(status)
示例程序
有示例程序如下,每隔2秒随机产生0至10个任务,平均分配到各个子进程。
class DemoMonitor(JobMonitor):
def __init__(self):
self.tasks = {}
self.logger = logging.getLogger('JobMonitor')
def on_process_creation(self, pid):
self.tasks[pid] = {
'running': set(),
'finished': set()
}
self.logger.info('Starting process {:d}'.format(pid))
def on_task_creation(self, task: Task):
p = self.tasks[task.pid]
p['running'].add(task.tid)
self.logger.info('Scheduled task: {:s}'.format(repr(task)))
def on_task_stopping(self, status: TaskStatus):
p = self.tasks[status.pid]
p['running'].remove(status.tid)
p['finished'].add(status.tid)
self.logger.info('Task finished: {:s}'.format(repr(status)))
class DemoTask(Task):
def __init__(self, pid, tid):
super().__init__(pid, tid)
async def run(self):
await asyncio.sleep(1)
return self.pid, self.tid, random.randint(0, 10)
class DemoHandler(TaskHandler):
def on_task_result(self, result):
self.logger.info('Task result ' + str(result))
if __name__ == '__main__':
import sys
logging.basicConfig(
format='%(asctime)s [%(name)s] %(levelname)s pid-%(process)d : %(message)s',
stream=sys.stdout,
level=logging.INFO)
monitor = DemoMonitor()
scheduler = JobScheduler(4, monitor=monitor)
scheduler.start(handler_cls=DemoHandler)
tid = 0
while True:
for i in range(random.randint(0, 10)):
pid, _ = min(monitor.tasks.items(), key=lambda x: len(x[1]['running']))
scheduler.add_task(DemoTask(pid, tid))
tid += 1
time.sleep(2)
运行结果示例:
2017-10-06 19:24:29,699 [JobMonitor] INFO pid-8456 : Starting process 8457
2017-10-06 19:24:29,701 [JobMonitor] INFO pid-8456 : Starting process 8458
2017-10-06 19:24:29,703 [JobMonitor] INFO pid-8456 : Starting process 8459
2017-10-06 19:24:29,705 [JobMonitor] INFO pid-8456 : Starting process 8460
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8457 tid:0
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8458 tid:1
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8459 tid:2
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8460 tid:3
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8457 tid:4
2017-10-06 19:24:29,707 [JobMonitor] INFO pid-8456 : Scheduled task: pid:8458 tid:5
2017-10-06 19:24:30,709 [TaskHandler] INFO pid-8459 : Task result (8459, 2, 9)
2017-10-06 19:24:30,709 [TaskHandler] INFO pid-8458 : Task result (8458, 1, 9)
2017-10-06 19:24:30,709 [TaskHandler] INFO pid-8457 : Task result (8457, 0, 4)
2017-10-06 19:24:30,710 [TaskHandler] INFO pid-8458 : Task result (8458, 5, 0)
2017-10-06 19:24:30,710 [TaskHandler] INFO pid-8457 : Task result (8457, 4, 4)
2017-10-06 19:24:30,710 [JobMonitor] INFO pid-8456 : Task finished: pid:8459 tid:2 succeeded:True for reason finished
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8458 tid:1 succeeded:True for reason finished
2017-10-06 19:24:30,710 [TaskHandler] INFO pid-8460 : Task result (8460, 3, 4)
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8457 tid:0 succeeded:True for reason finished
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8458 tid:5 succeeded:True for reason finished
2017-10-06 19:24:30,711 [JobMonitor] INFO pid-8456 : Task finished: pid:8457 tid:4 succeeded:True for reason finished
2017-10-06 19:24:30,712 [JobMonitor] INFO pid-8456 : Task finished: pid:8460 tid:3 succeeded:True for reason finished
(完)
代码见py-coroutine