当前位置: 首页 > news >正文

Python中multiprocessing的使用详解

1.实现多进程

代码实现

from multiprocessing import Process
import datetime
import timedef task01(name):current_time=datetime.datetime.now()start_time=current_time.strftime('%Y-%m-%d %H:%M:%S')+'.'+ "{:03d}".format(current_time.microsecond // 1000)print("{}开始执行时间: {}".format(name,start_time))time.sleep(5)current_time = datetime.datetime.now()end_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)print("{}执行结束时间: {}".format(name,end_time))def task02(name):current_time=datetime.datetime.now()start_time = current_time.strftime('%Y-%m-%d %H:%M:%S')+'.'+ "{:03d}".format(current_time.microsecond // 1000)print("{}开始执行时间: {}".format(name, start_time))time.sleep(5)current_time = datetime.datetime.now()end_time = current_time.strftime('%Y-%m-%d %H:%M:%S')+'.'+ "{:03d}".format(current_time.microsecond // 1000)print("{}执行结束时间: {}".format(name, end_time))if __name__ == '__main__':##创建进程:target是进程要执行的方法, args是进程里要执行的方法的参数proc1 = Process(target=task01, args=('进程01',))  # 创建进程proc1proc2 = Process(target=task02, args=('进程02',))  # 创建进程proc2##启动进程proc1.start()  # 启动进程proc1proc2.start()  # 启动进程proc2print("进程内的方法未执行完,并行执行后面的代码")

执行结果

如果有些操作需要等进程内的方法执行完才能执行:使用join()

2.多进程池

上面我们是一个一个手动创建进程,我们还可以用Pool模块来替我们创建

Pool的常见方法

apply():同步执行

apply_async():异步执行

close():等待所有进程结束后,才关闭进程池

terminate(): 立刻关闭进程池

join():主进程等待子进程执行完毕,必须在close()或 terminate()后调用

代码实现

from multiprocessing import Pool
import datetime
import timedef task01(name):current_time=datetime.datetime.now()start_time=current_time.strftime('%Y-%m-%d %H:%M:%S')+'.'+ "{:03d}".format(current_time.microsecond // 1000)print("{}开始执行时间: {}".format(name,start_time))time.sleep(5)current_time = datetime.datetime.now()end_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)print("{}执行结束时间: {}".format(name,end_time))def task02(name):current_time=datetime.datetime.now()start_time = current_time.strftime('%Y-%m-%d %H:%M:%S')+'.'+ "{:03d}".format(current_time.microsecond // 1000)print("{}开始执行时间: {}".format(name, start_time))time.sleep(5)current_time = datetime.datetime.now()end_time = current_time.strftime('%Y-%m-%d %H:%M:%S')+'.'+ "{:03d}".format(current_time.microsecond // 1000)print("{}执行结束时间: {}".format(name, end_time))if __name__ == '__main__':pool = Pool(processes=3)  # 创建进程池,最大进程数为3func_dict = {'task01': '进程01', 'task02': '进程02'}for func_name, func_args in func_dict.items():pool.apply_async(eval(func_name), args=(func_args,))  # 异步执行进程print("进程内的方法未执行完,并行执行后面的代码")pool.close()  # 关闭进程池pool.join()  # 等待进程池中的所有进程执行完毕,必须在close()之后调用,必须调用

执行结果

3.多进程之间的通信

多进程之间通过安全队列管道进行数据传递

Queue

Queu自身提供了函数put()和get()来完成数据的传递,put()往队列末尾添加数据,get()从队列的头部取出并删除一个数据。

代码实现

from multiprocessing import Process,Queue'''定义函数01向队列里添加数据'''
def queue_add_data01(queue):for i in range(3):queue.put(i)print("向队列中添加数据:{}".format(i))'''定义函数02向队列里添加数据'''
def queue_add_data02(queue):for i in range(10,13):queue.put(i)print("向队列中添加数据:{}".format(i))print("========================")'''定义函数从队列里获取数据'''
def queue_get_data(queue):while not queue.empty():data=queue.get()print("从队列中获取数据:{}".format(data))if __name__ == '__main__':#创建队列queue = Queue()#创建进程producer_process01 = Process(target=queue_add_data01, args=(queue,))producer_process02 = Process(target=queue_add_data02, args=(queue,))consumer_process = Process(target=queue_get_data, args=(queue,))# 启动进程producer_process01.start()producer_process02.start()consumer_process.start()# 等待两个进程执行结束producer_process01.join()producer_process02.join()consumer_process.join()

执行结果

Pipe

Pipe实列化后返回两个对象,分别表示管道的发生端和接收端

代码实现

from multiprocessing import Process,Pipe'''定义函数向管道中发送数据'''
def pipi_send_data(conn):for i in range(5):conn.send(i)print("向管道中发送数据:{}".format(i))print("===============================")'''定义函数从管道中接收数据'''
def pipi_receive_data(conn):while conn.poll():data=conn.recv()print("从管道中接收数据:{}".format(data))if __name__ == '__main__':# 创建管道send_conn, receive_conn = Pipe()# 创建两个进程,一个用于发送数据,一个用于接收数据sender_process = Process(target=pipi_send_data, args=(send_conn,))receiver_process = Process(target=pipi_receive_data, args=(receive_conn,))# 启动进程sender_process.start()receiver_process.start()# 等待两个进程执行结束sender_process.join()receiver_process.join()

执行结果

4.多进程数据共享

4.1 内存对象共享

进程间共享内存是一种高效的通信方式,它允许多个进程共享同一块内存区域。multiprocessing模块中提供了Value和Array类用来创建共享内存

Value

作用:创建一个可以在多个进程之间共享并可以进行修改的变量,这个共享的变量可以是整数、浮点数等基本数据类型 
创建共享变量时有两个参数:数据类型、数据初始值,其中数据类型有
i:整数

d:双精度浮点数

f:单精度浮点数

代码实现

from multiprocessing import Process,Value##定义操作共享变量的方法
def shared_value(shared_val,name):print("初始值是:{}".format(shared_val.value))shared_val.value += 1print("{}操作后的值是:{}".format(name,shared_val.value))if __name__ == '__main__':##创建共享变量shared_val = Value('i', 0)##新建两个进程对共享变量进行操作process1 = Process(target=shared_value, args=(shared_val,"进程1"))process2 = Process(target=shared_value, args=(shared_val,"进程2"))##启动进程process1.start()process2.start()##等待进程执行结束process1.join()process2.join()

执行结果

Array

作用:用于创建允许多个进程共享和访问的共享数组
创建共享数组时有两个参数:数组元素类型、数组长度

代码实现:

from multiprocessing import Process,Array
import timedef shared_array_produce(shared_arr):for i in range(5):shared_arr[i] = i*2print("shared_array_produce操作后的数组是:{}".format(shared_arr[:]))def shared_array_consum(shared_arr):time.sleep(3)  # 等待shared_array_produce操作print("shared_array_consum操作前数组的值是:{}".format(shared_arr[:]))for i in range(5):shared_arr[i] = shared_arr[i]*2print("shared_array_consum操作后的数组是:{}".format(shared_arr[:]))if __name__ == '__main__':##创建共享数组shared_arr = Array('i', 5)  # i表示数组元素类型为整形,5表示数组长度##新建两个进程对共享变量进行操作process1 = Process(target=shared_array_produce, args=(shared_arr,))process2 = Process(target=shared_array_consum, args=(shared_arr,))##启动进程process1.start()process2.start()##等待进程执行结束process1.join()process2.join()

执行结果

4.2 全局变量共享

在 Python 的多进程环境中,全局变量在不同进程中并不会共享。为了在多进程中共享数据,需要使用 multiprocessing.Manager 中的共享数据结构,如 list、dict 等

没有使用Manage时

import time
from multiprocessing import Processdef add_data(shared_list):for i in range(5):shared_list.append(i)print("add_data执行完后shared_list的值是:{}".format(shared_list))def use_data(shared_list):time.sleep(5)print("use_data使用前shared_list的值是:{}".format(shared_list))if __name__ == '__main__':# 定义一个全局变量shared_list =[]add_data_process = Process(target=add_data, args=(shared_list,))use_data_process = Process(target=use_data, args=(shared_list,))# 启动进程add_data_process.start()use_data_process.start()# 等待进程执行完毕add_data_process.join()use_data_process.join()

执行结果

可以看到进程add_data_process执行完成后输出shared_list里面是有值的,use_data_process进程去使用shared_list时且是空的,原因:在 Python 的多进程环境中,全局变量在不同进程中并不会共享。解决办法:使用 multiprocessing.Manager 中的共享数据结构,如 list、dict 等

5.Python多进程信息同步

5.1 方式一:锁 Lock

作用:在多进程环境中创建一个锁对象,用于控制多个进程对共享资源的访问。当一个进程获得了锁,其他进程就无法获得该锁,直到锁被释放。
通过acquire()方法可以获取锁,通过release()方法可以释放锁。这样可以确保在同一时刻只有一个进程能够访问受锁保护的共享资源

代码实现

import datetime
from multiprocessing import Process,Lockdef lock_func_file(lock,name):current_time = datetime.datetime.now()formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)#获取锁lock.acquire()print("{}开始执行时间: {}".format(name,formatted_time))text = formatted_time + " " + "测试内容\n"with open('test_lock.txt','a') as f:f.write(text)#释放锁lock.release()if __name__ == '__main__':# 创建锁对象lock = Lock()proc1 = Process(target=lock_func_file, args=(lock,"进程1"))  # 创建进程proc1proc2 = Process(target=lock_func_file, args=(lock,"进程2"))  # 创建进程proc2proc1.start()  #启动进程proc1proc2.start()  # 启动进程proc2proc1.join()  #等待进程proc1执行结束在执行下面的代码proc2.join()  #等待进程proc2执行结束执行下面的代码

执行结果

用进程池的方式调用

用进程池调用的方式,如果还是直接定义锁对象lock = Lock(),会出现当执行
pool.apply_async(lock_func_file,args=(lock,))时,不会执行方法lock_func_file,原因是:在使用apply_async时,它使用pickle模块将任务发送给子进程,所以需要确保被传递的参数是可以被pickle序列化的。multiprocessing.Lock 对象默认是不可序列化的,因此可能会导致该问题 ,解决方法:使用Manager().Lock()来创建一个可序列化的锁对象:lock = Manager().Lock()

5.2 方式二:信号量 Semaphore

作用:控制对共享资源的访问 ,它有一个内部计数器,该计数器限制了可以同时访问共享资源的进程数量。当进程要访问共享资源时,它必须先获取信号量。如果信号量的计数器大于零,进程将减少计数器并继续执行。如果计数器为零,进程将被阻塞直到有其他进程释放了信号量。
有两个重要方法
acquire([blocking]): 获取信号量,如果计数器的值大于0,则将其减1并立即返回
release(): 释放信号量。将计数器的值加1

代码实现

import datetime
from multiprocessing import Process, Semaphoredef  semaphore_func_file(semaphore,name):current_time = datetime.datetime.now()formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)#获取信号量semaphore.acquire()print("semaphore:{}开始执行时间: {}".format(name,formatted_time))text = formatted_time + " " + "semaphore测试内容\n"with open('test_semaphore.txt','a') as f:f.write(text)#释放信号量semaphore.release()if __name__ == '__main__':#创建信号量对象,允许一个进程同时访问资源semaphore = Semaphore(1)proc1 = Process(target=semaphore_func_file, args=(semaphore, "进程1"))  # 创建进程proc1proc2 = Process(target=semaphore_func_file, args=(semaphore, "进程2"))  # 创建进程proc2proc1.start()  # 启动进程proc1proc2.start()  # 启动进程proc2proc1.join()  # 等待进程proc1执行结束在执行下面的代码proc2.join()  # 等待进程proc2执行结束执行下面的代码

执行结果

5.3 方式三:条件变量 Condition

作用:它允许多个进程在特定条件下等待或者唤醒其他进程 ,通常需要与Lock()结合使用,以确保多个进程之间对共享资源的安全访问,条件变量的作用包括以下几个方面:

  • 线程间的通信:条件变量允许一个或多个进程等待另一个进程满足特定条件时进行通知。
  • 线程间的协调:条件变量可以协调多个进程,确保它们在同一时间一起执行或者按照特定的顺序执行。
  • 避免忙等待:使用条件变量可以避免进程在等待时持续占用 CPU 资源,因为进程可以在条件不满足时进入休眠状态。

代码实现

import datetime
import time
from multiprocessing import Process,Condition,Managerdef condition_producer(condition,shared_list):current_time = datetime.datetime.now()formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)print("condition_producer执行时间:{}".format(formatted_time))# 获取信号量print("生产数据开始")time.sleep(3) #加一个等待时间,让消费进程等待for i in range(3):with condition:shared_list.append(i)condition.notify_all()  # 通知消费者进程print("生产一次后的数据:{}".format(shared_list))def condition_consumer(condition,shared_list):current_time = datetime.datetime.now()formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)print("condition_consumer执行时间:{}".format(formatted_time))print("消费进程开始前shared_list里的数据:{}".format(shared_list))for i in range(3):with condition:while len(shared_list) == 0:print("consumer waiting.....")condition.wait(timeout=3) # 等待生产者进程通知、唤醒,等待时间3sprint("Consumed:", shared_list.pop(0))if __name__ == '__main__':# 实列化Condition对象condition = Condition()#创建共享数据列表shared_list = Manager().list() # 使用Manager来创建共享的list#创建进程producer_process =Process(target=condition_producer, args=(condition,shared_list))consumer_process = Process(target=condition_consumer, args=(condition,shared_list))producer_process.start()consumer_process.start()producer_process.join()consumer_process.join()

执行结果

with condition 的解释
在Python中,with语句用于创建一个运行时上下文,它在进入和退出该上下文时执行特定的操作。对于Condition对象,with condition的表示在进入和退出该上下文时,将自动获取和释放底层的锁。在示例中,with condition用于获取Condition对象的锁,这意味着当进程进入with块时,它会获得锁,当离开with块时,锁会被释放。这确保了在with块中的临界区代码在同一时间只能被一个进程执行,从而避免了多个进程同时修改共享数据可能引发的问题。

5.4 方式四:Event

作用:可以在进程间传递状态信息,Event有两种状态:已设置和未设置。它的运行机制是:全局定义了一个Flag,默认值为false。 
如果Flag值为 false,当程序执行event.wait()方法时就会阻塞
如果Flag值为true时,程序执行event.wait()方法时不会阻塞继续执行,执行wait()后,Flag值又变为false,直达再次执行set()后,Flag的值才会变为true。

除了wait()和set()外,Event还提供了其他两个方法:
clear()方法:将Flag的值改成False
is_set()方法:判断当前的Flag的值

代码实现:

import datetime
import time
from multiprocessing import Process,Event,Managerdef event_producer(event,shared_list):print("生产者先等待3s")time.sleep(3)current_time = datetime.datetime.now()formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)print("生产者开始执行的时间是:{}".format(formatted_time))for i in range(4):shared_list.append(i)print("数据生产完了.......")print("生产完的数据如下:{}".format(shared_list))print("通过Set()通知消费者")event.set() #Flag为Truedef event_consumer(event,shared_list):current_time = datetime.datetime.now()formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') + '.' + "{:03d}".format(current_time.microsecond // 1000)print("进入消费进程的时间:{}".format(formatted_time))print("消费者需要使用数据shared_list,但生产者还没有生产完,此时Flag状态还是false,通过wait阻塞等待")event.wait()print("Flag状态变为True")print("消费者使用数据前的数据为:{}".format(shared_list))for i in range(4):print("Consumed:", shared_list.pop(0))if __name__ == '__main__':#创建Event对象event=Event()#创建共享数据列表shared_list = Manager().list() # 使用Manager来创建共享的list#创建进程producer_process =Process(target=event_producer, args=(event,shared_list))consumer_process = Process(target=event_consumer, args=(event,shared_list))#启动进程producer_process.start()consumer_process.start()#等待进程执行完毕producer_process.join()consumer_process.join()

执行结果


http://www.mrgr.cn/news/96343.html

相关文章:

  • (一)初始化窗口
  • [AI绘图] ComfyUI 中自定义节点插件安装方法
  • leetcode102 二叉树的层次遍历 递归
  • Android设计模式之单例模式
  • 【学Rust写CAD】16 0、1、-1代数单位元(algebraic_units.rs)
  • 基于Spring Boot + Vue的银行管理系统设计与实现
  • Android设计模式之工厂方法模式
  • Chrome 开发环境快速屏蔽 CORS 跨域限制!
  • Elasticsearch 搜索高级
  • 【qt】文件类(QFile)
  • 【AI插件开发】Notepad++插件开发实践:从基础交互到ScintillaCall集成
  • 【数据结构】栈 与【LeetCode】20.有效的括号详解
  • Linux修改默认shell为zsh
  • Android 设备实现 adb connect 连接的步骤
  • Pycharm(七):几个简单案例
  • udp通信(一)
  • Oracle 23ai Vector Search 系列之2 ONNX(Open Neural Network Exchange)
  • 项目-苍穹外卖(十六) Apache ECharts+数据统计
  • 在 React 中,组件之间传递变量的常见方法
  • apache连接池机制讨论