引言
Redis 离用户最近的就是网络层了,网络层不负责命令的具体执行,只负责网络数据的收发,虽然它不负责具体的功能实现,却是Redis单线程,高性能的核心。
本文只点出一条关键的代码路径,如果想要彻底理解这一部分代码,需要一些Reactor模型,epoll相关的知识,请自行查阅其他资料。
后台运行Redis
这是main中配置解析结束后的第一行代码。
如果你设置了--daemonize
,那么Redis就会在后台启动redis.c:4027:
// 将服务器设置为守护进程 if (server.daemonize) daemonize();
服务器启动
C语言网络编程中常见的服务器启动流程就是socket->bind->listen->accept
,对于epoll这样的io多路复用,则是socket->bind->listen->set_non_block->register_fd_event->epoll_wait
,稍微解释一下这些流程节点的含义:
socket
:创建监听套接字bind
:绑定端口listen
:监听端口set_non_block
:设置套接字为非阻塞register_fd_event
:将监听套接字的"新连接到达"事件注册到epoll上epoll_wait
:阻塞在IO多路复用上,等待事件到来
我们就按照这条路径来追踪服务器启动的代码。
服务器启动的socket->bind->listen->set_non_block->register_fd_event
在initServer方法的调用中完成,redis.c:2050:
voidinitServer() { //...// socket -> bind -> listen -> set_non_blockif (server.port!=0&&listenToPort(server.port,server.ipfd,&server.ipfd_count) ==REDIS_ERR) exit(1); //...// ->register_fd_eventfor (j=0; j<server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) ==AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } //...}
因为redis.conf
中是允许监听多个ip地址,所以所有要监听的地址会在配置解析时被放到server结构体的bindaddr字段(字符串数组),如果bindaddr
为NULL
则表示绑定当前机器的全部地址:
structredisServer { //...char*bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to *///...}
在listenToPort
中会将所有完成了socket->bind->listen->set_non_block
的监听套接字放到bindaddr
数组中,之后在一个循环(redis.c:2160)中将事件全部注册到epoll
上。注意一下aeCreateFileEvent
的第四个参数acceptTcpHandler是一个回调函数,这个回调函数会在事件发生时被调用(即有新的客户端连接请求到达时被调用)。
Redis称这种事件为File Event(从方法名CreateFileEvent可以看出),因为在linux上一切都是文件,所以套接字本身也是文件,所以Redis中的File Event就是指套接字事件。
listenToPort会遍历bindaddr
数组,绑定数组中的所有ip地址+server.port
:
intlistenToPort(intport, int*fds, int*count) { intj; /* Force binding of 0.0.0.0 if no bind address is specified, always* entering the loop if j == 0. */if (server.bindaddr_count==0) server.bindaddr[0] =NULL; for (j=0; j<server.bindaddr_count||j==0; j++) { //...// set_non_blockanetNonBlock(NULL,fds[*count]); //... } //..}
for
循环里面的代码虽然很长,但是逻辑很简单,其实就是判断是NULL,IPv6地址还是IPv4地址,如果是NULL的话,就要绑定本机的全部地址,如果是IPv6地址,则调用anetTcp6Server获取一个IPv6套接字,如果是IPv4的话,则调用anetTcpServer获取一个IPv4套接字。获得套接字后立即就调用anetNonBlock将其设置为非阻塞的(即set_non_block
)。anetTcpServer
将会返回一个已经完成了bind
和listen
操作的套接字。anetTcpServer
函数其实就直接调用了一下_anetTcpServer,socket->bind->listen
都是在这个函数里完成的:
staticint_anetTcpServer(char*err, intport, char*bindaddr, intaf, intbacklog) { //...for (p=servinfo; p!=NULL; p=p->ai_next) { // socketif ((s=socket(p->ai_family,p->ai_socktype,p->ai_protocol)) ==-1) continue; if (af==AF_INET6&&anetV6Only(err,s) ==ANET_ERR) gotoerror; if (anetSetReuseAddr(err,s) ==ANET_ERR) gotoerror; // bind->listenif (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) ==ANET_ERR) gotoerror; gotoend; } //...}
到此准备工作就完成了,接下来的epoll_wait
阶段,服务器就已经正式启动了,翻到main
的最后几行,aeMain
就启动服务器主循环的函数,redis.c:4079:
aeMain(server.el);
在aeMain方法中可以看到明显的循环:
/** 事件处理器的主循环*/voidaeMain(aeEventLoop*eventLoop) { eventLoop->stop=0; while (!eventLoop->stop) { // 如果有需要在事件处理前执行的函数,那么运行它if (eventLoop->beforesleep!=NULL) eventLoop->beforesleep(eventLoop); // 开始处理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
进入aeProcessEvents
,里面代码很长,其中的aeApiPoll
函数调用会阻塞在IO多路复用上(即epoll_wait
),ae.c:560:
intaeProcessEvents(aeEventLoop*eventLoop, intflags) { //...numevents=aeApiPoll(eventLoop, tvp); //...}
aeApiPoll
调用完后,在eventLoop
的fired
字段上就本轮触发的所有事件。
文件事件
之前解释过Redis将所有的套接字事件都称为文件事件。
刚刚提到创建文件事件的函数是aeCreateFileEvent
函数,它会创建一个aeFileEvent结构体,并以fd
为下标,将其放置在eventLoop
的events
字段(是一个aeFileEvent
数组),ae.c:181:
// 取出文件事件结构aeFileEvent*fe=&eventLoop->events[fd];
其实这里本质上就是构建一个fd
到aeFileEvent
的映射,之后在某个fd
的事件触发时,方面通过fd
将aeFileEvent
结构体取出来,然后调用里面的回调函数,从后面的代码中也可以看出aeFileEvent
中存储着回调函数,ae.c:189:
if (mask&AE_READABLE) fe->rfileProc=proc; if (mask&AE_WRITABLE) fe->wfileProc=proc;
高级语言写得比较多的程序员可能会对fd为什么能够用来做下标感到疑惑,其实对于一个进程来说,fd是从3开始递增的,0,1,2分别代表stdin, stdout和stderr。所以虽然会损失三个元素的空间,但是对于性能却是能得到不少提升的。
当然,aeCreateFileEvent
最重要的还是将事件在epoll上注册,ae.c:184:
if (aeApiAddEvent(eventLoop, fd, mask) ==-1) returnAE_ERR;
然后回调函数会在aeApiPoll
返回之后,检查eventLoop.fired
中的fd
和事件,通过fd
取出相应的aeFileEvent
结构体来调用相应的回调函数,[ae.c:561]:
intaeProcessEvents(aeEventLoop*eventLoop, intflags) { //...numevents=aeApiPoll(eventLoop, tvp); for (j=0; j<numevents; j++) { // 从已就绪数组中获取事件aeFileEvent*fe=&eventLoop->events[eventLoop->fired[j].fd]; //...// 读事件if (fe->mask&mask&AE_READABLE) { // rfired 确保读/写事件只能执行其中一个rfired=1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件if (fe->mask&mask&AE_WRITABLE) { if (!rfired||fe->wfileProc!=fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } //... } }
定时事件
除了文件事件以外,Redis还有一类事件是定时事件,其实定时事件总共就只有一个,就是在initServer
时注册的serverCron
,redis.c:2151:
// 为 serverCron() 创建时间事件 新建了一个aeTimeEvent并插入了evetloop中的时间事件列表// 1 表示在 1ms 后执行,这个 1ms 定义的只是 serverCron 的初始执行时间,而不是执行间隔if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) ==AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); }
跳转到aeCreateTimeEvent,发现它做的事情就就是创建一个aeFileEvent
结构体并将其插入到eventLoop
的定时事件列表(即aeEventLoop
的timeEventHead字段)中。并没有任何epoll相关的操作,而且epoll其实也并不支持定时器的功能,那Redis是怎么实现定时任务的呢?其实秘密就在巧妙地设置epoll
的超时时间上,aeApiPoll
的第二个参数就是超时时间,在aeProcessEvents
里调用aeApiPoll
之前干的事情,就是计算距离现在最近的一次定时事件的时间,并以这个时间作为超时时间,ae.c:512:
intaeProcessEvents(aeEventLoop*eventLoop, intflags) { if (eventLoop->maxfd!=-1|| ((flags&AE_TIME_EVENTS) &&!(flags&AE_DONT_WAIT))) { intj; aeTimeEvent*shortest=NULL; structtimevaltv, *tvp; // 和最近一次定时事件的间隔//... } numevents=aeApiPoll(eventLoop, tvp); //...}
serverCron
的执行频率是由 redisServer.hz
来决定的,默认值为 10,也就是每秒执行 10 次,即每隔 100 ms 执行一次。在前面代码分析中我们看到了创建 serverCron
定时事件的代码aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
给的定时时间似乎是 1ms,其实这是初始时间,真正的间隔时间是由时间执行函数的返回值决定的,看一看 serverCron
的返回值,你就能找到它真正的定时时间了,redis.c:1562:
intserverCron(structaeEventLoop*eventLoop, longlongid, void*clientData) { //...return1000/server.hz; }
创建新连接
其处理逻辑就是当时在监听套接字上注册的回调函数acceptTcpHandler。
每当有新的客户端连接到来时,都会分发相关事件而触发该函数。
该函数会先accept这条连接,得到新连接的fd
,创建一个redisClient
结构体代表和这条连接相关的状态,最后注册该套接字的File Event:
voidacceptTcpHandler(aeEventLoop*el, intfd, void*privdata, intmask) { intcport, cfd, max=MAX_ACCEPTS_PER_CALL;//1000while(max--) { // accept 多次防止同时过来多条连接// accept 客户端连接cfd=anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd==ANET_ERR) { if (errno!=EWOULDBLOCK) redisLog(REDIS_WARNING, "Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); // 为客户端创建客户端状态(redisClient)acceptCommonHandler(cfd,0); } }
这里需要accept这么多次,是因为即使同时有多条连接过来,监听套接字注册的事件也只会激活一次,所以要多次accept,知道出现EWOULDBLOCK,防止有连接漏掉。
acceptCommonHandler中最重要的调用createClient
方法,networking.c:752:
staticvoidacceptCommonHandler(intfd, intflags) { // 创建客户端redisClient*c; if ((c=createClient(fd)) ==NULL) { //... } //...
createClient中先将fd设置为非阻塞,之后就在eventLoop注册了该套接字的File Event,注意这里注册的回调函数readQueryFromClient
,它就是之后从客户端读取数据的函数,最后将其加入了redisServer的clients列表中:
redisClient*createClient(intfd) { if (fd!=-1) { // 设置为非阻塞anetNonBlock(NULL,fd); // 禁用 Nagle 算法, 降低延迟anetEnableTcpNoDelay(NULL,fd); // 设置 keep aliveif (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 注册该套接字的的File eventif (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) ==AE_ERR) { //... } } //...//加入RedisServer的客户端列表中if (fd!=-1) listAddNodeTail(server.clients,c); //...}
读取客户端数据
上一节提到,这里其实就是readQueryFromClient函数。
读过上面的内容,大概也能猜出这个函数在做什么事情了,首先将传输来的数据读到代表该条连接的redisClient
的缓存中querybuf
字段中(每次最多读16kB),然后从中解析出命令名称,通过命令名称从之前的server.commands
字典中取出redisCommand
,然后执行里面的函数。
voidreadQueryFromClient(aeEventLoop*el, intfd, void*privdata, intmask) { redisClient*c= (redisClient*) privdata; //...// 读入内容到缓存nread=read(fd, c->querybuf+qblen, readlen); //...// 处理命令processInputBuffer(c); //..}
processInputBuffer中会尽可能地读取querybuf
字段,并将它解析成字符串数组放到redisClient
的argv
数组中:
voidprocessInputBuffer(redisClient*c) { while(sdslen(c->querybuf)) { // 尽可能读取querybuf// 将缓冲区中的内容转换成命令,以及命令参数 放到c->argv属性中// 因为通过telnet和通过客户端连接,命令格式不同,所以这里需要两种解析if (c->reqtype==REDIS_REQ_INLINE) { if (processInlineBuffer(c) !=REDIS_OK) break; } elseif (c->reqtype==REDIS_REQ_MULTIBULK) { if (processMultibulkBuffer(c) !=REDIS_OK) break; } if (c->argc==0) { //... } else { // 执行命令,并重置客户端if (processCommand(c) ==REDIS_OK) resetClient(c); } } }
processCommand会通过命令名称(c->argv[0])取到redisCommand
结构体并执行:
intprocessCommand(redisClient*c) { //...// lookupCommand其实就是去server.commands字典中去找command L2550c->cmd=c->lastcmd=lookupCommand(c->argv[0]->ptr); //...if { //... } else { // 执行命令 L2766call(c,REDIS_CALL_FULL); } }
向客户端返回数据
在后面实现各个Redis命令的时候,如果需要向客户端写数据,一般都是调用的addReply方法,addReply
先在创建了一个写文件事件,然后将要发送的数据写入到该条连接(其实就是redisClient
结构体)的写缓存中:
voidaddReply(redisClient*c, robj*obj) { // 创建写文件事件if (prepareClientToWrite(c) !=REDIS_OK) return; // 将要发送的数据写入写缓存中if (sdsEncodedObject(obj)) { //... } elseif (obj->encoding==REDIS_ENCODING_INT) { //... } }
进入prepareClientToWrite方法,你将看到熟悉的aeCreateFileEvent
,创建了一个写文件事件,并且回调方法是sendReplyToClient
,由这个回调方法负责最终将数据发送给客户端:
intprepareClientToWrite(redisClient*c) { //...// 创建写文件事件if (c->bufpos==0&&listLength(c->reply) ==0&& (c->replstate==REDIS_REPL_NONE||c->replstate==REDIS_REPL_ONLINE) &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) ==AE_ERR) returnREDIS_ERR; returnREDIS_OK; }
对于redisClient
,它有两个字段是用来存放写缓冲的,一个buf(就是一个16KB的缓冲),还有一个reply链表,当buf
满的时候,会先将回复内容链接在reply
后面。
End
作者:元青
微信公众号 「技乐书香」