redis网络层
这里我们只讨论宏观的、直接的,即忽略其他流程,只关注数据包处理流程。
对于redis连接来说,哪一条连接先构成一个完整的数据包,哪一条连接就会先得到redis的处理
1、一个数据包可能由多个读事件才能组装成(因为一次可能不能读到一个完整的数据包)
2、管道就是连接
3、人推车相当于网络线程(redis中网络线程就一个)
redis pipeline
根据上面的理解,redis中的pipeline模式其实也会是先请求的先返回。
Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。
Hiredis 提供redisCommand()
函数来向 Redis 服务端发送命令,redisCommand()
函数的原型如下:
void *redisCommand(redisContext *c, const char *format, ...);
redisCommand()执行后,返回一个redisReply *指针,指向redisReply结构体,该结构体包含了返回的结果信息。
redisCommand()函数是阻塞的(是指使用阻塞版的redisContext对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。
redisCommand()函数的使用示例如下所示:
redisReply *reply; reply = redisCommand(conn, "SET %s %s", "foo", "bar"); freeReplyObject(reply); reply = redisCommand(conn, "GET %s", "foo"); printf("%s\n", reply->str); freeReplyObject(reply);
如果我们需要向 Redis 服务端发送多次命令,如果都是使用redisCommand()
函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了redisAppendCommand()
函数来实现流水线的命令发送方案。
int redisAppendCommand(redisContext *c, const char *format, ...);
redisAppendCommand()
函数执行成功时返回REDIS_OK
,失败时返回REDIS_ERR
。
#define REDIS_ERR -1 #define REDIS_OK 0
跟redisCommand()函数一样,redisAppendCommand()函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以redisAppendCommand()函数为例说明。
redisAppendCommand()函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到redisContext对象中。那么,redisContext对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了redisGetReply()函数来将缓存的命令发送出去的功能。redisGetReply()函数的处理过程如下:
- 查看结果缓冲区是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
- 将命令缓冲区的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回
上面我们提到的redisCommand()函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用redisAppendCommand()函数,然后再调用redisGetReply()函数实现的。
说到这里,hiredis 实现流水线的过程就很清晰了。无论redisCommand()函数还是redisAppendCommand()函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand()函数会马上发送命令然后取得返回结果,而redisAppendCommand()函数则在调用redisGetReply()函数才将所有命令一次性发送,并取得第一个命令的返回结果。
下面是使用redisAppendCommand()
函数实现流水线方案的示例。
redisReply *reply; redisAppendCommand(context,"SET foo bar"); redisAppendCommand(context,"GET foo"); redisGetReply(context,&reply); // SET命令的返回 freeReplyObject(reply); redisGetReply(context,&reply); // GET命令的返回 freeReplyObject(reply);
值得注意的是,调用redisAppendCommand()
函数的次数需要与调用redisGetReply()
的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。
// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况 redisAppendCommand(conn, "get t"); // 本来想取得 set a ddd 的返回,却获取了 get t 的返回 reply = redisCommand(conn, "set a ddd"); printf("set a res: %s\n", reply->str);
输出的结果将会是get t
命令的返回,而不是set a ddd
命令的返回。
附:示例程序 redis-pipeline.c
编译:
gcc -o redis-pipeline redis-pipeline.c -L/usr/local/lib -lhiredis
输出:
bar res: OK res: b watch res: OK res: OK, num: 0, type: 5 res: QUEUED, num: 0, type: 5 res: QUEUED, num: 0, type: 5 res: QUEUED, num: 0, type: 5 res: (null), num: 3, type: 2 set a res: tt
源程序:
#include <stdio.h> #include <hiredis/hiredis.h> int main() { // 阻塞 redisContext redisContext *conn = redisConnect("127.0.0.1", 6379); if (conn != NULL && conn->err) { printf("connection error: %s\n", conn->errstr); return 0; } // 使用 redisCommand 发送命令并获取返回 redisReply *reply; reply = redisCommand(conn, "SET %s %s", "foo", "bar"); freeReplyObject(reply); reply = redisCommand(conn, "GET %s", "foo"); printf("%s\n", reply->str); freeReplyObject(reply); // 使用 redisAppendCommand 实现流水线 redisAppendCommand(conn, "set a b"); redisAppendCommand(conn,"get a"); int r = redisGetReply(conn, (void **)&reply); if (r == REDIS_ERR) { printf("ERROR\n"); } printf("res: %s\n", reply->str); freeReplyObject(reply); r = redisGetReply(conn, (void **)&reply); if (r == REDIS_ERR) { printf("ERROR\n"); } printf("res: %s\n", reply->str); freeReplyObject(reply); // 使用 watch 命令监控键 a reply = redisCommand(conn, "watch a"); printf("watch res: %s\n", reply->str); freeReplyObject(reply); // 事务流水线,总共5个命令 redisAppendCommand(conn, "multi"); redisAppendCommand(conn, "get foo"); redisAppendCommand(conn, "set t tt"); redisAppendCommand(conn, "set a aa"); redisAppendCommand(conn, "exec"); for (int i = 0; i < 5; ++i) { r = redisGetReply(conn, (void **)&reply); if (r == REDIS_ERR) { printf("ERROR\n"); } //reply->elements返回元素个数,reply->element中存储redis返回的具体元素,可能有多个 printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type); freeReplyObject(reply); } // 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况 redisAppendCommand(conn, "get t"); // 本来想取得 set a ddd 的返回,却获取了 get t 的返回 reply = redisCommand(conn, "set a ddd"); printf("set a res: %s\n", reply->str); redisFree(conn); return 0; }
redis事务
事务:用户定义一系列数据库操作,这些操作视为一个完整的 逻辑处理工作单元,要么全部执行,要么全部不执行,是不可 分割的工作单元。
在redis中,当我们开启multi的时候,会构建一个队列,multi后面的命令会放到队列中,当接收到exec命令时,会将队列中的命令取出来执行,目的是不让其他连接的命令在这些命令之间执行,redis采用这种方式来处理原子性的问题的。
MULTI
在redis中通过multi开启事务
相当于mysql中的 begin/start transaction
EXEC
在redis中通过exec来提交事务
相当于mysql中的 commit
redis中通过multi和exec来界定事务
DISCARD
redis中通过discard来取消事务
相当于mysql中的rollback
WATCH
检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现
(cas);
若被取消则事务返回 nil
用来检测事务中,key的变化
应用
注意:
实际项目中不会使用multi这种方式,因为watch这种方式可能会取消事务不执行(比如其他连接在我们之前操作了相同的key),这就需要重试(即乐观锁实现可能会需要重试,增加业务逻辑的复杂度),这在我们写业务逻辑的时候是非常不方便的;实际项目中使用更多的是lua脚本,multi这种方式可能会在面试当中问到
lua脚本
lua 脚本实现原子性,减少网络传输;
pipeline的效果 lua多个语句
redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的(因为lua脚本方式中可以写多个命令然后作为一个数据包传给redis,不可能其他语句在它中间去执行,因为redis它是单线程的);可以在里边做一些逻辑运算
lua 脚本当中的命令会直接修改数据状态;
注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;
通过一个命令来调用lua脚本
我们来看看server与redis-server的交互
1、调用script load xx.lua
2、通过hash操作生成40位的字符串,可以减少网络的传输
# 从文件中读取 lua脚本内容 cat test1.lua | redis-cli script load --pipe # 加载 lua脚本字符串 生成 sha1 > script load 'local val = KEYS[1]; return val' "b8059ba43af6ffe8bed3db65bac35d452f8115d8" # 检查脚本缓存中,是否有该 sha1 散列值的lua脚本 > script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8" 1) (integer) 1 # 清除所有脚本缓存 > script flush OK # 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本 > script kill (error) NOTBUSY No scripts in execution right now.
EVAL
测试使用这种方式
EVAL script numkeys key [key …] arg [arg …]
EVALSHA
线上使用这种方式
EVALSHA sha1 numkeys key [key …] arg [arg …]
应用
1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load) ,(会生成一系列哈希值 key-value对,key是哈希值,value就是lua脚本);
2: 项目中若需要热更新,通过redis-cli script flush(相当于把哈希映射lua脚本的映射关系全部清空);然后可以通过订阅发布功能通知所有服
务器重新加载lua脚本;
3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;
示例:
我们来运行一下,我们会拿res这个hash值作为evalsha的参数,并且告诉有几个参数,值是什么
先把脚本传到服务器当中,生成我们的hash值,根据hash值去执行这个lua脚本命令。
lua脚本与mysql存储过程区别:mysql存储过程不具备事务性(除非手动加事务)(存储过程只有一个作用,减少网络传输),也不具备原子性
ACID特性分析
A 原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis
不支持回滚;即使事务队列中的某个命令在执行期间出现了错误(比如对string类型的key进行zadd),整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。(可以通过lua脚本来实现原子性,也可以编写脚本来实现回滚)
C 一致性;事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一
致性而不是异常后的一致性;所以redis也不满足;这个争议很大:redis 能确保事务执行前后的数
据的完整约束;但是并不满足传统意义上的一致性;比如转账功能,一个扣钱一个加钱;可能出现
扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;
I 隔离性;事务的操作不被其他用户操作所打断;redis 是单线程执行,天然具备隔离性;
D 持久性;redis只有在 aof (通过命令协议刷到磁盘中)持久化策略的时候,并且需要在 redis.conf 中appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;
redis发布订阅
为了支持消息的多播机制,redis 引入了发布订阅模块;
消息不一定可达。可以使用分布式消息队列 或者 stream的方式 确保一定可达
# 订阅频道 subscribe 频道 # 订阅模式频道 psubscribe 频道 # 取消订阅频道 unsubscribe 频道 # 取消订阅模式频道 punsubscribe 频道 # 发布具体频道或模式频道的内容 publish 频道 内容 # 客户端收到具体频道内容 message 具体频道 内容 # 客户端收到模式频道内容 pmessage 模式频道 具体频道 内容
例子
订阅频道,如果有人在该频道里发布消息,就能收到
订阅模式频道,只要是news.开头的频道都接受
应用
发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而
pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接 用于处理发布订阅;
缺点
发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费
者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到
消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失
了;
另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;
使用hiredis同步连接
int current_tick() { int t = 0; struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); t = (int)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; return t; } int main(int argc, char **argv) { unsigned int j, isunix = 0; redisContext *c; redisReply *reply; const char *hostname = "127.0.0.1"; int port = 6379; struct timeval timeout = { 1, 500000 }; // 1.5 seconds c = redisConnectWithTimeout(hostname, port, timeout);//连接redis if (c == NULL || c->err) { if (c) { printf("Connection error: %s\n", c->errstr); redisFree(c); } else { printf("Connection error: can't allocate redis context\n"); } exit(1); } int num = (argc > 1) ? atoi(argv[1]) : 1000; int before = current_tick(); // redisCommand(c,"set counter 0"); for (int i=0; i<num; i++) { reply = redisCommand(c,"INCR counter");//发送命令,阻塞等待结果 printf("INCR counter: %lld\n", reply->integer); freeReplyObject(reply);//释放reply } int used = current_tick()-before; printf("after %d exec redis command, used %d ms\n", num, used); /* Disconnects and frees the context */ redisFree(c); return 0; }
hiredis异步方式实现
redis协议图
协议实现的第一步需要知道如何界定数据包
- 长度+二进制流
- 二进制流+特殊分隔符
hiredis是实现redis与服务器连接的协议实现,其中涉及到同步实现和部分异步实现,在异步实现中,hiredis只是提供了事件操作的接口(因为不同的平台、不同的网络库对事件操作的接口不一致),所以要想异步操作redis,需要我们自己适配这些事件操作的接口,相当于我们需要实现一个redis的驱动:
因为后端大部分框架都采用reactor方式,所以把redis连接融合到reactor中进行管理,将自定义的reactor设置到redisAsyncContext中,hiredis进行触发添加\删除事件,内部将 回调函数设置,添加epoll由reactor实现
int main(int argc, char **argv) { redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);//向redis服务器发送连接请求 if (c->err) { /* Let *c leak for now... */ printf("Error: %s\n", c->errstr); return 1; } R = create_reactor();//创建一个 reactor redisAttach(R, c);//把io检测流程 添加到 redisAsyncContext *c中 redisAsyncSetConnectCallback(c, connectCallback);//设置连接成功redis的回调 redisAsyncSetDisconnectCallback(c, disconnectCallback);//设置断开连接redis的回调 eventloop(R); release_reactor(R); return 0; }
第一部分
redisAttach(R, c)
将reactor设置到redisAsyncContext中
这是添加事件、删除事件,等操作
以redisAddWrite为例,是设置回调函数和添加事件到reactor中
redisWriteHandler中的redisAsyncHandleWrite(re->ctx)是执行回调函数
第二部分
设置connect
连接回调函数,会调用ac->ev.addWrite
(即调用上面设置的redisAddWrite函数,即添加写回调以及往reactor中注册写事件)
连接成功之后会触发可写事件,即成功之后会回调上面设置的redisWriteHandler写回调,最终在redisWriteHandler的redisAsyncHandleWrite的内部会调用redisAsyncSetConnectCallback设置的connectCallback函数。