python多进程程序设计 之四
python多进程程序设计 之四
- 进程间通信
- Pipes
- 产生管道
- 实列代码
- Queues
- 构造器
- put
- get
- 实列代码
- Connection
- Client
- Listener类
- 构造器
- accept
- close
- 实列代码
- 代码
进程间通信
当使用多个进程时,通常使用消息传递来进行进程之间的通信,并避免使用任何同步原语。
为了传递消息,可以使用 Pipe() ,仅仅用于两个进程之间的连接;或队列,(允许多个生产者和消费者)。
Queue、SimpleQueue 和 JoinableQueue 类型是多生产者、多消费者 FIFO 队列,以标准库中的queue.Queue 类为模型。
如果您使用 JoinableQueue,则必须为从队列中删除的每个任务调用 JoinableQueue.task_done() ,否则用于计算未完成任务数量的信号量最终可能会溢出,引发异常。
与其他 Python 队列实现的一个区别是,multiprocessing队列使用pickle,序列化放入其中的所有对象。 get方法返回的对象是重新创建的对象,不与原始对象共享内存。
Pipes
产生管道
词法:multiprocessing.Pipe([duplex])
Pipe返回连接对象的 (conn1, conn2),它是管道的端点。
如果参数duplex为True(默认值),则管道是双向的。如果 duplex 为 False,则管道是单向的。
- conn1 只能用于接收消息
- conn2 只能用于发送消息
send() 方法使用 pickle,序列化对象,而 receive() 重新创建该对象。
实列代码
下面代码中,Consumer和Producer进程使用Pipe进行数据传送
import time, random
from multiprocessing import *def Consumer(r):for i in range(10):msg = r.recv()print("Process: {1} msg: {0}".format(msg, current_process().name))r.close()def Producer(w):for i in range(10):w.send((i, current_process().name))time.sleep(10)w.close()if __name__ == '__main__':r, w = Pipe(duplex=False)p1 = Process(target=Producer, args=(w,))p2 = Process(target=Consumer, args=(r,))p1.start()p2.start()p1.join()p2.join()
显示器输出
Process: Process-2 msg: (0, 'Process-1')
Process: Process-2 msg: (1, 'Process-1')
Process: Process-2 msg: (2, 'Process-1')
Process: Process-2 msg: (3, 'Process-1')
Process: Process-2 msg: (4, 'Process-1')
Process: Process-2 msg: (5, 'Process-1')
Process: Process-2 msg: (6, 'Process-1')
Process: Process-2 msg: (7, 'Process-1')
Process: Process-2 msg: (8, 'Process-1')
Process: Process-2 msg: (9, 'Process-1')
Queues
构造器
词法:multiprocessing.Queue([maxsize])
multiprocessing模块中的Queue返回一个进程共享队列,这个队列使用管道和一些锁/信号灯实现的。当进程第一次将一个数据放入队列时,将启动供给线程,这个供给线程将对象从缓冲区传输到管道中。
multiprocessing模块中的Queue 实现了标准库中queue.Queue的所有方法,但task_done() 和join()除外。
put
词法:put(obj[, block[, timeout]])
将 obj 放入队列中。依据可选参数block,put有下列不同的行为:
- block为True(默认值),且timeout为None(默认值),那么,如果没有空闲槽可用,则调用进程被阻塞,直到有空闲槽可用。如果timeout 是正数,则调用进程最多会阻塞timeout秒,如果在该时间内没有可用的空闲插槽,则产生queue.Full 异常。
- block为False,如果空闲槽立即可用,则将一个项目放入队列中,否则引发queue.Full 异常。
get
词法:get([block[, timeout]])
将一个项目从队列中删除,并返回这个项目。根据可选参数block的值,get有下列不同的行为
- block为True(默认值)且timeout为 None(默认值),那么,如果队列是空,则调用进程被阻塞,直到有项目可用为止。如果timeout是正数,则它最多会阻塞timeout秒,如果在该时间内没有可用的项目,则会产生queue.Empty异常。
- block为False,如果队列有一项可用,则返回该项,否则产生queue.Empty 异常。
实列代码
import time
import randomfrom multiprocessing import *def worker(inputQ, outputQ):for func, args in iter(inputQ.get, 'STOP'):result = calculate(func, args)outputQ.put(result)def calculate(func, args):result = func(*args)return '%s says that %s%s = %s' % \(current_process().name, func.__name__, args, result)def mul(a, b):time.sleep(0.5*random.random())return a * bdef test():NUMBER_OF_PROCESSES = 3TASKS1 = [(mul, (i, 7)) for i in range(5)]# Create queuestask_queue = Queue()done_queue = Queue()# Submit tasksfor task in TASKS1:task_queue.put(task)# Start worker processesfor i in range(NUMBER_OF_PROCESSES):p = Process(target=worker, args=(task_queue, done_queue))p.start()# Get and print resultsprint('Unordered results:')for i in range(len(TASKS1)):print('\t', done_queue.get())# Tell child processes to stopfor i in range(NUMBER_OF_PROCESSES):task_queue.put('STOP')if __name__ == '__main__':freeze_support()test()
Unordered results:Process-1 says that mul(0, 7) = 0Process-3 says that mul(2, 7) = 14Process-1 says that mul(3, 7) = 21Process-2 says that mul(1, 7) = 7Process-3 says that mul(4, 7) = 28
Connection
连接对象允许发送和接收可序列化的对象或字符串。它们可以被认为是面向消息的连接套接字。
连接对象通常使用 Pipe 创建 - 另请参见侦听器和客户端。
Client
词法:multiprocessing.connection.Client(address[, family[, authkey]])
尝试与使用address地址的listener建立连接,返回一个连接。
连接的类型由参数family确定,但这通常可以省略,因为它通常可以从地址的格式推断出来。
如果authkey不是 None,则它应该是一个字节串,并将用作基于HMAC的身份验证质询的密钥。如果authkey为None,则不进行身份验证。如果身份验证失败,则会引发 AuthenticationError。
Listener类
构造器
词法:multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])
绑定套接字或 Windows 命名管道的包装器,用于“监听”连接。
- address,是listener对象的绑定套接字,或命名管道要使用的地址。
- family,是要使用的套接字,或命名管道的类型。这可以是字符串,“AF_INET”,用于 TCP 套接字;“AF_UNIX”,用于 Unix 域套接字;“AF_PIPE”,用于 Windows 命名管道。其中只有第一个保证可用。如果 family 为 None,则从地址格式推断出 family。如果地址也是None,则选择默认值。此默认值是假定最快可用的系列。请参阅地址格式。如果 family 为“AF_UNIX”,且地址为 None,则将在使用 tempfile.mkstemp() 创建的私有临时目录中创建套接字。
- backlog,如果listener对象使用套接字,则在绑定后,backlog(默认为 1)将传递给套接字的 Listen() 方法。
- authkey,如果给出了authkey,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证质询的密钥。如果 authkey为None,则不进行身份验证。如果身份验证失败,则会引发 AuthenticationError。
accept
词法:accept()
接受listener对象的绑定套接字,或命名管道上的连接,并返回Connection对象。如果尝试进行身份验证,但失败,则会产生AuthenticationError。
close
词法:close()
关闭listener对象的绑定套接字,命名管道。当listener被垃圾收集时,会自动调用此函数。
建议明确地调用它。
实列代码
该实列启动两个进程,
- conn_server,产生一个Listener对象,然后,使用accept,等待来自其他进程的连接请求。连接成功后,发送数据,接收数据
- conn_client,使用Client函数,请求一个连接。连接成功后,接收数据,发送数据。
代码
from multiprocessing import *
from multiprocessing.connection import *def client_proc(conn):conn.send([123, "xyz"]) def conn_client():address = ('localhost', 6000)with Client(address, authkey=b'secret password') as conn:print(conn.recv())client_proc(conn)def service_proc(conn):rcv = conn.recv()print("client info: {0}".format(rcv))def conn_server():address = ('localhost', 6000) # family is deduced to be 'AF_INET'print("address: {0}".format(address))with Listener(address, authkey=b'secret password') as listener:with listener.accept() as conn:print('connection accepted from', listener.last_accepted)conn.send("Welcome !!!") service_proc(conn)if __name__ == '__main__':ctx = get_context('spawn')p1 = ctx.Process(target=conn_server)p2 = ctx.Process(target=conn_client)p1.start()p2.start()p1.join()p2.join()
显示器输出
address: ('localhost', 6000)
connection accepted from ('127.0.0.1', 53069)
Welcome !!!
client info: [123, 'xyz']