深入理解对象池 sync.Pool
文章目录
- 前言
- 应用
- 使用
- 源码走读
- 数据结构
- Get获取对象
- Put归还对象
- poolDeque分析
- GC时
- 总结
前言
当多个 goroutine 都需要创建同⼀种对象的时候,如果 goroutine 数量过多,导致对象的创建剧增,进⽽导致 GC 压⼒增大。形成下面的恶性循环:
并发⼤ -->
占⽤内存⼤ -->
增加GC频率 -->
限制程序执行效率 -->
处理并发能⼒降低 -->
并发更⼤
此时就需要有⼀个对象池,每个 goroutine 不⾃⼰调mallocgc从runtime分配对象,⽽是从对象池中获取⼀个对象,用完后再把对象放回池里。sync.Pool
就是这样一个对象池,可以减少GC频率和内存分配
- 减少GC频率:内存分配少了,出发GC的频率就会降低
- 减少内存分配:都不需要调malloc分配空间了,直接从池中拿对象
应用
在工程中 sync.Pool有广泛的应用,例如在 gin
中会给每个请求分配一个 Context
用于承载这次请求的上下文信息,从 pool 中获取 Context
对象,用完后还回去。每个请求都需要一个Context,因此在并发量高时,很适合用pool复用该对象
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {// 从pool冲获取ctxc := engine.pool.Get().(*Context)c.reset(w)c.Request = reqc.reset()// 处理请求engine.handleHTTPRequest(c)// 归还ctx到pool中engine.pool.Put(c)
}
标准库fmt
也用了sync.Pool,用来复用pp对象,该对象承载执行一次print需要的上下文
// fmt.Println方法
func Println(a ...any) (n int, err error) {return Fprintln(os.Stdout, a...)
}func Fprintln(w io.Writer, a ...any) (n int, err error) {p := newPrinter()p.doPrintln(a)n, err = w.Write(p.buf)p.free()return
}func newPrinter() *pp {// 从pool获取一个ppp := ppFree.Get().(*pp)// ...return p
}func (p *pp) free() {// 如果buf过大,不归还buf,只归还pp本身if cap(p.buf) > 64*1024 {p.buf = nil} else {p.buf = p.buf[:0]}// ...// 归还ppppFree.Put(p)
}
需要特别注意free
实现中的一个细节,如果pp的buffer经过了扩容,其容量大于某个阈值,则不把该pp放回pool,防止出现大内存不会被释放的情况
为啥大内存不会被释放?可以看看这个issue,如果buf被扩容得很大,不断经过放回pool,从pool中取出的循环,那么其占用的空间一直不会被释放
虽然可能后面使用该对象的请求,只使用buf中很少一部分,但由于对象pp持有大容量buf的引用,这片大内存就不会被回收,造成的后果就是内存泄露
怎么解决?对象大小超过一定阈值就不放回pool
,交给GC进行内存回收
使用
-
New
:声明对象的构造方法,当池中没有对象时,会调改方法创建对象 -
Get
:从池中获取一个对象,返回any类型,需要调用方自行类型断言成需要的类型- 最佳实践:一个pool只放一种类型的对象
-
Put
:对象使用完毕后,将对象归还到池中
需要注意:
-
sync.Pool不适用于线程池、DB连接池之类的存储,因为 GC 时会清除
sync.Pool
的数据,存储在 pool 中的对象在不被通知的情况下随时可能被清理掉- 为啥要在GC时清除?
避免数据长时间占用内存,造成内存泄露
- 为啥要在GC时清除?
-
不应该对pool池中的对象做任何假定,不保证上次Put和下次Get的对象一定是同一个。因为sync.Pool的目的就是缓解GC压力,不应该牵扯到任何业务
- 推荐做法:在调用
Pool.Put
前进行对象 Reset 操作,或在Pool.Get
操作后进行 Reset 操作,防止读出脏数据
- 推荐做法:在调用
源码走读
sync.Pool的底层存储设计和GMP十分契合:
-
为每个处理器 P 分配了
private
和shared
两部分存储空间 -
private 是 P 私有的对象
,当前 P 可以完全无锁化访问 private
,性能极高- 为啥能无锁化?因为一个P同一时刻只能运行一个G,没有并发问题
- 且Get 和 Put 时都优先用private,也就是优先用高性能的方式
-
shared 是 P 下的共享象列表
,当前 P 访问 shared 时也需要通过CAS 操作保证并发安全
,因为其他 P 也会来此尝试窃取对象,存在并发行为
数据结构
- sync.Pool中包含
local
和victim
,都指向poolLocal数组,其长度和P的数量一致 - 每个poolLocal包含一个私有空间private和公共链表shared
- 公共链表shared的每个节点poolChainElt包含
环形数组poolDequeue
,对象实际存储在环形数组中
type Pool struct {noCopy noCopy// 真实类型为[P]poolLocallocal unsafe.Pointer // local的长度,与P个数相同localSize uintptr // 上一轮的localvictim unsafe.Pointer // 上一轮的localSizevictimSize uintptr // 构造方法New func() any
}
poolLocal可以看作和poolLocalInternal等价
type poolLocal struct {poolLocalInternalpad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}type poolLocalInternal struct {// 每个P的私有对象存储空间private any// 和其他P共享的链表shared poolChain
}
poolChain是个双向链表,链表中每个节点的类型为poolChainElt,poolChain持有头尾节点的指针
type poolChain struct {head *poolChainElttail *poolChainElt
}
每个节点poolChainElt存储了在链表中的前后指针,以及真正存储数据的地方poolDequeue:
type poolChainElt struct {poolDequeue// 前后指针next, prev *poolChainElt
}
poolDequeue包含一个环形数组val,以及数组的头尾索引:
type poolDequeue struct {headTail atomic.Uint64vals []eface
}
poolDequeue是一个基于CAS访问的无锁化环形队列
- 当前P的g通过pushHead添加对象,通过popHead获取对象
- 其他P的g通过popTail获取对象
Get获取对象
func (p *Pool) Get() any {// ...// 将当前G和P绑定到一起,禁止抢占,并返回P对应的local容器l, pid := p.pin()// 尝试获取private对象x := l.privatel.private = nilif x == nil {// 尝试从自己的shared获取,从头部开始尝试x, _ = l.shared.popHead()if x == nil {x = p.getSlow(pid)}}runtime_procUnpin()// ...// pool中没获取到,调New构造一个新对象if x == nil && p.New != nil {x = p.New()}return x
}
popHead从shared的头节点开始尝试获取对象,调每个节点的popHead方法获取,一直尝试到尾节点
为啥要将当前G和P绑定到一起?
保证同一时刻只能有一个G在操作一个P对应的poolLocal
。假设不绑定,当前G获取到P的id后,通过该id找到对应的poolLocal,然后P被抢占,运行新的G,此时新的G也根据该P的id找到相同的poolLocal,就会出现并发操作poolLocal.private,和并发调pushHead的情况
为啥从头节点pop呢?
- 基于
局部性原理
:当前P可能刚往头节点的环形数组push了数据,该数据还在cpu cache中,因此从头部获取效率更高 - 当前P操作该poolLocal从头节点开始pop,而其他P操作该poolLocal从尾节点pop,这样
减少因操作同一个节点产生并发冲突的的概率
func (c *poolChain) popHead() (any, bool) {d := c.headfor d != nil {// 从当前节点的环形数组中popHeadif val, ok := d.popHead(); ok {return val, ok}// 检查下一个节点d = loadPoolChainElt(&d.prev)}return nil, false
}
如果本地shared没获取到,再调 getSlow
方法获取,具体流程为:
- 尝试从各个 P 对应 local 的 shared 中窃取对象,从当前 P 的下一个 P 开始
- 尝试获取当前 P 对应 victim 的 private 对象
- 尝试从各个 P 对应 victim 的 shared 中窃取对象对象,从当前 P 开始
- 如果至此仍未获得对象,说明为 victim 是空的,将 victimSize 置为 0,这样后续流程无需遍历victim
func (p *Pool) getSlow(pid int) any {// See the comment in pin regarding ordering of the loads.size := runtime_LoadAcquintptr(&p.localSize) // load-acquire// 获取pool的locallocals := p.local // 尝试从其他P的shared中偷对象for i := 0; i < int(size); i++ {// 从当前p的下一个开始l := indexLocal(locals, (pid+i+1)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}size = atomic.LoadUintptr(&p.victimSize)if uintptr(pid) >= size {return nil}locals = p.victiml := indexLocal(locals, pid)// 尝试当前P的victim.privateif x := l.private; x != nil {l.private = nilreturn x}// 尝试窃取victim中各个shared中的对象,从当前P开始for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}// victim没有了,将其标记为空,这样下次Get就不用搜索victim了atomic.StoreUintptr(&p.victimSize, 0)return nil
}
从每个shared偷取时,调用popTail方法:
注意这里
next
代表前一个节点,prev
代表后一个,和一般的用法不一样,否则这一段源码很难理解
func (c *poolChain) popTail() (any, bool) {d := loadPoolChainElt(&c.tail)if d == nil {return nil, false}for {// 获取前一个节点d2 := loadPoolChainElt(&d.next)// 尝试从当前节点获取对象if val, ok := d.popTail(); ok {return val, ok}// 没有前一个了,当前节点也没有,返回空if d2 == nil {// 最后一个节点为空时,不删除,未来Push时可以直接用return nil, false}// d没有对象了,将其从链表中删除,这样下次请求再来时没必要再检查一次这个节点// 怎么删除d呢?将链表的tail设为d2,也就是上一个if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {storePoolChainElt(&d2.prev, nil)}d = d2}
}
Put归还对象
Put流程为:
- 尝试将对象x放到当前P对应poolLocal的private
- 将对象从头部推到当前P对应poolLocal的shared中
func (p *Pool) Put(x any) {// ...l, _ := p.pin()// 放到当前P的privateif l.private == nil {l.private = x} else {// pushHead到当前P的sharedl.shared.pushHead(x)}runtime_procUnpin()// ...
}
pushHead将val推到shared中
- 如果第一次Put,初始化头结点head,初始容量为8
- 如果head没满,往其插入val
- 否则,构造一个双倍容量的新头结点newHead
- 将newHead插到链表头部
- 往newHead插入val
func (c *poolChain) pushHead(val any) {d := c.headif d == nil {// 如果头结点为空,初始化const initSize = 8d = new(poolChainElt)d.vals = make([]eface, initSize)c.head = dstorePoolChainElt(&c.tail, d)}// 放入头结点if d.pushHead(val) {return}// 当前节点满了,构造一个双倍容量的新头结点newSize := len(d.vals) * 2if newSize >= dequeueLimit {newSize = dequeueLimit}d2 := &poolChainElt{prev: d}d2.vals = make([]eface, newSize)c.head = d2// 之前节点d的前向指针设为新的头结点d2storePoolChainElt(&d.next, d2)// 往新的头结点pushHeadd2.pushHead(val)
}
poolDeque分析
上面Get和Put最终会poolDeque,这是一个单生产者,多消费者的环形数组,生产者可以从head插入,head弹出,而消费者仅可从tail弹出
其除了有装数据的 底层数组vals
,在逻辑上还有两个指针:
tail
:最老对象的索引head
:指向下一个要插入数组的位置
head和tail在实现上被包装成了一个字段 headTail
,低32位为tail,高32位为head。为啥要包装呢?方便用一个变量的CAS操作判断有没有其他请求修改过poolDeque
接下来看pushHead,popHead,popTail这3个方法怎么实现的
pushHead往头部插入val,流程为:
-
判断队列是否满了,如果满了返回
- 怎么判断?如果
head - tail == 队列容量
,就表示满了
- 怎么判断?如果
-
如果没满,但是head指向的slot.typ不为nil,那一定是popTail中刚刚修改了headTail,但还没来得及清空slot。此时也当做队列满了
- 这里和popTail的实现有关,popTail中先修改了headTail的值,将tail++,再清空原来tail位置的数据
- 依据CAS操作的happens before原则:一旦slot.typ为空,那么该位置的数据就被清除干净了,也就可以push数据了
-
然后就放心地将val放到head指向的slot中。为什么可以放心,因为不会有其他g往head位置放数据或拿数据
- 对于head右边来说,只会有别的g把对象从从tail弹出,只会离head越来越远,因此不会和head发生冲突
- 对于head左边来说,会从tail不断弹出,一直到head-1位置结束,也不会和head发生冲突
const dequeueBits = 32func (d *poolDequeue) pushHead(val any) bool {ptrs := d.headTail.Load()head, tail := d.unpack(ptrs)// 队列满了,通过 head - tail == 队列容量 判断if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {return false}// 准备往head指向的位置放valslot := &d.vals[head&uint32(len(d.vals)-1)]typ := atomic.LoadPointer(&slot.typ)if typ != nil {// 如果没满,且这里head指向的slot.typ不为nil,那一定是popTail中刚刚修改了headTail,但还没来得及清空slot// 这种情况也当作队列满了return false}if val == nil {val = dequeueNil(nil)}// val放到slot中// 这里可以直接放,因为不会有别的g调pushHead(只有持有当前P的g能调)// 其他g只会从tail弹出元素,也就是说此时head位置此时一定是空的,以后也一定是空的*(*any)(unsafe.Pointer(slot)) = val// head++d.headTail.Add(1 << dequeueBits)return true
}
popHead从头部弹出一个对象:
- 如果队列为空,返回
- 否则将
head--
,通过CAS更新headTail - 将head位置清空
func (d *poolDequeue) popHead() (any, bool) {var slot *efacefor {ptrs := d.headTail.Load()head, tail := d.unpack(ptrs)// 队列为空if tail == head {return nil, false}head--ptrs2 := d.pack(head, tail)if d.headTail.CompareAndSwap(ptrs, ptrs2) {// CAS成功,表示成功从头部弹出slot = &d.vals[head&uint32(len(d.vals)-1)]break}}// 此时slot为原来head指向的位置// 读取slot指针指向的值val := *(*any)(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}// 将弹出位置置空*slot = eface{}return val, true
}
popTail从尾部弹出一个对象:
- 如果队列为空,返回
- 否则将
tail++
,并CAS更新headTail - 清空原来tail位置的slot
func (d *poolDequeue) popTail() (any, bool) {var slot *efacefor {ptrs := d.headTail.Load()head, tail := d.unpack(ptrs)// 队列为空if tail == head {return nil, false}ptrs2 := d.pack(head, tail+1)if d.headTail.CompareAndSwap(ptrs, ptrs2) {// CAS成功,代表成功从尾部弹出一个slot = &d.vals[tail&uint32(len(d.vals)-1)]break}}// 此时slot为原来tail指向的位置// 读取slot指针指向的值val := *(*any)(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}// 将val置为nil,帮助GC slot.val = nil// 将typ置为nil,用原子指令发布,这样pushHead中一旦检测到typ是nil,那么这个slot就可用了atomic.StorePointer(&slot.typ, nil)// 到这一步,pushHead才能拥有这个slotreturn val, true
}
为啥popTail和popHead需要用CAS?因为这两个方法可能被多个g并发执行。假设此时只有1个元素,用CAS能保证只有一个请求能弹出这个唯一的元素,避免并发问题
一旦CAS修改headTail成功,就说明自己占有了这个位置
那为啥pushHead不先用CAS占有head位置呢?因为 一旦判定数组有空间后,就不会有别的g操作head了
:
- 首先不会有别的g调pushHead,因为只有持有当前P的g能调,而那个g现在就是自己
- 其他g调popTail时,只会从尾部弹出对象,不会和head发生冲突,因此没有并发问题
GC时
在sync包的init方法中,将 poolCleanup
注册到GC的回调中,会在STW阶段执行
func init() {runtime_registerPoolCleanup(poolCleanup)
}
func poolCleanup() {// 清空所有的victimfor _, p := range oldPools {p.victim = nilp.victimSize = 0}// 将local移动到victim,并清空localfor _, p := range allPools {p.victim = p.localp.victimSize = p.localSizep.local = nilp.localSize = 0}oldPools, allPools = allPools, nil
}
这样如果pool中的对象没被用到,第一轮GC后会从local转移到victim中,第二轮GC后会清除掉,交给GC回收
为啥需要2轮才回收所有的对象?
- 这样能
提高缓存对象的命中率
。如果每轮GC都清空所有对象,那势必会造成接下来的大量请求需要调New生成,给内存分配和GC造成压力 - 同时
减少了GC的开销
,原来1次GC需要清理所有对象,现在分摊到2次GC中
总结
- 为啥需要sync.Pool?解决高并发时大量内存分配引起的恶性循环
- 使用时需要注意:
- 大内存(例如:被扩容了的buf)不要往pool里放
- Get后或Put前需要reset对象
- sync.Pool的巧妙设计:
- 通过和GMP巧妙结合,实现了无锁化访问private
- 在操作shared时,自己往头节点读写,其他g从尾节点读,从操作不同对象的方式减少锁冲突
- 访问poolDequeue时,通过原子操作替代锁,提升并发性能
- 使用victim提升缓存对象的命中率,以及减少GC的开销