前言:
该篇内容为我对redis的学习记录,欢迎指正批评。文中只是放了部分调试内容,完整的可以根据流程图跟踪。
一.redis set命令调用流程
二.源码分析
(1)源码解析
服务端对命令的解析是绑定在server.c中的redisCommandTable全局参数中,redisCommandTable为一个结构体数组。
结构体redisCommand如下:
typedefvoidredisCommandProc(client *c);
typedefint *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
structredisCommand {
char *name; //命令名称
redisCommandProc *proc; //函数定义
int arity; //参数的数目,可以使用-N来表示>=N。比如-3就是参数数量>=3
char *sflags;
/* 标记为字符串表示,每个标记一个字符。
w:写入命令(可以修改密钥空间)。
r:read命令(永远不会修改密钥空间)。
m:一经调用可能会增加内存使用量。如果内存不足,请不要允许。
a:管理命令,比如SAVE或SHUTDOWN。
p:发布/订阅相关命令。
f:强制复制此命令,而不考虑server.dirty
s:脚本中不允许使用命令。
R:随机命令。命令是不确定的,也就是说,同一个命令相同的参数,相同的键空间,可能有不同的结果。
例如SPOP和RANDOMKEY是两个随机命令。
S:Sort命令输出数组如果从脚本调用,那么是决定性的。
l:加载数据库时允许命令。
t:当从机有过时的数据但不允许服务器这个数据。通常这种情况下不接受任何命令
但只有几个。
M:不要在监视器上自动传播命令。
k:执行一个隐式的请求这个命令,命令将是如果插槽标记为“导入”,则在群集模式下接受。
F:快速命令:不应延迟的O(1)或O(log(N))命令它的执行只要内核调度程序给我们时间。
注意,可能触发DEL作为副作用的命令(如SET)不是快速命令。
*/
int flags; /* 从“sflags”字段获取的实际标志。*/
/* 使用函数确定命令行中的键参数。用于Redis集群重定向。从命令获取关键参数的可选函数。
只有当以下三个字段不足以指定哪些参数是键时,才使用此选项。*/
redisGetKeysProc *getkeys_proc;
/* 调用此命令时,应在后台加载哪些键?*/
int firstkey; /* 第一个参数是键(0=无键) */
int lastkey; /* 最后一个关键的论点 */
int keystep; /* 从第一个键到最后一个键之间的步骤 */
longlong microseconds, calls; //此命令的调用总数。
};
server.c 服务端入口文件main方法
intmain(int argc, char **argv) {
structtimevaltv;
int j;
...省略
initServerConfig(); //初始化服务配置
moduleInitModulesSystem();
...省略
/* 检查rdb和aof,检查是否需要在redis Check rdb/aof模式下启动*/
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
elseif (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
...省略
int background = server.daemonize && !server.supervised;
if (background) daemonize(); //后台运行
initServer(); //初始化
if (background || server.pidfile) createPidFile(); //创建pid文件
...省略
//事件处理部分
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return0;
}
ae.c 事件轮询
//事件模型加载
#ifdef HAVE_EVPORT
#include"ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include"ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include"ae_kqueue.c"
#else
#include"ae_select.c"
#endif
#endif
#endif
... 省略
voidaeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); //有事件时处理
}
}
redis会根据不同的系统匹配不同的事件模型。
networking.c readQueryFromClient方法
#define PROTO_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
voidreadQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);
readlen = PROTO_IOBUF_LEN; //读取字节为1024*16,刚好16k
/* 如果这是一个多体请求,并且我们正在处理一个足够大的块应答,
那么尝试最大化查询缓冲区包含SDS字符串代表该对象的概率,甚至
冒着需要更read(2)调用,这样函数processMultiBulkBuffer()
可以避免复制缓冲区来创建表示参数的Redis对象。*/
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen); //去读客户端发送命令行到c->querybuf中
..省略
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
/*处理缓冲区的时间。如果客户是主人我们需要计算应用
偏移前后的差值处理缓冲区,以了解复制流的大小实际应
用于主状态:这个数量,以及复制流的相应部分将传播到
子从服务器和复制积压工作。*/
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
db.c setKey方法
voidsetKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val); //添加字典
} else {
dbOverwrite(db,key,val); //覆盖已有字典数据
}
incrRefCount(val);
removeExpire(db,key);
signalModifiedKey(db,key);
}
三.编译redis
(1)修改redis源码目录下src/Makefile
#sudo vim src/Makefile
OPTIMIZATION?=-O0
修改OPTIMIZATION?=-O2为OPTIMIZATION?=-O0、修改该项的主要目的是为了防止编译优化.
(2)编译redis源码信息
#make
(3)修改redis.conf
#sudo vim redis.conf
daemonize yes
修改daemonize no为daemonize yes,修改该项主要目的是后台运行.
四.调试
(1)到源码目录下运行redis服务端。(调试前先操作三)
#src/redis-server redis.conf
#lldb -p 26619
(2)调试
先下一个断点 b readQueryFromClient
然后使用src/redis-cli输入set name a命令
跟踪到1648行
该地方其实就是把命令读入到client接口体到querybuf中。
继续往下走跟踪到 processCommand方法中
可以看到 lookupCommand,其实就是通过命令名称去获取到 redisCommand结构,然后通过这个结构就可以找到对应的proc参数的调用方法。
(3)set命令完整调度堆栈如下
(lldb) bt
* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 1.1
* frame #0: 0x00000001059dfe9c redis-server`setCommand(c=0x00007fb083800000) at t_string.c:98:11
frame #1: 0x00000001059b24c7 redis-server`call(c=0x00007fb083800000, flags=15) at server.c:2199:5
frame #2: 0x00000001059b326c redis-server`processCommand(c=0x00007fb083800000) at server.c:2479:9
frame #3: 0x00000001059c6bef redis-server`processInputBuffer(c=0x00007fb083800000) at networking.c:1330:17
frame #4: 0x00000001059c3233 redis-server`readQueryFromClient(el=0x00007fb081728d10, fd=7, privdata=0x00007fb083800000, mask=1) at networking.c:1420:9
frame #5: 0x00000001059a78fa redis-server`aeProcessEvents(eventLoop=0x00007fb081728d10, flags=11) at ae.c:421:17
frame #6: 0x00000001059a7fae redis-server`aeMain(eventLoop=0x00007fb081728d10) at ae.c:464:9
frame #7: 0x00000001059b72dc redis-server`main(argc=2, argv=0x00007ffeea25db08) at server.c:3844:5
frame #8: 0x00007fff60d203d5 libdyld.dylib`start + 1
frame #9: 0x00007fff60d203d5 libdyld.dylib`start + 1