当前位置: 首页 > news >正文

python多进程程序设计 之三

python多进程程序设计 之三

  • 进程间同步
    • Condition
      • 构造器
      • acquire
      • release
      • wait/wait_for
      • notify/notify_all
      • 实列代码
    • Event
      • 构造器
      • set
      • clear
      • wait
      • 应用实例
    • Lock
      • 构造器
      • acquire
      • release
      • 实列代码

进程间同步

Condition

条件变量总是与某种类型的锁相关联;可以通过构造器传输传递一个锁,也可以默认创建一个。当多个条件变量必须共享同一锁时,构造器传输传递一个锁很有用。

条件变量遵循上下文管理协议:使用 with 语句在封闭块的持续时间内获取关联的锁。 acquire() 和release() 方法也会调用关联锁的相应方法。

必须在持有关联锁的情况下,调用其他方法。

  • wait()方法释放锁,然后阻塞,直到另一个线程通过调用notify()或notify_all()唤醒它。一旦被唤醒,wait()重新获取锁并返回。这个函数可以指定超时。
  • notify() 方法会唤醒等待条件变量的线程之一(如果有)。 notify_all() 方法唤醒所有等待条件变量的线程。

notify()和notify_all()方法不会释放锁;这意味着被唤醒的一个或多个线程,不会立即从其 wait() 调用中返回,而是仅在调用 notify() 或 notify_all() 的线程最终放弃锁的所有权时,返回。

构造器

multiprocessing.Condition(lock=None)
Condition类实现条件变量对象。条件变量允许一个或多个线程等待,直到收到另一线程的通知。

如果给出了锁参数,而不是 None,则它必须是 Lock 或 RLock 对象,并且它被用作底层锁。否则,将创建一个新的 RLock 对象,并将其用作基础锁。

acquire

词法:acquire(*args)
该函数获取Condition锁。该函数调用Condition锁的对应的函数;返回值是,Condition锁返回的任何值。

release

词法:release()
释放Condition锁。该函数调用Condition锁对应的函数;没有返回值。

wait/wait_for

词法:
wait(timeout=None)
wait_for(predicate, timeout=None)

函数wait引起调用进程等待,直至收到通知,或发生超时。如果调用此函数时,调用线程尚未获取锁,则会产生RuntimeError异常。

函数wait释放Condition锁,然后阻塞,直到

  1. 由另一个进程,对同一条件变量,调用notify() 或notify_all() ,将其唤醒,
  2. 直到可选超时timeout发生。

一旦被唤醒,或发生超时,函数wait会重新获取锁,并返回。

当超时参数timeout 存在,且非None时,它​​应该是一个浮点数,timeout的单位是秒。

若超时,函数wait返回值为True;否则,返回值为 False。

wait_for引起调用进程等待,直到可调用函数predicate计算为真。predicate的计算结果将被解释为布尔值。可以提供超时,给出最长的等待时间。

函数wait_for支持参数timeout。

  • 如果predicate计算结果是True,则wait_for返回True;
  • 如果调用函数被阻塞时间超过timeout,则wait_for返回False。

notify/notify_all

词法:
notify(n=1)
notify_all()

notify在默认情况下,唤醒一个等待该条件的进程。如果调用此函数时,调用进程尚未获取锁,则会产生RuntimeError。

该方法最多唤醒n个等待条件变量的进程;如果没有进程在等待,则是空操作。

notify_all唤醒所有等待此条件变量的进程。如果调用此函数时,调用进程未获取锁,则会产生RuntimeError异常。

实列代码

from multiprocessing import *
import numpy as np
from multiprocessing import shared_memory
import randomdim = 50def an_item_is_available(np_array):for i in range (5):if np_array[i] == 0:return Falsereturn True;def make_an_item_available(num, np_array):np_array[num] = random.randrange(50, 200)def proc_0(cv, shm):print("Process: {0}".format(current_process().name))shm = shared_memory.SharedMemory(name=shm);np_array = np.ndarray((dim, ), dtype=np.int32, buffer=shm.buf)with cv:while not an_item_is_available(np_array):cv.wait()for i in range(5):print("{0} ".format(np_array[i]), end="")def proc_1(num, cv, shm):print("Process: {1} num: {0}".format(num, current_process().name))shm = shared_memory.SharedMemory(name=shm);np_array = np.ndarray((dim, ), dtype=np.int32, buffer=shm.buf)with cv:make_an_item_available(num, np_array)cv.notify()def main():global dimlock = Lock()cv = Condition(lock)a1 = np.ones(shape=(dim, ), dtype=np.int32)shm = shared_memory.SharedMemory(create=True, size=a1.nbytes);kwkeys = {"cv":cv, "shm":shm.name}p1 = Process(target=proc_0, kwargs=kwkeys)p_list = []for i in range(5):p2 = Process(target=proc_1, args=(i,), kwargs=kwkeys)p_list.append(p2)p1.start();for p in p_list:p.start()p1.join()for p in p_list:p.join()if __name__=="__main__":freeze_support()main();

显示器输出

Process: Process-1
Process: Process-3 num: 1
Process: Process-2 num: 0
Process: Process-5 num: 3
Process: Process-6 num: 4
Process: Process-4 num: 2
124 143 138 141 159

Event

这是进程间通信最简单的机制之一:一个进程发出事件信号,其他进程等待该事件。

事件对象管理一个内部标志,可以使用 set() 方法将其设置为 true,并使用clear() 方法将其重置为 false。 wait() 方法会阻塞,直到标志为 true。

构造器

词法:threading.Event()
实现事件对象的类。事件管理一个标志,可以使用 set() 方法将其设置为 true,并使用clear() 方法,将其重置为 false。 wait() 方法会阻塞,直到标志为 true。该标志最初是false。

set

词法:set()
将内部标志设置为 true。所有等待它变为 true 的线程都会被唤醒。一旦标志为 true,调用 wait() 的线程将根本不会阻塞。

clear

词法:clear()

将内部标志重置为 false。随后,调用 wait() 的线程将阻塞,直到调用 set() 再次将内部标志设置为 true。

wait

词法:wait(timeout=None)

只要内部标志为假

  • timeout是None,或者未指定,调用进程被阻塞。
  • 并且timeout正值,但阻塞时间尚未达到timeout,调用进程被阻塞;但当阻塞时间达到timeout,超时发生,则该函数返回,结束阻塞。

返回值代表这个阻塞函数返回的原因;

  • 如果由于内部标志设置为 true,而返回,则返回 True。
  • 如果该有timeout参数,且非None,内部标志在给定等待时间内,未变为 true,则返回 False。

当timeout参数存在,且非 None 时,它​​应该是一个浮点数,指定操作的超时时间,时间单位是秒。

应用实例

这个例子模拟两个初始化进程,但进程init_1必须等待init_0完成一些初始化工作之后,才能开始。

from time import sleep
from random import randomfrom multiprocessing import *def init_0(event):print('Process: {0}(init_0) start...'.format(current_process().name))value = random()sleep(value)print('Process: {0} got {1}'.format(current_process().name, value))event.set()print('Process: {0} init_1 can start ...'.format(current_process().name))    value1 = random()sleep(value1)    def init_1(event):print('Process: {0}(init_1) start...'.format(current_process().name))event.wait()print('Process: {0} init_0 done'.format(current_process().name))value = random()sleep(value)def main():event = Event()ctx = get_context('spawn')kwargs = {"event" : event}p1 = ctx.Process(target=init_0, kwargs=kwargs)p2 = ctx.Process(target=init_1, kwargs=kwargs)p1.start();p2.start();p1.join();p2.join();if __name__=="__main__":freeze_support()main();

显示器输出

Process: SpawnProcess-1(init_0) start...
Process: SpawnProcess-2(init_1) start...
Process: SpawnProcess-1 got 0.903477834169278
Process: SpawnProcess-1 init_1 can start ...
Process: SpawnProcess-2 init_0 done

Lock

这个类实现原语Lock对象。

Lock处于两种状态之一:“锁定”或“解锁”。它有两个基本方法:acquire() 和release()。

  • 当解锁状态时,acquire()将状态更改为锁定,并立即返回。
  • 当锁定状态时,acquire()阻塞调用进程,直到另一个进程调用release(),将其更改为解锁状态,然后,acquire()调用将其重置为锁定状态,并返回。
  • release()方法只能在锁定状态下调用;它将状态更改为解锁,并立即返回。如果尝试释放未锁定的锁,则会产生运行时错误。

当多个线程阻塞在 acquire() 中等待状态转为解锁状态时,当调用 release() 将状态重置为解锁时,只有一个线程继续执行;哪一个等待线程继续进行是未定义的,并且可能因实现而异。

构造器

词法:multiprocessing.Lock()

实现原始锁对象的类。一旦线程获取了锁,后续获取它的尝试就会被阻塞,直到它被释放;任何线程都可以释放它。

请注意,Lock 实际上是一个工厂函数,它返回平台支持的具体 Lock 类的最有效版本的实例。

acquire

词法:acquire(blocking=True, timeout=-1)
获取锁,阻塞或非阻塞。

当将阻塞参数设置为 True(默认值)进行调用时,将阻塞直到锁解锁,然后将其设置为锁定并返回 True。

当在阻塞参数设置为 False 的情况下调用时,不阻塞。如果将阻塞设置为 True 的调用会阻塞,则立即返回 False;否则,将锁设置为锁定并返回 True。

当在浮点超时参数设置为正值的情况下调用时,只要无法获取锁,就会阻塞最多超时指定的秒数。超时参数 -1 指定无限等待。当blocking为False时,禁止指定超时时间。

如果成功获取锁,则返回值为 True,否则返回值为 False(例如,如果超时已到)。

release

词法:release()
释放锁。这可以从任何线程调用,而不仅仅是已获取锁的线程。

当锁被锁定时,将其重置为解锁,然后返回。如果任何其他线程在等待锁解锁时被阻塞,则只允许其中一个线程继续进行。

当在未锁定的锁上调用时,会引发 RuntimeError。

没有返回值。

实列代码

下列实列代码在两个进程之间共享存储器,使用Lock,实现存储器共享

from multiprocessing import *
from multiprocessing import shared_memorydim = 5def proc_0(lock, shmem_name, val):global dimprint('Process: {0}(init_0) start...'.format(current_process().name))existing_shm = shared_memory.SharedMemory(name=shmem_name)np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=existing_shm.buf)lock.acquire()np_array[:] = np_array[0] + val   for x in np_array:print(x)lock.release()def proc_1(lock, shmem_name, val):global dimprint('Process: {0}(init_1) start...'.format(current_process().name))existing_shm = shared_memory.SharedMemory(name=shmem_name)np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=existing_shm.buf)lock.acquire()np_array[:] = np_array[0] + valfor x in np_array:print(x)lock.release()def main():global dimlock = Lock()a = np.ones(shape=(dim, dim,), dtype=np.int32)shm_a = shared_memory.SharedMemory(create=True, size=a.nbytes)np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=shm_a.buf)for x in np_array:print(x)ctx = get_context('spawn')p1 = ctx.Process(target=proc_0, args=(lock, shm_a.name, 10))   p2 = ctx.Process(target=proc_1, args=(lock, shm_a.name, 5))p1.start();p2.start();p1.join();p2.join();if __name__=="__main__":freeze_support()main();

显示屏输出

[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
Process: SpawnProcess-1(init_0) start...
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
Process: SpawnProcess-2(init_1) start...
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]

http://www.mrgr.cn/news/30095.html

相关文章:

  • 最优化理论与自动驾驶(十一):基于iLQR的自动驾驶轨迹跟踪算法(c++和python版本)
  • 如何将MySQL卸载干净(win11)
  • C++学习, 异常处理
  • Docker常用命令总结
  • Java线程池运行流程、配置参数及线程池类型
  • MATLAB系列08:输入/输入函数
  • 江协科技STM32学习- P13 TIM定时器中断
  • 探秘Python中的链表:从零开始的奇妙之旅
  • JAVA惊喜连连无限可能沉浸式盲盒商城系统小程序源码
  • 代码随想录 -- 二叉树 -- 删除二叉搜索树中的节点
  • 工单管理软件的优势有哪些?企业如何选择?
  • 【每日一诗】【诗词创作】【诗】《雨前秋夜》
  • 大模型学习起步的经验分享
  • Agile Modbus STM32裸机移植 从机使用
  • 图解Transformer工作原理(非常详细)零基础入门到精通,收藏这一篇就够了
  • C++ 面试模拟02
  • Mac 上哪个剪切板增强工具比较好用? 好用剪切板工具推荐
  • 二叉树的遍历【C++】
  • 土壤墒情测定仪的工作原理
  • layui table中的checkbox禁用问题