C++11实现一个自旋锁
自旋锁也是一种互斥锁,和mutex锁相比,它的特点是不会阻塞:如果申请不到锁,就会不断地循环检测锁变量的状态,直到申请到锁。它的核心算法是一个循环检查锁变量的操作,达不到目标也不会阻塞线程,而是一直不停的循环直到目标达成,因此称为自旋锁。
底层实现有两种算法,一种是把目标值和锁状态变量的原值进行交换,这个过程要求是一个原子操作,然后检查交换出来的值是否是期望的值,称为TAS算法;另一种是先检查锁状态变量是否是期望值,如果是就设置为目标值,并返回成功,这个过程也要求是一个原子操作,称为CAS算法。
下面分别以两种算法为例,介绍一下自旋锁的实现及其特点。
底层算法
TAS算法
即Test And Set,字面意思是测试并设置,锁状态变量要么为true要么为false,就是将true值原子地存入这个变量并返回设置前的原值,也就是相当于用true交换锁状态变量的当前值。它的语义原型可以使用下面的伪代码表示:
<< atomic >>
function tas(p : pointer to bool) returns bool {bool value = *p*p ←truereturn value
}
操作完之后,指针p指向的变量被设置为true,如果原值为false,就返回false,如果为true,就返回true。这个TAS操作过程要求是一个原子操作,中间不允许被打断,也就是保证了交换过程是一个原子操作,在X86环境下,使用xchg指令来实现。
CAS算法
即Compare And Swap,字面意思是比较并交换(不过Java的原子类提供的方法名称是:compareAndSet,x86提供的指令名称是cmpxchg:compare-exchange),把当前锁状态值与一个期望值作比较,如果相等,则更新锁状态为新值,并返回比较结果。它的语义原型,可以使用下面的伪代码表示:
<< atomic >>
function cas(p : pointer to int, old : int, new : int) returns bool {if *p ≠ old {return false}*p ← newreturn true
}
它的输入参数一共有三个,分别是:
p: 指向要修改的变量,old: 旧值,new: 新值。返回值是布尔类型,表示赋值是否成功。
CAS实现逻辑也非常简单,就是先检查p所指位置的值是不是等于期望值old,如果等于,那就把该位置的值更新为new,并返回 true,否则就不做任何操作,返回false。同样要保证比较和修改是一个原子操作,在X86环境下,使用lock cmpxchg指令,lock前缀是为了保证指令cmpxchg是原子操作。
由此可见,CAS和TAS返回结果意义不一样,TAS返回的是锁状态变量的值,为true时继续自旋,而CAS返回的是锁状态原值与期望值的比较结果,为false时继续自旋。TAS没有比较的操作,比较操作是在返回交换结果之后由调用者负责的。
TAS算法中操作的变量是bool类型,而CAS算法中的变量可以是任意类型,在实现自旋锁时都是使用整数型,比如int、bool等。
atomic_flag+TAS实现
C++11原子类中atomic_flag提供了TAS算法,对应的成员函数是:
bool test_and_set( std::memory_order order = std::memory_order_seq_cst );
功能是将 std::atomic_flag 的状态原子地更改为设置(true),并返回它先前保存的值,调用时如不指定内存序参数,则缺省为memory_order_seq_cst,即顺序一致性。用它来实现的自旋锁,非常简单。如下所示:
class spin_lock_tas final {
public:void lock() {while (flag.test_and_set(memory_order_acquire));}void unlock() {flag.clear(memory_order_release);}private:atomic_flag flag = ATOMIC_FLAG_INIT;
};
使用atomic_flag类型的变量flag作为锁状态变量,在加锁函数lock()中,test_and_set()实现了TAS原语操作。当flag原值为true时,返回true,说明这个锁已经被占有,其它的线程无法获取了;在解锁函数unlock()中,使用clear()设置flag为false。
循环while (flag.test_and_set(memory_order_acquire));
就是自旋过程。while语句在循环时,调用test_and_set()把true写入flag变量,然后检查返回的值是不是false,如果是false就退出循环,如果是true则继续循环,直到返回false退出。由此可见,成员函数起的名字叫test_and_set(),但是在使用的时候,却是按照set and test的方式使用的。
释放锁非常简单,调用atomic_flag的成员函数clear(),它把flag的值设置为false,这就意味着正在自旋中的另一个线程有机会从test_and_set()返回false,退出自旋。
atomic<bool>+CAS实现
在C++11提供的原子类atomic<T>中,提供了CAS算法接口,即compare_change_weak/strong()接口,以storng版为例:
bool compare_exchange_strong( T& expected, T desired, std::memory_order success, std::memory_order failure ) ;
原子地比较 *this 和 expected 的值,如果它们相等,那么以 desired 替换前者(进行读-修改-写操作),并使用内存序success参数。否则,将 *this 中的实际值加载进 expected(进行加载操作),并使用内存序failure参数。
参数:
expected - 期待在原子对象中找到的值的引用
desired - 在符合期待时存储到原子对象的值
success - 读-修改-写操作所用的内存同步定序
failure - 加载操作所用的内存同步定序
返回值:
成功更改底层原子值时返回 true,否则为返回 false。
下面使用atomic<bool>来实现自旋锁,实现也非常简单。
class spin_lock_cas final {atomic<bool> sync_var{false};public:void lock() {bool expected;do {expected = false;} while (!sync_var.compare_exchange_strong(expected, true,memory_order_acquire, memory_order_relaxed));}void unlock() {sync_var.store(false, memory_order_release);}
};
使用atomic<bool>类型的变量sync_var为锁状态变量,在加锁函数lock()中,compare_exchange_strong()实现了CAS原语操作。当sync_var原值为false时,操作返回true,说明获得了这个锁,如果原值是true时,返回false,说明这个锁已经被占有,同时sync_var被设置为true,其它的线程无法获取了;在解锁函数unlock()中,使用store()让sync_var为false。
循环while (!sync_var.compare_exchange_strong(expected, true))
就是自旋过程。compare_exchange_strong()是一个原子操作,它在执行时,先把expected和sync_var做比较,如果相等,就把true写入sync_var然后返回true就退出循环,即获得了锁,如果是不相等,设置expected为true(因此下一次循环时要更新expected为false)并返回false,说明没有获得锁,继续自旋直到返回true退出。
释放锁非常简单,调用atomic<bool>的成员函数store(),它把sync_var的值设置为false,这就意味着正在自旋中的另一个线程有机会从compare_exchange_strong()返回false,退出自旋。
内存序
TAS算法和CAS算法只是实现了自旋锁的互斥功能,即保证不会有多个线程同时能获得锁。此外,在使用自旋锁时,还要保证临界区内共享变量的可见性和顺序性,也就是要指定内存序(memory order)来保证内存一致性,一般在解锁时使用释放(release)语义,在加锁时使用获取(acquire)语义。这样能够保证上一个线程解锁时在临界区修改的数据,在下一个线程获得锁后进入临界区能够见到最新的修改值,访问内存操作不产生乱序现象。实际上锁状态变量也是一个同步变量,它起到了释放锁和获取锁的线程之间访问共享变量时同步定序的作用。网上有很多介绍C++内存序的文章,这里不再赘述。
二者区别
上面是分别使用TAS和CAS算法实现的最简单、最朴素的自旋锁,没有进行任何的优化考虑。下面分别比较一下它们之间的优劣。
先下面分别看一下这两种算法在x86环境下产生的汇编代码(clang -O2编译环境),并把每个指令的时钟周期注释在了后面(Ivy Bridge架构的指令执行周期)。
1、atomic_flag + TAS算法
spin_lock_tas::lock():
.LBB1_1:mov al, 1 // 1xchg byte ptr [rdi], al // 25 + 内存读和内存写耗时test al, al // 1jne .LBB1_1 // 0 ret
就是一个简单的循环指令流代码,可以看到一次自旋需要27个时钟周期再加上内存读和内存写的开销。其中指令jne .LBB0_1
为0,是因为它和前面的test指令宏融合(Macro-Fusion)为一个单一微指令(single macro-op)。
2、atomic + CAS算法
spin_lock_cas::lock():mov cl, 1
.LBB1_1:xor eax, eax // 1lock cmpxchg byte ptr [rdi], cl // 22 + 内存读 + [内存写(如果相等)]耗时jne .LBB1_1 // 1 ret
也是一个简单的循环指令流代码,可以看到一次自旋需要24个时钟周期再加上内存读+ [内存写(如果相等)] 的开销。其中指令cmpxchg前面有个前缀lock,是为了保证该指令的执行是一个原子操作。
由此可见,即使假设两个算法中读写内存的时间开销一样的话,每次自旋时,使用cmpxchg指令(即CAS算法)也要比使用xchg指令(即TAS算法)要快一些,要少用3个指令周期。实际上使用cmpxchg指令比使用xchg指令的开销要小得多,因为指令cmpxchg只有在原值和期望值相等时才会进行内存写操作,即每次自旋它不一定有写内存的时间开销,只有读内存的开销,而此时数据都在cache中,开销很小。
使用xchg指令自旋时,是先无条件交换再根据交换结果进行判断,即指令xchg byte ptr [rdi], al
,先要把和寄存器al中的值和[rdi]指向的内存地址处的1字节锁状态变量交换,无论它们是否相等。也就是说写内存是无条件的,因为xchg有隐藏的lock前缀,它要求使用seq_cst内存序,因此,xchg指令不是把al写入store buffer就完事,而且保证seq-cst内存序语义,即还要把写入的数据从store bufer中flush到cache和内存中,并通知各个处理器更新缓存,等待各个CPU返回ACK之后才算写内存结束,这个过程相对普通的写入store buffer要漫长的多。同时,根据MESI协议,其它CPU在同一个自旋变量上面自旋时,会因为这个CPU的这次无条件写入内存,导致缓存失效,就得重新从内存中读取(即“乒乓缓存”现象),所有自旋中的CPU几乎同时都要读内存,加剧了内存总线的竞争。不管哪个CPU,每一次自旋操作,即使获取不到锁,都有一次这样的过程。也就是说xchg的无条件写入内存操作对全局有很大的影响,此时的写内存是一个全局原子写事务,导致所有正在自旋中的CPU缓存失效,进而要更新缓存,从而产生流量风暴,加剧了内存总线的竞争和数据量的传输。同样当调用unlock()接口释放锁时,此时是需要把false值写入锁状态变量,因为其它的线程还在自旋中,会一直地读写内存而占有系统总线。此时释放锁的线程还要和获取锁的线程一起竞争系统总线,导致了释放锁时的延迟。
而使用cmpxchg指令自旋时,是先进行判断,只有符合条件时再交换 ,即指令lock cmpxchg byte ptr [rdi], cl
,即先要把和寄存器al中的值和[rdi]指向的内存地址处的1字节锁状态变量进行比较,只有它们相等时,才会把cl中的值写入内存。因此,如果别的CPU在没有比较成功时,是不会写内存的,也就不会flush到cache和内存中,这样,CPU在自旋时,它们各自的cache仍处于MESI协议的S状态,没有失效,是不需要重新从内存读取。如果各个CPU自旋时CAS比较结果不相等,它们之间不会互相影响,也就是说,如果cmpxchg比较结果不相等时,没有对全局的各个自旋中的CPU造成影响。 只有当cmpxchg比较结果相等时,才会把寄存器cl的值写入内存,因为cmpxchg指令前面有lock前缀,同样要求使用seq_cst内存序。此时才如同xchg指令那样把al更新到cache和内存中,并通知各个处理器更新缓存,这样在同一个锁状态变量上正在自旋中的其它CPU,才需要重新从内存读取锁状态变量,也就是只有当一个线程获得了锁时才导致一次相关CPU重新读取内存的操作,显然比xchg指令要好得多。在unlock()释放锁时,要把false值写入锁状态变量,其它的线程CPU虽然在自旋中,但是因为比较条件没有满足,并没有进行写操作,不存在像xchg那样因为一直写而导致的流量风暴,因此,unlock()在写内存时受到的影响要小得多。
可见,虽然TAS和CAS算法在X86 CPU上的实现时,它们执行的指令时钟周期差别并不明显,但是xchg指令每次的原子写事务,有全局的副作用影响,不光申请锁之间的线程互相影响,而且也影响到了释放锁的线程。相比而言,CAS算法性能好一些,也对整个系统更友好一些。
既然这样,是不是就不要使用atomic_flag来实现自旋锁了?并不是,TAS算法也可以进行优化,以避免每次无条件写内存对系统的影响,在C++20版本中对atomic_flag扩展了一个成员函数:test(),它提供了查询atomic_flag原子量的功能,这样就可以使用TTAS(Test Test And Set)算法进行优化了,这些内容参考文章:自旋锁的实现及优化里面的讲解。
附
下面是在一个4核CPU处理器上,测试两个自旋锁实现的性能程序:
static inline uint64_t get_rdtsc(void) {union {uint64_t tsc64;struct {uint32_t tsc32_lo;uint32_t tsc32_hi;};} tsc;asm("rdtsc":"=a"(tsc.tsc32_lo),"=d"(tsc.tsc32_hi));return tsc.tsc64;
}#if 1
spin_lock_tas slock;
#else
spin_lock_cas slock;
#endifstatic mutex mtx;
#define COUNT 1000000
static void sum() {uint64_t ticket1 = get_rdtsc();for (int i=0; i<COUNT; i++) {slock.lock();slock.unlock();}uint64_t ticket2 = get_rdtsc();int64_t x = ticket2 - ticket1;lock_guard<mutex> guard(mtx);printf("%lu\n", x);
}int main() {thread t1(sum);thread t2(sum);thread t3(sum);thread t4(sum);t1.join();t2.join();t3.join();t4.join();
}
各运行10次,分别取耗时时钟周期最小的那组测试结果(耗时越小,说明线程在运行时越可能没有被调度切换,也没有被中断)。
atomic_flag + TAS算法实现的自旋锁:
177915346
279384821
227520077
346827330
atomic<bool> + CAS算法实现的自旋锁:
432447222
502814284
556108365
513192991
可见,正如前面分析的,使用CAS算法要比TAS性能要好一些,性能相差2-3倍,当然这只能说是在这个测试用例上面的结果,即简单的加锁解锁过程,没有任何临界区代码,显然是一个高度竞争的场景,不代表所有场景。