- Notifications
You must be signed in to change notification settings - Fork95
Open
Labels
Description
函数create_items、multiply_items部分无注释,看了好久才明白。
第一个费解的地方pipe_1 = multiprocessing.Pipe(True),不知道这个Pipe具体干了什么的,查看源码后发现就是弄了两个队列,然后建立连接,至于至于send,就是使用的Queue的put函数,recv就是使用的Queue的get函数,用两个队列实现双工。
# 源代码位于multiprocessing/dummy/connection.pydefPipe(duplex=True):a,b=Queue(),Queue()returnConnection(a,b),Connection(b,a)classConnection(object):def__init__(self,_in,_out):self._out=_outself._in=_inself.send=self.send_bytes=_out.putself.recv=self.recv_bytes=_in.getdefpoll(self,timeout=0.0):ifself._in.qsize()>0:returnTrueiftimeout<=0.0:returnFalsewithself._in.not_empty:self._in.not_empty.wait(timeout)returnself._in.qsize()>0defclose(self):passdef__enter__(self):returnselfdef__exit__(self,exc_type,exc_value,exc_tb):self.close()
自己写了一下文中的实现,p1_a与p1_b是一组能够通信的,p2_a与p2_b是一组互相能通信的
importmultiprocessingdefcreate_items(pipe):p1_a,p1_b=pipeforiteminrange(10):p1_a.send(item)p1_a.close()defmultiply_items(pipe_1,pipe_2):p1_a,p1_b=pipe_1p1_a.close()p2_a,p2_b=pipe_2try:whileTrue:item=p1_b.recv()p2_a.send(item*item)exceptEOFError:p2_a.close()if__name__=='__main__':# 第一个进程管道发出数字pipe_1=multiprocessing.Pipe(True)process_pipe_1=multiprocessing.Process(target=create_items,args=(pipe_1,))process_pipe_1.start()# 第二个进程管道接收数字并计算pipe_2=multiprocessing.Pipe(True)process_pipe_2=multiprocessing.Process(target=multiply_items,args=(pipe_1,pipe_2,))process_pipe_2.start()pipe_1[0].close()pipe_2[0].close()try:whileTrue:print(pipe_2[1].recv())exceptEOFError:print("End")