redis源码系列--(二)--eventlooop+set流程
源码流程放在本文后面
set命令是用来设置string类型的,如果是使用hash、list、zset则是用其他的命令
redis中键值和ttl是分开存放的,所以更新键值和更新ttl是两个操作
redis中ttl是相对时间,master设置ttl为x,然后2s后复制给slave,slave上的ttl也是x,所以说这样可能会导致一个key存活时间超过x
set一个值时redis是异步复制,也就是不等待slave节点写成功就返回写成功给客户端
问:set时应该会保存set命令到磁盘或者内存,用于持久化或主从同步,源码中哪里这样做了。。还没找到
redis持久化方式有rdb和aof,rdb方式不会一条一条记录操作日志
在call函数里的alsoPropagate函数中准备好aof所需的日志,然后把数据放到server.also_propagate数组中,然后在call函数后半段的afterCommand中会把准备好的aof日志放到server.aof_buf缓冲区中,然后在beforeSleep中会把aof_buf中的内容写入磁盘aof文件,命令处理函数(这里是setCommand)是无返回值的,因为不管成功与否,都需要写日志,因为重点是要记录这个操作,如果执行失败,那么重放时这条命令也同样会因为条件不满足而执行失败,所以不管成功与否都需要写入日志。(暂不确定是否正确)
在 Redis 中,如果多个客户端同时尝试设置同一个键(例如使用 SET 命令),则所有操作都会成功,因为 Redis 是一个单线程的服务器,它会顺序处理所有请求。因此,最后一个操作将覆盖之前的设置。不过,如果使用了 SETNX(设置只在键不存在时成功)或类似的条件设置(如使用 WATCH、MULTI 和 EXEC 实现的乐观锁),那么只有第一个成功设置的客户端会成功,其他尝试会失败。这种情况下,后续客户端在执行设置时,会发现键已经存在,从而返回失败。
redis是单线程处理所有客户端吗?
redis是单线程处理所有客户端,redis多线程是指网络io、异步复制这一块可以使多线程,但是处理客户端请求这一块还是单线程。
accept描述符和client描述符都存放在同一个epoll fd中,redis用一个叫做aeApiState的结构体包装了这个epoll fd
一共三个描述符,epoll fd,epoll fd里面存放了accept fd和client fd
!!!redis用一个单线程即aemain线程来处理所有描述符。即redis通过一个线程从epoll fd获取有事件发生的fd,然后在同一个线程中处理该fd的相关读写事件。如果是accept事件,那么就在同一个线程中处理accept事件,如果是client fd上的读写事件,那么仍然是在同一个线程中处理,不过!!!!!client fd上的读写事件可以委托给其他线程,即网络io可以是多线程,具体代码流程就是在处理client fd上的读写时,如果检测到开启了thread io,那么就把本次读写操作添加到一个pending_read/write_list中,然后本次操作就返回了,专门的io线程会异步从这个list中取出节点,然后执行read/write,处理完后在通知对应的模块。笔记:accept没有多线程选项,直接在同一个线程中accept,但是client fd上的读写操作是可选的,如果没有开启therad io,那么就直接在同一个线程中处理网络io
eventloop中在每次进入循环时都会调用beforeSleep回调(因为processevents通常都会挂起一定时间等待事件发生)
redis的beforeSleep: flush AOF文件
redis8.0的epoll是采用水平触发
redis中很多函数会在两种情况下调用,redis往往是通过判断参数或者变量的值来确定当前是位于哪种环境,从而执行不同的操作,比如如果是在handleClientsWithPendingWritesUsingThreads中调用writeToClient,那么就不用清除epollfd中该client fd的write事件,因为根本没有,如果是在SendReplyToClient中调用,就必须清除,redis的writeToClient通过判断参数installed的值来判断,如果installed==0表示位于handleClientsWithPendingWritesUsingThreads中,如果installed ==1就表示位于sendReplyToclient中
aof是先执行命令,后写aof日志,如果命令执行完毕但是aof刷写失败,那么就会导致这个命令丢失,
在某些情况下redis会阻塞某些client,比如一个client watch某个key,直到有事件发生,那么这个client就会被阻塞
!!!个人笔记(半猜测):redis是在一个单线程中顺序执行所有client,一次只会执行一个client,对于一个multi/exec事务命令组,因为redis是串行执行,所以一旦轮到该命令组执行了,就说明该命令组获得了执行权限,因为redis是单线程,串行的,一次只会有一个client在执行,所以就是说在这个命令组执行期间,不会有任何其他client的命令乱入,从而保证该命令组只要执行就会原子的执行完毕。再补充一点,mutil命令后的所有命令都会放到一个队列中,当执行exec的时候,在exec命令处理函数中,我们可以把整个队列会被当做一个命令执行
!!!个人笔记(半猜测):redis的持久化只能降低重启恢复的过程,但不能保证恢复到100%。举个例子,用redis作为mysql的缓存,client先访问redis再去读mysql,如果在redis中命中了,那么client就直接返回,否则就去mysql读取,假设redis中缓存了1w个key,如果没有持久化,那么redis断电重启的时候,redis就需要冷启动这1w个key即client访问redis会有1w次不命中,这样对于client来说这1w次访问就必须去读mysql,就会很慢,如果有持久化,假设持久化只保存了9900条命令,还有100条丢了,那么重启后redis就能根据持久化文件快速恢复9900个key,这样client就只有100个key不会命中,其余9900个key都能命中,就是说redis崩溃再重启,通过持久化文件可以达到快速热机的目的,所以redis的持久化主要目的就是一个快速恢复的作用,所以要用redis,就只能默认redis是不可靠的,所以redis只能在缓存等对数据安全要求很低的场景中使用,作为数据库特别是金融等对可靠性要求非常高的场景是万万不行的。补充一点:如果在redis崩溃以及重启过程中,client还是在照样读写数据库,那么这样就会导致redis通过持久化文件恢复的数据和mysql中的不一样,最简单粗暴也是最不好的就是禁止client读写操作,直到redis重启,这会导致较长时间服务不可用,说不定p1起步,还有一种解决办法是,在redis崩溃期间记录client执行的写操作,然后redis重启通过持久化文件恢复后,阻塞client一会,禁止client读redis但允许client可读mysql,禁止写redis、禁止写mysql,然后redis根据记录崩溃期间的写命令来更新redis的内存映像,更新完毕后就可以开放访问了,这种方式会导致一段时间内client认为服务不可用,不好,还有一种是一边读一边写mysql,一边同时记录写命令,一边同时按照记录的写命令更新redis,直到redis赶上client的步伐再开放redis,达到同步,但这样可能很复杂,还有一种最简单的,直接丢弃redis所有数据,直接冷启动redis,同时对该服务降级、限流,通过这样来一点一点来让redis热机,等到一定时间后,redis热机完毕后就可以重新完全打开该服务了。
!!!个人笔记(半猜测):beforeSleep中,redis是先刷盘aof后发送响应给client,如果配置的不是每执行一条就刷盘一次,那么就会产生这种情况:发送成功给client,但是aof却还在等待,还没有刷盘,如果此时崩溃了,那么数据就肯定丢失了,即client认为成功了,但实际是失败的。不过如果配置执行一条刷盘一次,然后刷盘成功后再发送响应给客户端,这种情况下redis可靠性会提高但不是100%,因为刷盘调用底层linux的fsync,但是fsync不一定完全可靠,因为redis命令的执行是纯内存操作,所以命令执行对可靠性的影响可以忽略,只需要考虑aof日志刷盘和发送响应的先后顺序就行,而对于数据库等,就必须先刷日志再执行命令。redis所有数据都放在内存,是纯内存操作,假设突然崩溃,那么丢了就丢了,对redis数据状态无影响,就是说redis数据全放在内存,如果没有崩溃,那么内存里的数据就全部都是正确的,如果崩溃,那就全丢了,所以redis数据是要么全部丢了,要么完好无缺,不会一部分对一部分错,但是数据库就不一样了,内存放不下所有数据,执行过程中会读写磁盘,改变磁盘数据,所以如果在修改磁盘数据的过程中突然崩溃,那么就会导致部分数据对部分数据错,所以这种情况下就必须同时考虑日志刷盘、命令执行、发送响应这三者的执行顺序。先刷日志再执行,那么命令百分百不会丢,重启重做就是了,如果是先执行后刷日志,那么没有执行完毕的命令就会丢失,即磁盘上只有执行完毕的命令。
!!!个人笔记:fsync不是百分百可靠的。有些系统会立即刷新到磁盘不过即使刷新到磁盘,也可能只是提交到磁盘的硬件缓存,但是磁盘可能不会立即落盘,所以这里就导致了不可靠,也可能直接写到磁盘,这就100%可靠了;有些系统是asap(as soon as possiable),调用fsync甚至也不会立即把内存刷新到磁盘缓存,这就更加不可靠了。linux fsync写硬件有两种,一种是write back,写到磁盘缓存就返回了,一种是write through,会透过缓存直接写到磁盘,这就100%可靠了。这个write back/write through是磁盘硬件提供的选项,因为数据丢给磁盘硬件后,数据的控制权就转移到了磁盘硬件,什么时候真正落盘则是由该硬件控制,所以一般的硬件都会提供write back/write through这两个选项,所以金融系统中的数据库,一定要开启write through来保证fsync时,数据100%落盘,如果redis开启fsync=always,并且系统的fsync是write-through,即执行一条写一条,而且redis是先fsync然后再发送响应,此时可以认为redis不会丢失任何一条命令,注意,是redis保证不会漏掉命令,但是数据是否有错取决于redis事务,因为redis事务是不可靠的,一条命令失败了,事务的其他命令依旧会执行,不过对于redis来说,这种事务导致的不可靠是redis的正常状态,其产生的数据也算是正常数据,这和磁盘断电丢失数据导致的数据错误是根本不同的
!!!个人笔记(半猜测):如果redis开启fsync=always,并且系统的fsync是write-through,并且所有client都只执行单条命令而不会执行multi/exec(即在redis单线程顺序执行的情况下client的每个操作都是原子的,即使执行失败也会算执行成功),那么这种情况下redis就是可靠的,不会丢失任何数据。就是说redis的不可靠除了磁盘刷写导致的不可靠之外,还有一个非常重要的原因就是redis的事务不可靠,redis的事务不可靠就导致redis彻底无缘数据库
!!!个人笔记:redis用的是fdatasync而不是fsync,fsync写数据还会写元数据,但是fdatasync只会写数据,所以快一点,但两车是一样的,都会把数据刷新到磁盘
aof重写:就是一个压缩过程,合并一些语句,从而达到压缩aof文件的目的,比如对同一个key的多次set操作只保留最后一个
aof目录下的文件分为两块:
1:manifest文件,包含了aof文件的相关信息
2:rdb文件和aof文件。rdb是快照,aof是快照之后的日志
manifest文件举例:
file appendonly.aof.1.base.rdb seq 1 type b
file appendonly.aof.1.incr.aof seq 1 type i
解释:type b表示这是base文件,seq 1 type b表示appendonly.aof.1.base.rdb这个文件是base文件的第一个文件,就是说base文件可以有多个,通过seq来区分type i表示这是aof增量日志文件,seq 1 type i表示appendonly.aof.1.incr.aof这个文件是aof文件的第一个文件,就是说aof 日志文件也可以有多个,seq从小到大aof文件举例:
*2
$6
SELECT
$1
0
*3
$3
set
$7
keyname
$8
valuedcf
解释:
第一个命令:
*2:表示有三个部分组成的命令。
$6:表示接下来字符串的长度为6个字节。
SELECT:命令名称,选择数据库。
$1:表示接下来字符串的长度为1个字节。
0:选择数据库0。
第二个命令:
*3:表示有三个部分组成的命令。
$3:表示接下来字符串的长度为3个字节。
set:命令名称,用于设置一个键值对。
$7:表示接下来字符串的长度为7个字节。
keyname:要设置的键的名称。
$8:表示接下来字符串的长度为8个字节。
valuedcf:要设置的值。rbd是二进制文件
a.b这表示是a文件里的b函数a可能是xx.h也可能是xx.c,省略了后缀名
a.b.c表示a文件里的b结构体的c方法,因为c语言里面一个结构体可以包含函数指针
!!!redis单线程+多线程io
1-简单版:redis主线程eventLoop
ae.aeMain(server.el)while(!el.stop): #就一个死循环,不断处理事件,如果没有事件就阻塞ae.aeProcessEvents #每次循环就是执行一遍ae.aeProcessEventsae.aeEventLoop.beforesleep1:处理上一轮的io-thread即多线程读取2:刷写aof日志到磁盘3:发送响应给客户端ae_epoll.aeApiPoll 1:通过epoll_wait,获取epoll fd中所有发生了事件的描述符for 每一个fd:1:如果是listen fd上的accept事件,调用socket.connSocketAcceptHandler进行处理2:如果是client fd上的read事件, 调用socket.connSocketEventHandler进行处理3:如果是client fd上的write事件, 调用socket.connSocketEventHandler进行处理ae.processTimeEvents 1:处理各种定时器事件,比如过期删除、比如检查bgsave是否完成
1-详细版:redis主线程eventLoop
!!!!虽然叫做ae,但是整个事件循环都是在同一个线程,即对accept事件、fd read和write事件都在同一个线程中处理
server.mainserver.el=ae.aeCreateEventLoop #创建事件循环ae.aeMain(server.el)while(!el.stop): #就一个死循环,不断处理事件,如果没有事件就阻塞ae.aeProcessEventsae.aeEventLoop.beforesleep #这是一个叫做beforesleep的函数指针,做了很多事,这里列举3件: #执行多线程读取、把aof_buf中的数据写入磁盘、发送响应给client1:处理上一轮的io-thread即多线程读取networking.handleClientsWithPendingReadsUsingThreads #处理thread io:我们在上一轮注意是上一轮循环中处理client对应的fd上的read事件时,#如果开启了thread-io后,就会把读取任务丢到server.clients_pending_read链表中#这里就是把clients_pending_read链表中的任务通过轮转调度丢给io线程去并发读取if (!server.io_threads_active || !server.io_threads_do_reads):return 0 #注意:如果没有开启thread io,那么这里不做任何事,#网络读取操作会在下面的FileEventHandler中处理adlist.listRewind(server.clients_pending_read,&li) #获取clients_pending_read队列item_id=0 #!!!主线程也算作一个io-thread,target_id=0就是分配给主线程#不过不用担心,redis中fd的read/write肯定是设置为非阻塞的,即使主线程io也不会阻塞住while(next!=null):int target_id = item_id % server.io_threads_num; #轮转调度listAddNodeTail(io_threads_list[target_id],c); #把读取任务丢到id为[target_id]的io线程的pending队列item_id++; #切换到下一个io线程io_threads_op = IO_THREADS_OP_READ; #标记io线程正在执行readfor (int j = 1; j < server.io_threads_num; j++) { #设置信号量,即分配给每个io线程的任务数#注意,是从1开始,即除主线程以外的所有线程,因为我们是在主线程里面等待其他io thread,#而主线程是我们自己手动for循环来执行分配的任务,所以无需设置信号量来同步int count = listLength(io_threads_list[j]);setIOPendingCount(j, count); #设置信号量}listRewind(io_threads_list[0],&li); #thead_id=0表示主线程,主线程也算一个io线程,也会分配io任务while((ln = listNext(&li))) { #这里就是执行分配给主线程的io任务client *c = listNodeValue(ln);readQueryFromClient(c->conn); #执行读取任务,这个函数在 1:client对应的fd上的read事件中详解是否延迟处理=networking.postponeClientRead #如果检测到开启了thread io并且io线程处于IDEL空闲状态,#那么就会把本次请求挂到pending_read_list,#然后本次读取操作就直接返回了,当io线程完成读取后会通知响应的模块if threadIo==true&&io_threads_op == IO_THREADS_OP_IDLE: #!!此处io_threads_op必定不为IDEL,所以不会延迟处理,而是立即处理adlist.listAddNodeHead(server.clients_pending_read,c) #丢到pending_read_list,下一轮处理if !延迟处理: #即立即处理connection.connRead #读取数据......unisted.read #调用linux read从fd读取数据networking.processInputBuffer #解析并执行命令......server.processCommand ......networking.addReplyBulk #将响应数据放到buf中,准备发给client......memcopy(client->buf,xx) #把数据放到client对象的buf字段中#!!!也就是说redis会在进入本轮循环的eventLoop之前即在beforeSleep函数中#处理完上一轮的所有剩余操作:#通过thread io执行读取->解析命令->执行命令、刷aof日志到磁盘、write响应给客户端}listEmpty(io_threads_list[0]); #清空主线程的pending队列while(1) { #!!!等待所有io线程完成read操作#!!!也就是说redis本质上还是单线程,还是同步的,#只不过原本的单线程的同步读变成了多线程的并发同步读#!!!也就是说即使开启了thread io,redis本质上还是同步的,#!!!即必须等待本次io操作全部完成才会继续往下读#!!!只不过开启了thread io后,本次io操作可以多线程同时进行io#!!!这里是read操作,write操作也是一样的流程unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++) #除主线程以外的所有线程,因为我们是在主线程里面等待其他io thread,#而主线程是我们自己手动for循环来执行分配的任务,所以无需设置信号量来同步pending += getIOPendingCount(j); #直到所有信号量都为0,即读取成功就会把信号量-1if (pending == 0) break;}io_threads_op = IO_THREADS_OP_IDLE; #重置io_threads_op,因为readQueryFromClient函数会在不同的地方调用,#readQueryFromClient这个函数会根据io_threads_op来判断#是否要把本次请求丢到pending_read_list队列2:写aof日志到磁盘 #!!!在第x轮中执行命令并把命令记录到aof_buf中,#!!!然后第x+1的beforeSleep中把第x轮的aof_buf中的内容写入磁盘,#!!!写完以后再开始第x+1轮的eventLoopaof.flushAppendOnlyFile #把aof_buf中的数据写入磁盘aof文件isInProgess=aofFsyncInProgress() #如果fsync==everysec,那么就是另一个线程处理fsync,#这里就是检测该io线程是否正处于fsync状态bio.bioPendingJobsOfType(BIO_AOF_FSYNC) #BIO_AOF_FSYNC就是表明我们要找的是处理AOF flush的线程return bio_jobs_counter[type]>0 #如果该io线程的任务队列不为0,就说明正处于fsync状态if fsync==everysec && isInProgeress: #如果fsync==everysec即每秒flush一次,#但是上一次flush因为某种原因卡住了还没完成if 距离上次flush完成的时间<2s: #如果他一直在flushing状态,而且距离上次成功flush的时间小于两秒,#那么还可以忍受,则跳过本次flushreturnelse: #如果超过2s,就报一个常见错误:即fsync时间过长。#就是说flush卡住了,可能需要排查一下磁盘或者系统问题return error("Asynchronous AOF fsync is taking too long (disk is busy?)")if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess()): #如果redis有正在活跃的子进程,注意,是子进程,不是子线程#(比如rdb/aof_rewrite等)且配置了rewrite期间跳过flush,那么就跳过本次flushreturn aof.aofwrite unistd.write #!!!调用linux.write,不会立即刷新到磁盘#他的逻辑是这样的:先write,然后看是fsync==always#还是everysec来判断是立即flush还是创一个任务丢给io线程去异步flushif fsync==always: #如果conf中配置的是写一次刷新一次,那么就调用fdatasync刷新到磁盘redis_fsync(aof_fd) #redis_fsync实际是一个宏,实际是fdatasyncfdatasync(aof_fd) #如果配置执行一次就刷盘一次,那么这里强制刷新到磁盘#!!!如果fsync==always,那么就是主线程进行同步io,如果io一直没完成,那么主线程就一直阻塞#!!!就是说在发送响应之前,可以确保fsync已经调用,#即确保日志已经刷盘(不一定刷盘,取决于linux.fsync是writeback还是writethrough)else fsync==everysec: #如果配置每秒刷新一次,就创一个任务丢到其他线程去处理aof.aof_background_fsync #创建一个fsync任务,然后丢到其他线程去处理bio.bioCreateFsyncJob bio.bioSubmitJob(BIO_AOF_FSYNC) #redis创建了一组线程用于不同io任务,用一个数组存储所有线程,#BIO_AOF_FSYNC是本次io任务的类别worker = bio_job_to_worker[type] #一种io任务只分配一个线程,这里就是根据type获取对应的io线程在io线程组中的索引pthread_mutex_lock(&bio_mutex[worker]); #一个线程用一个mutex保护,这里锁住该线程,因为要往他的任务队列丢任务了listAddNodeTail(bio_jobs[worker],job); #把io任务添加到对应io线程的io队列bio_jobs_counter[type]++; #任务数+1pthread_cond_signal(&bio_newjob_cond[worker]); #唤醒对应的io的线程,新任务已经到来pthread_mutex_unlock(&bio_mutex[worker]); #释放锁。笔记:linux条件通知这一块不太记得了,所以这里没有深究了#!!!如果fsync==everysec,那么就是由io线程去进行异步io#!!!也就是不能确保在返回响应给client时日志已经刷盘,因为是其他线程在处理3:处理pendingwrite #!!!这里的数据指的是发给slave的保存在repl_backlog中的增量同步的数据(详情见同步流程)#!!!1:同read一样,如果开启了threadio那么就是其他线程去处理io#!!!即配置文件中io-thread配置项的值大于1,否则就是主线程进行write#笔记:主线程也算一个io-thread,其id=0#!!!2:如果是主线程io,那么只有一次发送不完才会给clientfd#!!!设置write_handler以及注册AE_WRITEABLE事件#意思就是写数据(比如一次请求的响应)常见做法是给client fd在epoll fd中注册写事件,#然后fd可写时就会触发写事件,#然后主线程再调用对应的write_handler来处理写事件即发送响应给客户端,#redis做了优化,直接在beforeSleep把响应发送给client#如果不能一次发送完毕,那么beforeSleep函数才会往epoll中为client fd#设置writeHandler并注册AE_WRITEABLE事件#这样当该fd可写的时候,就会触发write事件,这样就会调用对应的write_handler来进行处理了,#如果是thread-io,那么就直接由异步io线程处理,就不会用到AE_WRITEABLE事件了,#所以thread-io模式下client fd只会触发read事件,而不会触发write事件,#因为write操作在beforeSleep中就丢给io thread去处理了#个人笔记:我猜测之所以在beforeSleep中处理write,#大概是可以节省一次epoll_ctl system call吧#笔记:主线程也算一个io-thread,也会被分配任务networking.handleClientsWithPendingWritesUsingThreadstag=networking.stopThreadedIOIfNeeded: #检测是否需要关闭thread io,#如果是,那么即使开启thread-io,此处也会被关闭if (len(server.clients_pending_write) < (server.io_threads_num*2)) : #如果write任务数<线程数*2即如果平均每个io线程分到的任务数小于2#那么就关闭threadIO,就全部由主线程来执行ionetworking.stopThreadIO: if (server.io_threads_num == 1||tag): #如果io-thread==1或者关闭了io-thread,#那么就是直接在主线程中进行write即发送响应给clientnetworking.handleClientsWithPendingWrites #在主线程中发送响应给客户端,这是一个同步操作即只有发送完毕,主线程才能继续往下执行networking.writeToClientnetworking._writeToClientconnection.connWritesocket.connSocketWrite #conn->type->write,即CT_Socket里面的write函数指针指向的实际是#socket.connSocketWriteunistd.write(client->buf) #调用linux的write函数把client_buf中的数据写到网络即发给client#!!所以beforeSleep先处理read后处理write,#因为read操作包括读取、解析、执行、把响应数据写到client->buf#这样beforeSleep执行write时就可以把本次read操作产生的响应也一并发送if networking.clientHasPendingReplies(client) #如果本次没有写完,那么就为该client fd在epoll中注册AE_WRITEABLE事件#然后再在socket.connSocketEventHandler中发送给clientnetworking.installClientWriteHandlerconnection.connSetWriteHandlerWithBarrier socket.connSocketSetWriteHandler #1:设置redis层面的write_handle;#2:向epoll fd注册client fd上的write事件以及设置对应的wfile_proc处理函数conn->write_handler=networking.sendReplyToClient #1:设置write_handler,send res to client,#注意,这里只是设置write_handler,并没有执行write_handler#write_handler是在client fd的write事件发生时#socket.connSocketEventHandler会#根据事件类型来调用fd的write_handler或者read_handler,#如果是listen fd上的accept事件,则还有个conn_handler,#即处理accept事件networking.writeToClient(c,handler_installed=1)#参数1表示write_handler installed,#就表明运行此处时epoll fd中含有EPOLLOUT事件networking._writeToClient #执行发送任务if !clientHasPendingReplies(c)&&handler_installed==1: #如果没有数据需要发送了且设置了write_handler,#那么我们就必须在epoll fd中清除该client fd的EPOLLOUT事件networking.connSetWriteHandler(conn,NULL)socket.connSocketSetWriteHandler(func=NULL)conn->write_handler=NULLif func==NULL:ae.aeDeleteFileEvent(AE_WRITABLE) #删除该描述符的write事件ae.aeApiDelEvent(AE_WRITABLE)epoll_ctl(EPOLL_CTL_MOD,EPOLLIN) #不注册EPOLLOUT事件就相当于删除了EPOLLOUT事件#EPOLLIN是每个client fd都会注册的,而EPOLLOUT只会在特定情况下注册ae.aeCreateFileEvent(AE_WRITEABLE,ae_handler) #2:向epoll fd注册client fd AE_WRITEABLE事件#即表明此client fd对write事件感兴趣#wfile_proc=ae_handler=socket.connSocketEventHandler#代码逻辑就是设置ae_handler为socket.connSocketEventHandler,#然后又设置wfile_proc=ae_handler,即ae_handler和wfile_proc都是函数指针#这里的逻辑是:#事件发生->调用wfile_proc函数->wfile_proc函数调用我们设置的write_handler# ->write_handler调用linux的write把数据写到网络fe->wfile_proc=ae_handler=connSocketEventHandler ae.aeApiAddEventepoll.epoll_ctl(EPOLLOUT) #在epoll fd中为该client fd注册write事件即如果fd可写就触发#!!!代码里是水平触发,也就是说没有设置EPOLLETelse: #如果io-thread>1,那么就是thread-io即多线程io。#流程和read一样,放到io线程的pending_write队列,由其他io线程去从中取数据进行io#io线程有多个,所有使用轮转调度,保证公平startThreadedIO #因为thread-io有时会根据需要关闭,所以这里重新开启adlist.listRewind(server.clients_pending_write,&li) #取出所有写的数据,每个节点代表一个需要写的数据item_id=0 #!!!主线程也算作一个io-thread,target_id=0就是分配给主线程while(next!=null)int target_id = item_id % server.io_threads_num #io线程有多个,使用轮转调度adlist.listAddNodeTail(io_threads_list[target_id],c); #把数据添加到该io线程的pending队列item_id++; #下一个写任务就是丢给下一个io线程了,保证公平io_threads_op = IO_THREADS_OP_WRITE; #标记io任务正在执行写操作for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);setIOPendingCount(j, count); #设置信号量,即分配给每个io线程的任务数}adlist.listRewind(io_threads_list[0],&li); #执行分配给主线程的write任务while((ln = listNext(&li))) {client *c = listNodeValue(ln);writeToClient(c,0); #执行网络io,发送响应给client#writeToClient会在handleClientsWithPendingWritesUsingThreads和sendReplyToClient中调用#后者是client fd上write事件中设置的write_handler,参数0表示没有设置write_handler#即是在handleClientsWithPendingWritesUsingThreads中调用,#而参数为1则表示是在write事件的sendReplyToClient中调用,#此时发送完毕后需要在epoll fd中清除该client fd的EPOLLOUT事件,否则就会一直触发}while(1) { #等待所有io线程完成本次分配的所有io任务unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += getIOPendingCount(j); #当其他的io线程每执行完一个io任务,就会对count进行减一处理if (pending == 0) break;}io_threads_op = IO_THREADS_OP_IDLE; #标记io线程是空闲的ae_epoll.aeApiPoll #通过epoll获取发生了事件的描述符epoll.epoll_wait #通过linux epoll_wait获取事件,并填充到eventLoop->fired数组中ae.aeEventLoop.aftersleeop #这是一个叫做aftersleep的函数指针for: #!!!遍历处理所有有事件发生的描述符#!!!redis是串行执行的,首先是一轮又一轮的迭代,然后每轮迭代中可能有很多个fd有事件发生#!!!然后这些fd又是一个for循环顺序处理#!!!所以说redis最核心的eventLoop部分本质上是一个单线程模型,即一个线程处理所有客户端事件int invert = fe->mask & AE_BARRIER #默认是先处理读后处理写,如果设置了barrier即屏障,则先处理写后处理读。屏障的作用暂不了解#一个描述符可以同时存在读写事件,比如客户端发来请求,同时server要发数据给该客户端,#这时这个描述符就是既可读又可写,这个可读可写是对于server来说的,这里我就假设先处理读后处理写ae.aeFileEvent.rfileProc #rfileProc是一个函数指针即handler,处理读事件包括accept事件、client发来的命令导致的可读事件 #不同的事件对应不同的rfileProce#accept事件: rfileProc=socket.connSocketAcceptHandler#client fd read/write:rfileProc=socket.connSocketEventHandler0:accept对应的fd上的accept事件socket.connSocketAcceptHandlerfd=anet.anetTcpAccept #accept客户端连接,并获取client对应的fdanet.anetGenericAcceptif HAVE_ACCEPT4:client_fd=socket.accept4(SOCK_NONBLOCK) #接受该连接,并同时设置该client fd为非阻塞,要高效,就只能是非阻塞,不能是阻塞else: #accept4可以在接受连接的同时设置套接字属性,而accept必须先接受后设置client_fd=socket.acceptanet.anetNonBlock(client_fd) #设置该连接为非阻塞conn=socket.connCreateAcceptedSocket(fd) #把fd包装到一个connection对象中#!!!在创建的时候会设置一系列函数包括,比如ae_handler,#前面的rfileProc实际就是fd的ae_handler#socket.c文件中初始化了一个CT_socket静态变量,#这个变量给socket的所有函数指针都设置了默认函数conn->type = &CT_Socket #conn->type包含了一系列的函数指针#!!!所有连接对象都是直接使用CT_Socket变量,#所以有什么函数指针,直接去CT_Socket变量中找就行conn->state=CONN_STATE_ACCEPTING #目前还只是linux层面accept了连接,但是redis层面的连接还没有acceptnetworking.acceptCommonHandler(conn) #1:设置read_handler,就是当fd上发生read事件时所需要调用的函数,#即readQueryFromClient#2:向epoll fd为client fd注册read事件,#即表明该fd对read事件感兴趣并设置rfile_procnetworking.createClient #!!!创建client对象,然后会把他放到conn对应的privateData里#!!!这样处理后续client fd上的读写事件时直接从privateData里取出client就行,#!!!也就是说我们可以通过这一个client对象来保存连接对应的状态#举个例子:multi命令,在exec之前的所有命令都会放到c.mstate.commands这个队列里面#这样执行exec的时候我们就能一次处理所有命令client *c = zmalloc(sizeof(client)); #创建空的client对象 connection.connSetPrivateData(conn,c) #把client对象保存到conn即连接的private_data变量中conn->private_data=c connection.connSetReadHandlersocket.connSocketSetReadHandlerconn->read_handler=networking.readQueryFromClient #这里就是设置read_handlerae.aeCreateFileEvent(AE_READABLE,ae_handler,fd,conn) #这里就是把fd/conn/epoll三者联系起来#创建clientFileEvent并把描述符添加到epoll fd中#rfile_proc=ae_handler=socket.connSocketEventHandler#代码逻辑就是设置ae_handler为socket.connSocketEventHandler,#然后又设置rfile_proc=ae_handler,即ae_handler和wfile_proc都是函数指针#调用逻辑:事件发生->调用rfile_proc即调用connSocketEventHandler# ->connSocketEventHandler会调用read_handler即readQueryFromClient来读取数据aeFileEvent *fe = &eventLoop->events[fd]fe->client_data=conn #把client保存到了conn中,然后把conn保存到了fe中#然后fe注册到epoll中#这样后续clientfd上触发事件,我们通过epoll获取fe#然后通过fe获取conn,通过conn再获取client,#然后client中保存了连接的历史状态(比如multi中queued的一系列命令)#笔记:server.eventloopevents这个变量是公用的,相当于一个对象池,#注册的时候从中取一个变量,然后注册,注册完后就可以还回去,#同理,读取事件的时候就可以把events事件存储到server.eventloop->events中,#重复使用从而无需每次都重新分配fe->rfile_proc=ae_handler=connSocketEventHandlerae_epoll.aeApiAddEvent #添加到epoll fd对应的描述符中,#即accept fd和client fd都是统一存放在同一个epoll fd中epoll.epoll_ctl(EPOLL_CTL_ADD,EPOLLIN,fe) #!!!注册fe到epoll fd,并注册读事件,fe中包含了描述符、clientdata#!!!代码里是水平触发,也就是说redis epoll是水平触发,没有设置EPOLLETconnection.connAccept #redis层面accept连接socket.connSocketAccept #CT_Socket函数里面的accept函数指针实际是socket.connSocketAcceptconn->state = CONN_STATE_CONNECTED #标志redis层面连接已经建立networking.clientAcceptHandler #检查conn状态是不是CONN_STATE_CONNNECTED、#检查如果开启保护模式但是没有账号密码就报错、#增加连接计数、通知其他模块有连接状态改变client=conn->private_data #前面createClient时已经把client放到conn中了,#后续直接从conn中取出client然后做检查即可。1:client对应的fd上的read事件fe->rfileProc(fe->clientData) #fe->clientData就是我们保存的conn,#这里rfileProc实际是connSocketEventHandlersocket.connSocketEventHandler(conn) #当客户端发来请求时描述符可读,所以调用的是rfileProc,#这里rfileProc实际是connSocketEventHandler#客户端的所有命令的解析流程都是一样,#只是server.call后面会根据命令的类型来调用不同的函数connhelpers.callHandlernetworking.readQueryFromClient(conn)client=connection.getPrivateData(conn) #从conn中取出client对象,client对象保存了该连接的所有数据,#比如multi中queued系列命令是否延迟处理=networking.postponeClientRead #!!!如果检测到开启了thread io,那么就会把本次请求挂到pending_read_list,#然后本次读取操作就直接返回了,当io线程完成读取后会通知响应的模块if threadIo==true&&io_threads_op == IO_THREADS_OP_IDLE: #!!!如果io_threads_op!= IO_THREADS_OP_IDLE,那么就是立即处理#否则就是丢到pending_read_list,下一轮去处理。#readQueryFromClient会在beforeSleep和这里调用,#在beforeSleep中io_threads_op不等于 IO_THREADS_OP_IDEL#所以在beforeSleep中处理thread-io read调用readQueryFromClient时会跳过thread-io,#即会立即处理read,即不会丢到pending_read_list,不会等到下一轮才处理#而在socket.connSocketEventHandler函数中调用readQueryFromClient时#io_threads_op必定为IO_THREADS_OP_IDEL,所以此时如果开启thread-io#此时是可以把本次读取操作丢到pending_read_list,即延迟处理,#即延迟到下一轮循环由beforeSleep函数去处理adlist.listAddNodeHead(server.clients_pending_read,c) #!!!添加到pending_read_list,#在下一轮处理中beforeSleep函数会把这些pending_reads丢到对应io线程去异步读取#即这里不会立即丢到io线程return 延迟处理if 延迟处理: #延迟处理就是异步io即thread ioreturn #若开启threadio则本次read立即返回connection.connRead #如果不是异步io,那么就是同步读取即在同一个线程中执行读取操作connection.connSocketRead #即CT_Socket 变量的read函数指针实际是connSocketRead函数unisted.read #读取client发来的数据,就是linux的read函数,只不过这里肯定是非阻塞读取networking.processInputBufferif c->reqtype == PROTO_REQ_INLINE: #即本次请求是一个简单请求,一个请求包含一条完整的命令networking.processInlineBuffer #从缓冲区读取并解析请求#!!意思就是:缓冲区是一个字节序列,是redis协议格式的数据#processInlineBuffer函数就是手动解析这个字节序列得到命令#比如检测是否遇到了\r\n,如果遇到\r\n就表示已经读取了一个完整的命令else if c->reqtype == PROTO_REQ_MULTIBULK #即本次请求可能包含多条命令,如multi和execnetworking.processMultibulkBuffer #从缓冲区读取并解析请求,一样的操作,只是具体读取解析方式有些不同而已networking.processCommandAndResetClient #前面从缓冲区读取并解析了命令,这里就是执行命令server.processCommandif (isInsideYieldingLongCommand() && !(c->cmd->flags & CMD_ALLOW_BUSY)): #!!!当lua脚本超时的时候,只允许部分命令执行#!!!这些命令都必须打上CMD_ALLOW_BUSY标记#!!!凡是没有这个标记的,此时都会被拒绝#!!!此时只有script kill/shutdown nosave可以执行#!!!一个命令有哪些标记,在commands.def文件中都提前定义好了#!!!直接ctrl+f查找 CMD_ALLOW_BUSY就可以了server.rejectCommandFormat #不允许的命令,直接拒绝...还有许多其他检测条件,凡是不符合要求就拒绝本次命令... if server.cluster_enabled: #如果是集群,则丢给对应的节点去处理cluster.getNodeByQuery #获取对应的处理节点cluster.clusterRedirectClient #直接通知客户端重定向到其他节点returnif (c->flags & CLIENT_MULTI && #如果处于multi状态并且不是exec等执行命令,那么就把命令放到client的命令队列里存起来c->cmd->proc != execCommand && #multi状态就是客户端调用了multi,然后执行了一系列命令,但是还没有执行exec命令c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand &&c->cmd->proc != watchCommand &&c->cmd->proc != quitCommand &&c->cmd->proc != resetCommand)multi.queueMultiCommand(client) #丢到该client对应的multi队列中,当该client执行exec命令时才会执行这个队列中的命令#就是把命令追加到client->mstate.commands数组末尾else:server.call #如果是单条命令就会直接调用命令对应的函数networking.commandProcessed #命令执行完以后做一些清理工作:比如重置client、通知slave更新offsetnetworking.resetClient #重置客户端replication.replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied) #还么怎么看懂怎么计算的replication.prepareReplicasToWrite #一个slave对应一个client,这里就把client添加到pending_write_queue#pending_write_queue见部分同步源码流程replication.prepareClientToWrite replication.feedReplicationBuffer #!!!这里就是把写命令填充到repl_backlog缓冲区 #!!!repl_backlog保存的准备发往slave的增量同步的数据#!!!redis repl_offset:是master和slave各自保存一份#!!!备注:repl_backlog这个缓冲区没去细究了#!!!总之就是:redis一边不停的写缓冲区,另一边不停的发送给slave#这里master不是把offset这个变量同步给slave,而是把数据同步给slave#slave收到数据后再根据数据的长度去更新自己的offset/*long long prev_offset = c->reploff; #保存reploff旧值,reploff表示在此之前已经reply到这里了if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {#更新master上的reploff#笔记:read_reploff和reploff还不能肯定,不过八九不离十了,这一块变量太多了,#代码对应的逻辑又必须在集群模式下跑才会运行,集群模式又麻烦,所以暂时跳过了#read_reploff表示最新的readOffset,可以看成是一个逻辑Index,表示master上最新读取的数据的位置#querybuf表示缓冲区首地址,c->qb_pos表示缓冲区已经读到了这里,因为0~readOffset这个区间很长,#querybuf不可能开这么大,所以querybuf只保存[x,readOffset]区间的数据,#qb_pos表示querybuf中[0,qb_pos]区间内的数据都已经同步到了slave#所以(sdslen(querybuf)-qb_pos)就表示querybuf中还有这么多字节的数据还没有同步到slave#new_reploff=read_reploff-(sdslen(querybuf)-qb_pos),#所以new_reploff表示[0,new_reploff]这个逻辑区间内的数据都已经同步到slave了,#而(read_reploff-new_reploff)就表示还有这么多数据没有同步给slavec->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; }if (c->flags & CLIENT_MASTER) {#new_reploff-old_reploff就表示本次要同步这么多数据给long long applied = c->reploff - prev_offset;if (applied) {#这里为啥是repl_applied还有点疑问,不过可以确定的是repl_applied是一个位于[0,qb_pos]的值#正常情况下此时repl_applied=0#反正他就是一个重复使用的buf,read_reploff和reploff都是一个逻辑index,具体逻辑先放弃了。。replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied);c->repl_applied += applied;}}......#在processInputBuffer中会trim querybuf to repl_applied#意思就是说querybuf中[0,repl_applied]区间内的数据已经同步给slave了,那么这部分数据就可以丢弃了#就相当于querybuf是一个不断滚动的数组if (c->repl_applied) {sdsrange(c->querybuf,c->repl_applied,-1);c->qb_pos -= c->repl_applied;c->repl_applied = 0;}*/3:client对应的fd上的write事件ae.aeFileEvent.wfileProc #wfileae.processTimeEvents #处理时间事件。就是遍历一个链表,如果某个节点对应的事件到时间了,就调用对应处理函数
2:具体的命令处理流程,这里是set命令
server.call #接上面的server.call,3个主要操作:#1:执行命令;#2:执行后处理:比如记录慢日志、延迟直方图、monitor等;#3:传播命令:1:记录命令到aof_buf缓冲区;2:发送命令给slave节点1:执行命令t_string.setCommand # c->cmd->proc(c),这个proc是一个函数指针,不同的命令对应不同的proc,set就对应setCommandt_string.setGenericCommandif OBJ_SET_GET: #OBJ_SET_GET表示在set之前获取旧值,默认是truet_string.getGenericCommand #执行一遍get命令,获取旧值db.lookupKeyWRiteWithDictEntry #1:查找key是否存在。#!!!redis是内存数据库,所有数据都在内存,所以是去内存查找而不是访问磁盘#同样:redis先写内存,而aof是一个异步过程,所以写内存成功,但是aof失败,那么就可能丢失数据db.lookupKey db.dbFinddb.setKeyWithDictEntry #2:写内存if !keyfound: #key不存在db.dbAdddb.dbAddInternal(updating_if_exist=false)else if add_or_update: #key存在(需要更新)或不存在(需要追加)db.dbAddInternal(updating_if_exist=true)if (update_if_existing && existing):db.dbSetValueelse:kvstore.kvstoreDictSetKey #内存dict设置keykvstore.kvstoreDictSetVal #内存dict设置valueblock.signalKeyAsReady #通知其他阻塞的客户端notify.notifyKeyspaceEvent(type=NOTIFY_NEW) #通知其他人有new key事件发生model.moduleNotifyKeyspaceEvent #通知其他模块。内部有一个订阅者列表,遍历链表,通知所有订阅者pubsub.pubsubPublishMessage(__keyspace@<db>__:<key> <event>) #通知其他人,键空间有事件发生,举例:__keyspace@0__:foo delpubsub.pubsubPublishMesage(__keyevent@<db>__:<event> <key>) #通知其他人,发生了键事件,举例:__keyevent@0__:del foo#笔记:默认情况下redis事件通知是关闭的else:db.dbSetValue #key存在,直接设置值object.incrRefCount #增加value对象的引用计数。redis通过引用计数来管理对象if !SETKEY_KEEPTTL: #如果更新一个key后不保留ttl就删除该key的ttldb.removeExpire #redisDB对象中key和ttl是分别存放在两个kvstore对象中,即不是放在一起的db.getKeySlot #获取key对应slot的值,相当于槽号,一个key在keys和ttl kvstore中对应的slot号是一样的db.kvstoreDictDelete #ttl也是用一个kvstore存放if !SETKEY_NO_SIGNAL: #如果设置了通知更新,就通知所有watch这个key的所有对象(包括客户端) db.signalModifiedKey multi.touchWatchedKey #凡是监视这个key的client都被标记为CLIENT_DIRTY_CAS,如果这个key是在某个事务操作中,#那么CLIENT_DIRTY_CAS这就表示该CAS操作失败了即redis的watch是一种乐观锁的方式for 所有监视这个key的客户端:c->flags |= CLIENT_DIRTY_CAS #!!!这样multi事务执行exec时就会检测这个标记,一旦为真说明这个key被修改了,那么就会取消事务tracking.trackingInvalidateKey #通知watch这个key的所有客户端失败,因为watch是一种乐观锁的方式tracking.trackingRememberKeyToBroadcast #在PrefixTable中查找这个key,如果找到了,#就设置该key的广播状态并插入到PrefixTable对应节点的keylist中#详情可自己chatgpt一下:TrackingTable和PrefixTable这两个变量#他们的作用大致就是:client会tracking一些key或者一组key#(即一个key前缀)也就是在本地保存一些key的值然后监听修改,#然后服务器修改一个key后就会去trackingtable/prefixtable中查找,如果找到了,#说明有某个客户端在本地保存了这个key的值,但是这个key值已经被修改了,#所以需要通知该客户端该key的值已经不是最新的了#所以后续就要向该客户端发送key无效消息。#TrackingTable是一个key一个key的监听,而PrefixTable则允许监听一个前缀,即一组keyif !raxFind(TrackingTable,key,keylen,&result): #如果在TrackingTable中没有找到这个key,说明没人关注,所以直接返回啥也不干returnif target==cur_client: #如果当前client是将要发送无效消息的client,#那么就先放到server.tracking_pending_keys队列稍后发送adlist.listAddNodeTail #主要是为了保证消息顺序,比如当前client正在执行get,#get结果还没放到发送缓冲区就把无效化消息放到缓冲区#那么client收到消息时就会先解析出无效,然后才是get结果,这样就错乱了,#所以如果是当前client,那么就等到命令完全执行完毕后再把无效化消息放到缓冲区,大致就是这么个理else: #否则立即添加到发送缓冲区 tracking.sendTrackingMessage #通知watch这个key的客户端,就是把消息添加到发送缓冲区准备发给该客户端networking.addReplyXXXX #添加到发送缓冲区,addReplyxxxx表示一系列的函数,会把一系列的数据添加到发送缓冲区server.dirty++ #修改的key数+1notify.notifyKeyspaceEvent #3:事后处理。redis通知server其他模块以及客户端键空间(NOTIFY_KEYSPACE)事件#以及键事件(NOTIFY_EVENTS)pubsub.pubsubPublishMessage #发送通知给所有订阅者,pubsub表示publish-subscribe的意思networking.addReply #把准备发送给client的响应数据放到发送缓冲区,待下一轮循环,在beforesleep中被发送2:执行后处理:比如记录慢日志、延迟直方图、monitor等;3:传播命令:1:记录命令到aof_buf缓冲区;2:发送命令给slave节点;3:处理trackingserver.alsoPropagate #把命令追加到server.also_propagate缓冲区if cmd_is_readonly&&cilent_open_tracking: #如果是只读命令(如get)且客户端开启了tacking,#那么我们就需要在trackingTable中记录这些(client,key)pairtracking.trackingRememberKeys #把我们刚才获取到的key全都存放到TrackingTable中server.afterCommandserver.postExecutionUnitOperations #把命令数据从server.also_propagate转移到aof_bufserver.propagatePendingCommandsfor (j = 0; j < server.also_propagate.numops; j++) { #依次处理also_propagate缓冲区中的命令数据server.propagateNow #执行传播,有两个传播目的地:aof日志缓冲区、slave服务器aof.feedAppendOnlyFile #1:写到aof日志缓冲区aof.catAppendOnlyGenericCommand #把also_progate的一条数据转化成一个字节数组bufserver.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf)) #把buf中的数据追加到server.aof_buf#到这里就完成了把命令记录到aof_buf的过程,#会在下一次迭代中的beforeSleep函数中把aof_buf数据写入磁盘replication.feedReplicationBuffer #2:写到全局的slave服务器发送缓冲区即server.repl_buffer_blocks,这是一个链表tracking.trackingHandlePendingKeyInvalidations #命令已完全执行完毕,所以把在tracking.trackingInvalidateKey中放到pengding队列的消息添加到发送缓冲区
3:aof日志处理。在每一轮aeProcessEvents开始之前都会把上一轮的aof_buf中的数据写入磁盘
server.mainae.aeMainae.aeProcessEventsserver.beforeSleepaof.flushAppendOnlyFileaof.aofwrite
4:aof加载过程。启动的时候因为不能判断数据库是否损坏,所以需要需要根据aof日志重做
server.mainserver.aofLoadManifestFromDisk #加载aof manifest文件,并根据文件内容进行检查,比如对应的目录、对应的文件是否存在server.loadDataFromDiskaof.loadAppendOnlyFiles for all cmd in aof: aof.loadSingleAppendOnlyFile #!!!加载aof文件并重放aof日志if 是exec命令: #即multi命令multi.queueMultiCommand #把命令追加到一个multiCommand 队列else:cmd->proc #!!!重做该命令#proc是一个函数指针,不同的命令对应不同的函数#比如对于set命令,那么cmd->proc对应的就是setCommand函数aof.aofOpenIfNeededOnServerStart #打开aof文件以便后续运行的时候可以追加日志
杂记:conn->set_read_handler(func)或者conn->set_write_handler(func),如果func==null,那么就会从epoll中删除对应的事件,反之,如果func!=null,那么就会向epoll中注册对应的事件
redis在进行主从复制过程中也可以接受外界的查询请求,只不过这时候从redis返回的是以前老的数据