当前位置: 首页 > news >正文

深入理解对象池 sync.Pool

文章目录

  • 前言
  • 应用
  • 使用
  • 源码走读
    • 数据结构
    • Get获取对象
    • Put归还对象
    • poolDeque分析
    • GC时
  • 总结

前言

当多个 goroutine 都需要创建同⼀种对象的时候,如果 goroutine 数量过多,导致对象的创建剧增,进⽽导致 GC 压⼒增大。形成下面的恶性循环:

并发⼤ --> 占⽤内存⼤ -->增加GC频率 -->限制程序执行效率 --> 处理并发能⼒降低 --> 并发更⼤

此时就需要有⼀个对象池,每个 goroutine 不⾃⼰调mallocgc从runtime分配对象,⽽是从对象池中获取⼀个对象,用完后再把对象放回池里。sync.Pool 就是这样一个对象池,可以减少GC频率和内存分配

  • 减少GC频率:内存分配少了,出发GC的频率就会降低
  • 减少内存分配:都不需要调malloc分配空间了,直接从池中拿对象

应用

在工程中 sync.Pool有广泛的应用,例如在 gin 中会给每个请求分配一个 Context 用于承载这次请求的上下文信息,从 pool 中获取 Context 对象,用完后还回去。每个请求都需要一个Context,因此在并发量高时,很适合用pool复用该对象

func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {// 从pool冲获取ctxc := engine.pool.Get().(*Context)c.reset(w)c.Request = reqc.reset()// 处理请求engine.handleHTTPRequest(c)// 归还ctx到pool中engine.pool.Put(c)
}

标准库fmt 也用了sync.Pool,用来复用pp对象,该对象承载执行一次print需要的上下文

// fmt.Println方法
func Println(a ...any) (n int, err error) {return Fprintln(os.Stdout, a...)
}func Fprintln(w io.Writer, a ...any) (n int, err error) {p := newPrinter()p.doPrintln(a)n, err = w.Write(p.buf)p.free()return
}func newPrinter() *pp {// 从pool获取一个ppp := ppFree.Get().(*pp)// ...return p
}func (p *pp) free() {// 如果buf过大,不归还buf,只归还pp本身if cap(p.buf) > 64*1024 {p.buf = nil} else {p.buf = p.buf[:0]}// ...// 归还ppppFree.Put(p)
}

需要特别注意free实现中的一个细节,如果pp的buffer经过了扩容,其容量大于某个阈值,则不把该pp放回pool,防止出现大内存不会被释放的情况

为啥大内存不会被释放?可以看看这个issue,如果buf被扩容得很大,不断经过放回pool,从pool中取出的循环,那么其占用的空间一直不会被释放

虽然可能后面使用该对象的请求,只使用buf中很少一部分,但由于对象pp持有大容量buf的引用,这片大内存就不会被回收,造成的后果就是内存泄露

怎么解决?对象大小超过一定阈值就不放回pool,交给GC进行内存回收


使用

  • New:声明对象的构造方法,当池中没有对象时,会调改方法创建对象

  • Get:从池中获取一个对象,返回any类型,需要调用方自行类型断言成需要的类型

    • 最佳实践:一个pool只放一种类型的对象
  • Put:对象使用完毕后,将对象归还到池中


需要注意:

  • sync.Pool不适用于线程池、DB连接池之类的存储,因为 GC 时会清除 sync.Pool 的数据,存储在 pool 中的对象在不被通知的情况下随时可能被清理掉

    • 为啥要在GC时清除?避免数据长时间占用内存,造成内存泄露
  • 不应该对pool池中的对象做任何假定,不保证上次Put和下次Get的对象一定是同一个。因为sync.Pool的目的就是缓解GC压力,不应该牵扯到任何业务

    • 推荐做法:在调用 Pool.Put 前进行对象 Reset 操作,或在 Pool.Get 操作后进行 Reset 操作,防止读出脏数据

源码走读

sync.Pool的底层存储设计和GMP十分契合:

  • 为每个处理器 P 分配了 privateshared 两部分存储空间

  • private 是 P 私有的对象,当前 P 可以完全无锁化访问 private,性能极高

    • 为啥能无锁化?因为一个P同一时刻只能运行一个G,没有并发问题
    • 且Get 和 Put 时都优先用private,也就是优先用高性能的方式
  • shared 是 P 下的共享象列表,当前 P 访问 shared 时也需要通过 CAS 操作保证并发安全,因为其他 P 也会来此尝试窃取对象,存在并发行为

数据结构

在这里插入图片描述

  • sync.Pool中包含 localvictim,都指向poolLocal数组,其长度和P的数量一致
  • 每个poolLocal包含一个私有空间private和公共链表shared
  • 公共链表shared的每个节点poolChainElt包含 环形数组poolDequeue,对象实际存储在环形数组中
type Pool struct {noCopy noCopy// 真实类型为[P]poolLocallocal     unsafe.Pointer // local的长度,与P个数相同localSize uintptr        // 上一轮的localvictim     unsafe.Pointer // 上一轮的localSizevictimSize uintptr        // 构造方法New func() any
}

poolLocal可以看作和poolLocalInternal等价

type poolLocal struct {poolLocalInternalpad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}type poolLocalInternal struct {// 每个P的私有对象存储空间private any// 和其他P共享的链表shared  poolChain 
}

poolChain是个双向链表,链表中每个节点的类型为poolChainElt,poolChain持有头尾节点的指针

type poolChain struct {head *poolChainElttail *poolChainElt
}

每个节点poolChainElt存储了在链表中的前后指针,以及真正存储数据的地方poolDequeue:
type poolChainElt struct {poolDequeue// 前后指针next, prev *poolChainElt
}

poolDequeue包含一个环形数组val,以及数组的头尾索引:

type poolDequeue struct {headTail atomic.Uint64vals []eface
}

poolDequeue是一个基于CAS访问的无锁化环形队列

  • 当前P的g通过pushHead添加对象,通过popHead获取对象
  • 其他P的g通过popTail获取对象

Get获取对象

在这里插入图片描述

func (p *Pool) Get() any {// ...// 将当前G和P绑定到一起,禁止抢占,并返回P对应的local容器l, pid := p.pin()// 尝试获取private对象x := l.privatel.private = nilif x == nil {// 尝试从自己的shared获取,从头部开始尝试x, _ = l.shared.popHead()if x == nil {x = p.getSlow(pid)}}runtime_procUnpin()// ...// pool中没获取到,调New构造一个新对象if x == nil && p.New != nil {x = p.New()}return x
}

popHead从shared的头节点开始尝试获取对象,调每个节点的popHead方法获取,一直尝试到尾节点

为啥要将当前G和P绑定到一起?

保证同一时刻只能有一个G在操作一个P对应的poolLocal。假设不绑定,当前G获取到P的id后,通过该id找到对应的poolLocal,然后P被抢占,运行新的G,此时新的G也根据该P的id找到相同的poolLocal,就会出现并发操作poolLocal.private,和并发调pushHead的情况

为啥从头节点pop呢?

  • 基于局部性原理:当前P可能刚往头节点的环形数组push了数据,该数据还在cpu cache中,因此从头部获取效率更高
  • 当前P操作该poolLocal从头节点开始pop,而其他P操作该poolLocal从尾节点pop,这样减少因操作同一个节点产生并发冲突的的概率

func (c *poolChain) popHead() (any, bool) {d := c.headfor d != nil {// 从当前节点的环形数组中popHeadif val, ok := d.popHead(); ok {return val, ok}// 检查下一个节点d = loadPoolChainElt(&d.prev)}return nil, false
}

在这里插入图片描述

如果本地shared没获取到,再调 getSlow 方法获取,具体流程为:

  1. 尝试从各个 P 对应 local 的 shared 中窃取对象,从当前 P 的下一个 P 开始
  2. 尝试获取当前 P 对应 victim 的 private 对象
  3. 尝试从各个 P 对应 victim 的 shared 中窃取对象对象,从当前 P 开始
  4. 如果至此仍未获得对象,说明为 victim 是空的,将 victimSize 置为 0,这样后续流程无需遍历victim
func (p *Pool) getSlow(pid int) any {// See the comment in pin regarding ordering of the loads.size := runtime_LoadAcquintptr(&p.localSize) // load-acquire// 获取pool的locallocals := p.local                            // 尝试从其他P的shared中偷对象for i := 0; i < int(size); i++ {// 从当前p的下一个开始l := indexLocal(locals, (pid+i+1)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}size = atomic.LoadUintptr(&p.victimSize)if uintptr(pid) >= size {return nil}locals = p.victiml := indexLocal(locals, pid)// 尝试当前P的victim.privateif x := l.private; x != nil {l.private = nilreturn x}// 尝试窃取victim中各个shared中的对象,从当前P开始for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}// victim没有了,将其标记为空,这样下次Get就不用搜索victim了atomic.StoreUintptr(&p.victimSize, 0)return nil
}

从每个shared偷取时,调用popTail方法:

注意这里 next 代表前一个节点,prev 代表后一个,和一般的用法不一样,否则这一段源码很难理解

func (c *poolChain) popTail() (any, bool) {d := loadPoolChainElt(&c.tail)if d == nil {return nil, false}for {// 获取前一个节点d2 := loadPoolChainElt(&d.next)// 尝试从当前节点获取对象if val, ok := d.popTail(); ok {return val, ok}// 没有前一个了,当前节点也没有,返回空if d2 == nil {// 最后一个节点为空时,不删除,未来Push时可以直接用return nil, false}// d没有对象了,将其从链表中删除,这样下次请求再来时没必要再检查一次这个节点// 怎么删除d呢?将链表的tail设为d2,也就是上一个if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {storePoolChainElt(&d2.prev, nil)}d = d2}
}

Put归还对象

在这里插入图片描述

Put流程为:

  1. 尝试将对象x放到当前P对应poolLocal的private
  2. 将对象从头部推到当前P对应poolLocal的shared中
func (p *Pool) Put(x any) {// ...l, _ := p.pin()// 放到当前P的privateif l.private == nil {l.private = x} else {// pushHead到当前P的sharedl.shared.pushHead(x)}runtime_procUnpin()// ...	
}

pushHead将val推到shared中

  1. 如果第一次Put,初始化头结点head,初始容量为8
  2. 如果head没满,往其插入val
  3. 否则,构造一个双倍容量的新头结点newHead
  4. 将newHead插到链表头部
  5. 往newHead插入val
func (c *poolChain) pushHead(val any) {d := c.headif d == nil {// 如果头结点为空,初始化const initSize = 8d = new(poolChainElt)d.vals = make([]eface, initSize)c.head = dstorePoolChainElt(&c.tail, d)}// 放入头结点if d.pushHead(val) {return}// 当前节点满了,构造一个双倍容量的新头结点newSize := len(d.vals) * 2if newSize >= dequeueLimit {newSize = dequeueLimit}d2 := &poolChainElt{prev: d}d2.vals = make([]eface, newSize)c.head = d2// 之前节点d的前向指针设为新的头结点d2storePoolChainElt(&d.next, d2)// 往新的头结点pushHeadd2.pushHead(val)
}

poolDeque分析

上面Get和Put最终会poolDeque,这是一个单生产者,多消费者的环形数组,生产者可以从head插入,head弹出,而消费者仅可从tail弹出

其除了有装数据的 底层数组vals,在逻辑上还有两个指针:

  • tail:最老对象的索引
  • head:指向下一个要插入数组的位置

在这里插入图片描述

head和tail在实现上被包装成了一个字段 headTail,低32位为tail,高32位为head。为啥要包装呢?方便用一个变量的CAS操作判断有没有其他请求修改过poolDeque


接下来看pushHead,popHead,popTail这3个方法怎么实现的

pushHead往头部插入val,流程为:

  1. 判断队列是否满了,如果满了返回

    1. 怎么判断?如果head - tail == 队列容量,就表示满了
  2. 如果没满,但是head指向的slot.typ不为nil,那一定是popTail中刚刚修改了headTail,但还没来得及清空slot。此时也当做队列满了

    1. 这里和popTail的实现有关,popTail中先修改了headTail的值,将tail++,再清空原来tail位置的数据
    2. 依据CAS操作的happens before原则:一旦slot.typ为空,那么该位置的数据就被清除干净了,也就可以push数据了
  3. 然后就放心地将val放到head指向的slot中。为什么可以放心,因为不会有其他g往head位置放数据或拿数据

    1. 对于head右边来说,只会有别的g把对象从从tail弹出,只会离head越来越远,因此不会和head发生冲突
    2. 对于head左边来说,会从tail不断弹出,一直到head-1位置结束,也不会和head发生冲突
const dequeueBits = 32func (d *poolDequeue) pushHead(val any) bool {ptrs := d.headTail.Load()head, tail := d.unpack(ptrs)// 队列满了,通过 head - tail == 队列容量 判断if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {return false}// 准备往head指向的位置放valslot := &d.vals[head&uint32(len(d.vals)-1)]typ := atomic.LoadPointer(&slot.typ)if typ != nil {// 如果没满,且这里head指向的slot.typ不为nil,那一定是popTail中刚刚修改了headTail,但还没来得及清空slot// 这种情况也当作队列满了return false}if val == nil {val = dequeueNil(nil)}// val放到slot中// 这里可以直接放,因为不会有别的g调pushHead(只有持有当前P的g能调)// 其他g只会从tail弹出元素,也就是说此时head位置此时一定是空的,以后也一定是空的*(*any)(unsafe.Pointer(slot)) = val// head++d.headTail.Add(1 << dequeueBits)return true
}

popHead从头部弹出一个对象:

  1. 如果队列为空,返回
  2. 否则将 head--,通过CAS更新headTail
  3. 将head位置清空
func (d *poolDequeue) popHead() (any, bool) {var slot *efacefor {ptrs := d.headTail.Load()head, tail := d.unpack(ptrs)// 队列为空if tail == head {return nil, false}head--ptrs2 := d.pack(head, tail)if d.headTail.CompareAndSwap(ptrs, ptrs2) {// CAS成功,表示成功从头部弹出slot = &d.vals[head&uint32(len(d.vals)-1)]break}}// 此时slot为原来head指向的位置// 读取slot指针指向的值val := *(*any)(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}// 将弹出位置置空*slot = eface{}return val, true
}

popTail从尾部弹出一个对象:

  1. 如果队列为空,返回
  2. 否则将 tail++,并CAS更新headTail
  3. 清空原来tail位置的slot
func (d *poolDequeue) popTail() (any, bool) {var slot *efacefor {ptrs := d.headTail.Load()head, tail := d.unpack(ptrs)// 队列为空if tail == head {return nil, false}ptrs2 := d.pack(head, tail+1)if d.headTail.CompareAndSwap(ptrs, ptrs2) {// CAS成功,代表成功从尾部弹出一个slot = &d.vals[tail&uint32(len(d.vals)-1)]break}}// 此时slot为原来tail指向的位置// 读取slot指针指向的值val := *(*any)(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}// 将val置为nil,帮助GC	 slot.val = nil// 将typ置为nil,用原子指令发布,这样pushHead中一旦检测到typ是nil,那么这个slot就可用了atomic.StorePointer(&slot.typ, nil)// 到这一步,pushHead才能拥有这个slotreturn val, true
}

为啥popTail和popHead需要用CAS?因为这两个方法可能被多个g并发执行。假设此时只有1个元素,用CAS能保证只有一个请求能弹出这个唯一的元素,避免并发问题

一旦CAS修改headTail成功,就说明自己占有了这个位置

那为啥pushHead不先用CAS占有head位置呢?因为 一旦判定数组有空间后,就不会有别的g操作head了

  • 首先不会有别的g调pushHead,因为只有持有当前P的g能调,而那个g现在就是自己
  • 其他g调popTail时,只会从尾部弹出对象,不会和head发生冲突,因此没有并发问题

GC时

在sync包的init方法中,将 poolCleanup 注册到GC的回调中,会在STW阶段执行

func init() {runtime_registerPoolCleanup(poolCleanup)
}
func poolCleanup() {// 清空所有的victimfor _, p := range oldPools {p.victim = nilp.victimSize = 0}// 将local移动到victim,并清空localfor _, p := range allPools {p.victim = p.localp.victimSize = p.localSizep.local = nilp.localSize = 0}oldPools, allPools = allPools, nil
}

这样如果pool中的对象没被用到,第一轮GC后会从local转移到victim中,第二轮GC后会清除掉,交给GC回收

为啥需要2轮才回收所有的对象?

  • 这样能 提高缓存对象的命中率。如果每轮GC都清空所有对象,那势必会造成接下来的大量请求需要调New生成,给内存分配和GC造成压力
  • 同时 减少了GC的开销,原来1次GC需要清理所有对象,现在分摊到2次GC中

总结

  • 为啥需要sync.Pool?解决高并发时大量内存分配引起的恶性循环
  • 使用时需要注意:
    • 大内存(例如:被扩容了的buf)不要往pool里放
    • Get后或Put前需要reset对象
  • sync.Pool的巧妙设计:
    • 通过和GMP巧妙结合,实现了无锁化访问private
    • 在操作shared时,自己往头节点读写,其他g从尾节点读,从操作不同对象的方式减少锁冲突
    • 访问poolDequeue时,通过原子操作替代锁,提升并发性能
    • 使用victim提升缓存对象的命中率,以及减少GC的开销

http://www.mrgr.cn/news/66433.html

相关文章:

  • Android camera2
  • yolov8涨点系列之引入CBAM注意力机制
  • 【Android】初识路由框架及ARouter基本使用方法
  • BERT预训练的MLM和NSP任务的损失函数都是什么?
  • 这个开源项目牛逼,集成了多款短信通道,让发送短信变的更简单!(带私活源码)
  • 哲学家就餐问题(Java实现信号量和PV操作)
  • css过渡用法
  • AutoCAD2024
  • CLIP-Driven Universal Model for Organ Segmentation and Tumor Detection论文解读和实验复现
  • 数据采集之超级鹰验证码识别及模拟登录
  • Go 中的 Context实现原理以及正确使用方式
  • 小白直接冲!BiTCN-BiLSTM-Attention双向时间卷积双向长短期记忆神经网络融合注意力机制多变量回归预测
  • vite+vue项目创建流程;npm error enoent Could not read package.json异常报错问题
  • 鸿蒙移动应用开发-------初始arkts
  • leetcode动态规划(二十九)-最大子数组和
  • 【JavaSE】(2) 方法
  • 基于Leaflet的自助标绘源码解析-其它对象解析
  • 有线电视 1.27.5 | 完全免费的电视直播应用,频道丰富,画质清晰
  • File和InputStream,OutputStream
  • 什么时候出现线程安全,如何实现线程安全?
  • 如何显示弹出式窗口
  • Spark的容错机制
  • WCY的比赛题解
  • java学习3---面向对象
  • 19. 架构重要需求
  • 1105--面试代码题