redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

redis网络层

这里我们只讨论宏观的、直接的,即忽略其他流程,只关注数据包处理流程。

对于redis连接来说,哪一条连接先构成一个完整的数据包,哪一条连接就会先得到redis的处理

1、一个数据包可能由多个读事件才能组装成(因为一次可能不能读到一个完整的数据包)

2、管道就是连接

3、人推车相当于网络线程(redis中网络线程就一个)

redis pipeline

根据上面的理解,redis中的pipeline模式其实也会是先请求的先返回。

Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。

Hiredis 提供redisCommand()函数来向 Redis 服务端发送命令,redisCommand()函数的原型如下:

void *redisCommand(redisContext *c, const char *format, ...);

redisCommand()执行后,返回一个redisReply *指针,指向redisReply结构体,该结构体包含了返回的结果信息。

redisCommand()函数是阻塞的(是指使用阻塞版的redisContext对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。

redisCommand()函数的使用示例如下所示:

redisReply *reply;
reply = redisCommand(conn, "SET %s %s", "foo", "bar");
freeReplyObject(reply);
reply = redisCommand(conn, "GET %s", "foo");
printf("%s\n", reply->str);
freeReplyObject(reply);

如果我们需要向 Redis 服务端发送多次命令,如果都是使用redisCommand()函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了redisAppendCommand()函数来实现流水线的命令发送方案。

int redisAppendCommand(redisContext *c, const char *format, ...);

redisAppendCommand()函数执行成功时返回REDIS_OK,失败时返回REDIS_ERR

#define REDIS_ERR -1
#define REDIS_OK 0

跟redisCommand()函数一样,redisAppendCommand()函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以redisAppendCommand()函数为例说明。

redisAppendCommand()函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到redisContext对象中。那么,redisContext对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了redisGetReply()函数来将缓存的命令发送出去的功能。redisGetReply()函数的处理过程如下:

  1. 查看结果缓冲区是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
  2. 命令缓冲区的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回

上面我们提到的redisCommand()函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用redisAppendCommand()函数,然后再调用redisGetReply()函数实现的。

说到这里,hiredis 实现流水线的过程就很清晰了。无论redisCommand()函数还是redisAppendCommand()函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand()函数会马上发送命令然后取得返回结果,而redisAppendCommand()函数则在调用redisGetReply()函数才将所有命令一次性发送,并取得第一个命令的返回结果。

下面是使用redisAppendCommand()函数实现流水线方案的示例。

redisReply *reply;
redisAppendCommand(context,"SET foo bar");
redisAppendCommand(context,"GET foo");
redisGetReply(context,&reply); // SET命令的返回
freeReplyObject(reply);
redisGetReply(context,&reply); // GET命令的返回
freeReplyObject(reply);

值得注意的是,调用redisAppendCommand()函数的次数需要与调用redisGetReply()的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。

// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
redisAppendCommand(conn, "get t");
// 本来想取得 set a ddd 的返回,却获取了 get t 的返回
reply = redisCommand(conn, "set a ddd");
printf("set a res: %s\n", reply->str);

输出的结果将会是get t命令的返回,而不是set a ddd命令的返回。

附:示例程序 redis-pipeline.c

编译:

gcc -o redis-pipeline redis-pipeline.c -L/usr/local/lib -lhiredis

输出:

bar
res: OK
res: b
watch res: OK
res: OK, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: (null), num: 3, type: 2
set a res: tt

源程序:

#include <stdio.h>
#include <hiredis/hiredis.h>
int main() {
    // 阻塞 redisContext
    redisContext *conn = redisConnect("127.0.0.1", 6379);
    if (conn != NULL && conn->err) {
        printf("connection error: %s\n", conn->errstr);
        return 0;
    }
    // 使用 redisCommand 发送命令并获取返回
    redisReply *reply;
    reply = redisCommand(conn, "SET %s %s", "foo", "bar");
    freeReplyObject(reply);
    reply = redisCommand(conn, "GET %s", "foo");
    printf("%s\n", reply->str);
    freeReplyObject(reply);
    // 使用 redisAppendCommand 实现流水线
    redisAppendCommand(conn, "set a b");
    redisAppendCommand(conn,"get a");
    int r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);
    r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);
    // 使用 watch 命令监控键 a
    reply = redisCommand(conn, "watch a");
    printf("watch res: %s\n", reply->str);
    freeReplyObject(reply);
    // 事务流水线,总共5个命令
    redisAppendCommand(conn, "multi");
    redisAppendCommand(conn, "get foo");
    redisAppendCommand(conn, "set t tt");
    redisAppendCommand(conn, "set a aa");
    redisAppendCommand(conn, "exec");
    for (int i = 0; i < 5; ++i) {
        r = redisGetReply(conn, (void **)&reply);
        if (r == REDIS_ERR) {
            printf("ERROR\n");
        }
        //reply->elements返回元素个数,reply->element中存储redis返回的具体元素,可能有多个
        printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type);
        freeReplyObject(reply);
    }
    // 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
    redisAppendCommand(conn, "get t");
    // 本来想取得 set a ddd 的返回,却获取了 get t 的返回
    reply = redisCommand(conn, "set a ddd");
    printf("set a res: %s\n", reply->str);
    redisFree(conn);
    return 0;
}

redis事务

事务:用户定义一系列数据库操作,这些操作视为一个完整的 逻辑处理工作单元,要么全部执行,要么全部不执行,是不可 分割的工作单元。

在redis中,当我们开启multi的时候,会构建一个队列,multi后面的命令会放到队列中,当接收到exec命令时,会将队列中的命令取出来执行,目的是不让其他连接的命令在这些命令之间执行,redis采用这种方式来处理原子性的问题的。

MULTI

在redis中通过multi开启事务

相当于mysql中的 begin/start transaction

EXEC

在redis中通过exec来提交事务

相当于mysql中的 commit

redis中通过multi和exec来界定事务

DISCARD

redis中通过discard来取消事务

相当于mysql中的rollback

WATCH

检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现

(cas);

若被取消则事务返回 nil

用来检测事务中,key的变化

应用

注意:

实际项目中不会使用multi这种方式,因为watch这种方式可能会取消事务不执行(比如其他连接在我们之前操作了相同的key),这就需要重试(即乐观锁实现可能会需要重试,增加业务逻辑的复杂度),这在我们写业务逻辑的时候是非常不方便的;实际项目中使用更多的是lua脚本,multi这种方式可能会在面试当中问到

lua脚本

lua 脚本实现原子性,减少网络传输;

pipeline的效果 lua多个语句

redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的(因为lua脚本方式中可以写多个命令然后作为一个数据包传给redis,不可能其他语句在它中间去执行,因为redis它是单线程的);可以在里边做一些逻辑运算

lua 脚本当中的命令会直接修改数据状态;

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;

通过一个命令来调用lua脚本

我们来看看server与redis-server的交互

1、调用script load xx.lua

2、通过hash操作生成40位的字符串,可以减少网络的传输

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.

EVAL

测试使用这种方式

EVAL script numkeys key [key …] arg [arg …]

EVALSHA

线上使用这种方式

EVALSHA sha1 numkeys key [key …] arg [arg …]

应用

1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load) ,(会生成一系列哈希值 key-value对,key是哈希值,value就是lua脚本);

2: 项目中若需要热更新,通过redis-cli script flush(相当于把哈希映射lua脚本的映射关系全部清空);然后可以通过订阅发布功能通知所有服

务器重新加载lua脚本;

3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;

示例:

我们来运行一下,我们会拿res这个hash值作为evalsha的参数,并且告诉有几个参数,值是什么

先把脚本传到服务器当中,生成我们的hash值,根据hash值去执行这个lua脚本命令。

lua脚本与mysql存储过程区别:mysql存储过程不具备事务性(除非手动加事务)(存储过程只有一个作用,减少网络传输),也不具备原子性

ACID特性分析

A 原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis

不支持回滚;即使事务队列中的某个命令在执行期间出现了错误(比如对string类型的key进行zadd),整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。(可以通过lua脚本来实现原子性,也可以编写脚本来实现回滚)

C 一致性;事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一

致性而不是异常后的一致性;所以redis也不满足;这个争议很大:redis 能确保事务执行前后的数

据的完整约束;但是并不满足传统意义上的一致性;比如转账功能,一个扣钱一个加钱;可能出现

扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;

I 隔离性;事务的操作不被其他用户操作所打断;redis 是单线程执行,天然具备隔离性;

D 持久性;redis只有在 aof (通过命令协议刷到磁盘中)持久化策略的时候,并且需要在 redis.conf 中appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;

redis发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块;

消息不一定可达。可以使用分布式消息队列 或者 stream的方式 确保一定可达

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

例子

订阅频道,如果有人在该频道里发布消息,就能收到

订阅模式频道,只要是news.开头的频道都接受

应用

发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而

pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接 用于处理发布订阅;

缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费

者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到

消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失

了;

另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;

使用hiredis同步连接

int current_tick() {
    int t = 0;
    struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (int)ti.tv_sec * 1000;
  t += ti.tv_nsec / 1000000;
    return t;
}
int main(int argc, char **argv) {
    unsigned int j, isunix = 0;
    redisContext *c;
    redisReply *reply;
    const char *hostname = "127.0.0.1";
    int port = 6379;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    c = redisConnectWithTimeout(hostname, port, timeout);//连接redis
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
    }
    int num = (argc > 1) ? atoi(argv[1]) : 1000;
    int before = current_tick();
    // redisCommand(c,"set counter 0");
    for (int i=0; i<num; i++) {
        reply = redisCommand(c,"INCR counter");//发送命令,阻塞等待结果
        printf("INCR counter: %lld\n", reply->integer);
        freeReplyObject(reply);//释放reply
    }
    int used = current_tick()-before;
    printf("after %d exec redis command, used %d ms\n", num, used);
    /* Disconnects and frees the context */
    redisFree(c);
    return 0;
}

hiredis异步方式实现

redis协议图

协议实现的第一步需要知道如何界定数据包

  • 长度+二进制流
  • 二进制流+特殊分隔符

hiredis是实现redis与服务器连接的协议实现,其中涉及到同步实现和部分异步实现,在异步实现中,hiredis只是提供了事件操作的接口(因为不同的平台、不同的网络库对事件操作的接口不一致),所以要想异步操作redis,需要我们自己适配这些事件操作的接口,相当于我们需要实现一个redis的驱动:

因为后端大部分框架都采用reactor方式,所以把redis连接融合到reactor中进行管理,将自定义的reactor设置到redisAsyncContext中,hiredis进行触发添加\删除事件,内部将 回调函数设置,添加epoll由reactor实现

int main(int argc, char **argv) {
    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);//向redis服务器发送连接请求
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }
    R = create_reactor();//创建一个 reactor
    redisAttach(R, c);//把io检测流程 添加到 redisAsyncContext *c中
    redisAsyncSetConnectCallback(c, connectCallback);//设置连接成功redis的回调
    redisAsyncSetDisconnectCallback(c, disconnectCallback);//设置断开连接redis的回调
    eventloop(R);
    release_reactor(R);
    return 0;
}

第一部分

redisAttach(R, c)将reactor设置到redisAsyncContext中

这是添加事件、删除事件,等操作

以redisAddWrite为例,是设置回调函数和添加事件到reactor中

redisWriteHandler中的redisAsyncHandleWrite(re->ctx)是执行回调函数

第二部分

设置connect连接回调函数,会调用ac->ev.addWrite(即调用上面设置的redisAddWrite函数,即添加写回调以及往reactor中注册写事件)

连接成功之后会触发可写事件,即成功之后会回调上面设置的redisWriteHandler写回调,最终在redisWriteHandler的redisAsyncHandleWrite的内部会调用redisAsyncSetConnectCallback设置的connectCallback函数。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
负载均衡 网络协议 算法
|
2天前
|
前端开发 网络协议 安全
【网络原理】——HTTP协议、fiddler抓包
HTTP超文本传输,HTML,fiddler抓包,URL,urlencode,HTTP首行方法,GET方法,POST方法
|
4天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
34 1
|
1月前
|
安全 搜索推荐 网络安全
HTTPS协议是**一种通过计算机网络进行安全通信的传输协议
HTTPS协议是**一种通过计算机网络进行安全通信的传输协议
60 11
|
29天前
|
监控 网络协议 网络性能优化
网络通信的核心选择:TCP与UDP协议深度解析
在网络通信领域,TCP(传输控制协议)和UDP(用户数据报协议)是两种基础且截然不同的传输层协议。它们各自的特点和适用场景对于网络工程师和开发者来说至关重要。本文将深入探讨TCP和UDP的核心区别,并分析它们在实际应用中的选择依据。
59 3
|
1月前
|
网络协议 网络安全 网络虚拟化
本文介绍了十个重要的网络技术术语,包括IP地址、子网掩码、域名系统(DNS)、防火墙、虚拟专用网络(VPN)、路由器、交换机、超文本传输协议(HTTP)、传输控制协议/网际协议(TCP/IP)和云计算
本文介绍了十个重要的网络技术术语,包括IP地址、子网掩码、域名系统(DNS)、防火墙、虚拟专用网络(VPN)、路由器、交换机、超文本传输协议(HTTP)、传输控制协议/网际协议(TCP/IP)和云计算。通过这些术语的详细解释,帮助读者更好地理解和应用网络技术,应对数字化时代的挑战和机遇。
107 3
|
1月前
|
网络虚拟化
生成树协议(STP)及其演进版本RSTP和MSTP,旨在解决网络中的环路问题,提高网络的可靠性和稳定性
生成树协议(STP)及其演进版本RSTP和MSTP,旨在解决网络中的环路问题,提高网络的可靠性和稳定性。本文介绍了这三种协议的原理、特点及区别,并提供了思科和华为设备的命令示例,帮助读者更好地理解和应用这些协议。
69 4
|
1月前
|
网络协议 安全 Go
Go语言进行网络编程可以通过**使用TCP/IP协议栈、并发模型、HTTP协议等**方式
【10月更文挑战第28天】Go语言进行网络编程可以通过**使用TCP/IP协议栈、并发模型、HTTP协议等**方式
68 13
|
1月前
|
存储 缓存 网络协议
计算机网络常见面试题(二):浏览器中输入URL返回页面过程、HTTP协议特点,GET、POST的区别,Cookie与Session
计算机网络常见面试题(二):浏览器中输入URL返回页面过程、HTTP协议特点、状态码、报文格式,GET、POST的区别,DNS的解析过程、数字证书、Cookie与Session,对称加密和非对称加密