Redis 协议 事务 发布订阅 异步连接

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis 协议 事务 发布订阅 异步连接

1、redis 网络协议

1.1、redis 网络

微观上:reactor

  • 组成:IO 多路复用 + 非阻塞 IO
  • IO 职责:IO 检测和 IO 操作
  • 事件:异步事件处理流程,先注册事件,事件循环中处理事件 callback

宏观上:可以忽略其他流程,只关注数据包处理流程。当管道(连接)构成一个完整的包,处理对应的事件。

1.2、redis 协议

redis 协议设计

  • 消息边界:字符流头部 + 分隔符
  • 消息类型:字符串的第一个字符。

redis 采用 RESP 序列化协议,协议的不同部分使用以CRLF(\r\n)结束。

RESP 支持的数据类型,通过第一个字符判断数据类型

  • + Simple Strings
    +OK\r\n
  • - Errors:
    -Error <message>\r\n
  • : Integers
    :<数值>\r\n
  • $ Bulk Strings
    $<数据长度>\r\n<数据内容>\r\n
  • * Arrays
    *<元素个数n>\r\n<元素内容>...<元素n>

RESP 在 redis 请求-响应协议中的作用方式

  • 客户端发送字符串数组 ( Array + Bulk Strings) 到 redis 服务器
    *<参数数量>\r\n$<参数1的长度>\r\n<参数1的数据>\r\n...$<参数n的长度>\r\n<参数n的数据>\r\n
  • redis 服务器根据命令实现回复一种 RESP 数据类型到客户端。

来看下面例子

在 redis-cli,发送一条命令 set key value,对应的报文为:

*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue

执行成功 OK,回应的报文为:

+OK\r\n

若执行失败,回应的报文为

-ERR unknown command `ket`, with args beginning with: `key`, `value`, \r\n

2、redis pipline

redis pipline 是 redis 客户端提供的机制,与 redis 本身无关,是为了节约网络传输时间而设计的。具体来说,客户端一次性发送多个请求,redis 服务器按序依次回复,与 http 1.1 类似。

pipeline

3、redis 事务

事务:用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。

探讨事务的前提:在并发连接的情况下,不同连接异步执行命令造成的不可预期的冲突。

3.1、事务的特征

  • 原子性 Atomicity:事务不可分割,要么全部成功,要么全部失败,如果执行失败必须提供回滚机制。原子操作的原子性,只有执行或不执行,其他线程不可能看到其他状态。
  • 一致性 Consistency:事务的前后,所有的数据都保持一个一致的状态,不能违反数据的一致性检测。这里的一致性指的是预期的一致性不是异常后的一致性。类型一致性,逻辑一致性,数据一致性(主从数据库一致)。
  • 隔离性 Isolation:并发事务间的要相互隔离。redis 单线程执行,天然具备隔离性。mysql 一条连接对应一个线程。多线程环境下需要对临界资源进行加锁。
  • 持久性 Durability:事务一旦提交,对数据的改变就是永久性的,即数据落盘。

3.2、事务命令

redis 客户端以 MULTI 开启一个事务,发送多个命令到服务端的队列,直到发送 EXEC 命令后redis 服务端才会执行队列中的命令,将队列作为一个整体来执行。

# 开启事务
 MULTI
 # 提交事务
 EXEC
 # 取消事务
 DISCARD
 # 监视 key 的变动,在事务开启前调用,乐观锁 cas 实现。若在事务执行中,key 变动则取消事务返回 nil。
 WATCH key

实际工作中不会使用,这是因为事务命令是由乐观锁实现的,失败需要重试,会增加业务逻辑的复杂程度。

3.3、* lua 脚本

redis 内置 lua 解释器来执行 lua 脚本,通过 lua 脚本实现原子性。

面试点:lua 脚本满足原子性和隔离性,不满足一致性和持久性。

  • 原子性:Lua 脚本通过一个命令执行,脚本中所有的命令一起执行,具有原子性。
  • 一致性:不具备一致性,lua 脚本执行失败,已经成功的命令作用数据库,无法回滚。
  • 隔离性:redis 单线程执行,且 lua 脚本作为单数据包运行。
  • 持久性:不具备持久性,只有在 aof 并且 appendfsync = always 才具备,实际工作不会采用该方法。

3.3.1、命令

# 测试使用
 # 执行 lua 脚本
 EVAL script numkeys [key...] arg [arg...]
 # 实际使用
 # 只保存40位哈希字符串,减少数据传输量
 # 1、缓存脚本,将用户给定的脚本缓存在服务器中,并返回脚本对应的SHA1校验和(40位字符串)作为结果
 SCRIPT LOAD script
 # 2、执行缓存的脚本
 EVALSHA sha1 numkeys key [key ...] arg [arg ...]
 # 附:脚本管理命令
 # 检查脚本是否缓存
 SCRIPT EXISTS sha1 [sha1...]
 # 清除所有脚本缓存
 SCRIPT FLUSH
 # 强制停止正在运行的脚本,如死循环
 SCRIPT KILL

3.3.2、应用

  • 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本 script load
  • 项目中若需要热更新,通过 redis-cli 执行 script flush。然后可以通过订阅发布功能通知所有服务器重新加载lua脚本
  • 若项目中lua脚本发生阻塞,可通过 script kill 暂停当前阻塞脚本的执行

例:执行加倍操作

set mark 1
 # 测试使用
 eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 mark
 (integer) 2
 127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 darren
 (integer) 0
 # 实际使用
 # 1、缓存脚本
 script load "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;"
 "9da2e1ac090f2e1df67087370de115a4291cd0bd"
 # 2、执行缓存脚本
 evalsha "9da2e1ac090f2e1df67087370de115a4291cd0bd" 1 mark
 (integer) 4

4、redia 发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块,是一种分布式消息队列机制。订阅者通过特定的频道来接收发送者发送至该频道的消息。该机制并不保证消息一定到达,可以采用 stream 方式确保可达。

存在的问题有:发送者发送一条消息,若没有订阅者,则消息直接丢弃。若发送期间,一个订阅者断开连接,那么在断开连接期间消息对于该订阅者来说彻底丢失了。此外,redis 停机重启,pubsub 的消息是不会持久化的,所有的消息被直接丢弃。

4.1、命令

# 向频道发送消息
 publish channel message
 # 订阅频道
 subscribe channel [channel...]
 # 取消订阅频道
 unsubscribe [channel...]
 # 订阅模式
 psubscribe pattern [pattern...]
 # 退订模式
 punsubscribe [pattern...]
 # 查看发布与订阅的相关信息
 PUBSUB CHANNELS [pattern]

4.2、应用

发布订阅功能一般要重新开启一个连接,这是因为命令连接严格遵循请求回应模式,pubsub 能收到 redis 主动推送的内容。所以实际项目中如果支持 pubsub 的话,需要另开一条连接用于处理发布订阅。

# 一个客户端订阅频道
 SUBSCRIBE news.shanxi news.henan news.shandong
 # 另一个客户端订阅频道,模式匹配
 PSUBSCRIBE news.*
 # 向频道发送信息,该频道所有订阅者收到消息
 publish news.shanxi 'harmony'

5、redis 异步连接

hiredis 是一个 redis 的 C 客户端库函数,服务端可以使用它来访问 redis 服务器。

5.1、同步连接

同步连接采用阻塞 io 来实现,但是会阻塞当前线程,直至 redis 返回结果。

参考文档:hiredis 的使用

例如:访问 redis,并对 counter 实现自增1000次,统计用时。

// gcc redis-test-sync.c -o sync -lhiredis
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
 #include <hiredis/hiredis.h>
 int current_tick() {
     int t = 0;
     struct timespec ti;
     clock_gettime(CLOCK_MONOTONIC, &ti);
     t = (int)ti.tv_sec * 1000;
     t += ti.tv_nsec / 1000000;
     return t;
 }
 int main(int argc, char **argv) {
     unsigned int j, isunix = 0;
     redisContext *c;
     redisReply *reply;
     const char *hostname = "127.0.0.1";
     int port = 6379;
     struct timeval timeout = { 1, 500000 }; // 1.5 seconds
     c = redisConnectWithTimeout(hostname, port, timeout);
     if (c == NULL || c->err) {
         if (c) {
             printf("Connection error: %s\n", c->errstr);
             redisFree(c);
         } else {
             printf("Connection error: can't allocate redis context\n");
         }
         exit(1);
     }
     int num = (argc > 1) ? atoi(argv[1]) : 1000;
     int before = current_tick();
     reply = redisCommand(c, "auth 123456");
     freeReplyObject(reply);
     for (int i = 0; i < num; ++i) {
         reply = redisCommand(c, "INCR counter");
         printf("INCR counter: %lld\n", reply->integer);
         freeReplyObject(reply);
     }
     int used = current_tick() - before;
     printf("after %d exec redis command, used %d ms\n", num, used);
     /* Disconnects and frees the context */
     redisFree(c);
     return 0;
 }

5.2、异步连接

异步连接采用非阻塞 io 实现,不会阻塞当前线程。缺点是代码书写异步,业务逻辑割裂,可以通过携程解决。在有大量并发请求的情况,配合 redis 6.0 以后的 io 多线程,异步连接池,能更好解决应用层的数据访问性能

5.2.1、redis 驱动

redis 驱动:服务端使用异步连接,需要自己来实现 redis 驱动,也就是说需要把 redis 连接融合自己项目中的 reactor 进行管理。

接着还需要设计 redis 适配器,其主要功能有:

  • 构建 redis 事件对象,其中包括:hiredis 事件对象和 reactor 事件对象。
  • 适配事件控制,复用项目中 reactor 的事件循环。

综上所述,hiredis 的封装规则有:

  • reactor 的实现:所有的 IO 由用户实现。
  • 适配器的实现:hiredis 提供了事件操作接口,用户需要适配这些事件接口。
// 用户需要适配的 hiredis 事件接口有
 addRead     // 添加读事件            
 delRead     // 删除读事件
 addWrite    // 添加写事件
 delWrite    // 删除写事件
 cleanup     // 事件对象释放   
 scheduleTimer

5.2.2、范例

这里对 4.1 的例子使用异步的方法来实现。

第 1 步,实现 redis 驱动

#ifndef _REACTOR_
#define _REACTOR_
#include <stdio.h>
#include <unistd.h> // read write
#include <fcntl.h> // fcntl
#include <sys/types.h> // listen
#include <sys/socket.h> // socket
#include <errno.h> // errno
#include <arpa/inet.h> // inet_addr htons
// #include <netinet/tcp.h>
#include <assert.h> // assert
#include <sys/epoll.h>
#include <stdlib.h> // malloc
#include <string.h> // memcpy memmove
#include "chainbuffer/buffer.h"
// #include "ringbuffer/buffer.h"
#define MAX_EVENT_NUM 512       // 每次用户拷贝事件的最大数目
#define MAX_CONN ((1<<16)-1)    // 事件对象的最大数目:65535
typedef struct event_s event_t;
typedef void (*event_callback_fn)(int fd, int events, void *privdata);
typedef void (*error_callback_fn)(int fd, char * err);
// reactor对象,管理 io 全局变量 
typedef struct {
    int epfd;        // epfd 
    int listenfd;    // 监听的fd
    int stop;        // 停止循环标记
    event_t *events; // 存储监听的所有事件(event_t),存储在堆上,记得释放
    int iter;        // 用于遍历events,获取没有被使用的位置
    struct epoll_event fire[MAX_EVENT_NUM]; // 用户态数组,用于拷贝io事件到用户态
} reactor_t;
// 事件对象,sockitem,保存每个fd对应的io状态
struct event_s {
    int fd;         // 对应的事件 fd
    reactor_t *r;   // 指向 reactor 全局对象
    buffer_t in;    // 读缓冲,待读取
    buffer_t out;   // 写缓冲,待发送
    event_callback_fn read_fn;  // 读回调
    event_callback_fn write_fn; // 写回调
    error_callback_fn error_fn; // 错误回调
};
int event_buffer_read(event_t *e);
int event_buffer_write(event_t *e, void * buf, int sz);
// 创建 reactor 对象
reactor_t * create_reactor() {
    // 堆上申请 reactor 对象
    reactor_t *r = (reactor_t *)malloc(sizeof(*r));
    r->epfd = epoll_create(1);
    r->listenfd = 0;
    r->stop = 0;
    r->iter = 0;
    // 堆上申请 reactor 中的events数组
    r->events = (event_t*)malloc(sizeof(event_t)*MAX_CONN);
    memset(r->events, 0, sizeof(event_t)*MAX_CONN);
    memset(r->fire, 0, sizeof(struct epoll_event) * MAX_EVENT_NUM);
    // init_timer();
    return r;
}
// 释放 reactor 对象
void release_reactor(reactor_t * r) {
    free(r->events);    // 释放reactor在堆上申请的events
    close(r->epfd);     // 关闭epoll
    free(r);            // 释放reactor
}
// 从 reactor 的事件堆上获取空闲的事件对象
event_t * _get_event_t(reactor_t *r) {
    r->iter ++;
    // 寻找没有被使用的事件对象
    while (r->events[r->iter & MAX_CONN].fd > 0) {
        r->iter++;
    }
    return &r->events[r->iter];
}
// 基于事件的操作
// 1、创建事件对象
event_t * new_event(reactor_t *R, int fd,
    event_callback_fn rd,
    event_callback_fn wt,
    error_callback_fn err) {
    assert(rd != 0 || wt != 0 || err != 0);
    // 获取空闲的事件对象
    event_t *e = _get_event_t(R);
    // 初始化事件对象
    e->r = R;
    e->fd = fd;
    buffer_init(&e->in, 1024*16);
    buffer_init(&e->out, 1024*16);
    e->read_fn = rd;
    e->write_fn = wt;
    e->error_fn = err;
    return e;
}
// 2、添加事件
int add_event(reactor_t *R, int events, event_t *e) {
    struct epoll_event ev;
  ev.events = events;
  ev.data.ptr = e;
  if (epoll_ctl(R->epfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) {
        printf("add event err fd = %d\n", e->fd);
    return 1;
  }
  return 0;
}
// 释放事件所占空间
void free_event(event_t *e) {
  buffer_free(&e->in);    
  buffer_free(&e->out);
}
// 3、删除事件
int del_event(reactor_t *R, event_t *e) {
  epoll_ctl(R->epfd, EPOLL_CTL_DEL, e->fd, NULL);
    free_event(e);
    return 0;
}
// 4、修改事件,由后面两个参数决定是读事件还是写事件
int enable_event(reactor_t *R, event_t *e, int readable, int writeable) {
  struct epoll_event ev;
  ev.events = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0);
  ev.data.ptr = e;
  if (epoll_ctl(R->epfd, EPOLL_CTL_MOD, e->fd, &ev) == -1) {
    return 1;
  }
  return 0;
}
// 一次事件循环
void eventloop_once(reactor_t * r, int timeout) {
    int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
    for (int i = 0; i < n; ++i) {
        struct epoll_event *e = &r->fire[i];  // 获取事件
        int mask = e->events;                 // 获取事件类型
        // 用 io 函数捕获具体的错误信息
        if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
        // 用 io 函数捕获断开的具体信息
        if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
        event_t *et = (event_t*) e->data.ptr; // 获取事件关联的用户数据
        // 处理读事件
        if (mask & EPOLLIN) {
            if (et->read_fn) {
                et->read_fn(et->fd, EPOLLIN, et);   // 执行读回调
            }          
        }
        // 处理写事件
        if (mask & EPOLLOUT) {
            if (et->write_fn) {
                et->write_fn(et->fd, EPOLLOUT, et); // 执行写回调
            }     
            else {
                uint8_t *buf = buffer_write_atmost(&et->out);
                event_buffer_write(et, buf, buffer_len(&et->out));
            }
        }
    }
}
// 停止事件循环
void stop_eventloop(reactor_t * r) {
    r->stop = 1;
}
// 事件循环
void eventloop(reactor_t * r) {
    while (!r->stop) {
        // int timeout = find_nearest_expire_timer();
        eventloop_once(r, /*timeout*/ -1);
        // expire_timer();
    }
}
// 设置非阻塞fd
int set_nonblock(int fd) {
  int flag = fcntl(fd, F_GETFL, 0);
  return fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
// 创建服务器
int create_server(reactor_t *R, short port, event_callback_fn func) {
  // 1、socket
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
  if (listenfd < 0) {
        printf("create listenfd error!\n");
    return -1;
  }
  struct sockaddr_in addr;
  memset(&addr, 0, sizeof(struct sockaddr_in));
  addr.sin_family = AF_INET;
  addr.sin_port = htons(port);
  addr.sin_addr.s_addr = INADDR_ANY;
    // 设置地址可重用
    int reuse = 1;
  if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int)) == -1) {
        printf("reuse address error: %s\n", strerror(errno));
        return -1;
    }
    // 2、bind
  if (bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
        printf("bind error %s\n", strerror(errno));
    return -1;
  }
    // 3、listen
  if (listen(listenfd, 5) < 0) {
        printf("listen error %s\n", strerror(errno));
    return -1;
  }
    // 设置 listenfd 非阻塞 
    if (set_nonblock(listenfd) < 0) {
        printf("set_nonblock error %s\n", strerror(errno));
        return -1;
    }
    R->listenfd = listenfd;
    // 注册读事件
    event_t *e = new_event(R, listenfd, func, 0, 0);
    add_event(R, EPOLLIN, e);
  printf("listen port : %d\n", port);
  return 0;
}
// 读数据
int event_buffer_read(event_t *e) {
    int fd = e->fd; 
    int num = 0;    // 读取的数据总量
    while (1) {
        // TODO: dont use char buf[] here
        char buf[1024] = {0};
        int n = read(fd, buf, 1024);
        // 1、read=0, 服务器收到FIN包,半关闭状态
        // Todo: 半关闭状态逻辑处理,参考 skynet
        if (n == 0) { // 
            printf("close connection fd = %d\n", fd);
            if (e->error_fn) {
                e->error_fn(fd, "close socket");
            }   
            del_event(e->r, e);
            close(fd);
            return 0;
        } 
        // 2、read=-1,读异常
        else if (n < 0) {
            // 2.1、EINTR:中断,重试
            if (errno == EINTR) {
                continue;
            }
            // 2.2、EWOULDBLOCK:阻塞,读缓冲区为空 
            if (errno == EWOULDBLOCK) {
                break;
            }
            // 其他错误,执行错误回调,删除该事件,关闭当前连接
            printf("read error fd = %d err = %s\n", fd, strerror(errno));
            if (e->error_fn)
                e->error_fn(fd, strerror(errno));
            del_event(e->r, e);
            close(fd);
            return 0;
        } 
        // 3、read>0, 正常,读取数据,处理业务逻辑
        else {
            printf("recv data from client:%s", buf);
            buffer_add(&e->in, buf, n);
        }
        num += n;
    }
    return num;
}
// 向对端发送数据
int _write_socket(event_t *e, void * buf, int sz) {
    int fd = e->fd;
    while (1) {
        int n = write(fd, buf, sz);
        // 1、write=-1,写异常
        if (n < 0) {
            // 2.1、EINTR:中断,重试
            if (errno == EINTR) {
                continue;
            }  
            // 2.2、EWOULDBLOCK:阻塞,需要注册写事件
            if (errno == EWOULDBLOCK) {
                break;
            }
            // 其他错误,执行错误回调,删除该事件,关闭当前连接   
            if (e->error_fn) {
                e->error_fn(fd, strerror(errno));
            }     
            del_event(e->r, e);
            close(e->fd);
        }
        return n;
    }
    return 0;
}
// 写数据
int event_buffer_write(event_t *e, void * buf, int sz) {
    // 指向用户写缓冲
    buffer_t *r = &e->out;
    // 1、用户写缓冲已满,开始发送
    if (buffer_len(r) == 0) {
        // 向对端发送数据
        int n = _write_socket(e, buf, sz);
        // 1.1、本次数据未发送完,未发送的数据写入缓冲,并注册写事件
        if (n == 0 || n < sz) {
            // 1.1、将没有发送完的数据写入缓冲区
            buffer_add(&e->out, (char *)buf + n, sz - n);
            // 1.2、注册写事件,等待下次事件触发接着发送
            enable_event(e->r, e, 1, 1);
            return 0;
        } 
        // 1.2、本次没有发送数据
        else if (n < 0) {
            return 0;
        }
        // 1.3、本次数据发送完成    
        return 1;
    }
    // 2、用户写缓冲未满,写入缓冲,等待发送
    buffer_add(&e->out, (char *)buf, sz);
    return 1;
}
#endif

第 2 步,实现 redis 适配器,主要是构建 redis 事件对象和适配 hiredis 的事件控制接口。

// adapter_async.h
 #ifndef _ADAPTER_
 #define _ADAPTER_
 #include <hiredis/hiredis.h>
 #include <hiredis/alloc.h>
 #include "reactor.h"
 // redis 事件对象
 typedef struct {
     event_t e;              // reactor 事件对象
     int mask;               // 存储注册的事件
     redisAsyncContext *ctx; // hiredis 事件对象
 } redis_event_t;
 // redis 对象读事件回调
 static void redisReadHandler(int fd, int events, void *privdata) {
     ((void)fd);
     ((void)events);
     event_t *e = (event_t*)privdata;
     redis_event_t *re = (redis_event_t *)(char *)e;
     redisAsyncHandleRead(re->ctx);
 }
 // redis 对象写事件读回调
 static void redisWriteHandler(int fd, int events, void *privdata) {
     ((void)fd);
     ((void)events);
     event_t *e = (event_t*)privdata;
     redis_event_t *re = (redis_event_t *)(char *)e;
     redisAsyncHandleWrite(re->ctx);
 }
 /**
  * @brief 对 reactor 管理的事件对象进行更新
  * @param privdata  redis 事件对象
  * @param flag      要设置的 epoll 事件类型 
  * @param remove    1 删除该事件 0 添加该事件
  */
 static void redisEventUpdate(void *privdata, int flag, int remove) {
     redis_event_t *re = (redis_event_t *)privdata;
     reactor_t *r = re->e.r;
     int prevMask = re->mask;
     int enable = 0;             
     // redis 事件对象删除该事件
     if (remove) {
         if ((re->mask & flag) == 0) {
             return;
         }
         re->mask &= ~flag;
         enable = 0;
     } 
     // redis 事件对象添加该事件
     else {
         if (re->mask & flag) {
             return;    
         }           
         re->mask |= flag;
         enable = 1;
     }
     // 对 reactor 事件对象的处理
     // 1、reactor 事件对象删除该事件
     if (re->mask == 0) {
         del_event(r, &re->e);
     } 
     // 2、reactor 事件对象添加该事件(第一次加入)
     else if (prevMask == 0) {
         add_event(r, re->mask, &re->e);
     } 
     // 3、reactor 事件对象修改该事件
     else {
         // 注册读事件
         if (flag & EPOLLIN) {
             enable_event(r, &re->e, enable, 0);
         } 
         // 注册写事件
         else if (flag & EPOLLOUT) {
             enable_event(r, &re->e, 0, enable);
         }
     }
 }
 // 需要适配的 hiredis 事件接口
 // 1、redis 事件对象添加读事件
 static void redisAddRead(void *privdata) {
     redis_event_t *re = (redis_event_t *)privdata;
     re->e.read_fn = redisReadHandler;
     redisEventUpdate(privdata, EPOLLIN, 0);
 }
 // 2、redis 事件对象删除读事件
 static void redisDelRead(void *privdata) {
     redis_event_t *re = (redis_event_t *)privdata;
     re->e.read_fn = 0;
     redisEventUpdate(privdata, EPOLLIN, 1);
 }
 // 3、redis 事件对象添加写事件
 static void redisAddWrite(void *privdata) {
     redis_event_t *re = (redis_event_t *)privdata;
     re->e.write_fn = redisWriteHandler;
     redisEventUpdate(privdata, EPOLLOUT, 0);
 }
 // 4、redis 事件对象删除写事件
 static void redisDelWrite(void *privdata) {
     redis_event_t *re = (redis_event_t *)privdata;
     re->e.write_fn = 0;
     redisEventUpdate(privdata, EPOLLOUT, 1);
 }
 // 5、redis 事件对象释放
 static void redisCleanup(void *privdata) {
     redis_event_t *re = (redis_event_t *)privdata;
     reactor_t *r = re->e.r;
     del_event(r, &re->e);
     hi_free(re);
 }
 // redis 事件对象绑定,reactor 对象和 redis 异步上下文
 static int redisAttach(reactor_t *r, redisAsyncContext *ac) { 
     redisContext *c = &(ac->c); // redis 同步上下文
     redis_event_t *re;          // redis 事件对象
     /* Nothing should be attached when something is already attached */
     if (ac->ev.data != NULL)
         return REDIS_ERR;
     /* Create container for ctx and r/w events */
     re = (redis_event_t*)hi_malloc(sizeof(*re));
     if (re == NULL) {
         return REDIS_ERR;
     }  
     // redis 事件对象绑定 reactor 对象和 redis 异步上下文
     re->ctx = ac;       // 绑定 redis 异步上下文
     re->e.fd = c->fd;   // 绑定 redis 的fd
     re->e.r = r;        // 绑定 reacotr
     re->mask = 0;       // 绑定事件
     // redis 异步上下文设置,需要适配事件控制
     // hiredis 提供事件接口,用户实现事件接口
     ac->ev.addRead = redisAddRead;
     ac->ev.delRead = redisDelRead;
     ac->ev.addWrite = redisAddWrite;
     ac->ev.delWrite = redisDelWrite;
     ac->ev.cleanup = redisCleanup;
     ac->ev.data = re;
     return REDIS_OK;
 }
 #endif

接下来,实现主体代码,实现功能

// redis-test-async.c
 // gcc redis-test-async.c chainbuffer/buffer.c -o async -lhiredis
 #include <hiredis/hiredis.h>
 #include <hiredis/async.h>
 #include <time.h>
 #include "reactor.h"
 #include "adapter_async.h"
 static reactor_t *R;
 static int cnt, before, num;
 int current_tick() {
     int t = 0;
     struct timespec ti;
     clock_gettime(CLOCK_MONOTONIC, &ti);
     t = (int)ti.tv_sec * 1000;
     t += ti.tv_nsec / 1000000;
     return t;
 }
 void getCallback(redisAsyncContext *c, void *r, void *privdata) {
     redisReply *reply = r;
     if (reply == NULL) return;
     printf("argv[%s]: %lld\n", (char*)privdata, reply->integer);
     /* Disconnect after receiving the reply to GET */
     cnt++;
     if (cnt == num) {
         int used = current_tick()-before;
         printf("after %d exec redis command, used %d ms\n", num, used);
         redisAsyncDisconnect(c);
     }
 }
 void connectCallback(const redisAsyncContext *c, int status) {
     if (status != REDIS_OK) {
         printf("Error: %s\n", c->errstr);
         stop_eventloop(R);
         return;
     }
     printf("Connected...\n");
 }
 void disconnectCallback(const redisAsyncContext *c, int status) {
     if (status != REDIS_OK) {
         printf("Error: %s\n", c->errstr);
         stop_eventloop(R);
         return;
     }
     printf("Disconnected...\n");
     stop_eventloop(R);
 }
 int main(int argc, char **argv) {
     redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
     if (c->err) {
         /* Let *c leak for now... */
         printf("Error: %s\n", c->errstr);
         return 1;
     }
     R = create_reactor();
     redisAttach(R, c);   
     redisAsyncSetConnectCallback(c, connectCallback);
     redisAsyncSetDisconnectCallback(c, disconnectCallback);
     before = current_tick();
     num = (argc > 1) ? atoi(argv[1]) : 1000;
     redisAsyncCommand(c, NULL, NULL, "auth 123456");  
     for (int i = 0; i < num; i++) {
         redisAsyncCommand(c, getCallback, "count", "INCR counter");
     }
     eventloop(R);
     release_reactor(R);
     return 0;
 }
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL Redis 数据库
Redis 连接
10月更文挑战第19天
38 0
|
11天前
|
NoSQL 应用服务中间件 API
Redis是如何建立连接和处理命令的
本文主要讲述 Redis 是如何监听客户端发出的set、get等命令的。
|
8天前
|
NoSQL Redis
Redis事务长什么样?一文带你全面了解
Redis事务是一组命令的有序队列,通过MULTI、EXEC、WATCH和DISCARD等命令实现原子性操作。事务中的命令在EXEC执行前不会实际运行,而是先进入队列,确保所有命令要么全部成功,要么全部失败。此外,Redis还支持Lua脚本实现类似事务的操作,通常更简单高效。事务适用于购物车结算、秒杀活动、排行榜更新等需要保证数据一致性的场景。
30 0
|
1月前
|
监控 NoSQL 网络协议
【Azure Redis】部署在AKS中的应用,连接Redis高频率出现timeout问题
查看Redis状态,没有任何异常,服务没有更新,Service Load, CPU, Memory, Connect等指标均正常。在排除Redis端问题后,转向了AKS中。 开始调查AKS的网络状态。最终发现每次Redis客户端出现超时问题时,几乎都对应了AKS NAT Gateway的更新事件,而Redis服务端没有任何异常。因此,超时问题很可能是由于NAT Gateway更新事件导致TCP连接被重置。
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
55 6
|
2月前
|
NoSQL 网络协议 算法
Redis 客户端连接
10月更文挑战第21天
45 1
|
NoSQL Java 关系型数据库
Redis_事务_锁机制_秒杀
Redis_事务_锁机制_秒杀
|
SQL NoSQL 关系型数据库
Redis(二十)-Redis的事务和锁机制
何为事务呢?我的理解是事务是一种机制,是一个不可分割的工作单元,要么都执行,要么都不执行。
173 0
Redis(二十)-Redis的事务和锁机制
|
NoSQL 关系型数据库 Redis
Redis的事务与锁机制
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。Redis 事务的主要作用就是串联多个命令,防止别的命令插队。
83 0
Redis的事务与锁机制
|
监控 NoSQL 关系型数据库
Redis——事务 & 锁机制
Redis——事务 & 锁机制
Redis——事务 & 锁机制