redis事务和异步连接

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: redis事务和异步连接

一、redis网络层

微观上,redis采用的是单redis网络模型。

1)组成:io多路复用 + 非阻塞io

2)io职责:io检测和io操作

3)事件:异步事件处理流程 —— 先注册事件,在事件循环中通过回调函数(callback)处理事件。

3a0599c4440ad46c0e2b4ddf83620e2e_a782a28ec59142c2ad87726d71af35ee.png

宏观上,redis忽略其他流程,只关注数据包处理流程。即哪条管道先构成一个完整的数据包,谁先得到处理。

e333c47254e25169727224644779dfec_60061f417f8248a2a57220a1670b2d0d.png


二、redis pipeline

redis pipeline 是一个客户端提供的机制,与redis本身无关。

具体来说,redis pipeline将多个 redis 命令打包成一个网络请求发送给redis服务器,redis服务器收到后按序执行回应。

目的是节约网络传输事件,用于提高 Redis 的性能和吞吐量。


传统上,每个 Redis 命令在客户端和服务器之间都需要进行来回的网络通信。当需要执行大量的命令时,这种网络开销可能会成为性能瓶颈。


de44585c39f1e76b03804041899f93a5_74bec7b729ad4faa84dde605ae05b2e2.png

而通过使用 redis pipeline,可以将多个命令打包在一个网络请求中一次性发送给服务器,减少了网络通信的开销,并且能够更充分地利用服务器的处理能力。

6d96a15426d540963996d35f30da00dd_e42fbd7636fa4a53ad5dec74cce18be5.png

redis pipeline 的基本操作如下:

1)创建 pipeline 对象:在客户端中创建一个 pipeline 对象,用于存储要执行的多个命令。


2)向 pipeline 中添加命令:使用 pipeline 对象的方法(如 pipeline.set(key, value))向其中添加要执行的 redis 命令。可以添加任意多个命令。


3)执行 pipeline :调用 pipeline 对象的 execute() 方法,将 pipeline 中的所有命令一次性发送给 redis 服务器执行。


4)获取结果:根据需要,可以通过遍历 pipeline 中的命令结果,或者使用 execute() 方法的返回值来获取执行结果。


三、redis事务

事务是指用户定义一系列数据库操作,这些操作视为完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。


探讨事务的前提:在有并发连接的时候,不同连接异步执行命令肯会造成的不可预期的冲突。


比如如下图,我们希望执行的顺序是命令1,命令2,命令3。但是redis是请求回应模型,若命令1和命令2之间的空挡期间,命令3插入执行,那么最后结果就会出错。

6d06b5335dc5df213202ef3bc97cdc74_272b436c2d1e4a72a32b9d6b1a121c64.png


3.1 事务特征

ACID(原子性、一致性、隔离性和持久性)是指关系型数据库管理系统(RDBMS)确保事务正确执行的四个基本特性。


1)原子性(Atomicity):原子性指一个事务应该被视为单个不可分割的操作单元,要么全部执行成功,要么全部不执行。如果在事务执行期间发生错误或者失败,所有对数据的修改都将回滚,使数据库返回到事务开始之前的状态。


2)一致性(Consistency):事务的前后,所有的数据都保持一个一致的状态,不能违反数据的一致性检测;这里的一致性是指预期的一致性而不是异常后的一致性。有类型一致性、逻辑一致性、数据(主从数据库)一致性


# 类型一致性错误
127.0.0.1:6379> set count 1000
OK
127.0.0.1:6379> type count
string
127.0.0.1:6379> lpush count 2000
(error) WRONGTYPE Operation against a key holding the wrong kind of value

3)隔离性(Isolation):隔离性指多个并发事务的执行应该相互隔离,即每个事务的执行应该与其他事务的执行独立进行,互不干扰。


4)持久性(Durability):持久性确保一旦事务提交,其结果将永久保存在数据库中,即使系统发生故障也不会丢失。


下面我们看一下redis的事务分析,redis 不完全支持 ACID 特性,它更注重高性能、低延迟和数据的持久化。具体如下:

1)原子性(Atomicity):redis 中的单个命令是原子性的,但redis 不支持回滚机制。如果在一个事务中执行了多个命令,其中一个命令失败并不会导致其他命令的回滚。


2)一致性(Consistency):redis 引擎本身不提供严格的一致性保证。例如,在主从复制模式下,当主节点出现故障时,从节点可能无法立即更新,导致数据的部分丢失。


3)隔离性(Isolation):redis 使用单线程模型,一个客户端的命令在执行期间不会被其他客户端的命令中断,因此天然具备隔离性;mysql 一条连接对应一个线程。多线程环境下需要对临界资源进行加锁。


4)持久性(Durability):redis 提供了类似于日志备份的 AOF(Append-Only File)方式,redis 只有在 aof 持久化策略的时候,并且需要在redis.conf 中 appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;


3.2 WATCH

WATCH这种事务执行的方式,用的很少,了解即可。


Redis 提供了 WATCH 命令用于监视一个或多个键,并在事务执行期间检测这些键是否被修改。如果被监视的键在 WATCH 之后被修改,事务将被取消执行。


redis 客户端以 MULTI 开启一个事务,发送多个命令到服务端的队列,直到发送 EXEC 命令后redis 服务端才会执行队列中的命令,将队列作为一个整体来执行。


# 开启事务
MULTI
# 提交事务
EXEC
# 取消事务
DISCARD
# 监视 key 的变动,在事务开启前调用,乐观锁 cas 实现。若在事务执行中,key 变动则取消事务返回 nil。
# 乐观锁(Optimistic Locking)是一种并发控制机制,用于解决多个用户同时访问和修改共享数据时可能产生的数据冲突问题。
WATCH key

例子

事务实现 zpop

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

事务实现 加倍操作


WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC

3.2 lua 脚本

redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本; redis lua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会有其他命令或者脚本被执行;


3.2.1 lua 脚本的事务特性分析:

lua 脚本满足隔离性和部分的原子性,不满足一致性和持久性。


1)原子性: lua 脚本通过一个命令,将脚本中所有的语句一起执行。但是不具有回滚机制,需要自己写代码实现。


2)一致性:lua 脚本不具备一致性。如果lua 脚本执行失败,那么已经执行成功的命令依旧会作用在数据库,无法回滚。redis 能确保事务执行前后的数据的完整约束;但是并不满足业务功能上的一致性;比如转账功能,一个扣钱一个加钱;可能出现扣钱执行错误,加钱执行正确,那么最终还是会加钱成功;系统凭空多了钱。


3)隔离性:redis 使用单线程模型,且lua脚本作为单数据包运行。


4)持久性:不具备持久性。只有在 aof 并且 appendfsync = always 才具备。


3.2.2 基本命令

# 测试使用
# 执行 lua 脚本
EVAL script numkeys [key...] arg [arg...]
# 实际使用
# 只保存40位哈希字符串,减少数据传输量
# 1、缓存脚本,将用户给定的脚本缓存在服务器中,并返回脚本对应的SHA1校验和(40位字符串)作为结果
SCRIPT LOAD script
# 2、执行缓存的脚本
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
# 附:脚本管理命令
# 检查脚本缓存中是否有该 sha1 散列值的lua脚本
SCRIPT EXISTS sha1 [sha1...]
# 清除所有脚本缓存
SCRIPT FLUSH
# 强制停止正在运行的脚本,如死循环
SCRIPT KILL

3.2.3 应用


  1. 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本 script load .
  2. 项目中若需要热更新,通过redis-cli 执行 script flush。然后可以通过订阅发布功能通知所有服务器重新加载lua脚本.
  3. 若项目中lua脚本发生阻塞,可通过 script kill 暂停当前阻塞脚本的执行.

例子:执行加倍操作

测试使用


127.0.0.1:6379> set jack 100
OK
127.0.0.1:6379> eval 'local key = KEYS[1];local val = redis.call("get",key);if val then redis.call("set",key,2*val);return 2*val;end;return 0;' 1 jack
(integer) 200

实际使用


127.0.0.1:6379> set jack 100
OK
# 1、缓存脚本
127.0.0.1:6379> script load 'local key = KEYS[1];local val = redis.call("get",key);if val then redis.call("set",key,2*val);return 2*val;end;return 0;'
"f76a2571acb0452ef1a0ba3b0bbd6c46a321cbe1"
#  2、执行缓存脚本
127.0.0.1:6379> evalsha "f76a2571acb0452ef1a0ba3b0bbd6c46a321cbe1" 1 jack
(integer) 200

四、redis发布订阅

为了支持消息的多播机制,redis提供了发布订阅(Publish/Subscribe)功能,用于实现消息的发布和订阅模式。这是一种分布式消息队列。发布者(Publisher)将消息发布到指定的频道(Channel),而订阅者(Subscriber)可以订阅感兴趣的频道,以接收发布者发送的消息。该机制并不保证消息一定到达,可以采用 stream 方式确保可达。


需要注意的是,发布订阅的生产者传递过来一个消息,redis 会直接找到相应的消费者并传递过去。

1)假如此时没有消费者,消息直接丢弃;

2)假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到消息,但是如果刚挂掉的消费者重新连上1)后,在断开连接期间的消息对于该消费者来说彻底丢失了;

3)另外,redis 停机重启,pubsub 的消息是不会持久化的,所有的消息被直接丢弃;


4.1 命令

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

4.2 应用

发布订阅功能一般要重新开启一个连接,这是因为命令连接严格遵循请求回应模式,pubsub 能收到 redis 主动推送的内容。所以实际项目中如果支持 pubsub 的话,需要另开一条连接用于处理发布订阅。

173fe8c6b398aa7f4cffe8909f91d363_38f60260a5d545f9a4b7fac56f2ef5dd.png


# 一个客户端订阅频道
SUBSCRIBE news.game news.tech news.school
# 另一个客户端订阅频道,模式匹配
PSUBSCRIBE news.*
# 向频道发送信息,该频道所有订阅者收到消息
PUBLISH news.game 'EDG wins S12 championship'

e191e63d6a17a8a774979a37ce4b73fd_a5d8de2f5a3f4527ab3c22ae716da710.png


五、redis异步连接

5.1 同步连接

同步连接是指客户端与 Redis 服务器之间的一种简单的请求-响应模式。当使用同步连接时,客户端发送一个命令给 Redis 服务器,并在收到服务器响应之后才能继续执行后续的操作。在同步连接在执行命令时,客户端会阻塞当前线程等待服务器的响应,直到响应返回或超时。


例子:访问 redis,并对 counter 实现自增1000次,统计用时。


# gcc redis-test-sync.c -o redis-test-sync -lhiredis
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <hiredis/hiredis.h>
int current_tick() {
    int t = 0;
    struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (int)ti.tv_sec * 1000;
  t += ti.tv_nsec / 1000000;
    return t;
}
int main(int argc, char **argv) {
    unsigned int j, isunix = 0;
    redisContext *c;
    redisReply *reply;
    const char *hostname = "127.0.0.1";
    int port = 6379;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    c = redisConnectWithTimeout(hostname, port, timeout);
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
    }
    int num = (argc > 1) ? atoi(argv[1]) : 1000;
    int before = current_tick();
    for (int i=0; i<num; i++) {
        reply = redisCommand(c,"INCR counter"); // +1操作
        printf("INCR counter: %lld\n", reply->integer);
        freeReplyObject(reply);
    }
    int used = current_tick()-before;
    printf("after %d exec redis command, used %d ms\n", num, used);
    /* Disconnects and frees the context */
    redisFree(c);
    return 0;
}

5.2 异步连接

同步连接方案采用阻塞 io 来实现;优点是代码书写是同步的,业务逻辑没有割裂;

缺点是阻塞当前线程,直至 redis 返回结果;通常用多个线程来实现线程池来解决效率问题.


异步连接方案采用非阻塞 io 来实现;优点是没有阻塞当前线程,redis 没有返回,依然可以往 redis 发送命令;

缺点是代码书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决(openresty,skynet);配合 redis6.0 以后的 io 多线程(前提是有大量并发请求),异步连接池,能更好解决应用层的数据访问性能;


5.2.1 redis 驱动

1)redis 驱动:服务端使用异步连接,需要自己来实现 redis 驱动,也就是说需要把 redis 连接融合自己项目中的 reactor 进行管理。


2)设计 redis 适配器,其主要功能有:

构建 redis 事件对象,其中包括:hiredis 事件对象和 reactor 事件对象。

适配事件控制,复用项目中 reactor 的事件循环。


3)hiredis 的封装规则有:

reactor 的实现:所有的 IO 由用户实现。

适配器的实现:hiredis 提供了事件操作接口,用户只需要适配这些事件接口。特别的,不同网络库,不同平台,对事件操作的接口不一致。

// 用户需要适配的 hiredis 事件接口有
addRead   // 添加读事件      
delRead   // 删除读事件
addWrite  // 添加写事件
delWrite  // 删除写事件
cleanup   // 事件对象释放 
scheduleTimer

5.2.2 例子

第 1 步,实现 redis 驱动

reactor.h

#ifndef _MARK_REACTOR_
#define _MARK_REACTOR_
#include <sys/epoll.h>
#include <stdio.h>
#include <unistd.h> // read write
#include <fcntl.h> // fcntl
#include <sys/types.h> // listen
#include <sys/socket.h> // socket
#include <errno.h> // errno
#include <arpa/inet.h> // inet_addr htons
// #include <netinet/tcp.h>
#include <assert.h> // assert
#include <stdlib.h> // malloc
#include <string.h> // memcpy memmove
#include "chainbuffer/buffer.h"
// #include "ringbuffer/buffer.h"
#define MAX_EVENT_NUM 512       // 每次用户拷贝事件的最大数目
#define MAX_CONN ((1<<16)-1)    // 事件对象的最大数目:65535
typedef struct event_s event_t;
typedef void (*event_callback_fn)(int fd, int events, void *privdata);
typedef void (*error_callback_fn)(int fd, char * err);
// reactor对象,管理 io 全局变量 
typedef struct {
    int epfd;        // epfd 
    int listenfd;    // 监听的fd
    int stop;        // 停止循环标记
    event_t *events; // 存储监听的所有事件(event_t),存储在堆上,记得释放
    int iter;        // 用于遍历events,获取没有被使用的位置
    struct epoll_event fire[MAX_EVENT_NUM]; // 用户态数组,用于拷贝io事件到用户态
} reactor_t;
// 事件对象,sockitem,保存每个fd对应的io状态
struct event_s {
    int fd;         // 对应的事件 fd
    reactor_t *r;   // 指向 reactor 全局对象
    buffer_t in;    // 读缓冲,待读取
    buffer_t out;   // 写缓冲,待发送
    event_callback_fn read_fn;  // 读回调
    event_callback_fn write_fn; // 写回调
    error_callback_fn error_fn; // 错误回调
};
int event_buffer_read(event_t *e);
int event_buffer_write(event_t *e, void * buf, int sz);
reactor_t * create_reactor();
void release_reactor(reactor_t * r);
event_t * new_event(reactor_t *R, int fd,
    event_callback_fn rd,
    event_callback_fn wt,
    error_callback_fn err);
void free_event(event_t *e);
int set_nonblock(int fd);
int add_event(reactor_t *R, int events, event_t *e);
int del_event(reactor_t *R, event_t *e);
int enable_event(reactor_t *R, event_t *e, int readable, int writeable);
void eventloop_once(reactor_t * r, int timeout);
void stop_eventloop(reactor_t * r);
void eventloop(reactor_t * r);
int create_server(reactor_t *R, short port, event_callback_fn func);
int event_buffer_read(event_t *e);
int event_buffer_write(event_t *e, void * buf, int sz);
#endif

reactor.c

#include "reactor.h"
// 创建reactor对象
reactor_t *
create_reactor() {
    reactor_t *r = (reactor_t *)malloc(sizeof(*r));
    r->epfd = epoll_create(1);
    r->listenfd = 0;
    r->stop = 0;
    r->iter = 0;
    r->events = (event_t*)malloc(sizeof(event_t)*MAX_CONN);
    memset(r->events, 0, sizeof(event_t)*MAX_CONN);
    memset(r->fire, 0, sizeof(struct epoll_event) * MAX_EVENT_NUM);
    // init_timer();
    return r;
}
// 释放reactor对象
void
release_reactor(reactor_t * r) {
    free(r->events);
    close(r->epfd);
    free(r);
}
// 获取reactor的事件堆event上的空闲事件对象
static event_t *
_get_event_t(reactor_t *r) {
    r->iter ++;
    while (r->events[r->iter & MAX_CONN].fd > 0) {
        r->iter++;
    }
    return &r->events[r->iter];
}
// 创建事件对象
event_t *
new_event(reactor_t *R, int fd,
    event_callback_fn rd,
    event_callback_fn wt,
    error_callback_fn err) {
    assert(rd != 0 || wt != 0 || err != 0);
    // 获取空闲事件对象
    event_t *e = _get_event_t(R);
    // 初始化事件对象
    e->r = R;
    e->fd = fd;
    buffer_init(&e->in, 1024*16);
    buffer_init(&e->out, 1024*16);
    e->read_fn = rd;
    e->write_fn = wt;
    e->error_fn = err;
    return e;
}
// 释放事件对象分配的buffer空间
void
free_event(event_t *e) {
    buffer_free(&e->in);
    buffer_free(&e->out);
}
// 设置非阻塞的fd
int
set_nonblock(int fd) {
    int flag = fcntl(fd, F_GETFL, 0);
    return fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
// 添加事件
int
add_event(reactor_t *R, int events, event_t *e) {
    struct epoll_event ev;
    ev.events = events;
    ev.data.ptr = e;
    if (epoll_ctl(R->epfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) {
        printf("add event err fd = %d\n", e->fd);
        return 1;
    }
    return 0;
}
// 删除事件
int
del_event(reactor_t *R, event_t *e) {
    epoll_ctl(R->epfd, EPOLL_CTL_DEL, e->fd, NULL);
    free_event(e);
    return 0;
}
// 修改事件(读事件 or 写事件)
int
enable_event(reactor_t *R, event_t *e, int readable, int writeable) {
    struct epoll_event ev;
    ev.events = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0);
    ev.data.ptr = e;
    if (epoll_ctl(R->epfd, EPOLL_CTL_MOD, e->fd, &ev) == -1) {
        return 1;
    }
    return 0;
}
// 一次事件循环
void
eventloop_once(reactor_t * r, int timeout) {
    int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
    for (int i = 0; i < n; i++) {
        struct epoll_event *e = &r->fire[i];
        int mask = e->events;
        if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
        if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
        event_t *et = (event_t*) e->data.ptr;
        // 读事件处理
        if (mask & EPOLLIN) {
            if (et->read_fn)
                et->read_fn(et->fd, EPOLLIN, et);
        }
        // 写事件处理
        if (mask & EPOLLOUT) {
            if (et->write_fn)
                et->write_fn(et->fd, EPOLLOUT, et);
            else {
                uint8_t * buf = buffer_write_atmost(&et->out);
                event_buffer_write(et, buf, buffer_len(&et->out));
            }
        }
    }
}
// 停止事件循环
void
stop_eventloop(reactor_t * r) {
    r->stop = 1;
}
// 事件循环
void
eventloop(reactor_t * r) {
    while (!r->stop) {
        // int timeout = find_nearest_expire_timer();
        eventloop_once(r, /*timeout*/ -1);
        // expire_timer();
    }
}
// 创建服务器
int
create_server(reactor_t *R, short port, event_callback_fn func) {
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd < 0) {
        printf("create listenfd error!\n");
        return -1;
    }
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = INADDR_ANY;
    // 设置地址重用
    // 使得在关闭服务器之后立即重新启动服务器程序时,依然能够绑定到相同的端口上
    int reuse = 1;
    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int)) == -1) {
        printf("reuse address error: %s\n", strerror(errno));
        return -1;
    }
    if (bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
        printf("bind error %s\n", strerror(errno));
        return -1;
    }
    if (listen(listenfd, 5) < 0) {
        printf("listen error %s\n", strerror(errno));
        return -1;
    }
    if (set_nonblock(listenfd) < 0) {
        printf("set_nonblock error %s\n", strerror(errno));
        return -1;
    }
    R->listenfd = listenfd;
    event_t *e = new_event(R, listenfd, func, 0, 0);
    add_event(R, EPOLLIN, e);
    printf("listen port : %d\n", port);
    return 0;
}
// 读取数据
int
event_buffer_read(event_t *e) {
    int fd = e->fd;
    int num = 0;
    while (1) {
        // TODO: dont use char buf[] here
        char buf[1024] = {0};
        int n = read(fd, buf, 1024);
        if (n == 0) { // 半关闭状态 skynet
            printf("close connection fd = %d\n", fd);
            if (e->error_fn)
                e->error_fn(fd, "close socket");
            del_event(e->r, e);
            close(fd);
            return 0;
        } else if (n < 0) { // 异常
            if (errno == EINTR) // 被中断,重试
                continue;
            if (errno == EWOULDBLOCK)  // 阻塞,因为read buffer为空
                break;
            printf("read error fd = %d err = %s\n", fd, strerror(errno));
            if (e->error_fn)
                e->error_fn(fd, strerror(errno));
            del_event(e->r, e);
            close(fd);
            return 0;
        } else { // 正常
            printf("recv data from client:%s", buf);
            buffer_add(&e->in, buf, n);
        }
        // 多次读取的话,按序接在后面
        num += n;
    }
    return num;
}
// 向对端发送数据
static int
_write_socket(event_t *e, void * buf, int sz) {
    int fd = e->fd;
    while (1) {
        int n = write(fd, buf, sz);
        if (n < 0) {
            if (errno == EINTR)
                continue;
            if (errno == EWOULDBLOCK)
                break;
            if (e->error_fn)
                e->error_fn(fd, strerror(errno));
            del_event(e->r, e);
            close(e->fd);
        }
        return n;
    }
    return 0;
}
// 写数据
int
event_buffer_write(event_t *e, void * buf, int sz) {
    buffer_t *r = &e->out;
    if (buffer_len(r) == 0) {
        int n = _write_socket(e, buf, sz);
        if (n == 0 || n < sz) {
            // 发送失败,除了将没有发送出去的数据写入缓冲区,还要注册写事件
            buffer_add(&e->out, (char *)buf+n, sz-n);
            enable_event(e->r, e, 1, 1);
            return 0;
        } else if (n < 0) 
            return 0;
        return 1;
    }
    buffer_add(&e->out, (char *)buf, sz);
    return 1;
}

第 2 步,实现 redis 适配器,主要是构建 redis 事件对象和适配 hiredis 的事件控制接口。


// adapter_async.h
#ifndef _MARK_ADAPTER_
#define _MARK_ADAPTER_
#include <hiredis/hiredis.h>
#include <hiredis/alloc.h>
#include "reactor.h"
// redis 事件对象
typedef struct {
    event_t e;              // reactor 事件对象
    int mask;               // 存储注册的事件
    redisAsyncContext *ctx; // hiredis 事件对象
} redis_event_t;
// redis 对象读事件回调
static void redisReadHandler(int fd, int events, void *privdata) {
    ((void)fd);
    ((void)events);
    event_t *e = (event_t*)privdata;
    redis_event_t *re = (redis_event_t *)(char *)e;
    redisAsyncHandleRead(re->ctx);
}
// redis 对象写事件读回调
static void redisWriteHandler(int fd, int events, void *privdata) {
    ((void)fd);
    ((void)events);
    event_t *e = (event_t*)privdata;
    redis_event_t *re = (redis_event_t *)(char *)e;
    redisAsyncHandleWrite(re->ctx);
}
/**
 * @brief 对 reactor 管理的事件对象进行更新
 * @param privdata  redis 事件对象
 * @param flag      要设置的 epoll 事件类型 
 * @param remove    1 删除该事件 0 添加该事件
 */
static void redisEventUpdate(void *privdata, int flag, int remove) {
    redis_event_t *re = (redis_event_t *)privdata;
    reactor_t *r = re->e.r;
    int prevMask = re->mask;
    int enable = 0;             
    // redis 事件对象删除该事件
    if (remove) {
        if ((re->mask & flag) == 0) {
            return;
        }
        re->mask &= ~flag;
        enable = 0;
    } 
    // redis 事件对象添加该事件
    else {
        if (re->mask & flag) {
            return;    
        }           
        re->mask |= flag;
        enable = 1;
    }
    // 对 reactor 事件对象的处理
    // 1、reactor 事件对象删除该事件
    if (re->mask == 0) {
        del_event(r, &re->e);
    } 
    // 2、reactor 事件对象添加该事件(第一次加入)
    else if (prevMask == 0) {
        add_event(r, re->mask, &re->e);
    } 
    // 3、reactor 事件对象修改该事件
    else {
        // 注册读事件
        if (flag & EPOLLIN) {
            enable_event(r, &re->e, enable, 0);
        } 
        // 注册写事件
        else if (flag & EPOLLOUT) {
            enable_event(r, &re->e, 0, enable);
        }
    }
}
// 需要适配的 hiredis 事件接口
// 1、redis 事件对象添加读事件
static void redisAddRead(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.read_fn = redisReadHandler;
    redisEventUpdate(privdata, EPOLLIN, 0);
}
// 2、redis 事件对象删除读事件
static void redisDelRead(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.read_fn = 0;
    redisEventUpdate(privdata, EPOLLIN, 1);
}
// 3、redis 事件对象添加写事件
static void redisAddWrite(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.write_fn = redisWriteHandler;
    redisEventUpdate(privdata, EPOLLOUT, 0);
}
// 4、redis 事件对象删除写事件
static void redisDelWrite(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.write_fn = 0;
    redisEventUpdate(privdata, EPOLLOUT, 1);
}
// 5、redis 事件对象释放
static void redisCleanup(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    reactor_t *r = re->e.r;
    del_event(r, &re->e);
    hi_free(re);
}
// redis 事件对象绑定:reactor 对象和 redis 异步上下文
static int redisAttach(reactor_t *r, redisAsyncContext *ac) { 
    redisContext *c = &(ac->c); // redis 同步上下文
    redis_event_t *re;          // redis 事件对象
    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;
    /* Create container for ctx and r/w events */
    re = (redis_event_t*)hi_malloc(sizeof(*re));
    if (re == NULL) {
        return REDIS_ERR;
    }  
    // redis 事件对象绑定 reactor 对象和 redis 异步上下文
    re->ctx = ac;       // 绑定 redis 异步上下文
    re->e.fd = c->fd;   // 绑定 redis 的fd
    re->e.r = r;        // 绑定 reacotr
    re->mask = 0;       // 绑定事件
    // redis 异步上下文设置,需要适配事件控制
    // hiredis 提供事件接口,用户实现事件接口
    ac->ev.addRead = redisAddRead;
    ac->ev.delRead = redisDelRead;
    ac->ev.addWrite = redisAddWrite;
    ac->ev.delWrite = redisDelWrite;
    ac->ev.cleanup = redisCleanup;
    ac->ev.data = re;
    return REDIS_OK;
}
#endif

第 3步,实现主体代码,实现功能


// gcc chainbuffer/buffer.c redis-test-async.c reactor.c -o redis-test-async -lhiredis 
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <time.h>
#include "reactor.h"
#include "adapter_async.h"
static reactor_t *R;
static int cnt, before, num;
int current_tick() {
    int t = 0;
    struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (int)ti.tv_sec * 1000;
  t += ti.tv_nsec / 1000000;
    return t;
}
void getCallback(redisAsyncContext *c, void *r, void *privdata) {
    redisReply *reply = r;
    if (reply == NULL) return;
    printf("argv[%s]: %lld\n", (char*)privdata, reply->integer);
    /* Disconnect after receiving the reply to GET */
    cnt++;
    if (cnt == num) {
        int used = current_tick()-before;
        printf("after %d exec redis command, used %d ms\n", num, used);
        redisAsyncDisconnect(c);
    }
}
void connectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        stop_eventloop(R);
        return;
    }
    printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        stop_eventloop(R);
        return;
    }
    printf("Disconnected...\n");
    stop_eventloop(R);
}
int main(int argc, char **argv) {
    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }
    // 创建reactor对象
    R = create_reactor();
    // redis 事件对象绑定:reactor 对象和 redis 异步上下文
    redisAttach(R, c);
    redisAsyncSetConnectCallback(c, connectCallback);
    redisAsyncSetDisconnectCallback(c, disconnectCallback);
    before = current_tick();
    num = (argc > 1) ? atoi(argv[1]) : 1000;
    for (int i = 0; i < num; i++) {
        redisAsyncCommand(c, getCallback, "count", "INCR counter");
    }
    eventloop(R);
    release_reactor(R);
    return 0;
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
缓存 NoSQL Redis
Redis 事务
10月更文挑战第18天
34 1
|
2月前
|
NoSQL Redis 数据库
Redis 连接
10月更文挑战第19天
37 0
|
7天前
|
NoSQL 应用服务中间件 API
Redis是如何建立连接和处理命令的
本文主要讲述 Redis 是如何监听客户端发出的set、get等命令的。
|
4天前
|
NoSQL Redis
Redis事务长什么样?一文带你全面了解
Redis事务是一组命令的有序队列,通过MULTI、EXEC、WATCH和DISCARD等命令实现原子性操作。事务中的命令在EXEC执行前不会实际运行,而是先进入队列,确保所有命令要么全部成功,要么全部失败。此外,Redis还支持Lua脚本实现类似事务的操作,通常更简单高效。事务适用于购物车结算、秒杀活动、排行榜更新等需要保证数据一致性的场景。
19 0
|
1月前
|
监控 NoSQL 网络协议
【Azure Redis】部署在AKS中的应用,连接Redis高频率出现timeout问题
查看Redis状态,没有任何异常,服务没有更新,Service Load, CPU, Memory, Connect等指标均正常。在排除Redis端问题后,转向了AKS中。 开始调查AKS的网络状态。最终发现每次Redis客户端出现超时问题时,几乎都对应了AKS NAT Gateway的更新事件,而Redis服务端没有任何异常。因此,超时问题很可能是由于NAT Gateway更新事件导致TCP连接被重置。
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
52 6
|
2月前
|
NoSQL 网络协议 算法
Redis 客户端连接
10月更文挑战第21天
45 1
|
2月前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
30 2
|
2月前
|
NoSQL 关系型数据库 MySQL
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
本文全面阐述了Redis事务的特性、原理、具体命令操作,指出Redis事务具有原子性但不保证一致性、持久性和隔离性,并解释了Redis事务的适用场景和WATCH命令的乐观锁机制。
406 0
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
|
7天前
|
存储 缓存 NoSQL
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
131 85