etcd-v3.5release-(2)-STM
a.b.c表示a文件里的b类的方法c,注意a不一定是包名,因为文件名不一定等于包名
!!!!etcd在put的过程中使用的batchTxBuffered,这个事务是写bbolt数据库使用的事务,是对bbolt.Tx的一个封装,就是攒一批事务作为一个大事务,一次性提交到bbolt。etcd对外服务得像一个数据库,但是外部看到的不是bbolt,也就是说外部对etcd提交的事务和etcd用来写bbolt时使用的事务完全是两个东西,etcd使用的事务是STM(软件事务内存)
!!!etcd smt在单个事务中对同一key的修改在提交的时候只有最终的修改有效,因为他用了一个rset/wset分别表示客户端在事务中要读取哪些key和修改哪些key,在一条path中对某个key只允许有一个put操作,如果同一条path里有对同一个key的两个put操作,那么clientsdk就会报错
STM、ReadView
!!!!总的来说,etcd的stm事务模型是在客户端实现的,但事务的执行是在etcd集群中的成员节点上完成的。
apply的时候是一条一条的应用的,但是如果要原子的同时apply一批日志,那么就必须使用STM
STM内部包含一个wset和一个rset,保存了本次事务访问到的数据和版本,然后在提交的时候如果发现事务变了,那么就会报错
commited时日志的索引顺序和apply时revision顺序是一致的
!!!wal虽然是顺序写入的,apply的时候也是顺序提交到bbolt的,apply的时候他是会把从chan收到的数据包装成一个job,然后丢到一个fifo队列,这个fifo队列会等当前任务运行完毕才会运行下一个job,也就是apply的时候也必定是按顺序apply的,也就是说只有上一个事务提交到bbolt才会开始提交下一个事务(看了源码),这样在开始x事务的时候x以前的日志都已经写完了,也就是说如果x事务读取的数据被x-k版本的事务修改了,那么x事务是可以检测出来的,如果检测到冲突,那么就不会把该数据写入数据库,也即是说即使日志已经写入了磁盘,但是数据是不一定会写入数据库的。举个例子:一个事务t1,先读取key a,此时a的版本(即revision)为v1,然后事务t1在本地执行了很久才提交事务,然后t1提交到etcd的时候raft日志索引为index1,然后apply的时候对应的revision为 rev1,然后执行的时候检测到key a此时的版本是v1+x,但是他读的时候a的版本为v1,所以此时就检测到了冲突(从treeIndex检测,因为treeIndex包含所有key和key对应的revision,比较一下revision就行),那么事务就会执行失败,虽然事务执行失败,但是日志是apply成功的,也就是说事务执行的结果就是数据不写入数据库。再举个例子,假设事务t1对应的raft日志已经写入磁盘,然后apply但是还没apply完成的时候系统崩溃了,这个没关系,重做就是了,当再次重做到事务t1对应的日志的时候,因为t1以前的日志必定已经重做完了,所以此时还是可以再次检测到冲突的,所以只要wal日志已经持久化了,那么崩溃就对这些已经持久化的日志无影响。
!!!区分revision和applyIndex:etcd的修改操作比如txn或者put,底层都是提交一个事务到bbolt,但是底层是batchTxBuffered即批量提交的,所以就是说上层执行完一个事务后,事务缓存在batchTxBuffered中并没有提交,但是只要把事务提交到了batchTxBuffered,s.currentRev就会+1,也就是说下一次put或者writetxn事务分配的revision就是s.currentRev了,但是!!!在底层batchTxBuffered提交事务前,新增的数据对外部是不可见的,假设batchTxBuffered提交时,本批次最大的revision为x,一旦提交以后,那么rev<=x的素有key都对外可见了,但是x<rev<=s.currentRev之间的而所有key此时对外是不可见的,也就是说可见的数据是落后于已做的数据(因为底层没有提交,所以这些数据对外不可见),这与applyIndex落后于commiteIndex原理是一致的,已经apply的数据对外可见,但是已经commite但是还没有apply的数据不可见
STM内对同一个值多次修改,那么只有最后一次写会成功,因为他是先在本地执行函数,然后得到一个最终的结果,再在commited时候直接提交最终的结果
!!mvcc写,也就是说多个版本
Server上Txn请求流程,分Read和Write
!!!etcd直接提供的if-then-else不支持嵌套事务,但是不知道提供的NewSTM支不支持嵌套事务,有待确定,但是etcd server是肯定支持嵌套事务的
batchtxn内部有一个batchtx表示最后一个事务
!!!stm原理:
stm if xx then yy else zz
stm执行时先判断if里面的条件是否成立,一旦成立就表示true就执行then 路径,一旦失败就表示false就执行else路径,then和else都叫做txnpath即事务路径。stm是允许嵌套stm的比如if里面有一个子if-then-else或者then里面还有if-then-else或者else里也有子if-then-else,当然,也允许子stm里还有子stm。then-else里面的叫做操作即op,因为可能有很多,所以叫做ops。所以stm对于嵌套事务的执行是这样的:先执行最顶层if,然后根据if结果true或者false选择对应的路径,然后执行路径下的op,每个op可以是子事务也可以是简单的put/get/range等操作,然后就根据op执行对应的操作就行,如果是子事务,就再执行一遍if-then-else流程就行
etcd stm代码使用样例(抄自网络):
func getEtcdClient() *v3.Client {// 配置 etcd 客户端cfg := v3.Config{Endpoints: []string{"30.128.101.96:2379"}, // 集群中的任意一个节点DialTimeout: 5 * time.Second,}// 创建 etcd 客户端cli, err := v3.New(cfg)if err != nil {log.Fatal(err)}return cli}func stm() {client := getEtcdClient()txnTransfer(client, "k1", "k2")
}
func txnTransfer(etcd *v3.Client, sender, receiver string) error {// 失败重试for {if ok, err := doTxn(etcd, sender, receiver); err != nil {fmt.Println("dotxn failed:", err.Error())} else if ok {fmt.Println("dotxn success")}}
}
func doTxn(etcd *v3.Client, sender, receiver string) (bool, error) {fmt.Println("\n开始事务前的读取操作:")getresp, err := etcd.Txn(context.TODO()).Then(v3.OpGet(sender), v3.OpGet(receiver)).Commit()if err != nil {return false, err}senderKV := getresp.Responses[0].GetResponseRange().Kvs[0]receiverKV := getresp.Responses[0].GetResponseRange().Kvs[0]// 发起转账事务,冲突判断 ModRevision 是否发生变化var s stringfmt.Println("senderKv: revision=" + strconv.FormatInt(getresp.Header.Revision, 10) +" key=" + string(senderKV.Key) + " value=" + string(senderKV.Value))fmt.Println("senderKv: revision=" + strconv.FormatInt(getresp.Header.Revision, 10) +" key=" + string(receiverKV.Key) + " value=" + string(receiverKV.Value))fmt.Println("请输入newValue:")_, err = fmt.Scan(&s)if err != nil {fmt.Println("scan error:", err.Error())return false, err}fmt.Println("开始执行事务:修改key的值为newValue......")txn := etcd.Txn(context.TODO()).If( #etcd stm是通过if来判断是否可以执行#一旦if里面设置的条件通过了#那么就说明该事物没有发生冲突可以执行#这是if的两个compare即两个比较器v3.Compare(v3.ModRevision(sender), "=", senderKV.ModRevision), #compare1: key=k1,比较字段=ModRevision,result="="#result表示我们用什么比较方式#即比较看k1这个key的ModRevision这个字段#在事务执行时是否和旧的ModRevision相等#如果不相等则说明在这期间被其他人编辑了,#就说明if失败即事务失败v3.Compare(v3.ModRevision(receiver), "=", receiverKV.ModRevision)) #compare2: key=k2,比较字段=ModRevision,result="="#即比较看k2这个key的ModRevision这个字段#在事务执行时是否和旧的ModRevision相等#如果不相等则说明在这期间被其他人编辑了#!!!比较哪些key是任意的,是自己选的 #!!!也支持一次比较多个key #!!!当然还支持比较key的其他字段比如xx#!!!当然也支持其他result即>、<、=、!=等#!!!我们这里只比较ModRevision且用的是=txn = txn.Then( #path then里面又有两个op,每个op又是一个put操作v3.OpPut(sender, s), v3.OpPut(receiver, s),).Else( #path else里面又有两个op,每个op又是一个put操作v3.OpPut(sender, "faild to set"),v3.OpPut(receiver, "faild to set"),) resp, err := txn.Commit() fmt.Println("事务执行完成")if err != nil {return false, err}return resp.Succeeded, nil
}
//txn事务有两种,一种是读事务,一种是写事务,通通都是通过这个函数来处理,读事务比较简单,重点在于后半段的写事务
//!!!!etcd只读事务不走raft流程,直接本地读
//!!!!个人疑惑:只读事务里面读不需要readIndex吗?
key.quotaKVServer.Txnkey.kvServer.Txnif isTxnReadOnly #如果是只读事务,那么不需要走raft请求,直接事务读就行if !isTxnSerializable(r): #判断是不是线性读,!isTxnSerializable表示线性读s.linearizableReadNotify(ctx) #如果需要线性读则阻塞等待直到读的条件满足#就是记录当前的commitedIndex为cx,然后等待,直到applyIndex>=cxs.doSerialize #此时applyIndex>=cx,所以可以执行序列化读apply.applierV3backend.Txn #事务读也是一个事务操作,只不过这个事务执行的是range读操作#所以这里的具体操作就是读取操作...txn函数在下面详解else: etcdserver.EtcdServer.raftRequest #写事务因为对数据库有修改,所以必须先走一遍raft日志流程#也就是先写一条raft日志,然后再apply这条日志#是的,所有对数据库有修改的操作都叫apply,#只不过apply可以进一步细分为put/txn等很多操作...raft写日志流程略... #1:raft写日志流程,略etcdserver.apply #2:apply日志,这里就是执行写事务......apply.applierV3backend.Apply #所有修改数据库的操作都可以叫做applycase r.Txn != nil: #apply可以进一步细分,这里是实务操作,即txnop = "Txn" apply_auth.authApplierV3.Txn #鉴权准入apply.quotaApplierV3.Txn #配额准入apply.applierV3backend.Txn #执行事务isWrite := !isTxnReadonly(rt) #判断是不是写事务if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer:readTx=kvstore_txn.store.Read(mvcc.SharedBufReadTxMode) #共享buf读事务,可以避免复制buffer#因为是共享buf,所以会对buf加读锁s.mu.RLock() #kvstore代表数据库,kvstore是一个watchablestore对象#watchablestore对象内部又含有一个store对象#watchablestore和store对象都有一个mu对象#但是watchablestore对象没有Read函数而store对象有#所以go这里kvstore.Read代表的就是store的Read函数#所以store.Read函数里使用的肯定就是store.mu了#操蛋的go语法,刚开始没了解数据结构,迷惑了好久#!!!这里是给store.mu加读锁即给数据库加读锁#!!!加读锁后会阻止数据库写操作#!!!也就是说虽然读写会并发进行#!!!但是读的时候会阻止写#!!!注意:这里是store.mu后面还有把watchable.mu锁#!!!简单粗暴,s.mu.RLock会在txn.End中释放s.revMu.RLock() #临时给revMu加读锁,#因为我们要读取curretnRev和compactMainRevvar tx backend.ReadTx #backend.ReadTx是对bolt readtx的一个封装if mode == ConcurrentReadTxMode: #根据mode参数创建对应的txntx = s.b.ConcurrentReadTx() #conncurrentReadTx创建一个新的readTx,并且复制buf#readBuf中保存的是最新commit的数据backend.readTx.RLock() #给readBuf加读锁,因为事务提交的时候会把writebuf写回到readbufbackend.readTx.txWg.Add(1) #???不太懂,bbolt回滚和事务机制curBuf := b.readTx.buf.unsafeCopy() #复制readBuf,readTx.buf就是readBuf#提交事务的时候会把batchTxBuffered的writeBuffer#中的内容写到readTx.buf#查询操作先查readBuf,然后再查bbolt数据库backend.readTx.RUnlock() #!!!操作完后解锁#!!!也就是说复制完readBuf后就会释放事务读锁了#!!!后续读操作就是读readBuf的副本了#!!!所以不用担心commite的时候会用writebuf覆盖readBuf#!!!虽然concurrentRead创建后就会释放readTx的读锁#!!!但是创建concurrentReadTx的时候需要对readTx加读锁#!!!所以如果有地方已经加了写锁,比如提交事务的时候#!!!那么这里也会等到写事务完成后才能成功加读锁else:tx = backend.backend.ReadTx() #如果是sharedBuff模式return b.readTx #则直接返回backend内部的readTx而不是新创一个readTxtx.RLock() #因为如果是sharedbuff模式则是共用一个readTxtx.mu.RLock #所以tx.RLock是一个mutex.RLock操作#如果是concurrentReadTx则tx.RLock是一个空操作#!!!etcd自己包装了bbolt.tx,即包装出一个readTx#!!!这个readTx主要同步读事务和写事务的#!!!etcd读流程会先从readbuf读,读不到再去bbolt读#!!!sharedBuff模式下所有请求都使用同一个readTx#!!!即都会访问同一个readTx.buf,所以读写需要加锁#!!!对这个readTx加读写锁来表示是否有读写事务正在运行#!!!如果是读操作那么就对这个readTx加读锁#!!!比如这里compare操作需要加读锁阻止修改#!!!如果是写操作则对这个readTx加写锁#!!!比如commit的时候就要对readTx加写锁阻止后续读#!!!直到写操作完成#总结一下两种模式:#concurrent模式:加读锁-复制-解读锁-读,加写锁-写-解写锁,#因为锁住事件断,这样就可以读、写交替,但是有复制的开销#shared模式:加锁-读-读-读-读-解锁-写#无复制,但是这样就必须全部读事务都读完写事务才能写,#这样就会导致写阻塞的时间可能偏长firstRev, rev := s.compactMainRev, s.currentRev #读取共享变量 compactMainRev/currentRev#因为会有其他goroute并发修改,所以读之前需要加锁#currentRev就是当前的revisions.revMu.RUnlock() #释放revMu读锁return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace}) #创建事务对象txn = mvcc.NewReadOnlyTxnWrite(readTx) else: readTx=kvstore_txn.store.Read(mvcc.ConcurrentReadTxMode)#并发读事务,创建时会复制buffer#所以读取的时候不会加锁txn = mvcc.NewReadOnlyTxnWrite(readTx) #并发读#!!!stm事务在执行比较前会创建一个只读视图用来确保视图一致性#!!!即compare期间看到的数据是不变的#!!!虽然apply是串行apply的即写是串行的但是读与写却不是串行的而是并发的#!!!即可能同时进行读写操作,因为compare操作需要读数据库#!!!所有compare操作时必须通过一个只读视图来compare#!!!(个人猜测):换句话说bbolt只能读或者写不能同时读写因为会加读写锁return &txnReadWrite{txn} txnPath := make([]bool, 1)txnPath = apply.compareToPath(txn, rt) #!!!执行比较,stm事务的核心就是执行比较#!!!rt就代表一个事务if-then-else#!!!这里会递归遍历所有事务及子事务#!!!所有事务的比较结果都会顺序存放到txnPath数组里面#!!!也就是说compareToPath这个函数#!!!就是用来确定所有事务该走哪条path true还是false#!!!也就是说不管有多少个嵌套事务,最终的路径一定是确定的#!!!所以txnPath[i]是事务i,#!!!那么txnPath[i+1]必定是执行路径上的下一个事务的compare结果#!!!也就是说不管有多少个嵌套事务,最终的路径一定是确定的txnPath[0] = apply.applyCompares(rv, rt.Compare) #rt.Compare即if块,一个if可以包含很多个比较器for _, c := range rt.Compare: #c代表compare中的一个比较器,这里遍历所有比较器#我们的例子中含有两个比较器#compare1: key=k1,比较字段=ModRevision,result="="#compare2: key=k2,比较字段=ModRevision,result="="ok2=apply.applyCompare(rv, c): #这里以compare1为例metrics_txn.metricsTxnWrite.Range #读取比较器中的key for _, kv := range rr.KVs: #遍历compare1中的所有的key,我们这里只有一个key,即k1ok3=apply.compareKV(c, kv): #对该key应用比较器,这里是对key k1应用比较器compare1switch c.Target: #target表示要比较哪个字段,它支持很多字段#比如value/createVersion/ModVersion......#我们这里是用的ModVersion这个字段case pb.Compare_VALUE:tv=c.TargetUnion.(*pb.Compare_Value)v=tv.Valueresult = bytes.Compare(ckv.Value, v)case pb.Compare_CREATE:tv=c.TargetUnion.(*pb.Compare_CreateRevision)rev = tv.CreateRevisionresult = compareInt64(ckv.CreateRevision, rev)case pb.Compare_MOD: #我们用的是ModVersion这个字段tv= c.TargetUnion.(*pb.Compare_ModRevision) #TargetUnion是一个联合体,rev = tv.ModRevision #表示此时请求中的字段表示的含义是ModRevisionresult = compareInt64(ckv.ModRevision, rev) #执行比较,然后得到一个结果-1(小于),0(等于),1(大于)case pb.Compare_VERSION: #version是每个key独有的,tv= c.TargetUnion.(*pb.Compare_Version)rev = tv.Versionresult = compareInt64(ckv.Version, rev)case pb.Compare_LEASE:tv= c.TargetUnion.(*pb.Compare_Lease)rev = tv.Leaseresult = compareInt64(ckv.Lease, rev)switch c.Result: #Result表示我们选择的比较方式是什么#支持很多,比如=、!=、>、<等case pb.Compare_EQUAL: #我们这里compare1的k1选择的是=比较return result == 0 #所以我们返回result是否等于0#true表示if成功,那么稍后执行的就是then路径#false表示if失败,那么稍后执行的就行else路径case pb.Compare_NOT_EQUAL:return result != 0case pb.Compare_GREATER:return result > 0case pb.Compare_LESS:return result < 0if !ok3: #只要有任意一个key比较失败,就返回falsereturn false #就返回falsereturn trueif !ok2 #只要有任意一个比较器返回false即比较失败return false #那么就立即返回falsereturn trueops := rt.Success if !txnPath[0]: #如果比较失败就执行false路径,否则执行success路径ops = rt.Failure #success对应then,failure对应elsefor _, op := range ops { #选择好执行路径后就遍历该路径下的所有optv, ok := op.Request.(*pb.RequestOp_RequestTxn)if !ok || tv.RequestTxn == nil: #compareToPath只是为了确定事务及其子事务该走哪条pathcontinue #所以如果该op里不包含事务,就跳过txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...) #递归调用compareToPath来确定所有事务的路径if isWrite:apply.checkRequests(txn, rt, txnPath, a.checkPut) #检查写请求正确性apply.checkRequests(txn, rt, txnPath, a.checkRange) #检查range请求正确性if isWrite: metrics_txn.metricsTxnWrite.End #结束我们为了compare而创建的只读事务kvstore_txn.storeTxnRead.Endread_tx.readTx.RUnlock #释放backend对应的事务的读锁#一个backend对应一个readTxstore.mu.RUnlock #释放store.mu的读锁txn = watchable_store_txn.watchableStore.Write #compare完毕,可以写了,所以这里创建写事务txn=kvstore_txn.store.Write s.mu.RLock() #store代表kvstore,写事务会先加读锁#等所有读事务完成后释放读锁并加写锁 tx := s.b.BatchTx() #获取底层的batchtx#底层数据结构:store含有一个backend#backend内部有一个batchTxBuffered#batchTxBuffered内嵌一个batchTx#所以调用batchTxBuffered.LockInsideApply#实际就是调用内嵌的batchTx的LockInsideApply#!!!batchTx对应批量提交,内部含有一个bbolt.tx对象#!!!所有事务都是直接写这个bblot.tx#!!!写完以后这个bbolt.tx不会提交#!!!对于上层来说事务只要写到bbolt.tx就会返回#!!!上层会等到bbolt.tx提交以后才会返回结果给client#!!!也就是异步+batch#!!!这个bbolt.tx可以写很多次都不提交#!!!对于上层来说,这个bbolt.tx里包含了一批事务#!!!但是对于底层bbolt来说自始至终都是一个事务#!!!即上层看到的批量事务在底层实际就是一个事务#!!!所以只需要提交一个事务就行,即batchTx.bbolt.txtx.LockInsideApply() #batchtx加写锁,tw := &storeTxnWrite{storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},tx: tx,beginRev: s.currentRev, #!!!beginRev记录的是当前最后一次操作后的revision#!!!执行事务里面的操作比如put的时候#!!!用的revision是beginRev+1changes: make([]mvccpb.KeyValue, 0, 4),return newMetricsTxnWrite(tw)return &watchableStoreTxnWrite{s.store.Write(trace), s}apply.applierV3backend.applyTxn #执行事务reqs := rt.Success #这里判断是走success路径if !txnPath[0]:reqs = rt.Failure #还是走failure路径#!!!stm中不存在事务冲突#!!!stm中只有path #!!!如果没有发生冲突即比较成功#!!!那么就走success路径#!!!如果发生冲突,那就走failure路径#!!!即不管冲突还是没有冲突,都有path可走#!!!当然,如果走success路径,#!!!但我们没有设置对应的语句,#!!!那么就相当于一个空操作,什么也不做for i, req := range reqs: #遍历该path中的所有opswitch tv := req.Request.(type): #该op有很多类型case *pb.RequestOp_RequestRange: #如果是range读取操作,那么就执行range读取操作apply.applierV3backend.Range(txn, tv.RequestRange)case *pb.RequestOp_RequestPut: #如果是put操作,那么就执行put操作,这里以put为例apply.applierV3backend.Put(txn, tv.RequestPut) #!!!那么就直接执行普通的put流程#!!!即使是stm事务,本质上也算一个操作#!!!也就是说每个stm事务都对应一个版本号#!!!也就是说客户端一次stm事务操作和客户端一个put操作#!!!本质上没有任何区别,都是一个修改数据库的操作#!!!都是先写一条日志,然后apply执行日志完成修改#!!!注意,这里的一次stm事务是指一次完整的事务操作#!!!stm事务以及他嵌套的所有子事务用的是同一个版本号#!!!换句话说,不管是子事务还是put等操作#!!!同一个事务中的所有操作都用同一个版本号case *pb.RequestOp_RequestDeleteRange: #如果是deleteRange操作,那么就执行deleteRange操作apply.applierV3backend.DeleteRange(txn, tv.RequestDeleteRange) case *pb.RequestOp_RequestTxn: #如果是子事务,就递归执行applyTxn#!!!子事务也是使用txn这个事务对象#!!!也就是说子事务的版本号和父事务是同一个apply.applierV3backend.applyTxn(txn, tv.RequestTxn, txnPath[1:], resp)txns += applyTxns + 1txnPath = txnPath[applyTxns+1:] #txnpath保存了执行路径上所有事物的path#所以txnPath[applyTxns+1]就表示下一个事务的path索引default: #如果是非预期的操作类型,则啥也不干no-oprev := txn.Rev() #获取txn开始时最新的revisionif len(txn.Changes()) != 0: #每次修改操作都会改变revision,读操作不会rev++watchable_store_txn.watchableStoreTxnWrite.End #结束事务,主要是释放锁和提交事务tw.s.mu.Lock() #!!!我TM真找不到在哪释放的读锁,难道不是同一个store?#!!!破案了:tw.s代表watchable,tw.s.mu是另一把锁#!!!tw.s.store才表示kvstore,tw.s.store.mu才是数据库锁#!!!kvstore是一个watchablestore对象,就表示会有watch#!!!也就是说这边在end的时候那边可能会有一个watch在不断处理#!!!(不知道compaction会不会也需要访问这把锁)#!!!所以就需要临时锁一下kvstore,注意,这是watchablestore.mu#!!!etcdserver是一条一条apply日志的#!!!也就是说apply是串行执行的#!!!执行一次写事务就执行一次提交操作#!!!但是并不是每次提交都会真的提交 #!!!而是每次提交都会把事务放到batchTx buf#!!!当达到一定条数后才会执行commit操作,#!!!由BatchLimit参数指定tw.s.notify(rev, evs) #通知watch相关metrics_txn.metricsTxnWrite.End #更新一些metrics后继续调用内部事务的endkvstore_txn.storeTxnWrite.Endtw.s.revMu.Lock() tw.s.currentRev++ #更新server.currentRev,#因为一次writeTxn代表一次修改操作 batch_tx.batchTxBuffered.Unlock() #解锁batchtx#注意:batchTxBufered加锁是直接调用内部batchTx.lock#但是解锁的时候不能只调用batchTx.Unlock#因为batchTxBufered还需要做其他事,所以得用自己的Unlock函数#batchTxBufered.unlock做三件事:1:把数据从writebuf写入readBuf#2:如果未提交事务数达到batchLimit就提交事务#3:做完上面两条后才能释放batchTx.lock#所以加锁用batchTx.lock即可#但是解锁则batchTxBufered需实现自己的Unlock函数if t.pending != 0:t.backend.readTx.Lock() #batchTxn的所有读请求都会对这个readTx加读锁,#会等到所有读请求完成并释放锁后才对buf加写锁 txWriteBuffer.writeback #把writebuf中的内容写到readbufft.backend.readTx.Unlock() #释放buf锁if t.pending >= t.backend.batchLimit ||t.pendingDeleteOperations > 0 #batchTxn表示批量事务,如果pending的事务数超过了阈值#那么就提交,默认是1w或者挂起的delete操作数大于0也会提交{t.commit(false) #提交事务t.backend.readTx.Lock() #要提交事务必须先阻止所有读batch_tx.batchTxBuffered.unsafeCommit #提交事务backend.hooks.OnPreCommitUnsafe(t) #执行hookif t.backend.readTx.tx != nil: #tx是bbolt.tx #如果bbolt.readTx不为null#则需要等待这些读事务完成才能commit go func(tx *bolt.Tx, wg *sync.WaitGroup):wg.Wait()bbolt.tx.Rollback()t.backend.readTx.reset()batch_tx.batchTx.commitbbolt.tx.Commit() #bbolt的事务提交sdkt.pending = 0t.pendingDeleteOperations = 0t.backend.readTx.Unlock() #提交完毕,释放锁}t.batchTx.Unlock() #解锁batchtxtw.s.revMu.Unlock()tw.s.mu.RUnlock() #???释放读锁。我TM真找不到在哪加的读锁#下班 20241202 18:35#我靠,根本不是同一把锁,watchableStore有一把mu锁#watchablestore.store也有一把叫做mu的锁#这里是释放store上的读锁,#我们在创建txnWrite的时候会加读锁,在End里释放读锁#!!!store.mu是一把读写锁,因为采用的是mvcc。#!!!任何put/txn/delete等操作对于store来说#!!!都是一个读操作即只会追加数据不会修改旧数据#!!!所以此时加读锁就行了#!!!但是compact会压缩数据也就是修改旧数据#!!!所以此时就必须对store.mu加读锁#!!!捋一下:store.mu是保护compact和mvcc操作#!!!而watchablestore.mu则是保护watch与notify 操作#!!!所以txnwrite的时候只需要对store加读锁就行了#!!!不影响watch线程tw.s.mu.Unlock() #这里是watchablestore.mu.unlock#此后watch goroute就可以继续访问kvstore了txnResp.Header.Revision = rev #返回给客户端的revision
相关文章链接:
STM客户端api相关文章
https://juejin.cn/post/7134326187064557575
https://juejin.cn/post/7134326187064557575