Redis协议与异步方式

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis协议与异步方式

redis网络层

哪条管道先构成一个完整的数据包,谁先得到处理

1 一个数据包可能由多个读事件才能组装成。

2 管道就是连接

3 人推车相当于网络线程

reactor并发处理连接,线程串行处理命令;

单reactor,一个线程同时处理命令+网路IO   (mysql则是一个连接一个线程处理)

redis pipeline  

redis pipeline 是一个客户端提供的,而不是服务端提供的;pipeline 不具备事务性

pipeline就是多个请求依次发送出去,然后依次接收回包

目的:节约网络传输时间

对于request操作,只是将数据写到fd对应的写缓冲区,时间非常快,真正耗时操作在读取 response;

send这是将数据写到写缓冲区,真正的发送数据是由协议栈自己完成的。

redis事务

什么是事务?

       1 用户定义的一系列的数据库操作,这些操作视为一个完整的逻辑处理工作单元。

       2 要么全部都执行,要么全部不执行,是不可分割的工作单元。

在什么情况下探讨事务?

       有并发连接的时候

MULTI 开启事务,事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行;

redis事务是乐观锁实现,所以失败需要重试,增加业务逻辑的复杂度。

MULTI

开启事务          mysql ---- ( begin; start transaction; )

redis在开启事务之后会把所有的命令丢到一个任务队列里面,并不是立即执行。

EXEC

提交事务          mysql ---- commit

挨个执行任务队列里面的命令

DISCARD

取消事务          mysql --- rollback

WATCH

检测key的变动,若在事务执行中,key变动则取消事务;在事务开启前调用,乐观锁实现(cas); 若被取消则事务返回 nil ;

应用          (通过事务实现原子操作)

事务实现 zpop

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

事务实现 加倍操作

WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC

lua 脚本

lua 脚本实现原子性;

redis中加载了一个lua虚拟机;用来执行redis lua脚本;redis lua 脚本的执行是原子性的;当某个 脚本正在执行的时候,不会有其他命令或者脚本被执行;

lua脚本当中的命令会直接修改数据状态;

注意:如果项目中使用了lua脚本,不需要使用上面的事务命令;

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"        //lua脚本会生成一个散列值,以后用这个散列值取代脚本
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.

EVAL

# 测试使用

EVAL script numkeys key [key ...] arg [arg ...]

redis是单线程处理数据,这个命令是将Lua脚本作为一个数据包发送过去,在执行这个数据包的过程当中不可能去执行其他的数据包,所以具备原子性。

EVALSHA

# 线上使用

EVALSHA sha1 numkeys key [key ...] arg [arg ...]

mysql存储过程不具备事务性 需要显示的调用begin commit

redis lua脚本 具备原子性

应用

# 1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load); # 2: 项目中若需要热更新,通过redis-cli script flush;然后可以通过订阅发布功能通知所有服 务器重新加载lua脚本;

# 3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;

ACID特性分析

A 原子性;事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis 不支持回滚;即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。

C 一致性;事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一致性而不是异常后的一致性;所以redis也不满足;这个争议很大: redis 能确保事务执行前后的数据的完整约束;但是并不满足业务功能上的一致性;比如转账功能,一个扣钱一个加钱;可能出现扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱;

I 隔离性;事务的操作不被其他用户操作所打断;redis命令执行是单线程的,redis事务天然具备隔离性;    lua脚本完全隔离         watch去取消事务

D 持久性;redis只有在 aof 持久化策略的时候,并且需要在 redis.conf 中 appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;

面试时候回答:

lua脚本满足原子性和隔离性,一致性和持久性不满足。

redis 发布订阅

为了支持消息的多播机制,redis引入了发布订阅模块;disque 消息队列

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

应用

发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而 pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接用于处理发布订阅;

缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费 者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到 消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失 了;

另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;

应用

subscribe news.it news.showbiz news.car

psubscribe news.*

publish new.showbiz 'king kiss darren'

redis异步连接

协议实现的第一步需要知道如何界定数据包:

1. 长度 + 二进制流

2. 二进制流 + 特殊分隔符

异步连接

同步连接方案采用阻塞io来实现;优点是代码书写是同步的,业务逻辑没有割裂;缺点是阻塞当前线程,直至redis返回结果;通常用多个线程来实现线程池来解决效率问题;

异步连接方案采用非阻塞io来实现;优点是没有阻塞当前线程,redis没有返回,依然可以往redis 发送命令;缺点是代码书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决 (openresty,skynet);配合redis6.0以后的io多线程(前提是有大量并发请求),异步连接 池,能更好解决应用层的数据访问性能;

redis6.0 io多线程

redis6.0版本后添加的 io多线程主要解决redis协议的压缩以及解压缩的耗时问题;一般项目中不需要开启;如果有大量并发请求,且返回数据包一般比较大的场景才有它的用武之地;

原理

int n = read(fd, buff, size);
msg = decode(buff, size); // redis io-threads
data = do_command(msg);
bin = encode(data, sz); // io-threads
send(fd, bin, sz1);

开启

# 在 redis.conf 中
# if you have a four cores boxes, try to use 2 or 3 I/O threads, if you have
a 8 cores, try to use 6 threads.
io-threads 4
# 默认只开启 encode 也就是redis发送给客户端的协议压缩工作;也可开启io-threads-do-reads
yes来实现 decode;
# 一般发送给redis的命令数据包都比较少,所以不需要开启 decode 功能;
# io-threads-do-reads no

实现方案

hiredis + libevent

/* Context for a connection to Redis */
typedef struct redisContext {
    const redisContextFuncs *funcs; /* Function table */
    int err; /* Error flags, 0 when there is no error */
    char errstr[128]; /* String representation of error when applicable */
    redisFD fd;
    int flags;
    char *obuf; /* Write buffer */
    redisReader *reader; /* Protocol reader */
    enum redisConnectionType connection_type;
    struct timeval *connect_timeout;
    struct timeval *command_timeout;
    struct {
        char *host;
        char *source_addr;
        int port;
    } tcp;
    struct {
        char *path;
    } unix_sock;
    /* For non-blocking connect */
    struct sockadr *saddr;
    size_t addrlen;
    /* Optional data and corresponding destructor users can use to provide
    * context to a given redisContext. Not used by hiredis. */
    void *privdata;
    void (*free_privdata)(void *);
    /* Internal context pointer presently used by hiredis to manage
    * SSL connections. */
    void *privctx;
    /* An optional RESP3 PUSH handler */
    redisPushFn *push_cb;
} redisContext;
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base
*base) {
    redisContext *c = &(ac->c);
    redisLibeventEvents *e;
    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;
    /* Create container for context and r/w events */
    e = (redisLibeventEvents*)hi_calloc(1, sizeof(*e));
    if (e == NULL)
        return REDIS_ERR;
    e->context = ac;
    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisLibeventAddRead;
    ac->ev.delRead = redisLibeventDelRead;
    ac->ev.addWrite = redisLibeventAddWrite;
    ac->ev.delWrite = redisLibeventDelWrite;
    ac->ev.cleanup = redisLibeventCleanup;
    ac->ev.scheduleTimer = redisLibeventSetTimeout;
    ac->ev.data = e;
    /* Initialize and install read/write events */
    e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler,e);
        e->base = base;
        return REDIS_OK;
}

原理

hiredis 提供异步连接方式,提供可以替换网络检测的接口;

关键替换 addRead , delRead , addWrite , delWrite , cleanup , scheduleTimer ,这几个检测接口;其他io操作,比如 connect , read , write , close 等都交由hiredis来处理;

同时需要提供连接建立成功以及断开连接的回调;

用户可以使用当前项目的网络框架来替换相应的操作;从而实现跟项目网络层兼容的异步连接方 案;

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
7月前
|
NoSQL Redis
Redis原理之网络通信协议笔记
1. RESP协议 ​2. 自定义Socket连接Redis
|
7月前
|
缓存 NoSQL 数据库
探秘Redis读写策略:CacheAside、读写穿透、异步写入
本文介绍了 Redis 的三种高可用性读写模式:CacheAside、Read/Write Through 和 Write Behind Caching。CacheAside 简单易用,但可能引发数据不一致;Read/Write Through 保证数据一致性,但性能可能受限于数据库;Write Behind Caching 提高写入性能,但有数据丢失风险。开发者应根据业务需求选择合适模式。
785 2
探秘Redis读写策略:CacheAside、读写穿透、异步写入
|
3月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
46 6
|
5月前
|
安全 NoSQL Java
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
|
7月前
|
监控 NoSQL 测试技术
python使用Flask,Redis和Celery的异步任务
python使用Flask,Redis和Celery的异步任务
|
7月前
|
消息中间件 NoSQL Kafka
Redis事务与异步方式
Redis事务与异步方式
93 0
|
7月前
|
NoSQL Java 关系型数据库
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
122 0
|
7月前
|
NoSQL 网络协议 关系型数据库
Redis(二)网络协议和异步方式(乐观锁&悲观锁、事务)
Redis(二)网络协议和异步方式(乐观锁&悲观锁、事务)
79 0
|
7月前
|
存储 NoSQL 关系型数据库
Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)
Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)
209 0