一、redis 网络层
redis只有一个网络io,其他都是内存操作,所以在单线程下性能较高
对于所有连接的数据处理,redis 并发执行的;
对于单条连接的数据处理,redis 串行执行的;
每个连接,可以当作一个队列。对于一个连接而言,是串行执行的(A1A2A3),对于整体而言,是并发执行(比如:A1B1B2A2)
如果一定要按照A1A2A3执行,不受其他命令的影响(不想把B1、B2插入中间),就要把A1A2A3构成一个事务
并发 : 活跃队列的个数大于处理器的个数;
mysql中是以B+树为存储结构,redis(kv数据库)中整体是通过hashtable来组织的
二、redis pipeline
redis pipeline 是一个客户端提供的,而不是服务端提供的;
下图第三个是使用pipleline的方式,更为高效,但是不具备事务性
对于request操作,只是将数据写到fd对应的写缓冲区,时间非常快,真正耗时操作在读取
response;
因此在使用pipiline的时候,可以加上事务
三、redis事务
MULTI
开启事务,事务执行过程中,单个命令是入队列操作,直到调用EXEC
才会一起执行;
MULTI
在redis中通过multi
开启事务
相当于mysql中的 begin/start transaction
EXEC
在redis中通过exec
来提交事务
相当于mysql中的 commit
redis中通过multi和exec来界定事务
DISCARD
redis中通过discard
来取消事务
相当于mysql中的rollback
WATCH
检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现
(cas);
若被取消则事务返回 nil
用来检测事务中,key的变化
应用
事务实现 zpop
WATCH zset element = ZRANGE zset 0 0 MULTI ZREM zset element EXEC
事务实现 加倍操作
WATCH score:10001 val = GET score:10001 MULTI SET score:10001 val*2 EXEC
实践
例子一:
multi表示开启事务
(TX)表示transaction事务中的意思
输出QUEUD表示,已经将命令加入队列,由于在事务中,并未执行
然后exec会一起执行
例子二:
在执行到一半的时候,另一个连接发送命令给redisset key mark
,这时候下面这个事务(要提前开启watch指定的key)会检测到key的变化,从而输出nil,也就是说事务被取消了
在实际应用时,不会使用这种方式。如果一个事务经常被取消掉,那么使用的必要性很低。
四、lua 脚本
lua 脚本实现原子性;
redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当
某个脚本正在执行的时候,不会有其他命令或者脚本被执行;
lua 脚本当中的命令会直接修改数据状态;
注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;
lua脚本只是一个命令,redis是单线程,lua脚本是单独运行的,其他命令不会影响他,具有原子性
通过对 xx.lua,生成一个40位的哈希(sha1),后需要使用这个脚本的时候,直接使用哈希值,就可以指向这个lua脚本,从而减少网络传输
# 从文件中读取 lua脚本内容 cat test1.lua | redis-cli script load --pipe # 加载 lua脚本字符串 生成 sha1 > script load 'local val = KEYS[1]; return val' "b8059ba43af6ffe8bed3db65bac35d452f8115d8" # 检查脚本缓存中,是否有该 sha1 散列值的lua脚本 > script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8" 1) (integer) 1 # 清除所有脚本缓存 > script flush OK # 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本 > script kill (error) NOTBUSY No scripts in execution right now.
EVAL
# 测试使用 EVAL script numkeys key [key ...] arg [arg ...]
EVALSHA
# 线上使用 EVALSHA sha1 numkeys key [key ...] arg [arg ...]
应用
1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load) ,(会生成一系列哈希值 key-value对,key是哈希值,value就是lua脚本);
2: 项目中若需要热更新,通过redis-cli script flush(相当于把哈希映射lua脚本的映射关系全部清空);然后可以通过订阅发布功能通知所有服
务器重新加载lua脚本;
3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;
lua脚本使用案例
上传lua脚本,然后使用得到的哈希值res,调用脚本命令
lua脚本与mysql存储过程区别:mysql存储过程不具备事务性(除非手动加事务)(存储过程只有一个作用,减少网络传输),也不具备原子性
五、ACID特性分析
A
原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis
不支持回滚;即使事务队列中的某个命令在执行期间出现了错误(比如对string类型的key进行zadd),整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。(可以通过lua脚本来实现原子性,也可以编写脚本来实现回滚)C
一致性;事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一
致性而不是异常后的一致性;所以redis也不满足;这个争议很大:redis 能确保事务执行前后的数
据的完整约束;但是并不满足传统意义上的一致性;比如转账功能,一个扣钱一个加钱;可能出现
扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;I
隔离性;事务的操作不被其他用户操作所打断;redis 是单线程执行,天然具备隔离性;D
持久性;redis只有在 aof (通过命令协议刷到磁盘中)持久化策略的时候,并且需要在 redis.conf 中
appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;
六、redis 发布订阅
为了支持消息的多播机制,redis 引入了发布订阅模块;
消息不一定可达。可以使用分布式消息队列 或者 stream的方式 确保一定可达
# 订阅频道 subscribe 频道 # 订阅模式频道 psubscribe 频道 # 取消订阅频道 unsubscribe 频道 # 取消订阅模式频道 punsubscribe 频道 # 发布具体频道或模式频道的内容 publish 频道 内容 # 客户端收到具体频道内容 message 具体频道 内容 # 客户端收到模式频道内容 pmessage 模式频道 具体频道 内容
例子:
订阅频道,如果有人在该频道里发布消息,就能收到
只要是news.开头的频道都接受,这就是订阅模式频道
应用
发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而
pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接 用于处理发布订阅;
缺点
发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费
者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到
消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失
了;
另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;
应用
subscribe news.it news.showbiz news.car psubscribe news.* publish new.showbiz 'king kiss darren'
七、redis异步连接
redis协议图
协议实现的第一步需要知道如何界定数据包:
- 长度 + 二进制流
- 二进制流 + 特殊分隔符
异步连接
同步连接方案采用阻塞io来实现;优点是代码书写是同步的,业务逻辑没有割裂;缺点是阻塞当前
线程,直至redis返回结果;通常用多个线程来实现线程池来解决效率问题;
异步连接方案采用非阻塞io来实现;优点是没有阻塞当前线程,redis 没有返回,依然可以往redis
发送命令;缺点是代码书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决
(openresty,skynet);配合redis6.0以后的io多线程(前提是有大量并发请求),异步连接
池,能更好解决应用层的数据访问性能;
redis6.0 io多线程
edis6.0版本后添加的 io多线程主要解决redis协议的压缩以及解压缩的耗时问题;一般项目中不
需要开启;如果有大量并发请求,且返回数据包一般比较大的场景才有它的用武之地;
原理
int n = read(fd, buff, size);// redis io-threads msg = decode(buff, size); data = do_command(msg); bin = encode(data, sz);// io-threads send(fd, bin, sz1);
开启
# 在 redis.conf 中 # if you have a four cores boxes, try to use 2 or 3 I/O threads, if you have a 8 cores, try to use 6 threads. io-threads 4 # 默认只开启 encode 也就是redis发送给客户端的协议压缩工作;也可开启io-threads-do-reads yes来实现 decode; # 一般发送给redis的命令数据包都比较少,所以不需要开启 decode 功能; # io-threads-do-reads no
例子一:使用hiredis同步连接
int current_tick() { int t = 0; struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); t = (int)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; return t; } int main(int argc, char **argv) { unsigned int j, isunix = 0; redisContext *c; redisReply *reply; const char *hostname = "127.0.0.1"; int port = 6379; struct timeval timeout = { 1, 500000 }; // 1.5 seconds c = redisConnectWithTimeout(hostname, port, timeout);//连接redis if (c == NULL || c->err) { if (c) { printf("Connection error: %s\n", c->errstr); redisFree(c); } else { printf("Connection error: can't allocate redis context\n"); } exit(1); } int num = (argc > 1) ? atoi(argv[1]) : 1000; int before = current_tick(); // redisCommand(c,"set counter 0"); for (int i=0; i<num; i++) { reply = redisCommand(c,"INCR counter");//发送命令,阻塞等待结果 printf("INCR counter: %lld\n", reply->integer); freeReplyObject(reply);//释放reply } int used = current_tick()-before; printf("after %d exec redis command, used %d ms\n", num, used); /* Disconnects and frees the context */ redisFree(c); return 0; }
例子二:使用hiredis异步连接
将自定义的reactor设置到redisAsyncContext中,
hiredis进行触发添加\删除事件,内部将 回调函数设置,添加epoll由reactor实现
int main(int argc, char **argv) { redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);//向redis服务器发送连接请求 if (c->err) { /* Let *c leak for now... */ printf("Error: %s\n", c->errstr); return 1; } R = create_reactor();//创建一个 reactor redisAttach(R, c);//把io检测流程 添加到 redisAsyncContext *c中 redisAsyncSetConnectCallback(c, connectCallback);//设置连接成功redis的回调 redisAsyncSetDisconnectCallback(c, disconnectCallback);//设置断开连接redis的回调 before = current_tick(); num = (argc > 1) ? atoi(argv[1]) : 1000; for (int i = 0; i < num; i++) { redisAsyncCommand(c, getCallback, "count", "INCR counter");//发送redis命令,并设置回调函数 } eventloop(R); release_reactor(R); return 0; }
第一部分
redisAttach(R, c)
将reactor设置到redisAsyncContext中
这是添加事件、删除事件,等操作
以redisAddRead
为例,是设置回调函数 和 添加到epoll中
redisReadHandler
中的redisAsyncHandleRead(re->ctx)
是执行回调函数
第二部分
设置connect
连接回调函数,会触发ac->ev.addWrite
事件,从而对连接成功进行io监测
连接成功后会触发ac->ev.addRead
,对读事件进行检测
在loop中有读事件检测到,就会触发读的回调函数