python多进程程序设计 之三
python多进程程序设计 之三
- 进程间同步
- Condition
- 构造器
- acquire
- release
- wait/wait_for
- notify/notify_all
- 实列代码
- Event
- 构造器
- set
- clear
- wait
- 应用实例
- Lock
- 构造器
- acquire
- release
- 实列代码
进程间同步
Condition
条件变量总是与某种类型的锁相关联;可以通过构造器传输传递一个锁,也可以默认创建一个。当多个条件变量必须共享同一锁时,构造器传输传递一个锁很有用。
条件变量遵循上下文管理协议:使用 with 语句在封闭块的持续时间内获取关联的锁。 acquire() 和release() 方法也会调用关联锁的相应方法。
必须在持有关联锁的情况下,调用其他方法。
- wait()方法释放锁,然后阻塞,直到另一个线程通过调用notify()或notify_all()唤醒它。一旦被唤醒,wait()重新获取锁并返回。这个函数可以指定超时。
- notify() 方法会唤醒等待条件变量的线程之一(如果有)。 notify_all() 方法唤醒所有等待条件变量的线程。
notify()和notify_all()方法不会释放锁;这意味着被唤醒的一个或多个线程,不会立即从其 wait() 调用中返回,而是仅在调用 notify() 或 notify_all() 的线程最终放弃锁的所有权时,返回。
构造器
multiprocessing.Condition(lock=None)
Condition类实现条件变量对象。条件变量允许一个或多个线程等待,直到收到另一线程的通知。
如果给出了锁参数,而不是 None,则它必须是 Lock 或 RLock 对象,并且它被用作底层锁。否则,将创建一个新的 RLock 对象,并将其用作基础锁。
acquire
词法:acquire(*args)
该函数获取Condition锁。该函数调用Condition锁的对应的函数;返回值是,Condition锁返回的任何值。
release
词法:release()
释放Condition锁。该函数调用Condition锁对应的函数;没有返回值。
wait/wait_for
词法:
wait(timeout=None)
wait_for(predicate, timeout=None)
函数wait引起调用进程等待,直至收到通知,或发生超时。如果调用此函数时,调用线程尚未获取锁,则会产生RuntimeError异常。
函数wait释放Condition锁,然后阻塞,直到
- 由另一个进程,对同一条件变量,调用notify() 或notify_all() ,将其唤醒,
- 直到可选超时timeout发生。
一旦被唤醒,或发生超时,函数wait会重新获取锁,并返回。
当超时参数timeout 存在,且非None时,它应该是一个浮点数,timeout的单位是秒。
若超时,函数wait返回值为True;否则,返回值为 False。
wait_for引起调用进程等待,直到可调用函数predicate计算为真。predicate的计算结果将被解释为布尔值。可以提供超时,给出最长的等待时间。
函数wait_for支持参数timeout。
- 如果predicate计算结果是True,则wait_for返回True;
- 如果调用函数被阻塞时间超过timeout,则wait_for返回False。
notify/notify_all
词法:
notify(n=1)
notify_all()
notify在默认情况下,唤醒一个等待该条件的进程。如果调用此函数时,调用进程尚未获取锁,则会产生RuntimeError。
该方法最多唤醒n个等待条件变量的进程;如果没有进程在等待,则是空操作。
notify_all唤醒所有等待此条件变量的进程。如果调用此函数时,调用进程未获取锁,则会产生RuntimeError异常。
实列代码
from multiprocessing import *
import numpy as np
from multiprocessing import shared_memory
import randomdim = 50def an_item_is_available(np_array):for i in range (5):if np_array[i] == 0:return Falsereturn True;def make_an_item_available(num, np_array):np_array[num] = random.randrange(50, 200)def proc_0(cv, shm):print("Process: {0}".format(current_process().name))shm = shared_memory.SharedMemory(name=shm);np_array = np.ndarray((dim, ), dtype=np.int32, buffer=shm.buf)with cv:while not an_item_is_available(np_array):cv.wait()for i in range(5):print("{0} ".format(np_array[i]), end="")def proc_1(num, cv, shm):print("Process: {1} num: {0}".format(num, current_process().name))shm = shared_memory.SharedMemory(name=shm);np_array = np.ndarray((dim, ), dtype=np.int32, buffer=shm.buf)with cv:make_an_item_available(num, np_array)cv.notify()def main():global dimlock = Lock()cv = Condition(lock)a1 = np.ones(shape=(dim, ), dtype=np.int32)shm = shared_memory.SharedMemory(create=True, size=a1.nbytes);kwkeys = {"cv":cv, "shm":shm.name}p1 = Process(target=proc_0, kwargs=kwkeys)p_list = []for i in range(5):p2 = Process(target=proc_1, args=(i,), kwargs=kwkeys)p_list.append(p2)p1.start();for p in p_list:p.start()p1.join()for p in p_list:p.join()if __name__=="__main__":freeze_support()main();
显示器输出
Process: Process-1
Process: Process-3 num: 1
Process: Process-2 num: 0
Process: Process-5 num: 3
Process: Process-6 num: 4
Process: Process-4 num: 2
124 143 138 141 159
Event
这是进程间通信最简单的机制之一:一个进程发出事件信号,其他进程等待该事件。
事件对象管理一个内部标志,可以使用 set() 方法将其设置为 true,并使用clear() 方法将其重置为 false。 wait() 方法会阻塞,直到标志为 true。
构造器
词法:threading.Event()
实现事件对象的类。事件管理一个标志,可以使用 set() 方法将其设置为 true,并使用clear() 方法,将其重置为 false。 wait() 方法会阻塞,直到标志为 true。该标志最初是false。
set
词法:set()
将内部标志设置为 true。所有等待它变为 true 的线程都会被唤醒。一旦标志为 true,调用 wait() 的线程将根本不会阻塞。
clear
词法:clear()
将内部标志重置为 false。随后,调用 wait() 的线程将阻塞,直到调用 set() 再次将内部标志设置为 true。
wait
词法:wait(timeout=None)
只要内部标志为假
- timeout是None,或者未指定,调用进程被阻塞。
- 并且timeout正值,但阻塞时间尚未达到timeout,调用进程被阻塞;但当阻塞时间达到timeout,超时发生,则该函数返回,结束阻塞。
返回值代表这个阻塞函数返回的原因;
- 如果由于内部标志设置为 true,而返回,则返回 True。
- 如果该有timeout参数,且非None,内部标志在给定等待时间内,未变为 true,则返回 False。
当timeout参数存在,且非 None 时,它应该是一个浮点数,指定操作的超时时间,时间单位是秒。
应用实例
这个例子模拟两个初始化进程,但进程init_1必须等待init_0完成一些初始化工作之后,才能开始。
from time import sleep
from random import randomfrom multiprocessing import *def init_0(event):print('Process: {0}(init_0) start...'.format(current_process().name))value = random()sleep(value)print('Process: {0} got {1}'.format(current_process().name, value))event.set()print('Process: {0} init_1 can start ...'.format(current_process().name)) value1 = random()sleep(value1) def init_1(event):print('Process: {0}(init_1) start...'.format(current_process().name))event.wait()print('Process: {0} init_0 done'.format(current_process().name))value = random()sleep(value)def main():event = Event()ctx = get_context('spawn')kwargs = {"event" : event}p1 = ctx.Process(target=init_0, kwargs=kwargs)p2 = ctx.Process(target=init_1, kwargs=kwargs)p1.start();p2.start();p1.join();p2.join();if __name__=="__main__":freeze_support()main();
显示器输出
Process: SpawnProcess-1(init_0) start...
Process: SpawnProcess-2(init_1) start...
Process: SpawnProcess-1 got 0.903477834169278
Process: SpawnProcess-1 init_1 can start ...
Process: SpawnProcess-2 init_0 done
Lock
这个类实现原语Lock对象。
Lock处于两种状态之一:“锁定”或“解锁”。它有两个基本方法:acquire() 和release()。
- 当解锁状态时,acquire()将状态更改为锁定,并立即返回。
- 当锁定状态时,acquire()阻塞调用进程,直到另一个进程调用release(),将其更改为解锁状态,然后,acquire()调用将其重置为锁定状态,并返回。
- release()方法只能在锁定状态下调用;它将状态更改为解锁,并立即返回。如果尝试释放未锁定的锁,则会产生运行时错误。
当多个线程阻塞在 acquire() 中等待状态转为解锁状态时,当调用 release() 将状态重置为解锁时,只有一个线程继续执行;哪一个等待线程继续进行是未定义的,并且可能因实现而异。
构造器
词法:multiprocessing.Lock()
实现原始锁对象的类。一旦线程获取了锁,后续获取它的尝试就会被阻塞,直到它被释放;任何线程都可以释放它。
请注意,Lock 实际上是一个工厂函数,它返回平台支持的具体 Lock 类的最有效版本的实例。
acquire
词法:acquire(blocking=True, timeout=-1)
获取锁,阻塞或非阻塞。
当将阻塞参数设置为 True(默认值)进行调用时,将阻塞直到锁解锁,然后将其设置为锁定并返回 True。
当在阻塞参数设置为 False 的情况下调用时,不阻塞。如果将阻塞设置为 True 的调用会阻塞,则立即返回 False;否则,将锁设置为锁定并返回 True。
当在浮点超时参数设置为正值的情况下调用时,只要无法获取锁,就会阻塞最多超时指定的秒数。超时参数 -1 指定无限等待。当blocking为False时,禁止指定超时时间。
如果成功获取锁,则返回值为 True,否则返回值为 False(例如,如果超时已到)。
release
词法:release()
释放锁。这可以从任何线程调用,而不仅仅是已获取锁的线程。
当锁被锁定时,将其重置为解锁,然后返回。如果任何其他线程在等待锁解锁时被阻塞,则只允许其中一个线程继续进行。
当在未锁定的锁上调用时,会引发 RuntimeError。
没有返回值。
实列代码
下列实列代码在两个进程之间共享存储器,使用Lock,实现存储器共享
from multiprocessing import *
from multiprocessing import shared_memorydim = 5def proc_0(lock, shmem_name, val):global dimprint('Process: {0}(init_0) start...'.format(current_process().name))existing_shm = shared_memory.SharedMemory(name=shmem_name)np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=existing_shm.buf)lock.acquire()np_array[:] = np_array[0] + val for x in np_array:print(x)lock.release()def proc_1(lock, shmem_name, val):global dimprint('Process: {0}(init_1) start...'.format(current_process().name))existing_shm = shared_memory.SharedMemory(name=shmem_name)np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=existing_shm.buf)lock.acquire()np_array[:] = np_array[0] + valfor x in np_array:print(x)lock.release()def main():global dimlock = Lock()a = np.ones(shape=(dim, dim,), dtype=np.int32)shm_a = shared_memory.SharedMemory(create=True, size=a.nbytes)np_array = np.ndarray((dim, dim,), dtype=np.int32, buffer=shm_a.buf)for x in np_array:print(x)ctx = get_context('spawn')p1 = ctx.Process(target=proc_0, args=(lock, shm_a.name, 10)) p2 = ctx.Process(target=proc_1, args=(lock, shm_a.name, 5))p1.start();p2.start();p1.join();p2.join();if __name__=="__main__":freeze_support()main();
显示屏输出
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
Process: SpawnProcess-1(init_0) start...
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
[10 10 10 10 10]
Process: SpawnProcess-2(init_1) start...
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]
[15 15 15 15 15]