Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)

一、redis 网络层

redis只有一个网络io,其他都是内存操作,所以在单线程下性能较高

对于所有连接的数据处理,redis 并发执行的;

对于单条连接的数据处理,redis 串行执行的;

每个连接,可以当作一个队列。对于一个连接而言,是串行执行的(A1A2A3),对于整体而言,是并发执行(比如:A1B1B2A2)

如果一定要按照A1A2A3执行,不受其他命令的影响(不想把B1、B2插入中间),就要把A1A2A3构成一个事务

并发 : 活跃队列的个数大于处理器的个数;

mysql中是以B+树为存储结构,redis(kv数据库)中整体是通过hashtable来组织的

二、redis pipeline

redis pipeline 是一个客户端提供的,而不是服务端提供的;

下图第三个是使用pipleline的方式,更为高效,但是不具备事务性

对于request操作,只是将数据写到fd对应的写缓冲区,时间非常快,真正耗时操作在读取

response;

因此在使用pipiline的时候,可以加上事务

三、redis事务

MULTI 开启事务,事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行;

MULTI

在redis中通过multi开启事务

相当于mysql中的 begin/start transaction

EXEC

在redis中通过exec来提交事务

相当于mysql中的 commit

redis中通过multi和exec来界定事务

DISCARD

redis中通过discard来取消事务

相当于mysql中的rollback

WATCH

检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现

(cas);

若被取消则事务返回 nil

用来检测事务中,key的变化

应用

事务实现 zpop

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

事务实现 加倍操作

WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC

实践

例子一:

multi表示开启事务

(TX)表示transaction事务中的意思

输出QUEUD表示,已经将命令加入队列,由于在事务中,并未执行

然后exec会一起执行

例子二:

在执行到一半的时候,另一个连接发送命令给redisset key mark,这时候下面这个事务(要提前开启watch指定的key)会检测到key的变化,从而输出nil,也就是说事务被取消了

在实际应用时,不会使用这种方式。如果一个事务经常被取消掉,那么使用的必要性很低。

四、lua 脚本

lua 脚本实现原子性;

redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当

某个脚本正在执行的时候,不会有其他命令或者脚本被执行;

lua 脚本当中的命令会直接修改数据状态;

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;

lua脚本只是一个命令,redis是单线程,lua脚本是单独运行的,其他命令不会影响他,具有原子性

通过对 xx.lua,生成一个40位的哈希(sha1),后需要使用这个脚本的时候,直接使用哈希值,就可以指向这个lua脚本,从而减少网络传输

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.

EVAL

# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]

EVALSHA

# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

应用

1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load) ,(会生成一系列哈希值 key-value对,key是哈希值,value就是lua脚本);

2: 项目中若需要热更新,通过redis-cli script flush(相当于把哈希映射lua脚本的映射关系全部清空);然后可以通过订阅发布功能通知所有服

务器重新加载lua脚本;

3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;

lua脚本使用案例

上传lua脚本,然后使用得到的哈希值res,调用脚本命令

lua脚本与mysql存储过程区别:mysql存储过程不具备事务性(除非手动加事务)(存储过程只有一个作用,减少网络传输),也不具备原子性

五、ACID特性分析

  • A 原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis
    不支持回滚;即使事务队列中的某个命令在执行期间出现了错误(比如对string类型的key进行zadd),整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。(可以通过lua脚本来实现原子性,也可以编写脚本来实现回滚)
  • C 一致性;事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一
    致性而不是异常后的一致性;所以redis也不满足;这个争议很大:redis 能确保事务执行前后的数
    据的完整约束;但是并不满足传统意义上的一致性;比如转账功能,一个扣钱一个加钱;可能出现
    扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;
  • I 隔离性;事务的操作不被其他用户操作所打断;redis 是单线程执行,天然具备隔离性;
  • D 持久性;redis只有在 aof (通过命令协议刷到磁盘中)持久化策略的时候,并且需要在 redis.conf 中
    appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;

六、redis 发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块;

消息不一定可达。可以使用分布式消息队列 或者 stream的方式 确保一定可达

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

例子:

订阅频道,如果有人在该频道里发布消息,就能收到

只要是news.开头的频道都接受,这就是订阅模式频道

应用

发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而

pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接 用于处理发布订阅;

缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费

者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到

消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失

了;

另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;

应用

subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'king kiss darren'

七、redis异步连接

redis协议图

协议实现的第一步需要知道如何界定数据包:

  1. 长度 + 二进制流
  2. 二进制流 + 特殊分隔符

异步连接

同步连接方案采用阻塞io来实现;优点是代码书写是同步的,业务逻辑没有割裂;缺点是阻塞当前

线程,直至redis返回结果;通常用多个线程来实现线程池来解决效率问题;

异步连接方案采用非阻塞io来实现;优点是没有阻塞当前线程,redis 没有返回,依然可以往redis

发送命令;缺点是代码书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决

(openresty,skynet);配合redis6.0以后的io多线程(前提是有大量并发请求),异步连接

池,能更好解决应用层的数据访问性能;

redis6.0 io多线程

edis6.0版本后添加的 io多线程主要解决redis协议的压缩以及解压缩的耗时问题;一般项目中不

需要开启;如果有大量并发请求,且返回数据包一般比较大的场景才有它的用武之地;

原理

int n = read(fd, buff, size);// redis io-threads
msg = decode(buff, size);
data = do_command(msg);
bin = encode(data, sz);// io-threads
send(fd, bin, sz1);

开启

# 在 redis.conf 中
# if you have a four cores boxes, try to use 2 or 3 I/O threads, if you have
a 8 cores, try to use 6 threads.
io-threads 4
# 默认只开启 encode 也就是redis发送给客户端的协议压缩工作;也可开启io-threads-do-reads
yes来实现 decode;
# 一般发送给redis的命令数据包都比较少,所以不需要开启 decode 功能;
# io-threads-do-reads no

例子一:使用hiredis同步连接

int current_tick() {
    int t = 0;
    struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (int)ti.tv_sec * 1000;
  t += ti.tv_nsec / 1000000;
    return t;
}
int main(int argc, char **argv) {
    unsigned int j, isunix = 0;
    redisContext *c;
    redisReply *reply;
    const char *hostname = "127.0.0.1";
    int port = 6379;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    c = redisConnectWithTimeout(hostname, port, timeout);//连接redis
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
    }
    int num = (argc > 1) ? atoi(argv[1]) : 1000;
    int before = current_tick();
    // redisCommand(c,"set counter 0");
    for (int i=0; i<num; i++) {
        reply = redisCommand(c,"INCR counter");//发送命令,阻塞等待结果
        printf("INCR counter: %lld\n", reply->integer);
        freeReplyObject(reply);//释放reply
    }
    int used = current_tick()-before;
    printf("after %d exec redis command, used %d ms\n", num, used);
    /* Disconnects and frees the context */
    redisFree(c);
    return 0;
}

例子二:使用hiredis异步连接

将自定义的reactor设置到redisAsyncContext中,

hiredis进行触发添加\删除事件,内部将 回调函数设置,添加epoll由reactor实现

int main(int argc, char **argv) {
    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);//向redis服务器发送连接请求
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }
    R = create_reactor();//创建一个 reactor
    redisAttach(R, c);//把io检测流程 添加到 redisAsyncContext *c中
    redisAsyncSetConnectCallback(c, connectCallback);//设置连接成功redis的回调
    redisAsyncSetDisconnectCallback(c, disconnectCallback);//设置断开连接redis的回调
    before = current_tick();
    num = (argc > 1) ? atoi(argv[1]) : 1000;
    for (int i = 0; i < num; i++) {
        redisAsyncCommand(c, getCallback, "count", "INCR counter");//发送redis命令,并设置回调函数
    }
    eventloop(R);
    release_reactor(R);
    return 0;
}

第一部分

redisAttach(R, c)将reactor设置到redisAsyncContext中

这是添加事件、删除事件,等操作

redisAddRead为例,是设置回调函数 和 添加到epoll中

redisReadHandler中的redisAsyncHandleRead(re->ctx)是执行回调函数

第二部分

设置connect连接回调函数,会触发ac->ev.addWrite事件,从而对连接成功进行io监测

连接成功后会触发ac->ev.addRead,对读事件进行检测

在loop中有读事件检测到,就会触发读的回调函数

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
19天前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
2月前
|
NoSQL Redis
Redis 执行 Lua保证原子性原理
Redis 执行 Lua 保证原子性原理
129 1
|
8天前
|
NoSQL Linux Redis
linux安装单机版redis详细步骤,及python连接redis案例
这篇文章提供了在Linux系统中安装单机版Redis的详细步骤,并展示了如何配置Redis为systemctl启动,以及使用Python连接Redis进行数据操作的案例。
19 2
|
2月前
|
NoSQL 网络协议 Redis
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
|
29天前
|
NoSQL 安全 容灾
阿里云DTS踩坑经验分享系列|Redis迁移、同步
阿里云数据传输服务DTS在帮助用户迁移Redis数据、同步数据时,在某些复杂场景下会出现报错,或者源库与目标库数据不一致的问题,给用户带来困扰。本文介绍了DTS Redis到Redis迁移、同步过程中的典型问题,以帮助用户更好地使用DTS。
80 2
|
2月前
|
NoSQL 算法 Java
诡异!Redis Proxy RT上升后连接倾斜
本文细致地描述了关于Redis Proxy RT上升后连接倾斜问题的排查过程和根本原因,最后给出了优化方案。
|
2月前
|
Kubernetes NoSQL Redis
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
|
2月前
|
NoSQL 网络协议 Linux
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
|
NoSQL Java 关系型数据库
Redis_事务_锁机制_秒杀
Redis_事务_锁机制_秒杀
|
SQL NoSQL 关系型数据库
Redis(二十)-Redis的事务和锁机制
何为事务呢?我的理解是事务是一种机制,是一个不可分割的工作单元,要么都执行,要么都不执行。
143 0
Redis(二十)-Redis的事务和锁机制
下一篇
无影云桌面