当前位置: 首页 > news >正文

python多进程程序设计 之四

python多进程程序设计 之四

  • 进程间通信
    • Pipes
      • 产生管道
      • 实列代码
    • Queues
      • 构造器
      • put
      • get
      • 实列代码
    • Connection
      • Client
      • Listener类
        • 构造器
        • accept
        • close
      • 实列代码
        • 代码

进程间通信

当使用多个进程时,通常使用消息传递来进行进程之间的通信,并避免使用任何同步原语。

为了传递消息,可以使用 Pipe() ,仅仅用于两个进程之间的连接;或队列,(允许多个生产者和消费者)。

Queue、SimpleQueue 和 JoinableQueue 类型是多生产者、多消费者 FIFO 队列,以标准库中的queue.Queue 类为模型。

如果您使用 JoinableQueue,则必须为从队列中删除的每个任务调用 JoinableQueue.task_done() ,否则用于计算未完成任务数量的信号量最终可能会溢出,引发异常。

与其他 Python 队列实现的一个区别是,multiprocessing队列使用pickle,序列化放入其中的所有对象。 get方法返回的对象是重新创建的对象,不与原始对象共享内存。

Pipes

产生管道

词法:multiprocessing.Pipe([duplex])

Pipe返回连接对象的 (conn1, conn2),它是管道的端点。

如果参数duplex为True(默认值),则管道是双向的。如果 duplex 为 False,则管道是单向的。

  • conn1 只能用于接收消息
  • conn2 只能用于发送消息

send() 方法使用 pickle,序列化对象,而 receive() 重新创建该对象。

实列代码

下面代码中,Consumer和Producer进程使用Pipe进行数据传送

import time, random
from multiprocessing import *def Consumer(r):for i in range(10):msg = r.recv()print("Process: {1}  msg: {0}".format(msg, current_process().name))r.close()def Producer(w):for i in range(10):w.send((i, current_process().name))time.sleep(10)w.close()if __name__ == '__main__':r, w = Pipe(duplex=False)p1 = Process(target=Producer, args=(w,))p2 = Process(target=Consumer, args=(r,))p1.start()p2.start()p1.join()p2.join()

显示器输出

Process: Process-2  msg: (0, 'Process-1')
Process: Process-2  msg: (1, 'Process-1')
Process: Process-2  msg: (2, 'Process-1')
Process: Process-2  msg: (3, 'Process-1')
Process: Process-2  msg: (4, 'Process-1')
Process: Process-2  msg: (5, 'Process-1')
Process: Process-2  msg: (6, 'Process-1')
Process: Process-2  msg: (7, 'Process-1')
Process: Process-2  msg: (8, 'Process-1')
Process: Process-2  msg: (9, 'Process-1')

Queues

构造器

词法:multiprocessing.Queue([maxsize])

multiprocessing模块中的Queue返回一个进程共享队列,这个队列使用管道和一些锁/信号灯实现的。当进程第一次将一个数据放入队列时,将启动供给线程,这个供给线程将对象从缓冲区传输到管道中。

multiprocessing模块中的Queue 实现了标准库中queue.Queue的所有方法,但task_done() 和join()除外。

put

词法:put(obj[, block[, timeout]])

将 obj 放入队列中。依据可选参数block,put有下列不同的行为:

  • block为True(默认值),且timeout为None(默认值),那么,如果没有空闲槽可用,则调用进程被阻塞,直到有空闲槽可用。如果timeout 是正数,则调用进程最多会阻塞timeout秒,如果在该时间内没有可用的空闲插槽,则产生queue.Full 异常。
  • block为False,如果空闲槽立即可用,则将一个项目放入队列中,否则引发queue.Full 异常。

get

词法:get([block[, timeout]])

将一个项目从队列中删除,并返回这个项目。根据可选参数block的值,get有下列不同的行为

  • block为True(默认值)且timeout为 None(默认值),那么,如果队列是空,则调用进程被阻塞,直到有项目可用为止。如果timeout是正数,则它最多会阻塞timeout秒,如果在该时间内没有可用的项目,则会产生queue.Empty异常。
  • block为False,如果队列有一项可用,则返回该项,否则产生queue.Empty 异常。

实列代码

import time
import randomfrom multiprocessing import *def worker(inputQ, outputQ):for func, args in iter(inputQ.get, 'STOP'):result = calculate(func, args)outputQ.put(result)def calculate(func, args):result = func(*args)return '%s says that %s%s = %s' % \(current_process().name, func.__name__, args, result)def mul(a, b):time.sleep(0.5*random.random())return a * bdef test():NUMBER_OF_PROCESSES = 3TASKS1 = [(mul, (i, 7)) for i in range(5)]# Create queuestask_queue = Queue()done_queue = Queue()# Submit tasksfor task in TASKS1:task_queue.put(task)# Start worker processesfor i in range(NUMBER_OF_PROCESSES):p = Process(target=worker, args=(task_queue, done_queue))p.start()# Get and print resultsprint('Unordered results:')for i in range(len(TASKS1)):print('\t', done_queue.get())# Tell child processes to stopfor i in range(NUMBER_OF_PROCESSES):task_queue.put('STOP')if __name__ == '__main__':freeze_support()test()
Unordered results:Process-1 says that mul(0, 7) = 0Process-3 says that mul(2, 7) = 14Process-1 says that mul(3, 7) = 21Process-2 says that mul(1, 7) = 7Process-3 says that mul(4, 7) = 28

Connection

连接对象允许发送和接收可序列化的对象或字符串。它们可以被认为是面向消息的连接套接字。

连接对象通常使用 Pipe 创建 - 另请参见侦听器和客户端。

Client

词法:multiprocessing.connection.Client(address[, family[, authkey]])

尝试与使用address地址的listener建立连接,返回一个连接。

连接的类型由参数family确定,但这通常可以省略,因为它通常可以从地址的格式推断出来。

如果authkey不是 None,则它应该是一个字节串,并将用作基于HMAC的身份验证质询的密钥。如果authkey为None,则不进行身份验证。如果身份验证失败,则会引发 AuthenticationError。

Listener类

构造器

词法:multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

绑定套接字或 Windows 命名管道的包装器,用于“监听”连接。

  • address,是listener对象的绑定套接字,或命名管道要使用的地址。
  • family,是要使用的套接字,或命名管道的类型。这可以是字符串,“AF_INET”,用于 TCP 套接字;“AF_UNIX”,用于 Unix 域套接字;“AF_PIPE”,用于 Windows 命名管道。其中只有第一个保证可用。如果 family 为 None,则从地址格式推断出 family。如果地址也是None,则选择默认值。此默认值是假定最快可用的系列。请参阅地址格式。如果 family 为“AF_UNIX”,且地址为 None,则将在使用 tempfile.mkstemp() 创建的私有临时目录中创建套接字。
  • backlog,如果listener对象使用套接字,则在绑定后,backlog(默认为 1)将传递给套接字的 Listen() 方法。
  • authkey,如果给出了authkey,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证质询的密钥。如果 authkey为None,则不进行身份验证。如果身份验证失败,则会引发 AuthenticationError。
accept

词法:accept()

接受listener对象的绑定套接字,或命名管道上的连接,并返回Connection对象。如果尝试进行身份验证,但失败,则会产生AuthenticationError。

close

词法:close()
关闭listener对象的绑定套接字,命名管道。当listener被垃圾收集时,会自动调用此函数。
建议明确地调用它。

实列代码

该实列启动两个进程,

  • conn_server,产生一个Listener对象,然后,使用accept,等待来自其他进程的连接请求。连接成功后,发送数据,接收数据
  • conn_client,使用Client函数,请求一个连接。连接成功后,接收数据,发送数据。
代码
from multiprocessing import *
from multiprocessing.connection import *def client_proc(conn):conn.send([123, "xyz"])    def conn_client():address = ('localhost', 6000)with Client(address, authkey=b'secret password') as conn:print(conn.recv())client_proc(conn)def service_proc(conn):rcv = conn.recv()print("client info: {0}".format(rcv))def conn_server():address = ('localhost', 6000)     # family is deduced to be 'AF_INET'print("address: {0}".format(address))with Listener(address, authkey=b'secret password') as listener:with listener.accept() as conn:print('connection accepted from', listener.last_accepted)conn.send("Welcome !!!")               service_proc(conn)if __name__ == '__main__':ctx = get_context('spawn')p1 = ctx.Process(target=conn_server)p2 = ctx.Process(target=conn_client)p1.start()p2.start()p1.join()p2.join()

显示器输出

address: ('localhost', 6000)
connection accepted from ('127.0.0.1', 53069)
Welcome !!!
client info: [123, 'xyz']

http://www.mrgr.cn/news/28487.html

相关文章:

  • sql中in()方法查询参数过多处理小记
  • webpack案例----pdd(anti-content)
  • Spring框架之责任链模式 (Chain of Responsibility Pattern)
  • 如何快速定位并解决 Linux 系统性能瓶颈:终极全攻略
  • 软件测试必学的16个高频数据库操作及命令
  • ⚡️如何在 React 和 Next.js 项目里优雅的使用 Zustand
  • protobuf.js:Message类功能详解与实战应用
  • n位格雷码
  • C语言 | Leetcode C语言题解之第412题Fizz Buzz
  • ls -l是什么命令全称?
  • 高德地图2.0 绘制、编辑多边形覆盖物(电子围栏)
  • 408算法题leetcode--第六天
  • NISP 一级 | 5.5 账户口令安全
  • 刷题日记【160. 相交链表】
  • 2022高教社杯全国大学生数学建模竞赛C题 问题一(2) Python代码演示
  • cp 命令是用来复制文件或目录的
  • 基于springboot+vue+uniapp的驾校报名小程序
  • 代码随想录冲冲冲 Day47 单调栈Part1
  • Navicat使用 笔记04
  • R语言统计分析——散点图1(常规图)
  • 使用Qt 搭建简单雷达
  • SpringBoot 消息队列RabbitMQ使用延迟消息插件 接收延迟消息
  • Django学习实战篇五(适合略有基础的新手小白学习)(从0开发项目)
  • 基于Python的自然语言处理系列(10):使用双向LSTM进行文本分类
  • WebGL入门(048):OES_draw_buffers_indexed 简介、使用方法、示例代码
  • 制造、调试OOPS