redis async client 与自有框架集成

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: redis async client 与自有框架集成

hiredis的异步接口已经支持ae libuv libev 和 libevent集成,具体头文件可以参见redis/deps/hiredis/adapters,样例参见redis/deps/hiredis/examples.


完整样例参见: https://github.com/DavadDi/study_example/tree/master/async_redis_client

 

参照hireids的异步接口和libevent的集成可以很容易和其他网络框架集成,例如asio或者ace等。 以下样例为自己编写reactor框架的集成方式,


支持自动重练和asyncRedisContext对象的创建和释放,重练使用退步算法,最大连接时间间隔为32秒。

 

使用方式:

  将redis_client.hpp 放到 hiredis的adapter目录即可。

#ifndef redis_client_h
#define redis_client_h
#include "reactor/define.hpp"
#include "reactor/event_handler.hpp"
#include "hiredis.h"
#include "async.h"
using namespace reactor;
static  void redisReactorAddRead(void *arg);
static  void redisReactorDelRead(void *arg);
static  void redisReactorAddWrite(void *arg);
static  void redisReactorDelWrite(void *arg);
static  void redisReactorCleanup(void *arg);
void connectCallBack(const redisAsyncContext *c, int status);
void disconnectCallBack(const redisAsyncContext *c, int status);
void get_call_fun(redisAsyncContext *c, void *r, void *arg)
{
    redisReply *reply = (redisReply *)r;
    std::string *key_str = (std::string *)arg;
    if (reply == NULL)
    {
        delete key_str;
        return;
    }
    LOG_INFO("[%s] -> %s\n", key_str->c_str(), reply->str);
    delete key_str;
    /* Disconnect after receiving the reply to GET */
    // redisAsyncDisconnect(c);
}
// -------------------------------------------------------------------
// !!NOTE, if obj conneted to server faild and unregister from epoll,
// prog exit, this object my leak memory
// -------------------------------------------------------------------
class CRedisClient : public CEventHandler
{
    public:
        // enum {MAX_BUF_SIZE = 4096};
        typedef CEventHandler super;
        CRedisClient(const char *srv_ip, uint16_t srv_port, reactor::CReactor *reactor)
            : super(reactor)
        {
            m_srv_ip_str = srv_ip;
            m_srv_port = srv_port;
        }
        int connect()
        {
            LOG_DEBUG("Enter CRedisClient connect()");
            m_client_status = CONNECT_STATUS::CLIENT_CONNECTING;
            clear_redis_context();
            m_redis_context = redisAsyncConnect(m_srv_ip_str.c_str(), m_srv_port);
            if (m_redis_context == nullptr)
            {
                return -1;
            }
            if (m_redis_context->err)
            {
                LOG_INFO("Connect to %s:%d Error: %s",
                        m_srv_ip_str.c_str(), m_srv_port, m_redis_context->errstr);
                return -1;
            }
            if (m_timer_id == 0)
            {
                m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
                LOG_DEBUG("Client regist timer to reactor id %d, timeout %d", m_timer_id, m_timeout_value);
            }
            this->attach();
            return 0;
        }
        virtual ~CRedisClient()
        {
            // maybe should not free redis context in deconstuct!!
            m_delete_redis_context = true;
            clear_redis_context();
        }
        virtual int open(void *data = nullptr)
        {
            m_client_status = CONNECT_STATUS::CLIENT_CONNECTED;
            m_delete_redis_context = false;
            if (m_timer_id == 0)
            {
                m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
                LOG_DEBUG("Client regist timer to reactor id %d, timeout %d",
                                                    m_timer_id, m_timeout_value);
            }
            LOG_INFO("Connect to RedisServer %s:%d succeed!!",
                                               m_srv_ip_str.c_str(), m_srv_port);
            return 0;
        }
        virtual int handle_input(socket_t socket)
        {
            redisAsyncHandleRead(m_redis_context);
            return 0;
        }
        virtual int handle_output(socket_t socket)
        {
            redisAsyncHandleWrite(m_redis_context);
            return 0;
        }
        virtual int handle_timeout(uint32_t tm, void *data = nullptr)
        {
            // LOG_DEBUG("Enter into timeout function....");
            if (m_client_status == CONNECT_STATUS::CLIENT_CONNECTED)
            {
                /* just for test */
                std::string key = std::to_string(tm);
                LOG_DEBUG("Set key %s", key.c_str());
                redisAsyncCommand(m_redis_context, NULL, NULL, "SET %s %s",key.c_str(), "aaa");
                redisAsyncCommand(m_redis_context, get_call_fun, (char*)new string(key), "GET %s", key.c_str());
            }
            else
            {
                static uint32_t last_tm = 0;
                if ((tm - last_tm) >= m_timeout_interval)
                {
                    //reconnect
                    LOG_DEBUG("Start reconnect now ...");
                    this->connect();
                    m_timeout_interval = m_timeout_interval * 2;
                    if (m_timeout_interval > 32)
                    {
                        m_timeout_interval = 1;
                    }
                    last_tm = tm;
                }
            }
            return 0;
        }
        virtual int handle_close(socket_t socket = INVALID_SOCKET, uint32_t mask = 0)
        {
            LOG_DEBUG("Enter into handle_close()");
            m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;
            // epoll call delete this handler
            if (mask & RE_MASK_DEL)
            {
                LOG_DEBUG("Call RE_MASK_DEL now");
                if (this->m_timer_id && (this->reactor() != nullptr))
                {
                    this->reactor()->unregist_timer(this->m_timer_id);
                    this->m_timer_id = 0;
                }
                delete this;
                return 0;
            }
            this->reactor()->del_event(this,0);
            return 0;
        }
        void clear_redis_context()
        {
            if (m_delete_redis_context && m_redis_context != nullptr)
            {
                LOG_DEBUG("Call redisAsynFree() now");
                redisAsyncFree(m_redis_context);
                m_redis_context = nullptr;
            }
        }
        int attach()
        {
            LOG_DEBUG("Enter attatch function... ");
            redisContext *context = &(m_redis_context->c);
            if (m_redis_context->ev.data != NULL)
            {
                return -1;
            }
            // set callback function
            redisAsyncSetConnectCallback(m_redis_context,connectCallBack);
            redisAsyncSetDisconnectCallback(m_redis_context,disconnectCallBack);
            this->set_handle(context->fd); // set handler
            m_redis_context->ev.addRead = redisReactorAddRead;
            m_redis_context->ev.delRead = redisReactorDelRead;
            m_redis_context->ev.addWrite = redisReactorAddWrite;
            m_redis_context->ev.delWrite = redisReactorDelWrite;
            m_redis_context->ev.cleanup = redisReactorCleanup;
            m_redis_context->ev.data = this;
            LOG_DEBUG("ac->ev.data %p", m_redis_context->ev.data);
            this->add_read();
            this->add_write();
            return 0;
        }
        void add_read()
        {
            LOG_TRACE_METHOD(__func__);
            if( (this->m_current_event_mask & reactor::EVENT_READ) > 0)
            {
                LOG_DEBUG("EV_READ(0x%0x) already in event_mask 0x%x",
                        reactor::EVENT_READ, this->m_current_event_mask);
                return;
            }
            this->reactor()->add_event(this, reactor::EVENT_READ);
        }
        void del_read()
        {
            LOG_TRACE_METHOD(__func__);
            this->reactor()->mod_event(this, this->m_current_event_mask&(~reactor::EVENT_READ));
        }
        void add_write()
        {
            LOG_TRACE_METHOD(__func__);
            this->schedule_write();
        }
        void del_write()
        {
            LOG_TRACE_METHOD(__func__);
            this->cancel_schedule_write();
        }
        void clean_up()
        {
            LOG_TRACE_METHOD(__func__);
        }
        // note!!!
        // connenct not succeed. we can free redis context. ]
        // But if connect succeed and borken, we don't connect
    protected:
        std::string m_srv_ip_str;
        uint16_t    m_srv_port;
        CONNECT_STATUS  m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;
        int m_timer_id  = 0;
        uint32_t m_timeout_value = 1;
        uint32_t  m_timeout_interval = 1;
        bool    m_delete_redis_context = true;
        redisAsyncContext *m_redis_context = nullptr;
};
static void redisReactorAddRead(void *arg)
{
    LOG_DEBUG("Enter redisReactorAddRead() arg %p", arg);
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->add_read();
}
static void redisReactorDelRead(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->del_read();
}
static void redisReactorAddWrite(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->add_write();
}
static void redisReactorDelWrite(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->del_write();
}
static void redisReactorCleanup(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->clean_up();
}
void connectCallBack(const redisAsyncContext *ac, int status)
{
    if (status != REDIS_OK)
    {
        LOG_ERROR("connectCallBack() Error: %s", ac->errstr);
        return;
    }
    CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
    event_handler->open();
    LOG_INFO("RedisClient Connected...");
}
void disconnectCallBack(const redisAsyncContext *ac, int status)
{
    CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
    event_handler->handle_close(0,0);
    if (status != REDIS_OK)
    {
        LOG_INFO("disconnectCallBack()!! Error: %s", ac->errstr);
        return;
    }
    LOG_INFO("RedisClient Disconnected...");
}
#endif /* redis_client_h */


使用的程序样例:


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
//#include <hiredis.h>
//#include <async.h>
#include <adapters/redis_client.hpp>
//#include "redis_client.hpp"
#include <signal.h>
static void signal_handler(int sig)
{
    if (sig == SIGINT)
    {
        reactor::CReactor::instance()->end_event_loop();
    }
}
/*
void get_call_fun(redisAsyncContext *c, void *r, void *arg)
{
    redisReply *reply = (redisReply *)r;
    std::string *key_str = (std::string *)arg;
    if (reply == NULL)
    {
        delete key_str;
        return;
    }
    LOG_INFO("[%s] -> %s\n", key_str->c_str(), reply->str);
    delete key_str;
    // Disconnect after receiving the reply to GET
    // redisAsyncDisconnect(c);
}
*/
int main (int argc, char **argv)
{
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT, signal_handler);
    CLoggerMgr logger("reactor.prop");
    reactor::CReactor *rt = reactor::CReactor::instance();
    CRedisClient *redis_client = new CRedisClient("127.0.0.1", 6379, rt);
    redis_client->connect();
    rt->run_event_loop();
    return 0;
}



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
6天前
|
缓存 NoSQL Java
Redis应用—8.相关的缓存框架
本文介绍了Ehcache和Guava Cache两个缓存框架及其使用方法,以及如何自定义缓存。主要内容包括:Ehcache缓存框架、Guava Cache缓存框架、自定义缓存。总结:Ehcache适合用作本地缓存或与Redis结合使用,Guava Cache则提供了更灵活的缓存管理和更高的并发性能。自定义缓存可以根据具体需求选择不同的数据结构和引用类型来实现特定的缓存策略。
Redis应用—8.相关的缓存框架
|
1月前
|
缓存 NoSQL Java
Redis的操作以及SpringCache框架
以及如何在Spring Boot应用中使用Spring Cache框架集成Redis。Redis提供了丰富的数据结构和高效的内存存储能力,结合Spring Cache框架,可以显著提高应用的性能和响应速度。
47 7
|
2月前
|
人工智能 达摩院 并行计算
VideoRefer:阿里达摩院开源视频对象感知与推理框架,可集成 VLLM 提升其空间和时间理解能力
VideoRefer 是浙江大学与阿里达摩学院联合推出的视频对象感知与推理技术,支持细粒度视频对象理解、复杂关系分析及多模态交互,适用于视频剪辑、教育、安防等多个领域。
190 17
VideoRefer:阿里达摩院开源视频对象感知与推理框架,可集成 VLLM 提升其空间和时间理解能力
|
4月前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
82 2
|
5月前
|
Java 程序员 API
Android|集成 slf4j + logback 作为日志框架
做个简单改造,统一 Android APP 和 Java 后端项目打印日志的体验。
226 1
|
5月前
|
开发框架 监控 搜索推荐
GoFly快速开发框架集成ZincSearch全文搜索引擎 - Elasticsearch轻量级替代为ZincSearch全文搜索引擎
本文介绍了在项目开发中使用ZincSearch作为全文搜索引擎的优势,包括其轻量级、易于安装和使用、资源占用低等特点,以及如何在GoFly快速开发框架中集成和使用ZincSearch,提供了详细的开发文档和实例代码,帮助开发者高效地实现搜索功能。
331 0
|
5月前
|
存储 缓存 NoSQL
深入理解Django与Redis的集成实践
深入理解Django与Redis的集成实践
140 0
|
5月前
|
存储 监控 NoSQL
Redis的实现二: c、c++的网络通信编程技术,让服务器处理多个client
本文讨论了在C/C++中实现服务器处理多个客户端的技术,重点介绍了事件循环和非阻塞IO的概念,以及如何在Linux上使用epoll来高效地监控和管理多个文件描述符。
59 0
|
5月前
|
NoSQL 网络协议 Linux
Redis的实现一:c、c++的网络通信编程技术,先实现server和client的通信
本文介绍了使用C/C++进行网络通信编程的基础知识,包括创建socket、设置套接字选项、绑定地址、监听连接以及循环接受和处理客户端请求的基本步骤。
87 6
|
5月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
125 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。