Redis网络层源码阅读

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
云解析 DNS,旗舰版 1个月
简介: Redis 离用户最近的就是网络层了,网络层不负责命令的具体执行,只负责网络数据的收发,虽然它不负责具体的功能实现,却是Redis单线程,高性能的核心。

引言


Redis 离用户最近的就是网络层了,网络层不负责命令的具体执行,只负责网络数据的收发,虽然它不负责具体的功能实现,却是Redis单线程,高性能的核心。

本文只点出一条关键的代码路径,如果想要彻底理解这一部分代码,需要一些Reactor模型,epoll相关的知识,请自行查阅其他资料。

后台运行Redis


这是main中配置解析结束后的第一行代码。

如果你设置了--daemonize,那么Redis就会在后台启动redis.c:4027


    // 将服务器设置为守护进程
if (server.daemonize) daemonize();

服务器启动


C语言网络编程中常见的服务器启动流程就是socket->bind->listen->accept,对于epoll这样的io多路复用,则是socket->bind->listen->set_non_block->register_fd_event->epoll_wait,稍微解释一下这些流程节点的含义:

  • socket:创建监听套接字
  • bind:绑定端口
  • listen:监听端口
  • set_non_block:设置套接字为非阻塞
  • register_fd_event:将监听套接字的"新连接到达"事件注册到epoll上
  • epoll_wait:阻塞在IO多路复用上,等待事件到来

我们就按照这条路径来追踪服务器启动的代码。

服务器启动的socket->bind->listen->set_non_block->register_fd_eventinitServer方法的调用中完成,redis.c:2050

voidinitServer() {
//...// socket -> bind -> listen -> set_non_blockif (server.port!=0&&listenToPort(server.port,server.ipfd,&server.ipfd_count) ==REDIS_ERR)
exit(1);
//...// ->register_fd_eventfor (j=0; j<server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) ==AE_ERR)
            {
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
            }
    }
//...}

因为redis.conf中是允许监听多个ip地址,所以所有要监听的地址会在配置解析时被放到server结构体的bindaddr字段(字符串数组),如果bindaddrNULL则表示绑定当前机器的全部地址:

structredisServer {
//...char*bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to *///...}

listenToPort中会将所有完成了socket->bind->listen->set_non_block的监听套接字放到bindaddr数组中,之后在一个循环(redis.c:2160)中将事件全部注册到epoll上。注意一下aeCreateFileEvent的第四个参数acceptTcpHandler是一个回调函数,这个回调函数会在事件发生时被调用(即有新的客户端连接请求到达时被调用)。

Redis称这种事件为File Event(从方法名CreateFileEvent可以看出),因为在linux上一切都是文件,所以套接字本身也是文件,所以Redis中的File Event就是指套接字事件。

listenToPort会遍历bindaddr数组,绑定数组中的所有ip地址+server.port

intlistenToPort(intport, int*fds, int*count) {
intj;
/* Force binding of 0.0.0.0 if no bind address is specified, always* entering the loop if j == 0. */if (server.bindaddr_count==0) server.bindaddr[0] =NULL;
for (j=0; j<server.bindaddr_count||j==0; j++) {
//...// set_non_blockanetNonBlock(NULL,fds[*count]);
//...    }
//..}

for循环里面的代码虽然很长,但是逻辑很简单,其实就是判断是NULL,IPv6地址还是IPv4地址,如果是NULL的话,就要绑定本机的全部地址,如果是IPv6地址,则调用anetTcp6Server获取一个IPv6套接字,如果是IPv4的话,则调用anetTcpServer获取一个IPv4套接字。获得套接字后立即就调用anetNonBlock将其设置为非阻塞的(即set_non_block)。anetTcpServer将会返回一个已经完成了bindlisten操作的套接字。anetTcpServer函数其实就直接调用了一下_anetTcpServersocket->bind->listen都是在这个函数里完成的:

staticint_anetTcpServer(char*err, intport, char*bindaddr, intaf, intbacklog)
{
//...for (p=servinfo; p!=NULL; p=p->ai_next) {
// socketif ((s=socket(p->ai_family,p->ai_socktype,p->ai_protocol)) ==-1)
continue;
if (af==AF_INET6&&anetV6Only(err,s) ==ANET_ERR) gotoerror;
if (anetSetReuseAddr(err,s) ==ANET_ERR) gotoerror;
// bind->listenif (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) ==ANET_ERR) gotoerror;
gotoend;
    }
//...}

到此准备工作就完成了,接下来的epoll_wait阶段,服务器就已经正式启动了,翻到main的最后几行,aeMain就启动服务器主循环的函数,redis.c:4079

aeMain(server.el);

aeMain方法中可以看到明显的循环:

/** 事件处理器的主循环*/voidaeMain(aeEventLoop*eventLoop) {
eventLoop->stop=0;
while (!eventLoop->stop) {
// 如果有需要在事件处理前执行的函数,那么运行它if (eventLoop->beforesleep!=NULL)
eventLoop->beforesleep(eventLoop);
// 开始处理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

进入aeProcessEvents,里面代码很长,其中的aeApiPoll函数调用会阻塞在IO多路复用上(即epoll_wait),ae.c:560

intaeProcessEvents(aeEventLoop*eventLoop, intflags)
{
//...numevents=aeApiPoll(eventLoop, tvp);
//...}

aeApiPoll调用完后,在eventLoopfired字段上就本轮触发的所有事件。

文件事件


之前解释过Redis将所有的套接字事件都称为文件事件。

刚刚提到创建文件事件的函数是aeCreateFileEvent函数,它会创建一个aeFileEvent结构体,并以fd为下标,将其放置在eventLoopevents字段(是一个aeFileEvent数组),ae.c:181

// 取出文件事件结构aeFileEvent*fe=&eventLoop->events[fd];

其实这里本质上就是构建一个fdaeFileEvent的映射,之后在某个fd的事件触发时,方面通过fdaeFileEvent结构体取出来,然后调用里面的回调函数,从后面的代码中也可以看出aeFileEvent中存储着回调函数,ae.c:189

if (mask&AE_READABLE) fe->rfileProc=proc;
if (mask&AE_WRITABLE) fe->wfileProc=proc;


高级语言写得比较多的程序员可能会对fd为什么能够用来做下标感到疑惑,其实对于一个进程来说,fd是从3开始递增的,0,1,2分别代表stdin, stdout和stderr。所以虽然会损失三个元素的空间,但是对于性能却是能得到不少提升的。

当然,aeCreateFileEvent最重要的还是将事件在epoll上注册,ae.c:184:

if (aeApiAddEvent(eventLoop, fd, mask) ==-1)
returnAE_ERR;

然后回调函数会在aeApiPoll返回之后,检查eventLoop.fired中的fd和事件,通过fd取出相应的aeFileEvent结构体来调用相应的回调函数,[ae.c:561]:

intaeProcessEvents(aeEventLoop*eventLoop, intflags)
{
//...numevents=aeApiPoll(eventLoop, tvp);
for (j=0; j<numevents; j++) {
// 从已就绪数组中获取事件aeFileEvent*fe=&eventLoop->events[eventLoop->fired[j].fd];
//...// 读事件if (fe->mask&mask&AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个rfired=1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        }
// 写事件if (fe->mask&mask&AE_WRITABLE) {
if (!rfired||fe->wfileProc!=fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
        }
//...    }
}

定时事件


除了文件事件以外,Redis还有一类事件是定时事件,其实定时事件总共就只有一个,就是在initServer时注册的serverCronredis.c:2151

// 为 serverCron() 创建时间事件   新建了一个aeTimeEvent并插入了evetloop中的时间事件列表// 1 表示在 1ms 后执行,这个 1ms 定义的只是 serverCron 的初始执行时间,而不是执行间隔if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) ==AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
    }

跳转到aeCreateTimeEvent,发现它做的事情就就是创建一个aeFileEvent结构体并将其插入到eventLoop的定时事件列表(即aeEventLooptimeEventHead字段)中。并没有任何epoll相关的操作,而且epoll其实也并不支持定时器的功能,那Redis是怎么实现定时任务的呢?其实秘密就在巧妙地设置epoll的超时时间上,aeApiPoll的第二个参数就是超时时间,在aeProcessEvents里调用aeApiPoll之前干的事情,就是计算距离现在最近的一次定时事件的时间,并以这个时间作为超时时间,ae.c:512

intaeProcessEvents(aeEventLoop*eventLoop, intflags)
{
if (eventLoop->maxfd!=-1||        ((flags&AE_TIME_EVENTS) &&!(flags&AE_DONT_WAIT))) {
intj;
aeTimeEvent*shortest=NULL;
structtimevaltv, *tvp;  // 和最近一次定时事件的间隔//...    }
numevents=aeApiPoll(eventLoop, tvp);
//...}

serverCron 的执行频率是由 redisServer.hz 来决定的,默认值为 10,也就是每秒执行 10 次,即每隔 100 ms 执行一次。在前面代码分析中我们看到了创建 serverCron 定时事件的代码aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) 给的定时时间似乎是 1ms,其实这是初始时间,真正的间隔时间是由时间执行函数的返回值决定的,看一看 serverCron 的返回值,你就能找到它真正的定时时间了,redis.c:1562

intserverCron(structaeEventLoop*eventLoop, longlongid, void*clientData) {
//...return1000/server.hz;
}

创建新连接


其处理逻辑就是当时在监听套接字上注册的回调函数acceptTcpHandler

每当有新的客户端连接到来时,都会分发相关事件而触发该函数。

该函数会先accept这条连接,得到新连接的fd,创建一个redisClient结构体代表和这条连接相关的状态,最后注册该套接字的File Event:

voidacceptTcpHandler(aeEventLoop*el, intfd, void*privdata, intmask) {
intcport, cfd, max=MAX_ACCEPTS_PER_CALL;//1000while(max--) {  // accept 多次防止同时过来多条连接// accept 客户端连接cfd=anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd==ANET_ERR) {
if (errno!=EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
        }
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 为客户端创建客户端状态(redisClient)acceptCommonHandler(cfd,0);
    }
}


这里需要accept这么多次,是因为即使同时有多条连接过来,监听套接字注册的事件也只会激活一次,所以要多次accept,知道出现EWOULDBLOCK,防止有连接漏掉。

acceptCommonHandler中最重要的调用createClient方法,networking.c:752

staticvoidacceptCommonHandler(intfd, intflags) {
// 创建客户端redisClient*c;
if ((c=createClient(fd)) ==NULL) {
//...    }
//...

createClient中先将fd设置为非阻塞,之后就在eventLoop注册了该套接字的File Event,注意这里注册的回调函数readQueryFromClient,它就是之后从客户端读取数据的函数,最后将其加入了redisServer的clients列表中:

redisClient*createClient(intfd) {
if (fd!=-1) {
// 设置为非阻塞anetNonBlock(NULL,fd);
// 禁用 Nagle 算法, 降低延迟anetEnableTcpNoDelay(NULL,fd);
// 设置 keep aliveif (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 注册该套接字的的File eventif (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) ==AE_ERR)
        {
//...        }
    }
//...//加入RedisServer的客户端列表中if (fd!=-1) listAddNodeTail(server.clients,c);
//...}

读取客户端数据


上一节提到,这里其实就是readQueryFromClient函数。

读过上面的内容,大概也能猜出这个函数在做什么事情了,首先将传输来的数据读到代表该条连接的redisClient的缓存中querybuf字段中(每次最多读16kB),然后从中解析出命令名称,通过命令名称从之前的server.commands字典中取出redisCommand,然后执行里面的函数。

voidreadQueryFromClient(aeEventLoop*el, intfd, void*privdata, intmask) {
redisClient*c= (redisClient*) privdata;
//...// 读入内容到缓存nread=read(fd, c->querybuf+qblen, readlen);
//...// 处理命令processInputBuffer(c);
//..}

processInputBuffer中会尽可能地读取querybuf字段,并将它解析成字符串数组放到redisClientargv数组中:

voidprocessInputBuffer(redisClient*c) {
while(sdslen(c->querybuf)) { // 尽可能读取querybuf// 将缓冲区中的内容转换成命令,以及命令参数 放到c->argv属性中// 因为通过telnet和通过客户端连接,命令格式不同,所以这里需要两种解析if (c->reqtype==REDIS_REQ_INLINE) {
if (processInlineBuffer(c) !=REDIS_OK) break;
        } elseif (c->reqtype==REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) !=REDIS_OK) break;
        } 
if (c->argc==0) {
//...        } else {
// 执行命令,并重置客户端if (processCommand(c) ==REDIS_OK)
resetClient(c);
        }
    }
}

processCommand会通过命令名称(c->argv[0])取到redisCommand结构体并执行:

intprocessCommand(redisClient*c) {
//...// lookupCommand其实就是去server.commands字典中去找command   L2550c->cmd=c->lastcmd=lookupCommand(c->argv[0]->ptr);
//...if {
//...    } else {
// 执行命令  L2766call(c,REDIS_CALL_FULL);
    }
} 

向客户端返回数据


在后面实现各个Redis命令的时候,如果需要向客户端写数据,一般都是调用的addReply方法,addReply先在创建了一个写文件事件,然后将要发送的数据写入到该条连接(其实就是redisClient结构体)的写缓存中:

voidaddReply(redisClient*c, robj*obj) {
// 创建写文件事件if (prepareClientToWrite(c) !=REDIS_OK) return;
// 将要发送的数据写入写缓存中if (sdsEncodedObject(obj)) {
//...    } elseif (obj->encoding==REDIS_ENCODING_INT) {
//...    }
}

进入prepareClientToWrite方法,你将看到熟悉的aeCreateFileEvent,创建了一个写文件事件,并且回调方法是sendReplyToClient,由这个回调方法负责最终将数据发送给客户端:

intprepareClientToWrite(redisClient*c) {
//...// 创建写文件事件if (c->bufpos==0&&listLength(c->reply) ==0&&        (c->replstate==REDIS_REPL_NONE||c->replstate==REDIS_REPL_ONLINE) &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) ==AE_ERR) returnREDIS_ERR;
returnREDIS_OK;
}

对于redisClient,它有两个字段是用来存放写缓冲的,一个buf(就是一个16KB的缓冲),还有一个reply链表,当buf满的时候,会先将回复内容链接在reply后面。

End


作者:元青

微信公众号 「技乐书香」

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
NoSQL 网络协议 Linux
Redis的实现一:c、c++的网络通信编程技术,先实现server和client的通信
本文介绍了使用C/C++进行网络通信编程的基础知识,包括创建socket、设置套接字选项、绑定地址、监听连接以及循环接受和处理客户端请求的基本步骤。
50 6
|
3月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
66 6
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
56 3
|
1月前
|
NoSQL 网络协议 应用服务中间件
redis,memcached,nginx网络组件
redis,memcached,nginx网络组件
17 0
|
1月前
|
存储 监控 NoSQL
Redis的实现二: c、c++的网络通信编程技术,让服务器处理多个client
本文讨论了在C/C++中实现服务器处理多个客户端的技术,重点介绍了事件循环和非阻塞IO的概念,以及如何在Linux上使用epoll来高效地监控和管理多个文件描述符。
28 0
|
3月前
|
Web App开发 前端开发 关系型数据库
基于SpringBoot+Vue+Redis+Mybatis的商城购物系统 【系统实现+系统源码+答辩PPT】
这篇文章介绍了一个基于SpringBoot+Vue+Redis+Mybatis技术栈开发的商城购物系统,包括系统功能、页面展示、前后端项目结构和核心代码,以及如何获取系统源码和答辩PPT的方法。
|
3月前
|
NoSQL Redis
redis 6源码解析之 ziplist
redis 6源码解析之 ziplist
33 5
|
3月前
|
缓存 负载均衡 NoSQL
【Azure Redis】Azure Redis添加了内部虚拟网络后,其他区域的主机通过虚拟网络对等互连访问失败
【Azure Redis】Azure Redis添加了内部虚拟网络后,其他区域的主机通过虚拟网络对等互连访问失败
|
3月前
|
缓存 NoSQL Java
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
|
3月前
|
存储 缓存 NoSQL
【Azure Redis 缓存】Azure Cache for Redis 专用终结点, 虚拟网络, 公网访问链路
【Azure Redis 缓存】Azure Cache for Redis 专用终结点, 虚拟网络, 公网访问链路
下一篇
无影云桌面