Go-知识-定时器
Go-知识-定时器
- 1. 介绍
- 2. Timer使用场景
- 2.1 设定超时时间
- 2.2 延迟执行某个方法
- 3. Timer 对外接口
- 3.1 创建定时器
- 3.2 停止定时器
- 3.3 重置定时器
- 3.4 After
- 3.5 AfterFunc
- 4. Timer 的实现原理
- 4.1 Timer数据结构
- 4.1.1 Timer
- 4.1.2 runtimeTimer
- 4.2 Timer 实现原理
- 4.2.1 创建Timer
- 4.2.2 停止Timer
- 4.2.3 重置Timer
- 5. Ticker 使用场景
- 5.1 简单定时任务
- 5.2 定时刷新缓存
- 6. Ticker 对外接口
- 6.1 创建定时器
- 6.2 停止定时器
- 6.3 Tick
- 7. Ticker 实现原理
- 7.1 数据结构
- 7.1.1 Ticker
- 7.1.2 runtimeTimer
- 7.2 Ticker实现原理
- 7.2.1 创建Ticker
- 7.2.2 停止Ticker
- 8. runtimeTimer 原理(go 1.11)
- 8.1 定时器存储
- 8.1.1 timer 的数据结构
- 8.1.2 timersBucket 的数据结构
- 8.1.3 Ticker & timer & timersBucket 关系
- 8.1.4 timersBucket 数组
- 8.2 定时器运行机制
- 8.2.1 创建定时器
- 8.2.2 删除定时器
- 8.3 资源泄露
- 9. 性能优化 (go 1.14+)
- 9.1 消除了timersBucket
- 9.2 消除了timeproc
- 9.3 更少的锁竞争
- 9.4 更少的上下文切换
- 9.5 优化效果
Go语言提供了两种定时器,分别为一次性定时器和周期性定时器。
- 一次性定时器(Timer):定时器只计时一次,计时结束变停止运行。
- 周期性定时器(Ticker):定时器周期性进行计时,除非主动停止,否则将永久运行。
以下都是 go 1.10~1.13的逻辑,在go 1.14 有非常大的优化。
1. 介绍
Timer是一种单一事件的定时器,即经过指定的时间后触发一个事件,这个事件通过其本身提供的
channel进行通知。Timer只执行一次就结束。
通过timer.NewTimer(d Duration)可以创建一个Timer,参数即等待时间,时间到来后立即触发一个事件。
在src/time/sleep.go
中定义了Timer的数据结构
Timer对外仅暴露一个channel,指定的时间到来时就往该channel种写入系统时间,即一个事件。
Ticker是周期性定时器,即周期性地触发一个事件,通过Ticker本身提供的channel将事件传递出去。
Ticker的数据结构与Timer非常类似:
Ticker对外仅暴露一个channel,当指定的时间到来时就往该channel种写入系统时间,即一个事件。
在创建Ticker时会指定一个时间,作为事件触发的周期,这是Ticker和Timer最主要的区别。
2. Timer使用场景
2.1 设定超时时间
有时候在等待资源的时候,又不希望永久等待,而是希望加个超时时间,如果在指定的时间还未获取到,那么就超时,不在等待了。
func TestWait(t *testing.T) {timer := time.NewTimer(1 * time.Second)c := make(<-chan string)select {case <-c:t.Log("chan string ")case <-timer.C:t.Log("time out")}
}
因为case <-c
不会触发,所以在1秒后,超时结束
通过select语句轮询timer.C 和 c 两个channel,如果1s内,c还没有数据写入,那就认为超时了。
2.2 延迟执行某个方法
在应用启动中,一般会初始化很多组件,如果在应用启动后,马上就使用,可能出现组件还未初始化成功,拿到的组件对象是还未准备好的对象。
这个时候,如果能延迟使用,那么就不会出现使用未准备好的组件的情况。
func TestDelay(t *testing.T) {timer := time.NewTimer(1 * time.Second)select {case <-timer.C:t.Log("wait 1 s")}t.Log("do something")
}
select 只有一个case,就是当时间到了,该case满足,如果时间没到,那么select会阻塞。
当触发select 后,可以什么都不做,也可以打印日志,然后跳出select的语句,顺序执行后续逻辑,以此实现延迟等待后执行。
3. Timer 对外接口
3.1 创建定时器
使用func NewTimer(d Duration) *Timer
方法指定一个时间即可创建一个Timer,Timer一经创建便开始计时,不需要额外的启动命令。
实际上,创建Timer意味着把一个计时任务交给系统守护协程,该协程管理着所有的Timer,当Timer的时间到达后,Timer向Channel中发送当前的时间作为事件。
3.2 停止定时器
Timer创建后可以随时停止,停止计时器的方法如下:
func (t *Timer) Stop() bool
其返回值代表定时器有没有超时。
- true : 定时器超时前停止,后续不会在发送事件
- false : 定时器超时后停止。
实际上,停止计时器意味着通知系统守护协程移除该定时器。
3.3 重置定时器
已过期的定时器或已停止的定时器可以通过重置动作重新激活,重置方法如下:
func (t *Timer) Reset(d Duration) bool
重置的动作实质上是先停止定时器,在启动,其返回值即停止计时器(Stop()) 的返回值。
需要注意的是,重置定时器虽然可以用于修改还未超时的定时器,但正确的使用方式还是针对已过期的定时器或已被停止的定时器,同时其返回值也不可靠,返回值存在的价值仅仅是与前面的版本兼容。
实际上,重置定时器意味着通知系统守护协程移除该定时器,重新设定时间后,再把定时器交给守护协程。
3.4 After
如果仅仅是向等待指定的时间,没有提前停止定时器的需求,也没有复用该定时器的需求,那么可以使用匿名的定时器。
使用func After(d Duration) <-chan Time
方法创建一个定时器,并返回定时器的管道。
func TestAfter(t *testing.T) {select {case <-time.After(1 * time.Second):t.Log("after 1 s")}
}
执行后和之前的延迟执行一模一样:
实际上还是一个定时器,但是代码更加简洁。
3.5 AfterFunc
除了 After调用,返回 channel,进行同步处理,还可以使用 AfterFunc,将需要延迟的操作交给系统协程异步执行。
AfterFunc的定义: func AfterFunc(d Duration, f Func()) *Timer
比如:
func TestAfterFunc(t *testing.T) {time.AfterFunc(1*time.Second, func() {t.Log("after 1 s")})t.Log("waiting")time.Sleep(2 * time.Second)t.Log("done")
}
执行结果如下:
很明显执行和等待已经不是一个协程了.
4. Timer 的实现原理
4.1 Timer数据结构
4.1.1 Timer
在源码包src/time/sleep.go
中定义了其数据结构:
Timer只有两个成员:
- C:channel ,上层应用根据此channel接收事件
- r:runtime定时器,该定时器即系统管理的定时器,对上层应用不可见。
这里按照层次来理解Timer的数据结构,Timer.C是面向Timer用户的,Timer.r是面向底层的定时器实现。
4.1.2 runtimeTimer
创建一个Timer实质上是把一个定时任务交给专门的写成进行监控,这个任务的载体便是runtimeTimer。
每创建一个Timer意味着创建了一个runtimeTimer变量,然后把它交给系统进行监控。通过设置runtimeTimer过期后的行为来达到定时的目的。
type runtimeTimer struct {tb uintptr // 存储当前定时器的数组地址i int // 存储当前定时器的数组下标when int64 // 当前定时器触发事件period int64 // 当前定时器周期性触发间隔f func(interface{}, uintptr) // 定时器触发时执行的回调函数arg interface{} // 定时器触发时执行回调函数传递的参数一seq uintptr // 定时器触发时执行回调函数传递的参数二
}
- tb: 系统底层存储runtimeTimer的数组地址
- i: 当前runtimeTime在tb数组中的下标
- when: 定时器触发事件的时间
- period: 定时器周期性触发间隔(对于Timer来说,此值为0)
- f: 定时器触发时执行的回调函数,回调函数接收两个参数。
- arg: 定时器触发时执行回调函数的参数一
- seq: 定时器触发时执行回调函数的参数二(Timer并不使用该参数)
4.2 Timer 实现原理
一个进程中的多个Timer都由底层的写成来管理,这个协程称为系统协程。
runtimeTimer存放在数组中,并看招when字段对所有的runtimeTimer进行堆排序,定时器触发时执行runtimeTimer中的预定义函数f,即完成了一次定时任务。
4.2.1 创建Timer
创建Timer的实现,非常简单:
func NewTimer(d Duration) *Timer {c := make(chan Time, 1) // 创建一个channelt := &Timer{ // Timer的数据结构C: c,r: runtimeTimer{when: when(d), // 触发事件f: sendTime, // 触发后执行 sendTime 函数arg: c, // 触发后执行sendTime函数时附带的参数},}startTimer(&t.r) // 此处启动定时器,只是把 runtimeTimer放到系统协程的堆中,由系统协程维护return t
}
NewTimer函数只是构造了一个Timer,然后把Timer.r 通过startTimer()交给系统协程维护。其中when()方法是计算下一次
定时器触发的绝对时间,即当前时间+NewTimer()的参数d。sendTimer()方法是定时器触发时的动作。
- when函数
// when是一个辅助函数,用于设置runtimeTimer的“when”字段。
// 它返回未来的持续时间d,单位为纳秒。
// 如果d为负,则忽略。如果返回值小于
// 由于溢出,返回MaxInt64。
func when(d Duration) int64 {if d <= 0 {return runtimeNano()}t := runtimeNano() + int64(d)if t < 0 {t = 1<<63 - 1 // math.MaxInt64}return t
}
- sendTimer函数
func sendTime(c interface{}, seq uintptr) {//c上的非阻塞时间发送。//在NewTimer中使用,它无论如何都不能阻塞(缓冲区)。//在NewTicker中使用,在地板上放下发送是//当读者落后时的期望行为,//因为发送是周期性的。select {case c.(chan Time) <- Now():default:}
}
sendTime接收一个channel作为参数,其主要任务是向channel中写入当前时间。
创建Timer时生成的管道含有一个缓冲区(make(chan Time,1)),所以Timer触发时向channel写入事件永远不会阻塞,sendTime写完即退出。
sendTime使用select搭配一个空的default分支,是因为Ticker也复用了sendTime,Ticker触发时也会向channel中写入事件,但无法保证之前的数据已经
被取走,所以使用select并搭配一个空的default分支,确保sendTime不会阻塞,Ticker触发时,如果管道中还有值,则本次不在向管道中写入时间,将本次触发的事件直接丢弃。
- startTime函数
startTime函数的具体实现在runtime包中,其主要作用是把runtimeTimer写入系统写成的数组中,并启动系统协程(如果系统协程还未开始运行)
其主要是addTimer(t *timer)函数,timer是runtime包中用于表示time包中runtimeTimer结构的struct
addTimer(t *timer)函数
func addtimer(t *timer) {tb := t.assignBucket() // 获取time桶数组lock(&tb.lock) // 加锁ok := tb.addtimerLocked(t) // 将 timer加入数据组unlock(&tb.lock) // 解锁if !ok { // 如果timer加入数组失败,那么触发 panic badTimer()}
}
4.2.2 停止Timer
停止Timer,只是简单地把Timer从系统协程中移除。
stopTimer即通知系统协程把该Timer移除,即不在监控。
stopTimer也是runtime中的函数
系统协程只是移除Timer,并不会关闭channel,以避免用户协程读取错误。
Stop 的返回值取决于定时器的状态:
- 如果Timer已经触发,则Stop返回false
- 如果Timer还未触发,则Stop返回true
4.2.3 重置Timer
重置Timer时会先把Timer从系统协程中删除,修改新的时间后重新添加到系统协程中。
重置的实现如下:
其返回值与Stop保持一致,如果Timer成功停止,则返回true,如果Timer已经触发,则返回false.
由于新加的Timer时间很可能变化,所以其在系统协程中的位置也会相应地发生变化。
需要注意的是,按照源码注释,Reset应该作用于已经停止的Timer或已经触发的Timer。
按照这个约定,Reset的返回值总是false,仍然保留是为了保持向前兼容,使用老版本Go编写的应用不需要因为Go升级而修改代码。
如果不按照此约定使用Reset,则有可能遇到Reset和Timer触发后同时执行的情况,此时有可能会收到两个事件,从而对应用程序造成一些负面影响。
5. Ticker 使用场景
5.1 简单定时任务
假设需求是每隔1秒就报时一次:
func TestTicker(t *testing.T) {// 创建一个一秒的Tickerticker := time.NewTicker(1 * time.Second)// 使用defer关闭Tickerdefer ticker.Stop()// 收到事件就打印时间 (根据之前Timer的实现,res := <-ticker.C , res 就是 Time 类型的时间戳 )for range ticker.C {t.Log("tick 1 s : " + time.Now().String())}
}
如果不主动停止,那么永远不会结束。
5.2 定时刷新缓存
假设需求是定时将内存中的数据写到磁盘中,或者内存中数据满了,需要落盘:
func TestFlush(t *testing.T) {// 创建一个每1秒刷新一次的Tickerticker := time.NewTicker(1 * time.Second)// 使用 defer 关闭defer ticker.Stop()// 模拟内存数据mem := 0for {select {// 时间到,不管如何,必须刷新case <-ticker.C:t.Log("flush 1 s : " + time.Now().String())default:// 内存满了,需要刷新if mem > 15 {t.Logf("flush mem : %d", mem)mem = 0}}// 每次for循环设置随机值mem += rand.Intn(10)// 每次循环等待300毫秒time.Sleep(300 * time.Millisecond)}
}
6. Ticker 对外接口
6.1 创建定时器
使用NewTicker函数就能创建一个Ticker
// NewTicker返回一个新的Ticker,其中包含一个将发送
// 由duration参数指定的时间段。
// 它调整间隔或降低滴答声,以弥补接收器速度慢的问题。
// 持续时间d必须大于零;否则,NewTicker将会恐慌。
// 停止自动收报机以释放相关资源。
func NewTicker(d Duration) *Ticker {if d <= 0 {panic(errors.New("non-positive interval for NewTicker"))}c := make(chan Time, 1)t := &Ticker{C: c,r: runtimeTimer{when: when(d),// Timer 的 period 是 0period: int64(d),f: sendTime,arg: c,},}startTimer(&t.r)return t
}
NewTimer和NewTicker非常类似,在初始化runtimeTimer的时候,Timer没有设置period,Ticker则设置period等于d.
6.2 停止定时器
使用定时器对外暴露的Stop方法就可以停止Ticker
其实不管是 Timer还是Ticker的startTimer和stopTimer,最终都是由runtime/time.go 中的timer的startTimer和stopTimer实现的
需要注意的是,该方法会停止计时,意味着不会向定时器的channel中写入事件,但是channel并不会给关闭,channel在使用完后,声明周期结束后会自动释放。
Ticker 在使用完后务必要释放,否则会产生资源泄露,进而会持续消耗CPU资源,最后会把CPU资源消耗完。
6.3 Tick
在有些长江下,启动一个Ticker后,该Ticker永远不会停止,比如定时轮询任务,此时可以使用一个简单的Tick函数来获取定时器的channel。
注释上面说的很明白,因为该函数内部实际上还是创建了一个Ticker,但是并没有返回,只是返回了channel,因为没有Ticker对象,所以没法调用Stop。
在for循环中,使用Ticker的时候,一定三思
在Timer中,很容易写出如下代码:
for {select {case <-time.After(1 * time.Second):t.Log("flush 1 s : " + time.Now().String())}
} ```
使用Timer这样写当然没错,因为Timer在触发事件后,就会从数组中移除。
但是当把Timer换成Ticker,那么就出现了资源泄露
for {select {case <-time.Tick(1 * time.Second):t.Log("flush 1 s : " + time.Now().String())}
}```
上面的代码出现了资源泄露,因为 Tick 会创建Ticker,并且因为使用Tick直接获取的Ticker.C,所以没有手段去Stop。
随着for的执行,最终会导致越来越多的Ticker耗尽CPU资源。
7. Ticker 实现原理
实际上Ticker与Timer几乎完全相同,数据结构和内部实现机制都相同,唯一不同的是创建方式。
在创建Timer时,不指定时间触发周期,时间触发后Timer自动销毁。而在创建Ticker时会指定一个事件触发周期,事件按照这个周期触发,如果不显式停止,则定时器永不停止。
7.1 数据结构
7.1.1 Ticker
Ticker的数据结构与Timer的数据结构除名字不同外,其他完全一样。
源码包src/time/tick.go
中定义了数据结构
Ticker只有两个成员:
- C: channel,上层应用根据此channel接收事件。
- r: runtimeTimer定时器,该定时器即系统管理的定时器,对上层应用不可见。
按照层次来理解Ticker的数据结构,Ticker.C 是面向Ticker 用户的,Ticker.r 是面向底层的定时器的。
7.1.2 runtimeTimer
runtimeTimer和Timer的一样,创建一个Timer实质上是把一个定时任务交给专门的写成进行监控,这个任务的载体便是runtimeTimer。
每创建一个Timer意味着创建了一个runtimeTimer变量,然后把它交给系统进行监控。通过设置runtimeTimer过期后的行为来达到定时的目的。
type runtimeTimer struct {tb uintptr // 存储当前定时器的数组地址i int // 存储当前定时器的数组下标when int64 // 当前定时器触发事件period int64 // 当前定时器周期性触发间隔f func(interface{}, uintptr) // 定时器触发时执行的回调函数arg interface{} // 定时器触发时执行回调函数传递的参数一seq uintptr // 定时器触发时执行回调函数传递的参数二
}
- tb: 系统底层存储runtimeTimer的数组地址
- i: 当前runtimeTime在tb数组中的下标
- when: 定时器触发事件的时间
- period: 定时器周期性触发间隔(对于Timer来说,此值为0)
- f: 定时器触发时执行的回调函数,回调函数接收两个参数。
- arg: 定时器触发时执行回调函数的参数一
- seq: 定时器触发时执行回调函数的参数二(Timer并不使用该参数)
7.2 Ticker实现原理
7.2.1 创建Ticker
创建Ticker的实现非常简单:
func NewTicker(d Duration) *Ticker {if d <= 0 {panic(errors.New("non-positive interval for NewTicker"))}c := make(chan Time, 1)t := &Ticker{C: c,r: runtimeTimer{when: when(d),period: int64(d), // 这个在Timer中是没有的f: sendTime,arg: c,},}startTimer(&t.r)return t
}
Ticker 和Timer的重要区别就是提供了period参数,据此决定Timer是一次性的,还是周期性的。
NewTicker只是构造了一个Ticker,然后把Ticker通过startTimer交给系统协程维护。
其中period为事件触发的周期,sendTime函数是定时器触发时的动作。
- sendTimer函数
func sendTime(c interface{}, seq uintptr) {//c上的非阻塞时间发送。//在NewTimer中使用,它无论如何都不能阻塞(缓冲区)。//在NewTicker中使用,在地板上放下发送是//当读者落后时的期望行为,//因为发送是周期性的。select {case c.(chan Time) <- Now():default:}
}
sendTime接收一个channel作为参数,其主要任务是向channel中写入当前时间。
创建Timer时生成的管道含有一个缓冲区(make(chan Time,1)),所以Timer触发时向channel写入事件永远不会阻塞,sendTime写完即退出。
sendTime使用select搭配一个空的default分支,是因为Ticker也复用了sendTime,Ticker触发时也会向channel中写入事件,但无法保证之前的数据已经
被取走,所以使用select并搭配一个空的default分支,确保sendTime不会阻塞,Ticker触发时,如果管道中还有值,则本次不在向管道中写入时间,将本次触发的事件直接丢弃。
- startTime函数
startTime函数的具体实现在runtime包中,其主要作用是把runtimeTimer写入系统写成的数组中,并启动系统协程(如果系统协程还未开始运行)
其主要是addTimer(t *timer)函数,timer是runtime包中用于表示time包中runtimeTimer结构的struct
addTimer(t *timer)函数
func addtimer(t *timer) {tb := t.assignBucket() // 获取time桶数组lock(&tb.lock) // 加锁ok := tb.addtimerLocked(t) // 将 timer加入数据组unlock(&tb.lock) // 解锁if !ok { // 如果timer加入数组失败,那么触发 panic badTimer()}
}
7.2.2 停止Ticker
停止Ticker时,只是简单地把Ticker从系统协程中移除。
stopTimer即通知系统协程把该Ticker移除,即不在监控。
stopTimer也是runtime中的函数
系统协程只是移除Ticker,并不会关闭channel,以避免用户协程读取错误。
与Timer不同的是,Ticker停止时没有返回值,即不需要关注返回值,实际上返回值也没有什么用途。
Ticker 没有重置接口,即Ticker创建后不能通过重置修改周期。
需要格外注意的是,Ticker用完后必须主动停止,否则会产生资源泄露,持续消耗CPU资源。
8. runtimeTimer 原理(go 1.11)
NewTimer和NewTicker都会在底层创建一个runtimeTimer,runtime包负责管理runtimeTimer,保证定时器按照约定的时间触发。
- Go 1.10 之前: 所有的runtimeTimer 保存在一个全局的堆中
- G0 1.10 ~ 1.13 : runtimeTimer被拆分到多个全局的堆中,减少了多个系统协程的锁等待时间
- Go 1.14+: runtimeTimer保存在每个处理器P中,消除了专门的系统协程,减少了系统协程上下文切换。
8.1 定时器存储
8.1.1 timer 的数据结构
Timer和Ticker的数据结构除名字外,其他完全一样,二者都含有一个runtimeTimer类型的成员,这就是系统协程所维护的对象。
runtimeTimer类型是time包的名字,在runtime包中,这个类型叫做timer.
// 包装时间知道这个结构的布局。
// 如果此结构更改,请调整/time/sleep.go:/runtimeTimer。
// 对于GOOS=nacl,包syscall知道这个结构的布局。
// 如果此结构更改,请调整/syscall/net_nacl.go:/runtimeTimer。
type timer struct {tb *timersBucket // the bucket the timer lives in // 当前定时器寄存于系统timer堆的地址i int // heap index // 当前定时器寄存于系统timer堆的下标//定时器在何时唤醒,然后在何时+时段唤醒。。。(仅限大于0的时段)//每次在计时器goroutine中调用f(arg,now)时,f必须//一个行为良好的函数,而不是块。when int64 // 当前定时器下次触发时间period int64 // 当前定时器周期性触发间隔(如果是Timer,间隔为0,表示不重复触发)f func(interface{}, uintptr) // 定时器触发时执行的函数arg interface{} // 定时器触发时执行的参数一seq uintptr // 定时器触发时执行的参数二(该参数只在网络收发场景下使用)
}
其中timersBucket便是系统协程存储timer的容器,里面有一个切片来存储timer,而i便是timer所在切片的下标。
8.1.2 timersBucket 的数据结构
//go:notinheap
type timersBucket struct {lock mutexgp *g // 处理堆中事件的协程created bool // 时间处理协程是否已创建,默认为false,添加收个定时器是置为truesleeping bool // 事件处理协程(gp)是否在睡眠,(如果t中有定时器,那么还未到达触发的时间,gp会睡眠)rescheduling bool // 事件处理协程(gp)是否已暂停,(如果t中定时器均已删除,那么gp会暂停)sleepUntil int64 // 时间处理协程睡眠事件waitnote note // 时间处理协程睡眠事件(据此唤醒协程)t []*timer // 定时器切片
}
Bucket是存储timer的桶。
- lock: 互斥锁,在timer增加和删除时需要加锁,防止并发
- gp: 事件处理协程,就是系统协程,这个协程在首次创建Timer或Ticker时生成
- created: 状态值,表示系统协程是否创建
- sleeping: 系统协程是否已暂停
- sleepUntil: 系统协程睡眠到指定的时间(如果有新的定时任务则可能会提前唤醒)
- waitnote: 提前唤醒时使用的通知
- t: 保存timer的切片,当调用NewTimer或NewTicker时,便会有新的timer存储到切片中
系统协程在首次创建定时器时创建,定时器存储在切片中,系统写成负责计时并维护这个切片。
8.1.3 Ticker & timer & timersBucket 关系
假设创建了1个Timer,2个Ticker,关系如下
用户创建Timer或者Ticker时会生成一个timer,这个timer指向timersBucket,timersBucket记录timer的指针。
8.1.4 timersBucket 数组
通过timersBucket的数据结构可以看到,系统写成负责计时并维护其中的多个timer,一个timersBucket由一个特定的系统协程来维护。
当系统重的定时器非常多时,一个系统协程的处理能力可能跟不上,所以Go在实现时实际上提供了多个timerBucket,也就是有多个系统协程来处理定时器。
最理想的情况是应该预留GOMAXPROCS个timersBucket,以便充分使用CPU资源,但需要根据实际环境动态分配。为了实现简单,Go在实现时预留了64个timersBucket,可以满足绝大部分场景。
在addTimer时调用的
当协程创建定时器时,使用协程所属的ProcessId%64来计算定时器存入的timersBucket。
在上面的关系中,当三个协程创建定时器时,定时器的分布可能如下:
一般情况下,同一个Process的协程创建的定时器分布于同一个timersBucket中,只有当GOMAXPROCS大于64时才会出现多个Process分布于同一个timersBucket中的情况。
8.2 定时器运行机制
8.2.1 创建定时器
创建Timer或Ticker实际上分为两步:
- 创建一个channel
- 创建一个timer并启动(这里的timer是指runtime包中的timer,不是Timer)
不管是 Timer还是Ticker都是先创建channel,channel都是带有一个缓冲区的。
接着创建timer,调用startTimer启动。
startTimer在runtime包中实现,通过go:link关联
addTimer的实现如下:
func addtimer(t *timer) {tb := t.assignBucket() // 分配 timersBucket ,从64个中选择一个lock(&tb.lock) // 加锁ok := tb.addtimerLocked(t) // 加入切片unlock(&tb.lock) // 解锁if !ok { // 加入失败badTimer() // 错误处理}
}
首先,每个timer都必须归属于某个timersBucket,所以第一步是先选择一个timersBucket,选择的算法很简单,将当前协程所属的Process ID 与 timersBucket数据长度求模,结果就是timersBucket数组的下标。
其次,每个timer都必须加入timersBucket,timersBucket数据结构中的切片t保存着timer的指针,新创建的timer也需要加入这个切片。
保存timer的切片是一个按timer触发事件排序的小顶堆,所以新timer插入的过程中会触发堆调整,堆顶的timer最快被触发。
// 在堆中添加一个计时器,并在新计时器为时启动或启动timerproc
// 比其他任何人都早。
// 计时器已锁定。
// 返回是否一切正常:如果数据结构损坏,则返回false
// 由于用户级别的竞争。
func (tb *timersBucket) addtimerLocked(t *timer) bool {//何时决不能为负数;否则timerproc将溢出//在其增量计算期间,并且永远不会使其他运行时计时器过期。if t.when < 0 {t.when = 1<<63 - 1}t.i = len(tb.t) // 先把定时器插入堆尾tb.t = append(tb.t, t) // 保存定时器if !siftupTimer(tb.t, t.i) { // 在堆中插入数据,触发重新排序return false}if t.i == 0 { // 堆排序后,如果新插入的定时器跑到了堆顶,需要唤醒系统协程来处理// siftup moved to top: new earliest deadline.if tb.sleeping && tb.sleepUntil > t.when { // 系统协程在睡眠(切片中有数据,未到时间),唤醒系统协程来处理新加入的定时器tb.sleeping = falsenotewakeup(&tb.waitnote)}if tb.rescheduling { // 系统协程已暂停(切片中没有数据),唤醒系统协程来处理新加入的定时器tb.rescheduling = falsegoready(tb.gp, 0)}if !tb.created { // 如果是系统协程收个定时器,则启动协程处理堆中的定时器tb.created = truego timerproc(tb) // 系统协程就是这里创建的}}return true
}
- 如果timer的时间是负值,那么就会被修改为很大的值来保证后续定时算法的正确性
- 系统协程是在首次添加timer时创建的,并不是一直存在
- 新加入timer后,如果新的timer跑到了堆顶,则意味着新的timer需要立即处理,那么会唤醒系统协程
小顶堆排序
func siftupTimer(t []*timer, i int) bool { // 入参是数组和数组尾部if i >= len(t) {return false} // 数组越界when := t[i].when // 获取数组尾部元素的触发事件tmp := t[i] // 存储到临时变量中for i > 0 { // 只要没到堆顶,那么就一直运行p := (i - 1) / 4 // parent // 使用 4 叉堆,所以 (i-1)/4 得到父节点if when >= t[p].when { // 如果 子节点的触发事件大于等于 父节点,那么结束,满足小顶堆的规则break // 满足小顶堆的规则,结束}t[i] = t[p] // 否则先将父节点 写到 子节点位置上,(子节点在临时变量中有,不会丢失)t[i].i = i // 将 timersBucket中的index 设置为子节点的indexi = p // 将指针移动到父节点,进行下一轮循环(因为使用数组保存树,所以当index=0时,就到达了根节点)}if tmp != t[i] { // 判断是否发生了并发,理论上 tmp 就是 t[i]t[i] = tmp // 如果并发导致数据不一致,那么强行设置t[i].i = i}return true
}
这是用数组存储的一个小顶堆的维护逻辑。
创建系统协程
当第一次addTimer的时候,会触发创建系统协程:
// Timerproc运行时间驱动的事件。
// 它一直休眠到tb堆中的下一个事件。
// 如果addtimer插入一个新的较早事件,它会提前唤醒timerproc。
func timerproc(tb *timersBucket) {tb.gp = getg() // 获取协程for {lock(&tb.lock) // 加锁tb.sleeping = false // 设置标志now := nanotime() // 获取当前时间delta := int64(-1)for {if len(tb.t) == 0 { // 切片数组为空,堆中没有等待的元素delta = -1 // 不创建系统协程,结束break}t := tb.t[0] // 拿到小顶堆堆顶的元素delta = t.when - now // 计算需要等待的时间if delta > 0 { // 如果还未到时间,结束,delta 大于0表示时间未到,小于0表示已过时执行break}ok := trueif t.period > 0 { // Ticker 的 period 才会大于0, Timer 的 period 等于0// leave in heap but adjust next time to firet.when += t.period * (1 + -delta/t.period)// delta小于0,表示本次是过时执行,那么下次执行时间需要加上delta,比如delta=1s,那么下次执行时间 when(next) = when(this) + period + delta// 将上述式子进行变形 when(next) = when(this) + period (1 + delta/period)// 因为已知delta <= 0 ,所以 when(next)=when(this) + period * (1 + -delta/period) => when += period * (1 + -delta/period)if !siftdownTimer(tb.t, 0) { // 将根节点值进行了增加,那么从 小堆顶 进行平衡ok = false}} else { // period = 0 表示是 Timer,触发一次后,需要从小顶堆中删除// remove from heaplast := len(tb.t) - 1 // 切片长度缩小if last > 0 { // 小顶堆不为空tb.t[0] = tb.t[last] // 将最大值设置到根节点,然后执行 根节点值增加,然后从 小堆顶进行平衡tb.t[0].i = 0}tb.t[last] = nil // 将最后一个元素置空,删除,显示的置空,用于 gctb.t = tb.t[:last] // 将最后一个元素删除if last > 0 { // 如果 小顶堆不为空,那么从根节点进行平衡if !siftdownTimer(tb.t, 0) {ok = false}}t.i = -1 // mark as removed // 标记该 timer 已经被删除了,无法通过 index 在 timersBucket中索引到了}f := t.f // 拿到 触发后的 funcarg := t.arg // 获取第一个参数,channel , Timer.C or Ticker.Cseq := t.seq // 获取第二个参数,只有网络才会用到unlock(&tb.lock) // 解锁if !ok { // 如果小顶堆平衡失败,那么返回错误信息badTimer()}if raceenabled {raceacquire(unsafe.Pointer(t))}f(arg, seq) // 执行预设的 func ,也就是 sendTime , 也就是将 now 写入 Timer.C 或 Ticker.C 触发事件lock(&tb.lock) // 加锁,防止多次触发,分两次加锁解锁,是为了防止执行 sendTime 的时候,太费时间,导致协程无法进行其他操作}if delta < 0 || faketime > 0 { // 小顶堆中没有元素了,系统协程需要暂停// No timers left - put goroutine to sleep.tb.rescheduling = true // 设置暂停标志goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)continue}// At least one timer pending. Sleep until then.tb.sleeping = true // 没有待触发的事件,设置睡眠标志tb.sleepUntil = now + delta // 计算系统协程睡眠时间noteclear(&tb.waitnote) // 清除系统协程的唤醒通知unlock(&tb.lock) // 解锁notetsleepg(&tb.waitnote, delta) // 系统协程睡眠,没有待触发的事件}
}
唤醒系统协程(睡眠)
当有定时器需要触发的时候,会唤醒系统协程,触发事件:
func notewakeup(n *note) {var v uintptrfor { // 加锁,乐观锁,自旋v = atomic.Loaduintptr(&n.key)if atomic.Casuintptr(&n.key, v, locked) {break}}// Successfully set waitm to locked.// What was it before?switch {case v == 0:// Nothing was waiting. Done.case v == locked:// Two notewakeups! Not allowed.throw("notewakeup - double wakeup")default:// Must be the waiting m. Wake it up.semawakeup((*m)(unsafe.Pointer(v))) // 唤醒,唤醒后,进入 timerproc 的 for 循环,再次触发事件, 当小顶堆没有触发的定时器时,协程会在for最后一步sleep,唤醒后继续执行}
}
唤醒,唤醒后,进入 timerproc 的 for 循环,再次触发事件, 当小顶堆没有触发的定时器时,协程会在for最后一步sleep,唤醒后继续执行.
唤醒系统协程(暂停)
当小顶堆中没有元素,系统协程会进入暂停状态,等待addTimer
func goready(gp *g, traceskip int) {systemstack(func() {ready(gp, traceskip, true)})
}
睡眠(Sleep):time.Sleep(duration) 函数会使当前的 goroutine 暂停指定的时间。
在这段时间内,goroutine 不会执行任何操作,也不会消耗 CPU 资源。
这个函数通常用于模拟 I/O 操作,或者在测试中插入人为的延迟。
暂停(Yield):runtime.Gosched() 函数会使当前的 goroutine 让出 CPU,让其他 goroutine 有机会执行。
这个函数通常用于在一个长时间运行的 goroutine 中,插入一些 “断点”,以避免阻塞其他 goroutine 的执行。
需要注意的是,runtime.Gosched() 并不保证当前 goroutine 会立即停止执行,也不保证其他 goroutine 会立即开始执行。
8.2.2 删除定时器
当Timer执行结束或Ticker调用Stop时会触发定时器的删除操作。从timersBucket中删除定时器是添加定时器的你过程,即堆中元素删除后,触发小顶堆平衡。
不管是Timer还是Ticker的删除操作,最终都会执行runtime中的 stopTimer
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {return deltimer(t)
}
接下来看看deltimer的逻辑:
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {if t.tb == nil { // 用户自己创建的 Timer 或者 Ticker 是不可用的// t.tb can be nil if the user created a timer// directly, without invoking startTimer e.g// time.Ticker{C: c}// In this case, return early without any deletion.// See Issue 21874.return false}tb := t.tblock(&tb.lock) // 加锁removed, ok := tb.deltimerLocked(t) // 移除元素unlock(&tb.lock) // 解锁if !ok {badTimer()}return removed
}
得益于在 timerproc中加了两次锁,删除小顶堆元素,不需要马上通知timerproc。
因为在计算时间的时候,是加锁的,中间执行是不加锁的,后面设置系统协程状态也是加锁的。
所以删除元素,要么发生在系统协程睡眠或暂停的时候,要么发生在 sendTime 的时候,不管那种,都不会影响触发的正确性。
因为可能删除小顶堆中任意位置的元素,所以需要从该节点出发,向上和向下平衡
8.3 资源泄露
对于不使用的Ticker需要显示的Stop,否则会产生资源泄露问题。
- 首先,创建Ticker 的协程并不负责计时,只负责从Ticker的管道中获取事件
- 其次,系统写成只负责定时器计时,向管道中发送事件,并不关心上层协程如何处理事件。
如果创建了Ticker,则系统协程将持续监控该Ticker的timer,定期触发事件。如果Ticker不在使用且没有Stop,那么系统协程的负担会越来越重,持续消耗CPU资源。
9. 性能优化 (go 1.14+)
上面的runtimeTimer的原理仅适用于Go 1.10 ~ 1.13 ,尽管定时器的性能已经能满足绝大多数场景,但在一些高度依赖定时器的业务场景中,
往往需要创建海量的定时器,这些场景中需要定时器能更精确、占用系统资源更少。
Go 1.14 中对定时器又做了一次大的性能优化,主要围绕如何管理runtimeTimer进行,包含如何存储runtimeTimer,如何检测以确保定时器能准时触发。
9.1 消除了timersBucket
在前面的版本中,NewTimer和NewTicker创建的runtimeTimer会存储到全局的timersBucket桶中,最多拥有64个timersBucket桶,如果GOMAXPROCS的值不超过64,timersBucket桶的数量等于GOMAXPROCS。
每个timersBucket桶中均包含一个堆用于保存runtimerTimer,此外每个timersBucket桶对应一个专门的协程(timerproc)来监控runtimeTimer
在Go 1.14中取消了timersBucket桶,直接把保存runtimeTimer的堆放到了处理器P中。
在处理器P的数据结构中,除了包含协程的队列,还直接包含了runtimeTimer。
消除timersBucket桶的同时,也不再需要timerproc来监控定时器了。
9.2 消除了timeproc
在Go 1.14之前的版本中,timerproc实际上专门监控定时器的协程执行体。
在Go 1.14的设计中取消了timerproc,因为runtimeTimer不在存储在timerBucket桶中,而是转移到每个处理器P中。
Go 1.14 中做的优化主要是为了取消timerproc,不在依赖timerproc来监控定时器,而是希望提供一种更搞笑的监控方式。
9.3 更少的锁竞争
在Go 1.14 之前,runtimeTimer存储在timersBucket桶中,runtimeTimer的添加、删除均需要加锁。
在处理器P的数据结构中,仍然有一把锁(timersLock)来限制timers的并发访问。实际上在Go 1.14 中,runtimer的添加、删除也需要加锁。
当协程发生系统调用时,当前的工作线程将释放持有的处理器P,当前工作线程专注于处理系统调用(被阻塞),然后启动一个新的工作线程来继续消费当前处理器P中的协程。当新的工作线程启动时,需要寻找空闲的处理器P,这是需要加锁的。
当程序中拥有大量的定时器时,在Go 1.14之前,每个timerproc处理完一个定时器都会休眠,即触发系统调用,从而释放处理器P,启动新的工作线程,多个新的工作线程在获取空闲处理器P时会争抢互斥锁。
从Go 1.14开始,处理器不在由timerproc处理,而是在每次协程调度时检查定时器是否需要触发,在协程调度时捎带检查定时器。
相较于之前的timerproc,定时器被关注的更加频繁,而且不会因为协程触发系统调用而产生新的工作线程,所以定时器触发的会更准时。
9.4 更少的上下文切换
在Go 1.14 之前,timerproc也是夹杂在系统其他的协程中被调度的,假设将timerproc标记为GT
由于timerproc夹杂在其他的协程中,当协程较多时,难以保证timerproc能被及时调度。假如程序中每1微妙就需要触发一个定时器,而timerproc每2微妙才被调度一次,那么定时器将产生1微妙的误差,从而不准时了。
在Go 1.14 中,由于每次调度协程时都会检查处理器,所以当有定时器需要触发时,先处理定时器,在调度协程,相当于每个协程都兼任了之前的timerproc的工作,但不会触发系统调用。
由于在设计上取消了timerproc,也避免了频繁的调度timerproc时产生的上下文切换,从而在一定程度上节省了系统资源。
9.5 优化效果
在github上有优化代码的性能测试结果
https://github.com/golang/go/commit/6becb033341602f2df9d7c55cc23e64b925bbee2
可以看到在多个涉及定时器的场景中,性能均有了较大程度的优化。
相关的好文:
https://studygolang.com/articles/26529
https://github.com/golang/go/commit/76f4fd8a5251b4f63ea14a3c1e2fe2e78eb74f81
https://xiaorui.cc/archives/6483
https://www.pengrl.com/p/20021/