redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS SQL Server,独享型 2核4GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

redis网络层

这里我们只讨论宏观的、直接的,即忽略其他流程,只关注数据包处理流程。

对于redis连接来说,哪一条连接先构成一个完整的数据包,哪一条连接就会先得到redis的处理

1、一个数据包可能由多个读事件才能组装成(因为一次可能不能读到一个完整的数据包)

2、管道就是连接

3、人推车相当于网络线程(redis中网络线程就一个)

redis pipeline

根据上面的理解,redis中的pipeline模式其实也会是先请求的先返回。

Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。

Hiredis 提供redisCommand()函数来向 Redis 服务端发送命令,redisCommand()函数的原型如下:

void *redisCommand(redisContext *c, const char *format, ...);

redisCommand()执行后,返回一个redisReply *指针,指向redisReply结构体,该结构体包含了返回的结果信息。

redisCommand()函数是阻塞的(是指使用阻塞版的redisContext对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。

redisCommand()函数的使用示例如下所示:

redisReply *reply;
reply = redisCommand(conn, "SET %s %s", "foo", "bar");
freeReplyObject(reply);
reply = redisCommand(conn, "GET %s", "foo");
printf("%s\n", reply->str);
freeReplyObject(reply);

如果我们需要向 Redis 服务端发送多次命令,如果都是使用redisCommand()函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了redisAppendCommand()函数来实现流水线的命令发送方案。

int redisAppendCommand(redisContext *c, const char *format, ...);

redisAppendCommand()函数执行成功时返回REDIS_OK,失败时返回REDIS_ERR

#define REDIS_ERR -1
#define REDIS_OK 0

跟redisCommand()函数一样,redisAppendCommand()函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以redisAppendCommand()函数为例说明。

redisAppendCommand()函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到redisContext对象中。那么,redisContext对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了redisGetReply()函数来将缓存的命令发送出去的功能。redisGetReply()函数的处理过程如下:

  1. 查看结果缓冲区是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
  2. 命令缓冲区的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回

上面我们提到的redisCommand()函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用redisAppendCommand()函数,然后再调用redisGetReply()函数实现的。

说到这里,hiredis 实现流水线的过程就很清晰了。无论redisCommand()函数还是redisAppendCommand()函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand()函数会马上发送命令然后取得返回结果,而redisAppendCommand()函数则在调用redisGetReply()函数才将所有命令一次性发送,并取得第一个命令的返回结果。

下面是使用redisAppendCommand()函数实现流水线方案的示例。

redisReply *reply;
redisAppendCommand(context,"SET foo bar");
redisAppendCommand(context,"GET foo");
redisGetReply(context,&reply); // SET命令的返回
freeReplyObject(reply);
redisGetReply(context,&reply); // GET命令的返回
freeReplyObject(reply);

值得注意的是,调用redisAppendCommand()函数的次数需要与调用redisGetReply()的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。

// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
redisAppendCommand(conn, "get t");
// 本来想取得 set a ddd 的返回,却获取了 get t 的返回
reply = redisCommand(conn, "set a ddd");
printf("set a res: %s\n", reply->str);

输出的结果将会是get t命令的返回,而不是set a ddd命令的返回。

附:示例程序 redis-pipeline.c

编译:

gcc -o redis-pipeline redis-pipeline.c -L/usr/local/lib -lhiredis

输出:

bar
res: OK
res: b
watch res: OK
res: OK, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: (null), num: 3, type: 2
set a res: tt

源程序:

#include <stdio.h>
#include <hiredis/hiredis.h>
int main() {
    // 阻塞 redisContext
    redisContext *conn = redisConnect("127.0.0.1", 6379);
    if (conn != NULL && conn->err) {
        printf("connection error: %s\n", conn->errstr);
        return 0;
    }
    // 使用 redisCommand 发送命令并获取返回
    redisReply *reply;
    reply = redisCommand(conn, "SET %s %s", "foo", "bar");
    freeReplyObject(reply);
    reply = redisCommand(conn, "GET %s", "foo");
    printf("%s\n", reply->str);
    freeReplyObject(reply);
    // 使用 redisAppendCommand 实现流水线
    redisAppendCommand(conn, "set a b");
    redisAppendCommand(conn,"get a");
    int r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);
    r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);
    // 使用 watch 命令监控键 a
    reply = redisCommand(conn, "watch a");
    printf("watch res: %s\n", reply->str);
    freeReplyObject(reply);
    // 事务流水线,总共5个命令
    redisAppendCommand(conn, "multi");
    redisAppendCommand(conn, "get foo");
    redisAppendCommand(conn, "set t tt");
    redisAppendCommand(conn, "set a aa");
    redisAppendCommand(conn, "exec");
    for (int i = 0; i < 5; ++i) {
        r = redisGetReply(conn, (void **)&reply);
        if (r == REDIS_ERR) {
            printf("ERROR\n");
        }
        //reply->elements返回元素个数,reply->element中存储redis返回的具体元素,可能有多个
        printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type);
        freeReplyObject(reply);
    }
    // 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
    redisAppendCommand(conn, "get t");
    // 本来想取得 set a ddd 的返回,却获取了 get t 的返回
    reply = redisCommand(conn, "set a ddd");
    printf("set a res: %s\n", reply->str);
    redisFree(conn);
    return 0;
}

redis事务

事务:用户定义一系列数据库操作,这些操作视为一个完整的 逻辑处理工作单元,要么全部执行,要么全部不执行,是不可 分割的工作单元。

在redis中,当我们开启multi的时候,会构建一个队列,multi后面的命令会放到队列中,当接收到exec命令时,会将队列中的命令取出来执行,目的是不让其他连接的命令在这些命令之间执行,redis采用这种方式来处理原子性的问题的。

MULTI

在redis中通过multi开启事务

相当于mysql中的 begin/start transaction

EXEC

在redis中通过exec来提交事务

相当于mysql中的 commit

redis中通过multi和exec来界定事务

DISCARD

redis中通过discard来取消事务

相当于mysql中的rollback

WATCH

检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现

(cas);

若被取消则事务返回 nil

用来检测事务中,key的变化

应用

注意:

实际项目中不会使用multi这种方式,因为watch这种方式可能会取消事务不执行(比如其他连接在我们之前操作了相同的key),这就需要重试(即乐观锁实现可能会需要重试,增加业务逻辑的复杂度),这在我们写业务逻辑的时候是非常不方便的;实际项目中使用更多的是lua脚本,multi这种方式可能会在面试当中问到

lua脚本

lua 脚本实现原子性,减少网络传输;

pipeline的效果 lua多个语句

redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的(因为lua脚本方式中可以写多个命令然后作为一个数据包传给redis,不可能其他语句在它中间去执行,因为redis它是单线程的);可以在里边做一些逻辑运算

lua 脚本当中的命令会直接修改数据状态;

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;

通过一个命令来调用lua脚本

我们来看看server与redis-server的交互

1、调用script load xx.lua

2、通过hash操作生成40位的字符串,可以减少网络的传输

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.

EVAL

测试使用这种方式

EVAL script numkeys key [key …] arg [arg …]

EVALSHA

线上使用这种方式

EVALSHA sha1 numkeys key [key …] arg [arg …]

应用

1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load) ,(会生成一系列哈希值 key-value对,key是哈希值,value就是lua脚本);

2: 项目中若需要热更新,通过redis-cli script flush(相当于把哈希映射lua脚本的映射关系全部清空);然后可以通过订阅发布功能通知所有服

务器重新加载lua脚本;

3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;

示例:

我们来运行一下,我们会拿res这个hash值作为evalsha的参数,并且告诉有几个参数,值是什么

先把脚本传到服务器当中,生成我们的hash值,根据hash值去执行这个lua脚本命令。

lua脚本与mysql存储过程区别:mysql存储过程不具备事务性(除非手动加事务)(存储过程只有一个作用,减少网络传输),也不具备原子性

ACID特性分析

A 原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis

不支持回滚;即使事务队列中的某个命令在执行期间出现了错误(比如对string类型的key进行zadd),整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。(可以通过lua脚本来实现原子性,也可以编写脚本来实现回滚)

C 一致性;事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一

致性而不是异常后的一致性;所以redis也不满足;这个争议很大:redis 能确保事务执行前后的数

据的完整约束;但是并不满足传统意义上的一致性;比如转账功能,一个扣钱一个加钱;可能出现

扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;

I 隔离性;事务的操作不被其他用户操作所打断;redis 是单线程执行,天然具备隔离性;

D 持久性;redis只有在 aof (通过命令协议刷到磁盘中)持久化策略的时候,并且需要在 redis.conf 中appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;

redis发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块;

消息不一定可达。可以使用分布式消息队列 或者 stream的方式 确保一定可达

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

例子

订阅频道,如果有人在该频道里发布消息,就能收到

订阅模式频道,只要是news.开头的频道都接受

应用

发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而

pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接 用于处理发布订阅;

缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费

者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到

消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失

了;

另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;

使用hiredis同步连接

int current_tick() {
    int t = 0;
    struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (int)ti.tv_sec * 1000;
  t += ti.tv_nsec / 1000000;
    return t;
}
int main(int argc, char **argv) {
    unsigned int j, isunix = 0;
    redisContext *c;
    redisReply *reply;
    const char *hostname = "127.0.0.1";
    int port = 6379;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    c = redisConnectWithTimeout(hostname, port, timeout);//连接redis
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
    }
    int num = (argc > 1) ? atoi(argv[1]) : 1000;
    int before = current_tick();
    // redisCommand(c,"set counter 0");
    for (int i=0; i<num; i++) {
        reply = redisCommand(c,"INCR counter");//发送命令,阻塞等待结果
        printf("INCR counter: %lld\n", reply->integer);
        freeReplyObject(reply);//释放reply
    }
    int used = current_tick()-before;
    printf("after %d exec redis command, used %d ms\n", num, used);
    /* Disconnects and frees the context */
    redisFree(c);
    return 0;
}

hiredis异步方式实现

redis协议图

协议实现的第一步需要知道如何界定数据包

  • 长度+二进制流
  • 二进制流+特殊分隔符

hiredis是实现redis与服务器连接的协议实现,其中涉及到同步实现和部分异步实现,在异步实现中,hiredis只是提供了事件操作的接口(因为不同的平台、不同的网络库对事件操作的接口不一致),所以要想异步操作redis,需要我们自己适配这些事件操作的接口,相当于我们需要实现一个redis的驱动:

因为后端大部分框架都采用reactor方式,所以把redis连接融合到reactor中进行管理,将自定义的reactor设置到redisAsyncContext中,hiredis进行触发添加\删除事件,内部将 回调函数设置,添加epoll由reactor实现

int main(int argc, char **argv) {
    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);//向redis服务器发送连接请求
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }
    R = create_reactor();//创建一个 reactor
    redisAttach(R, c);//把io检测流程 添加到 redisAsyncContext *c中
    redisAsyncSetConnectCallback(c, connectCallback);//设置连接成功redis的回调
    redisAsyncSetDisconnectCallback(c, disconnectCallback);//设置断开连接redis的回调
    eventloop(R);
    release_reactor(R);
    return 0;
}

第一部分

redisAttach(R, c)将reactor设置到redisAsyncContext中

这是添加事件、删除事件,等操作

以redisAddWrite为例,是设置回调函数和添加事件到reactor中

redisWriteHandler中的redisAsyncHandleWrite(re->ctx)是执行回调函数

第二部分

设置connect连接回调函数,会调用ac->ev.addWrite(即调用上面设置的redisAddWrite函数,即添加写回调以及往reactor中注册写事件)

连接成功之后会触发可写事件,即成功之后会回调上面设置的redisWriteHandler写回调,最终在redisWriteHandler的redisAsyncHandleWrite的内部会调用redisAsyncSetConnectCallback设置的connectCallback函数。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
缓存 NoSQL 数据处理
Redis事务悄然而至:命令的背后故事
Redis事务悄然而至:命令的背后故事
16 0
|
2月前
|
NoSQL Redis
Redis原理之网络通信协议笔记
1. RESP协议 ​2. 自定义Socket连接Redis
|
2月前
|
NoSQL Linux Redis
Redis原理之网络模型笔记
Redis采用单线程模型,这意味着一个Redis服务器在任何时刻都只会处理一个请求。Redis的网络模型涉及到阻塞I/O(Blocking I/O)、非阻塞I/O(Non-blocking I/O)、I/O多路复用(I/O Multiplexing)、信号驱动I/O(Signal-driven I/O)以及异步I/O(Asynchronous I/O)。
|
4天前
|
缓存 NoSQL Java
【Redis】5、Redis 的分布式锁、Lua 脚本保证 Redis 命令的原子性
【Redis】5、Redis 的分布式锁、Lua 脚本保证 Redis 命令的原子性
17 0
|
16天前
|
消息中间件 NoSQL Kafka
Redis事务与异步方式
Redis事务与异步方式
21 0
|
1月前
|
NoSQL Java 数据处理
Redis和Spring Boot的绝佳组合:Lua脚本的黑科技
Redis和Spring Boot的绝佳组合:Lua脚本的黑科技
26 0
|
1月前
|
算法 NoSQL Java
springboot整合redis及lua脚本实现接口限流
springboot整合redis及lua脚本实现接口限流
30 0
|
2月前
|
缓存 监控 NoSQL
腾讯二面:Redis 事务支持 ACID 么?
腾讯二面:Redis 事务支持 ACID 么?
52 0
|
2月前
|
NoSQL 算法 关系型数据库
redis与mysql的数据一致性问题( 网络分区)
redis与mysql的数据一致性问题( 网络分区)
15 0
|
2月前
|
NoSQL 关系型数据库 MySQL
redis与mysql的数据一致性问题(事务一致性)
redis与mysql的数据一致性问题(事务一致性)
16 0

热门文章

最新文章