PYNQ 框架 - VDMA驱动 - 帧缓存
目录
1. 简介
2. 代码分析
2.1 _FrameCache 类定义
2.1.1 xlnk.cma_array()
2.1.2 pointer=None
2.1.3 PynqBuffer
2.2 _FrameCache 例化与调用
2.3 _FrameCache 测试
2.4 _FrameList 类定义
2.5 _FrameList 例化与调用
2.6 _FrameList 测试
3. 帧的使用
3.1 读取帧
4. 总结
1. 简介
本文分享在 PYNQ 框架下,AXI VDMA 驱动的部分实现细节,重点分析帧缓存的管理和使用。
重点分析了 _FrameCache 和 _FrameList 类的实现与功能。这些类用于管理帧缓存,包括内存分配、帧获取、所有权管理等操作,确保高效的视频数据处理和传输。
代码的主要结构框架如下:
class _FrameCache:_xlnk = None...def getframe(self):"""从缓存中检索一个帧,或者在缓存为空时创建一个新帧。"""class AxiVDMA(DefaultIP):
"""Xilinx VideoDMA IP核的驱动类该驱动程序分为输入和输出通道,并通过 readchannel 和 writechannel 属性公开。
每个通道都有 start 和 stop 方法来控制数据传输。DMA 帧缓存使用单一所有权模型,即帧要么由 DMA 拥有,要么由用户代码拥有,
不能同时被两者拥有。S2MMChannel.readframe 和 MM2SChannel.newframe 都会
将帧返回给用户。用户有责任使用 freebuffer() 方法释放帧,或者使用
MM2SChannel.writeframe 将所有权交还给 DMA。一旦所有权被归还,用户不应访问帧
的内容,因为底层内存可能会在没有警告的情况下被删除。
"""class _FrameList:"""用于处理与 DMA 通道关联的帧列表的内部辅助类。除非通过 takeownership显式移除,否则假定其包含的所有帧的所有权。"""def __init__(self, parent, offset, count):self._frames = [None] * count...class S2MMChannel:...def start(self):...self._cache = _FrameCache(self._mode, cacheable=self.cacheable_frames)for i in range(len(self._frames)):self._frames[i] = self._cache.getframe()...class MM2SChannel:...def start(self):...self._cache = _FrameCache(self._mode, cacheable=self.cacheable_frames)self._frames[0] = self._cache.getframe()...def __init__(self, description, framecount=None):...super().__init__(description)if "parameters" in description:parameters = description["parameters"]has_s2mm = parameters["C_INCLUDE_S2MM"] == "1"has_mm2s = parameters["C_INCLUDE_MM2S"] == "1"...if has_s2mm: # 由IP设置确定是否包含属性self.readchannel = AxiVDMA.S2MMChannel(self, self.s2mm_introut, memory)if has_mm2s:self.writechannel = AxiVDMA.MM2SChannel(self, self.mm2s_introut, memory)...bindto = ["xilinx.com:ip:axi_vdma:6.2", "xilinx.com:ip:axi_vdma:6.3"]
2. 代码分析
2.1 _FrameCache 类定义
import asyncio
import numpy as np
from pynq.xlnk import ContiguousArray
from pynq import DefaultIP, Xlnkclass _FrameCache:_xlnk = None # 类变量(静态变量)def __init__(self, mode, capacity=5, cacheable=0):self._cache = [] # 空列表,帧缓存的指针self._mode = modeself._capacity = capacityself._cacheable = cacheabledef getframe(self):if self._cache: # 缓存有数据frame = _FrameCache._xlnk.cma_array(shape=self._mode.shape, dtype='u1', cacheable=self._cacheable,pointer=self._cache.pop(), cache=self)else: # 缓存为空if _FrameCache._xlnk is None:_FrameCache._xlnk = Xlnk() # 执行延迟初始化# 创建连续内存frame = _FrameCache._xlnk.cma_array(shape=self._mode.shape, dtype=np.uint8,cacheable=self._cacheable, cache=self)return frame# 添加到缓存列表def return_pointer(self, pointer):if len(self._cache) < self._capacity:self._cache.append(pointer)def clear(self):self._cache.clear()
1)主要功能
- 初始化(__init__):缓存列表_cache、模式_mode、缓存容量_capacity和是否可缓存的标志_cacheable。
- 获取帧(getframe):
- 如果缓存中有可用的帧,则从缓存中取出一个帧并返回。
- 如果缓存为空,则创建一个新的帧。
- 使用 Xlnk 库的 cma_array 方法来分配连续内存的数组。
- 返回的数组对象的 freebuffer 方法被重写,以便在被释放时将其返回到缓存,而不是直接释放掉。
- 返回指针(return_pointer方法):
- 将一个帧的指针返回到缓存中,如果缓存未达到容量限制,则将指针添加到缓存列表中。
- 清空缓存(clear方法):
- 清空缓存中的所有帧指针。
2)_xlnk = None
- _xlnk 是一个类变量(静态变量),为了实现 Xlnk 实例的延迟初始化和共享。
- 通过将 _xlnk 设为类变量,所有的 _FrameCache 实例都可以共享同一个 Xlnk 实例。这对于管理有限资源(如硬件设备的内存接口)是有用的,避免了重复的实例化和资源浪费。
3)cma_array() 函数的参数 cache=self
在 cma_array 创建的对象中保留对 _FrameCache 的引用,以便在对象不再使用时,可以通过 _FrameCache 实例执行特定的资源管理操作。
4)dtype='u1'
指定数据类型为无符号8位整数(等同于 np.uint8),表示每个元素占用1个字节。
2.1.1 xlnk.cma_array()
def cma_array(self, shape, dtype=np.uint32, cacheable=0,pointer=None, cache=None):if isinstance(shape, numbers.Integral):shape = [shape]dtype = np.dtype(dtype)elements = functools.reduce(lambda value, total: value * total, shape)length = elements * dtype.itemsizeif pointer is None:raw_pointer = self.cma_alloc(length, cacheable=cacheable)pointer = self.ffi.gc(raw_pointer, self.cma_free, size=length)buffer = self.cma_get_buffer(pointer, length)physical_address = self.cma_get_phy_addr(pointer)view = PynqBuffer(shape=shape, dtype=dtype, buffer=buffer,device_address=physical_address,coherent=not cacheable,bo=physical_address, device=self)view.pointer = pointerview.return_to = cachereturn view
功能:创建一个物理上连续的 numpy 数组。可以通过返回对象的 physical_address 属性找到该数组的物理地址。当不再需要数组时,应该使用 array.freebuffer() 或 array.close() 来释放数组。此外,cma_array 可以在 with 语句中使用,以便在代码块结束时自动释放内存。
参数:
- shape(int 或 int 的元组)—— 要构建的数组的维度
- dtype(numpy.dtype 或 str)—— 要构建的数据类型,默认为32位无符号整数
- cacheable(int)—— 缓冲区是否可缓存,默认值为0
返回:numpy 数组;返回类型:numpy.ndarray
2.1.2 pointer=None
pointer 默认值 None。
1)内存分配
当 pointer 是 None 时,代码会执行 self.cma_alloc(length, cacheable=cacheable) 来分配一块所需大小 (length) 的内存,并返回一个指向这块内存的原始指针 raw_pointer。
2)自定义 pointer
如果调用者提供了一个自定义的 pointer,则意味着内存已经在函数外部分配好了,函数只需使用该指针而不再需要自行分配内存。
2.1.3 PynqBuffer
view = PynqBuffer(shape=shape, dtype=dtype, buffer=buffer,device_address=physical_address,coherent=not cacheable,bo=physical_address, device=self)
PynqBuffer 是一个类,用于创建一个物理上连续的内存缓冲区,继承自 numpy.ndarray,并添加了一些额外的属性和方法来处理物理地址和缓存一致性。
此处代码是创建一个 PynqBuffer 对象,并初始化其属性:
- shape 和 dtype 定义了缓冲区的形状和数据类型。
- buffer 是实际的数据缓冲区。
- device_address 是缓冲区的物理地址。
- coherent 表示缓冲区是否是一致的(即是否需要缓存一致性)。
- bo 是缓冲区对象的标识符。
- device 是与缓冲区关联的设备。
缓存一致性(Cache Coherence)
当多个处理器核心共享同一块内存区域时,如果一个核心修改了这块内存中的数据,其他核心的缓存中也必须反映这一变化,以避免数据不一致的问题。
cacheable 参数
- cacheable=0(不可缓存):缓冲区的数据不会被缓存,直接从主内存读取和写入。这种情况下,数据的一致性由硬件保证,适用于需要频繁访问和修改的缓冲区。
- cacheable=1(可缓存):缓冲区的数据会被缓存到处理器的缓存中,以提高访问速度。这种情况下,需要手动处理缓存一致性问题,例如在数据修改后刷新缓存,以确保其他处理器核心看到的数据是最新的。
2.2 _FrameCache 例化与调用
_FrameCache 类的实例化,是在 AxiVDMA 类的嵌套两个子类 S2MMChannel 和 MM2SChannel 类中 start 方法中进行的,代码如下:
class AxiVDMA(DefaultIP):"""Driver class for the Xilinx VideoDMA IP core"""class S2MMChannel:...def start(self):"""Start the DMA. The mode must be set prior to this being called"""if not self._mode:raise RuntimeError("Video mode not set, channel not started")self.desiredframe = 0# 创建 _FrameCache 对象(初始化)self._cache = _FrameCache(self._mode, cacheable=self.cacheable_frames)# 依据_frames数量,申请若干个CMA区域for i in range(len(self._frames)):self._frames[i] = self._cache.getframe()self._writemode()self.reload()self._mmio.write(0x30, 0x108b)#0x00011083) # Start DMAself.irqframecount = 4 # Ensure all frames are written toself._mmio.write(0x34, 0x1000) # Clear any interruptswhile not self.running:passself.reload()self.desiredframe = 1class MM2SChannel:...def start(self):"""Start the DMA channel with a blank screen. The mode mustbe set prior to calling or a RuntimeError will result."""if not self._mode:raise RuntimeError("Video mode not set, channel not started")self._cache = _FrameCache(self._mode, cacheable=self.cacheable_frames)self._frames[0] = self._cache.getframe()self._writemode()self.reload()self._mmio.write(0x00, 0x008b)#0x00011089)while not self.running:passself.reload()self.desiredframe = 0pass
_FrameCache 类会根据 _mode 中的形状申请内存区域,另一个参数是 cacheable_frames,标志是否可缓存。
self._cache = _FrameCache(self._mode, cacheable=self.cacheable_frames)
2.3 _FrameCache 测试
可以从以下几个方面进行测试:
1)初始化测试:
- 确认 _FrameCache 类的实例化是否正确,检查 mode、capacity 和 cacheable 参数是否正确设置。
- 确认 _cache 列表是否初始化为空。
2)getframe 方法测试:
- 在缓存为空的情况下调用 getframe,确保能够正确创建新的 frame。
- 在缓存不为空的情况下调用 getframe,确保能够从缓存中取出 frame。
- 确认 frame 对象的 shape 和 dtype 是否与 mode 的 shape 和 dtype 一致。
- 确认 frame 对象的 cacheable 属性是否正确设置。
3)缓存管理测试:
- 调用 return_pointer 方法,确保指针正确返回到缓存中。
- 确认缓存的大小不超过 capacity。
4)清理测试:
- 调用 clear 方法,确保缓存被正确清空。
5)边界条件测试:
- 测试 capacity 为 0 或负值的情况,确保代码能够正确处理。
- 测试 mode 为不同形状和数据类型的情况,确保代码的通用性。
6)异常处理测试:
- 确认在 _xlnk 未初始化的情况下,getframe 方法能够正确初始化 _xlnk。
- 测试在 Xlnk 类不可用或初始化失败的情况下,代码的异常处理是否合理。
通过这些测试,可以全面验证 _FrameCache 类的功能和稳定性。
2.4 _FrameList 类定义
_FrameList 是一个内部辅助类,用于处理与 VDMA 通道关联的帧列表,负责管理帧的存储、访问和所有权转移。
class AxiVDMA(DefaultIP):class _FrameList:def __init__(self, parent, offset, count=3):self._frames = [None] * count # 创建列表[None, None, None]self._mmio = parent._mmioself._offset = offset # 即通道 Start Addressself._slaves = set() # 集合self.count = countself.reload = parent.reload # 写入VSize,重启 VDMA 通道def __getitem__(self, index):frame = self._frames[index]return framedef takeownership(self, index):self._frames[index] = Nonedef __len__(self):return self.countdef __setitem__(self, index, frame):self._frames[index] = frameif frame is not None:self._mmio.write(self._offset + 4 * index, frame.physical_address)else:self._mmio.write(self._offset + 4 * index, 0)self.reload()for s in self._slaves:s[index] = frames.takeownership(index)def addslave(self, slave):self._slaves.add(slave)for i in range(len(self._frames)):slave[i] = self[i]slave.takeownership(i)slave.reload()def removeslave(self, slave):self._slaves.remove(slave)
主要功能:
- 初始化
- 索引访问 __getitem__(self, index):通过索引访问帧列表中的某个帧
- 长度访问 __len__(self)
- 赋值行为:_FrameList[i] = value
- 设置或更新帧列表中指定索引位置的帧。
- 如果帧非空,将帧的物理地址写入对应 Start Address。
- 如果帧为空,则在对应的内存地址写入0。
- 调用 reload() 方法来启动 VDMA。
- 更新所有从属对象中对应的帧信息,并转移所有权。
- 添加从属对象 addslave(self, slave)
- 移除从属对象 removeslave(self, slave)
这个类的设计允许对帧进行集中管理,并通过内存映射输入输出与硬件设备进行交互,同时能够同步更新多个从属对象的帧信息。
2.5 _FrameList 例化与调用
class AxiVDMA(DefaultIP):class _FrameList:def __init__(self, parent, offset, count):self._frames = [None] * countself._mmio = parent._mmioself._offset = offsetself._slaves = set()self.count = countself.reload = parent.reload...class S2MMChannel:def __init__(self, parent, interrupt):self._mmio = parent.mmioself._frames = AxiVDMA._FrameList(self, 0xAC, parent.framecount)self._interrupt = interruptself._sinkchannel = Noneself._mode = Noneself.cacheable_frames = True...
1)def __init__(self, parent, offset, count)
_FrameList 的构造函数,包含3个参数:
- parent:想要引用父对象的方法,parent._mmio,parent.reloa
- offset:存放 S2MM 或者 MM2S 通道的 Start Address
- count:存放 Frame Buffers 变量,是在 Vivado IDE 中设定的
注意混淆,第一个参数 self 是不需要传递的。
2)self._frames = AxiVDMA._FrameList(self, 0xAC, parent.framecount)
创建一个 _FrameList 对象,并将其赋值给 self._frames。
- self 是 S2MMChannel的实例,作为 parent 参数传入
- 0xAC 是第二个参数
- parent.framecount 是第三个参数
3)__setitem__(self, index, frame) 赋值操作
调用 __setitem__ 方法在通道 start 函数中:
首先,理解 self._cache.getframe() 本身是执行了内存申请的操作,并且返回物理地址。
class AxiVDMA(DefaultIP):...class S2MMChannel:...def start(self):...self._cache = _FrameCache(self._mode, cacheable=self.cacheable_frames)for i in range(len(self._frames)):self._frames[i] = self._cache.getframe()...
2.6 _FrameList 测试
3. 帧的使用
3.1 读取帧
def readframe(self):"""从通道读取一帧并返回给用户此函数可能会阻塞,直到读取完整帧为止。单帧缓冲区会被保留,因此在长时间暂停读取后读取的第一帧可能会返回过时的帧。为了确保在开始处理视频时获取最新的帧,请在开始处理循环之前额外读取一次。返回值-------视频帧的 numpy.ndarray"""if not self.running:raise RuntimeError('DMA channel not started')while self._mmio.read(0x34) & 0x1000 == 0:loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.ensure_future(self._interrupt.wait()))passself._mmio.write(0x34, 0x1000)return self._readframe_internal()
def _readframe_internal(self):if self._mmio.read(0x34) & 0x8980:# Some spurious errors can occur at the start of transfers# let's ignore them for nowself._mmio.write(0x34, 0x8980)self.irqframecount = 1nextframe = self._cache.getframe()previous_frame = (self.activeframe + 2) % len(self._frames)captured = self._frames[previous_frame]self._frames.takeownership(previous_frame)self._frames[previous_frame] = nextframepost_frame = (self.activeframe + 2) % len(self._frames)captured.invalidate()return captured
4. 总结
本文详细解析了在 PYNQ 框架下,AXI VDMA 驱动中帧缓存管理的实现细节,重点介绍了 _FrameCache 和 _FrameList 类。
- _FrameCache 类通过管理帧缓存的内存分配和回收,确保了视频数据处理的高效性。其实现中,通过共享的 Xlnk 实例进行连续内存分配,并采用单一所有权模型来管理帧的使用和释放,避免了资源浪费和竞争。
- _FrameList 类则负责管理帧的存储、访问与所有权转移,支持多从属对象的同步更新。