当前位置: 首页 > news >正文

Python 中的高并发 I/O

Python 中的高并发 I/O

并发(concurrency)指计算机似乎能在同一时刻做许多件不同的事情。例如,在只配有一个 CPU 核心的计算机上面,操作系统可以迅速切换这个处理器所运行的程序,因此尽管同一时刻最多只有一个程序在运行,但这些程序能够交替地使用这个核心,从而造成一种假象,让人觉得它们好像真的在同时运行。

在同一个程序之中,可以利用并发轻松地解决某些类型的问题。例如,并发可以让程序里面出现多条独立的执行路径,每条路径都可以处理它自己的 I/O 流,这就让人觉得这些 I/O 任务好像真的是在各自的路径里面同时向前推进的。

Python 很容易就能写出各种风格的并发程序。在并发量较小的场合可以使用线程(thread)​,如果要运行大量的并发函数,那么可以使用协程(coroutine)​。

并发前瞻

程序范围变大、需求变复杂之后,经常要用多条路径平行地处理任务。假如,要实现康威生命游戏(Conway’s Game of Life)​,这是个经典的有限状态自动机。

它的规则很简单:在任意长宽的二维网格中,每个单元格都必须处于 ALIVE 或 EMPTY 状态,前者表示这个单元格里有生命存在,后者表示这里没有生物(或者原有生物已经死亡)​。

ALIVE = '*'
EMPTY = '-'

时钟每走一格,游戏就要前进一步。这个时候,需要考虑每个单元格的周围总共有多少个处于存活状态的单元格,并根据这个数量来决定本单元格的新状态:如果当前有生命体存在(ALIVE)​,那么该生命体有可能继续存活,也有可能死亡;如果单元格当前是空白的(EMPTY)​,那么下一步有可能继续保持空白,也有可能诞生新的生命体​。先试着画一张 5×5 的网格,然后把这个游戏推进四步,看看每一代的生存情况。

   0   |   1   |   2   |   3   |   4----- | ----- | ----- | ----- | ------*--- | --*-- | --**- | --*-- | -------**- | --**- | -*--- | -*--- | -**-----*- | --**- | --**- | --*-- | ---------- | ----- | ----- | ----- | -----

可以定义一个简单的容器类管理这些单元格的状态。这个类必须提供 get 与 set 方法,以获取并设置任何一个坐标点(或者说任何一个单元格)的值。如果坐标越界,那么应该自动绕回,产生一种无限折返的效果。

class Grid:def __init__(self, height, width):self.height = heightself.width = widthself.rows = []for _ in range(self.height):self.rows.append([EMPTY] * self.width)def get(self, y, x, state):self.rows[y % self.height][x % self.width] = statedef __str__(self):result = ''for row in self.rows:result += ''.join(row) + '\n'return result

为了观察这个类的实际效果,我们创建 Grid 实例,并采用经典的滑翔机(glider)形状来开局:

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)# >>>
# ---*-----
# ----*----
# --***----
# ---------
# ---------

现在,需要想个办法获取相邻单元格的状态。这里决定编写一个辅助函数来实现,该辅助函数可以查询本单元格周边的八个单元格,并统计其中有几个处于存活(ALIVE)状态。给函数设计参数时,不应该让它明确接受 Grid 实例,因为那样会导致这个函数与 Grid 类耦合。只需要把一个能根据坐标来查询单元格状态的函数传给 get 参数即可。

def count_neighbors(y, x, get):n_ = get(y - 1, x + 0)  # Northne = get(y - 1, x + 1)  # Northeaste_ = get(y + 0, x + 1)  # Eastse = get(y + 1, x + 1)  # Southeasts_ = get(y + 1, x + 0)  # Southsw = get(y + 1, x - 1)  # Southwestw_ = get(y + 0, x - 1)  # Westnw = get(y - 1, x - 1)  # Northwestneighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]count = 0for state in neighbor_states:if state == ALIVE:count += 1return count

现在来定义康威生命游戏的逻辑。这套逻辑共有三条规则。第一,如果单元格里有生命体,而且周边的存活单元格少于两个,那么本单元格里的生命体死亡;第二,如果单元格里有生命体,而且周边的存活单元格多于三个,那么本单元格里的生命体死亡;第三,如果单元格为空(或者说,单元格里面的生命体已经死亡)​,而且周边的存活单元格恰好是三个,那么本单元格里的生命体复活。

def game_logic(state, neighbors):if state == ALIVE:if neighbors < 2:return EMPTY     # Die: Too fewelif neighbors > 3:return EMPTY     # Die: Too manyelse:if neighbors == 3:return ALIVE     # Regeneratereturn state

接下来编写一个函数,用以更改单元格的状态。它可以把刚写的 count_neighbors 与 game_logic 利用起来。这个函数根据坐标查出单元格当前的状态,然后统计周边总共有多少个存活的单元格,接下来根据当前状态与存活的邻居数判断本单元格在下一轮的状态,最后,更新单元格的状态。在设计这个接口时,与 count_neighbors 一样,也不允许传入 Grid 实例,而是传入一个能根据坐标来设置新状态的函数,以降低耦合度。

def step_cell(y, x, get, set):state = get(y, x)neighbors = count_neighbors(y, x, get)next_state = game_logic(state, neighbors)set(y, x, next_state)

最后,定义一个函数,把整张网格之中的每一个单元格都向前推进一步,并返回一张新的网格,用来表示下一代的状态。在实现这个函数时,要调用刚才写的 step_cell 函数,这时必须注意把 get 与 set 参数写对。get 指的是当前这代网格(grid)之中的 get 方法,而 set 指的则是下一代网格(next_grid)的 set 方法,只有这样,才能让每一个单元格都按照现在的情况分别演化到下一轮,而不会让先演化的单元格影响其他单元格的迁移结果,这对于游戏正常运行是很重要的。假如设计 step_cell 时,让它只接受一个 Grid 实例,而不是分别通过两个参数来接受获取与设置单元格状态所用的那两个方法,那么这里的 simulate 就不好写了,若是把当前的 grid 传过去,那么它里面的单元格状态就会被 step_cell 函数破坏掉。

def simulate(grid):next_grid = Grid(grid.height, grid.width)for y in range(grid.height):for x in range(grid.width):step_cell(y, x, grid.get, next_grid.set)return next_grid

现在,通过 for 循环来推进这张网格(或者说棋盘)​,推进到第四代时,大家就会发现,原来那个滑翔机的形状已经整体向右下方移动了一个位置。当然这个效果,最终还是通过 game_logic 函数里面那三条简单的规则而得以落实的。

class ColumnPrinter:columns = []def append(self, index, grid):grid_len = len(grid.strip('\n').split('\n'))if not self.columns: for _ in range(grid_len + 1):self.columns.append([])self.columns[0].append(index)grids = grid.strip('\n').split('\n')for i, g in enumerate(grids, 1):self.columns[i].append(g)def __str__(self):for column in self.columns:for i, c in enumerate(column, 1):print(f'{c}'.center(10), end='')if i != len(column):print('|', end='')print('\n')return ''columns = ColumnPrinter()
for i in range(5):columns.append(str(grid))grid = simulate(grid)print(columns)
# >>>
#     0     |    1     |    2     |    3     |    4     
# ---*----- |--------- |--------- |--------- |--------- 
# ----*---- |--*-*---- |----*---- |---*----- |----*---- 
# --***---- |---**---- |--*-*---- |----**--- |-----*--- 
# --------- |---*----- |---**---- |---**---- |---***--- 
# --------- |--------- |--------- |--------- |--------- 

这个程序,在单机单线程的环境下,是没有问题的。但如果需求变了呢?正如刚才暗示过的那样,game_logic 函数或许要执行某些 I/O 操作(例如要通过 socket 通信)​。例如,如果这是大型多人在线游戏(massively multiplayer online game,MMOG)的一部分,那么这些单元格可能分别对应全球各地的玩家,所以在迁移每个单元格的状态时,都要联网查询其他玩家的状态,这样可能必须要执行 I/O 操作。

这种需求应该如何实现呢?最简单的办法是,把执行阻塞式的 I/O 操作直接放在 game_logic 函数里面执行。

def game_logic(state, neighbors):...# Do some blocking input/output in here: # data = my_socket.reve(100)...

这种写法的问题在于,它会拖慢整个程序的速度。如果 game_logic 函数每次执行的 I/O 操作需要 100 毫秒才能完成(与国外的玩家通信一个来回,确实有可能需要这么长时间)​,那么把整张网格向前推进一代最少需要 4.5 秒,因为 simulate 函数在推进网格时,是一个一个单元格来计算的,它需要把这 45 个单元格按顺序计算一遍。这对于网络游戏来说,实在太慢,让人没耐心玩下去。另外,这个方案也无法扩展,假如单元格的数量增加到一万,那么计算新一代网格所花的总时间就会超过 15 分钟。

若想降低延迟时间,应该平行地执行这些 I/O 操作,这样的话,无论网格有多大,都只需要 100 毫秒左右就能推进到下一代。针对每个工作单元开辟一条执行路径,这种模式叫作扇出(fan-out)​,对于本例来说,工作单元指的是网格中的单元格。然后,要等待这些并发的工作单元全部完工,才能执行下一个环节,这种模式叫作扇入(fan-in)​,对于本例来说,下一个环节指的是让整张网格进入新的一代。

Python 提供了许多内置的工具,可以实现 fan-out 与 fan-in 模式,这些工具各有利弊。要了解每种方案的优点和缺点,这样才能用最合适的工具来应对具体的需求,实现高并发 I/O。

用线程平行地处理 I/O

想在 Python 里平行地做 I/O,首先要考虑的工具当然是线程。但如果真用线程来表示 fan-out 模式中的执行路径,就会发现,这样其实有很多问题。

以前面提到的生命游戏为例,用线程来解决 game_logic 函数由于执行 I/O 而产生的延迟问题。首先,这些线程之间需要锁定功能对其进行协调,以确保它们所操纵的数据结构不会遭到破坏。下面创建 Grid 子类,并为它添加锁定功能,让多条线程能够正确地访问同一个实例。

from threading import LockALIVE = '*'
EMPTY = '-'class Grid:...class LockingGrid(Grid):def __init__(self, height, width) -> None:super().__init__(height, width)self.lock = Lock()def __str__(self) -> str:with self.lock:return super().__str__()def get(self, y, x):with self.lock:return super().get(y, x)def set(self, y, x, state):with self.lock:return super().set(y, x, state)

接下来,改用 fan-out 模式实现 simulate 函数,为每个 step_cell 操作都创建一条线程,这些线程能够平行地运行并各自执行相应的 I/O 任务。这样的话,就不需要像原来那样,非得等前一次 step_cell 执行完,才能更新下一个单元格。然后,通过 fan-in 模式等待这些线程全部完工,再把网格正式演化到下一代。

from threading import Threaddef count_neighbors(y, x, get):...def game_logic(state, neighbors):...def step_cell(y, x, get, set):state = get(y, x)neighbors = count_neighbors(y, x, get)next_state = game_logic(state, neighbors)set(y, x, next_state)def simulate_thread(grid):next_grid = LockingGrid(grid.height, grid.width)threads = []for y in range(grid.height):for x in range(grid.width):args = (y, x, grid.get, next_grid.set)thread = Thread(target=step_cell, args=args)thread.start()  # Fan outthreads.append(thread)for thread in threads:thread.join()  # Fan inreturn next_grid

负责推进单元格状态的 step_cell 函数可以保持原样,推动整个游戏流程的那些代码基本上也不用改,只有两个地方必须调整:一个是把网格类的名称改为 LockingGrid,另一个是把 simulate 函数改为多线程版本的 simulate_threaded。

grid = LockingGrid(5, 9)  # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)columns = ColumnPrinter()
for i in range(5):columns.append(i, str(grid))grid = simulate_thread(grid)  # Changedprint(columns)# >>>
#     0     |    1     |    2     |    3     |    4
# ---*----- |--------- |--------- |--------- |---------
# ----*---- |--*-*---- |----*---- |---*----- |----*----
# --***---- |---**---- |--*-*---- |----**--- |-----*---
# --------- |---*----- |---**---- |---**---- |---***---
# --------- |--------- |--------- |--------- |---------

这样写没错,而且 I/O 操作确实能够平行地在各线程中执行。但是,这种方案有三个比较大的缺点。

  • 第一,要专门采用工具来协调这些Thread实例,才能保证它们不会破坏程序中的数据。这会让多线程版本的代码理解起来比较困难,因为早前的单线程版是按照先后顺序实现的,不需要有专门的协调机制,所以读起来比现在的版本好懂。这种复杂的多线程代码,随时间的推移更加难以扩展与维护。
  • 线程占用的内存比较多,每条线程大约占8MB。当然,现在的许多电脑完全能承担起本例中45条线程所占用的内存总量。但如果游戏里有一万个单元格,那就必须创建一万条线程,这个内存量恐怕很多电脑不支持。所以,给每项操作都新开一条线程是不现实的。
  • 线程的开销比较大。系统频繁地切换线程,会降低程序的运行效率。对于本例来说,每推进一代,就要停止一批旧线程并启动一批新线程,这样开销很大,于是程序除了要花 100 毫秒等待 I/O 操作结束,还会因为停止并启动线程而耽误一些时间。

如果线程所执行的代码出现了错误,也会很难调试。例如,game_logic 函数有可能会抛出异常,因为它要执行的那种 I/O 操作本来就属于有时成功有时失败的操作。

def game_logic(state, neighbors):...raise OSError('Problem with I/O')...

如果需要一大批执行路径分头去执行某项任务,而且还要频繁地启动并停止这批执行路径,那么每次都需手工新建一批线程,这肯定不是个好办法。

Queue 类实现多线程管道

另一种方案,也就是用内置的 queue 模块里的 Queue 类实现多线程管道。

这种方案的总思路是:在推进生命游戏时,不像原来那样,每推进一代,就新建一批线程来推进相应的单元格,而是可以提前创建数量固定的一组工作线程,令这组线程平行地处理当前这批 I/O 任务,并在处理完之后,继续等待下一批任务,这样就不会消耗那么多资源了,程序也不会再因为频繁新建线程而耽误那么多时间。

虽然利用 Queue 能把 fan-out 与 fan-in 实现了,但这样做的代价很大。当然这种通过队列来衔接各个环节的方案,要比每次做 I/O 时都新建 Thread 实例要好。但是为了改用队列方案来处理 I/O,需要重构许多代码,如果管道要分成好几个环节,那么要修改的地方会更多。

利用队列并行地处理 I/O 任务,其处理 I/O 任务量有限,可以考虑用 Python 内置的某些功能与模块打造更好的方案。

通过 ThreadPoolExecutor 实现线程并发

Python 有个内置模块叫作 concurrent.futures,它提供了 ThreadPoolExecutor 类。这个类结合了线程(Thread)方案与队列(Queue)方案的优势,可以用来平行地处理 I/O 操作。

还是以之前的游戏为例。

ALIVE = '*'
EMPTY = '-'class Grid:...class LockingGrid(Grid):...def count_neighbors(y, x, get):...def game_logic(state, neighbors):...def step_cell(y, x, get, set):state = get(y, x)neighbors = count_neighbors(y, x, get)next_state = game_logic(state, neighbors)set(y, x, next_state)

这次在把游戏推进到下一代的时候,不针对每个单元格启动新的 Thread 实例,而是把推进每个单元格状态的那个函数与必要的参数提交给 ThreadPoolExecutor,让执行器自己安排线程去执行这些状态更新任务,这样就实现了 fan-out(分派)​。稍后,可以等待提交过去的所有任务都执行完毕,然后再把整张网格正式推进到下一代,这样就实现了 fan-in(归集)​。

from concurrent.futures import ThreadPoolExecutordef simulate_pool(pool, grid):next_grid = LockingGrid(grid.height, grid.width)futures = []for y in range(grid.height):for x in range(grid.width):args = (y, x, grid.get, next_grid.set)future = pool.submit(step_cell, *args)  # Fan outfutures.append(future)for future in futures:future.result()                             # Fan inreturn next_grid

用来推进游戏状态的这些线程可以提前分配,不用每次执行 simulate_pool 都分配一遍,这样能够降低启动线程的开销。另外,线程池里的最大线程数可以通过 max_workers 参数手工指定,这样能把线程数量限制在一定范围内,而不像最早的那个方案那样,每执行一项 I/O 操作,就启动一条线程,那样会导致内存用量激增。

class ColumnPrinter:...grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)columns = ColumnPrinter()
with ThreadPoolExecutor(max_workers=10) as pool:for i in range(5):columns.append(i, str(grid))grid = simulate_pool(pool, grid)print(columns)# >>>
#     0     |    1     |    2     |    3     |    4
# ---*----- |--------- |--------- |--------- |---------
# ----*---- |--*-*---- |----*---- |---*----- |----*----
# --***---- |---**---- |--*-*---- |----**--- |-----*---
# --------- |---*----- |---**---- |---**---- |---***---
# --------- |--------- |--------- |--------- |---------

ThreadPoolExecutor 类的最大优点在于:如果调用者通过 submit 方法把某项任务提交给它执行,那么会获得一个与该任务相对应的 Future 实例,当调用者在这个实例上通过 result 方法获取执行结果时,ThreadPoolExecutor 会把它在执行任务的过程中所遇到的异常自动抛给调用者。

def game_logic(state, neighbors):...raise OSError('Problem with I/O')...with ThreadPoolExecutor(max_workers=10) as pool:task = pool.submit(game_logic, ALIVE, 3)task.result()# >>>
# Traceback ...
# OSError: Problem with I/O

如果除了 game_logic 之外,count_neighbors 函数也必须执行 I/O 操作,那么只需要沿用现在的代码,就可以让那些 I/O 操作得到并发,因为这两个函数都是 step_cell 任务的一部分,而 step_cell 任务会由 ThreadPoolExecutor 负责执行。即便要做的不是 I/O 操作,而是那种需要放在 CPU 上面计算的任务,也还是可以沿用这套接口。

ThreadPoolExecutor 方案仍然有个很大的缺点,就是 I/O 并行能力不高,即便把 max_workers 设成 100,也无法高效地应对那种有一万多个单元格,且每个单元格都要同时做 I/O 的情况。如果你面对的需求,没办法用异步方案解决,而是必须执行完才能往后走(例如文件 I/O )​,那么 ThreadPoolExecutor 是个不错的选择。不过,ThreadPoolExecutor 不像直接启动线程的方案那样,需要消耗大量内存。

用协程实现高并发 I/O

对于同时需要执行的 I/O 任务有成千上万个,那么上述那些方案的效率就不太理想了。

像这种在并发方面要求比较高的 I/O 需求,可以用 Python 的协程(coroutine)来解决。协程能够制造出一种效果,让觉得 Python 程序好像真的可以同时执行大量任务。这种效果需要使用 async 与 await 关键字来实现,它的基本原理与生成器(generator)类似,也就是不立刻给出所有的结果,而是等需要用到的时候再一项一项地获取。

启动协程是有代价的,就是必须做一次函数调用。协程激活之后,只占用不到 1KB 内存,所以只要内存足够,协程稍微多一些也没关系。与线程类似,协程所要执行的任务也是用一个函数来表示的,在执行这个函数的过程中,协程可以从执行环境里面获取输入值,并把输出结果放到这个执行环境之中。协程与线程的区别在于,它不会把这个函数从头到尾执行完,而是每遇到一个 await 表达式,就暂停一次,下次继续执行的时候,它会先等待 await 所针对的那项 awaitable 操作有了结果(那项操作是用 async 函数表示的)​,然后再推进到下一个 await 表达式那里(这跟生成器函数的运作方式有点像,那种函数也是一遇到 yield 就暂停)​。

Python 系统可以让数量极多的 async 函数各自向前推进,看起来像很多条 Python 线程那样,能够并发地运行。然而,这些协程并不会像线程那样占用大量内存,启动和切换的开销也比较小,而且不需要用复杂的代码来实现加锁或同步。这种强大的机制是通过事件循环(event loop)打造的,只要把相关的函数写对,这种循环就可以穿插着执行许多个这样的函数,并且执行得相当快,从而高效地完成并发式的 I/O 任务。

现在就用协程来实现生命游戏。目标是让游戏能够高效地执行 game_logic 函数里面的 I/O 操作,同时又不像前面提到的 Thread 方案与 Queue 方案那样,有那么多缺点。首先修改 game_logic 函数,这次必须在定义函数所用的那个 def 关键字前面,加上 async,表示该函数是一个协程,这样就可以在函数里面用 await 做 I/O 了(例如从套接字(socket)之中异步读取一份数据)。

ALIVE = '*'
EMPTY = '-'class Grid:...def count_neighbors(y, x, get):...async def game_logic(state, neighbors):...

同理,给 step_cell 函数也添上 async 关键字,把它变为协程,并在调用 game_logic 的那个地方使用 await 关键字。

async def step_cell(y, x, get, set):state = get(y, x)neighbors = count_neighbors(y, x, get)next_state = await game_logic(state, neighbors)set(y, x, next_state)

simulate 函数也同样需要变为协程。

import asyncioasync def simulate(grid):next_grid = Grid(grid.height, grid.width)tasks = []for y in range(grid.height):for x in range(grid.width):task = step_cell(y, x, grid.get, next_grid.set)  # Fan outtasks.append(task)await asyncio.gather(*tasks)                             # Fan inreturn next_grid

async 版本的 simulate 函数,有以下几个地方需要解释:

  • 第一,它在调用 step_cell 的时候,系统并不会立刻执行这个函数,而是会返回一个协程实例,稍后会把这个实例写在 await 表达式里面。这里的 step_cell,好比那种用 yield 写的生成器函数一样,调用时并不立刻执行它,而是返回一个生成器实例。这样就可以实现任务 fan-out(分派)模式了。
  • 第二,内置的 asyncio 模块提供了 gather 函数,可以用来实现 fan-in(归集)模式。把 gather 写在 await 表达式里面,可以让系统用事件循环去并发地执行那些 step_cell 协程,并在全部执行完之后,再往下推进 simulate 协程。
  • 第三,由于这些代码都是在同一条线程里执行的,因此不需要给 Grid(网格)实例加锁,至于怎样让这些 I/O 操作表现出平行的效果,则是由 asyncio 所提供的事件循环来负责的。

最后,要调整原范例之中用来推动游戏流程的那段代码。只需要修改一行代码,也就是把 simulate(grid) 这个协程交给 asyncio.run 去运行,从而利用事件循环机制去执行推进单元格状态所需的那些 I/O 操作。

class ColumnPrinter:...grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)columns = ColumnPrinter()
for i in range(5):columns.append(i, str(grid))grid = asyncio.run(simulate(grid))  # Run the event loopprint(columns)# >>>
#     0     |    1     |    2     |    3     |    4
# ---*----- |--------- |--------- |--------- |---------
# ----*---- |--*-*---- |----*---- |---*----- |----*----
# --***---- |---**---- |--*-*---- |----**--- |-----*---
# --------- |---*----- |---**---- |---**---- |---***---
# --------- |--------- |--------- |--------- |---------

这样做的结果跟原来相同,但是这次,不需要再花费资源去创建线程了。另外,Queue 方案与 ThreadPoolExecutor 方案虽然能够处理异常,但仅仅是把这些异常从工作线程抛到主线程而已。跟那两种方案相比,采用协程所实现的这种方案,调试起来更为容易,因为可以在调试器(debugger)的交互界面之中,以单步模式来调试这种程序,也就是一行一行地去执行代码并观察效果​。

async def game_logic(state, neighbors):...raise OSError('Problem with I/O')...asyncio.run(game_logic(ALIVE, 3))# >>>
# Traceback ...
# OSError: Problem with I/O

如果需求变了,例如 count_neighbors 函数现在也需要做 I/O 了,那么只需要在已有的代码中把这种函数声明成 async,并在调用它的那些地方添上 await 即可。不用像早前的 Thread 方案或 Queue 方案那样,重新调整代码结构。

async def count_neighbors(y, x, get):...async def step_cell(y, x, get, set):state = get(y, x)neighbors = await count_neighbors(y, x, get)next_state = await game_logic(state, neighbors)set(y, x, next_state)grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)columns = ColumnPrinter()
for i in range(5):columns.append(i, str(grid))grid = asyncio.run(simulate(grid))  # Run the event loopprint(columns)# >>>
#     0     |    1     |    2     |    3     |    4     
# ---*----- |--------- |--------- |--------- |---------
# ----*---- |--*-*---- |----*---- |---*----- |----*----
# --***---- |---**---- |--*-*---- |----**--- |-----*---
# --------- |---*----- |---**---- |---**---- |---***---
# --------- |--------- |--------- |--------- |---------

协程的优点是,能够把那些与外部环境交互的代码(例如 I/O 调用)与那些实现自身需求的代码(例如事件循环)解耦。使得可以把重点放在实现需求所用的逻辑上面,而不用专门花时间去写一些代码来确保这些需求能够并发地执行。


http://www.mrgr.cn/news/27144.html

相关文章:

  • 什么是幂等
  • Coggle数据科学 | 科大讯飞AI大赛:人岗匹配挑战赛 赛季3
  • Java多线程编程-基础篇
  • 利士策分享,细品礼仪之美:在日常中优雅相处的艺术
  • 【FATFS】FATFS简介及下载
  • ​经​纬​恒​润​二​面​​三​七​互​娱​一​面​​元​象​二​面​
  • ET 框架问题集合(请收藏,不定时更新)
  • 【例题】lanqiao1331 二进制中 1 的个数
  • MySQL聚合统计
  • Nginx:Web架构中的全能战士
  • 一分钟教你 全平台隔空投送文件 LoaclSend保姆级教程
  • Git换行符自动转换参数core.autocrlf的用法
  • 【零成本】七日杀 服务器搭建 异地联机 无需公网IP、服务器
  • 看Threejs好玩示例,学习创新与技术(二)
  • Java的发展史与前景
  • 小程序开发之我见
  • 分块总结:时髦之裤
  • openstack之cinder介绍
  • lightdm , xrandr , startx 桌面管理器,窗口管理器
  • ruby和python哪个好学