Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

本文涉及的产品
RDS Agent(兼容OpenClaw),2核4GB
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

一、redis 网络层

redis只有一个网络io,其他都是内存操作,所以在单线程下性能较高

对于所有连接的数据处理,redis 并发执行的;

对于单条连接的数据处理,redis 串行执行的;

每个连接,可以当作一个队列。对于一个连接而言,是串行执行的(A1A2A3),对于整体而言,是并发执行(比如:A1B1B2A2)

如果一定要按照A1A2A3执行,不受其他命令的影响(不想把B1、B2插入中间),就要把A1A2A3构成一个事务

并发 : 活跃队列的个数大于处理器的个数;

mysql中是以B+树为存储结构,redis(kv数据库)中整体是通过hashtable来组织的

二、redis pipeline

redis pipeline 是一个客户端提供的,而不是服务端提供的;

下图第三个是使用pipleline的方式,更为高效,但是不具备事务性

对于request操作,只是将数据写到fd对应的写缓冲区,时间非常快,真正耗时操作在读取

response;

因此在使用pipiline的时候,可以加上事务

三、redis事务

MULTI 开启事务,事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行;

MULTI

在redis中通过multi开启事务

相当于mysql中的 begin/start transaction

EXEC

在redis中通过exec来提交事务

相当于mysql中的 commit

redis中通过multi和exec来界定事务

DISCARD

redis中通过discard来取消事务

相当于mysql中的rollback

WATCH

检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现

(cas);

若被取消则事务返回 nil

用来检测事务中,key的变化

应用

事务实现 zpop

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

事务实现 加倍操作

WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC

实践

例子一:

multi表示开启事务

(TX)表示transaction事务中的意思

输出QUEUD表示,已经将命令加入队列,由于在事务中,并未执行

然后exec会一起执行

例子二:

在执行到一半的时候,另一个连接发送命令给redisset key mark,这时候下面这个事务(要提前开启watch指定的key)会检测到key的变化,从而输出nil,也就是说事务被取消了

在实际应用时,不会使用这种方式。如果一个事务经常被取消掉,那么使用的必要性很低。

四、lua 脚本

lua 脚本实现原子性;

redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当

某个脚本正在执行的时候,不会有其他命令或者脚本被执行;

lua 脚本当中的命令会直接修改数据状态;

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;

lua脚本只是一个命令,redis是单线程,lua脚本是单独运行的,其他命令不会影响他,具有原子性

通过对 xx.lua,生成一个40位的哈希(sha1),后需要使用这个脚本的时候,直接使用哈希值,就可以指向这个lua脚本,从而减少网络传输

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.

EVAL

# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]

EVALSHA

# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

应用

1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load) ,(会生成一系列哈希值 key-value对,key是哈希值,value就是lua脚本);

2: 项目中若需要热更新,通过redis-cli script flush(相当于把哈希映射lua脚本的映射关系全部清空);然后可以通过订阅发布功能通知所有服

务器重新加载lua脚本;

3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;

lua脚本使用案例

上传lua脚本,然后使用得到的哈希值res,调用脚本命令

lua脚本与mysql存储过程区别:mysql存储过程不具备事务性(除非手动加事务)(存储过程只有一个作用,减少网络传输),也不具备原子性

五、ACID特性分析

  • A 原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis
    不支持回滚;即使事务队列中的某个命令在执行期间出现了错误(比如对string类型的key进行zadd),整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。(可以通过lua脚本来实现原子性,也可以编写脚本来实现回滚)
  • C 一致性;事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一
    致性而不是异常后的一致性;所以redis也不满足;这个争议很大:redis 能确保事务执行前后的数
    据的完整约束;但是并不满足传统意义上的一致性;比如转账功能,一个扣钱一个加钱;可能出现
    扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;
  • I 隔离性;事务的操作不被其他用户操作所打断;redis 是单线程执行,天然具备隔离性;
  • D 持久性;redis只有在 aof (通过命令协议刷到磁盘中)持久化策略的时候,并且需要在 redis.conf 中
    appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;

六、redis 发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块;

消息不一定可达。可以使用分布式消息队列 或者 stream的方式 确保一定可达

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

例子:

订阅频道,如果有人在该频道里发布消息,就能收到

只要是news.开头的频道都接受,这就是订阅模式频道

应用

发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而

pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接 用于处理发布订阅;

缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费

者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到

消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失

了;

另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;

应用

subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'king kiss darren'

七、redis异步连接

redis协议图

协议实现的第一步需要知道如何界定数据包:

  1. 长度 + 二进制流
  2. 二进制流 + 特殊分隔符

异步连接

同步连接方案采用阻塞io来实现;优点是代码书写是同步的,业务逻辑没有割裂;缺点是阻塞当前

线程,直至redis返回结果;通常用多个线程来实现线程池来解决效率问题;

异步连接方案采用非阻塞io来实现;优点是没有阻塞当前线程,redis 没有返回,依然可以往redis

发送命令;缺点是代码书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决

(openresty,skynet);配合redis6.0以后的io多线程(前提是有大量并发请求),异步连接

池,能更好解决应用层的数据访问性能;

redis6.0 io多线程

edis6.0版本后添加的 io多线程主要解决redis协议的压缩以及解压缩的耗时问题;一般项目中不

需要开启;如果有大量并发请求,且返回数据包一般比较大的场景才有它的用武之地;

原理

int n = read(fd, buff, size);// redis io-threads
msg = decode(buff, size);
data = do_command(msg);
bin = encode(data, sz);// io-threads
send(fd, bin, sz1);

开启

# 在 redis.conf 中
# if you have a four cores boxes, try to use 2 or 3 I/O threads, if you have
a 8 cores, try to use 6 threads.
io-threads 4
# 默认只开启 encode 也就是redis发送给客户端的协议压缩工作;也可开启io-threads-do-reads
yes来实现 decode;
# 一般发送给redis的命令数据包都比较少,所以不需要开启 decode 功能;
# io-threads-do-reads no

例子一:使用hiredis同步连接

int current_tick() {
    int t = 0;
    struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (int)ti.tv_sec * 1000;
  t += ti.tv_nsec / 1000000;
    return t;
}
int main(int argc, char **argv) {
    unsigned int j, isunix = 0;
    redisContext *c;
    redisReply *reply;
    const char *hostname = "127.0.0.1";
    int port = 6379;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    c = redisConnectWithTimeout(hostname, port, timeout);//连接redis
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
    }
    int num = (argc > 1) ? atoi(argv[1]) : 1000;
    int before = current_tick();
    // redisCommand(c,"set counter 0");
    for (int i=0; i<num; i++) {
        reply = redisCommand(c,"INCR counter");//发送命令,阻塞等待结果
        printf("INCR counter: %lld\n", reply->integer);
        freeReplyObject(reply);//释放reply
    }
    int used = current_tick()-before;
    printf("after %d exec redis command, used %d ms\n", num, used);
    /* Disconnects and frees the context */
    redisFree(c);
    return 0;
}

例子二:使用hiredis异步连接

将自定义的reactor设置到redisAsyncContext中,

hiredis进行触发添加\删除事件,内部将 回调函数设置,添加epoll由reactor实现

int main(int argc, char **argv) {
    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);//向redis服务器发送连接请求
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }
    R = create_reactor();//创建一个 reactor
    redisAttach(R, c);//把io检测流程 添加到 redisAsyncContext *c中
    redisAsyncSetConnectCallback(c, connectCallback);//设置连接成功redis的回调
    redisAsyncSetDisconnectCallback(c, disconnectCallback);//设置断开连接redis的回调
    before = current_tick();
    num = (argc > 1) ? atoi(argv[1]) : 1000;
    for (int i = 0; i < num; i++) {
        redisAsyncCommand(c, getCallback, "count", "INCR counter");//发送redis命令,并设置回调函数
    }
    eventloop(R);
    release_reactor(R);
    return 0;
}

第一部分

redisAttach(R, c)将reactor设置到redisAsyncContext中

这是添加事件、删除事件,等操作

redisAddRead为例,是设置回调函数 和 添加到epoll中

redisReadHandler中的redisAsyncHandleRead(re->ctx)是执行回调函数

第二部分

设置connect连接回调函数,会触发ac->ev.addWrite事件,从而对连接成功进行io监测

连接成功后会触发ac->ev.addRead,对读事件进行检测

在loop中有读事件检测到,就会触发读的回调函数

相关文章
|
编解码 异构计算
RT-DETR改进策略【Neck】| BiFPN:双向特征金字塔网络-跨尺度连接和加权特征融合
RT-DETR改进策略【Neck】| BiFPN:双向特征金字塔网络-跨尺度连接和加权特征融合
1251 10
RT-DETR改进策略【Neck】| BiFPN:双向特征金字塔网络-跨尺度连接和加权特征融合
|
9月前
|
Windows
电脑显示有问题,电脑连接不上网络,电脑没声音,电脑链接不上打印机?驱动人生就能解决所有问题
电脑显示有问题,电脑连接不上网络,电脑没声音,电脑链接不上打印机?驱动人生就能解决所有问题
186 0
|
计算机视觉
RT-DETR改进策略【Neck】| GFPN 超越BiFPN 通过跳层连接和跨尺度连接改进RT-DETR颈部网络
RT-DETR改进策略【Neck】| GFPN 超越BiFPN 通过跳层连接和跨尺度连接改进RT-DETR颈部网络
574 12
RT-DETR改进策略【Neck】| GFPN 超越BiFPN 通过跳层连接和跨尺度连接改进RT-DETR颈部网络
|
缓存 NoSQL 搜索推荐
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
788 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
|
计算机视觉
YOLOv11改进策略【Neck】| GFPN 超越BiFPN 通过跳层连接和跨尺度连接改进v11颈部网络
YOLOv11改进策略【Neck】| GFPN 超越BiFPN 通过跳层连接和跨尺度连接改进v11颈部网络
2936 10
YOLOv11改进策略【Neck】| GFPN 超越BiFPN 通过跳层连接和跨尺度连接改进v11颈部网络
|
编解码 异构计算
YOLOv11改进策略【Neck】| BiFPN:双向特征金字塔网络-跨尺度连接和加权特征融合
YOLOv11改进策略【Neck】| BiFPN:双向特征金字塔网络-跨尺度连接和加权特征融合
4312 7
YOLOv11改进策略【Neck】| BiFPN:双向特征金字塔网络-跨尺度连接和加权特征融合
|
缓存 NoSQL 测试技术
Redis压测脚本及持久化机制
Redis压测脚本及持久化机制简介: Redis性能压测通过`redis-benchmark`工具进行,可评估读写性能。持久化机制包括无持久化、RDB(定期快照)和AOF(操作日志),以及两者的结合。RDB适合快速备份与恢复,但可能丢失数据;AOF更安全,记录每次写操作,适合高数据安全性需求。两者结合能兼顾性能与安全性,建议同时开启并定期备份RDB文件以确保数据安全。
263 9
|
安全 网络协议 网络安全
当虚拟机出现网络连接问题时,应该先检查Hyper-V的网卡连接配置
当虚拟机出现网络连接问题时,应首先检查Hyper-V的网卡配置。具体步骤包括:确认虚拟机运行状态、检查虚拟交换机类型和物理网卡连接、确保虚拟机网络适配器正确连接到虚拟交换机,并验证网络配置(IP地址等)。常见问题如虚拟交换机配置错误、网络适配器未连接或防火墙阻止连接,可通过重新配置或调整设置解决。必要时重启虚拟机和宿主机,查看事件日志或联系技术支持以进一步排查问题。
|
SQL 安全 网络安全
网络安全与信息安全:知识分享####
【10月更文挑战第21天】 随着数字化时代的快速发展,网络安全和信息安全已成为个人和企业不可忽视的关键问题。本文将探讨网络安全漏洞、加密技术以及安全意识的重要性,并提供一些实用的建议,帮助读者提高自身的网络安全防护能力。 ####
437 17