redis async client 与自有框架集成

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: redis async client 与自有框架集成

hiredis的异步接口已经支持ae libuv libev 和 libevent集成,具体头文件可以参见redis/deps/hiredis/adapters,样例参见redis/deps/hiredis/examples.


完整样例参见: https://github.com/DavadDi/study_example/tree/master/async_redis_client

 

参照hireids的异步接口和libevent的集成可以很容易和其他网络框架集成,例如asio或者ace等。 以下样例为自己编写reactor框架的集成方式,


支持自动重练和asyncRedisContext对象的创建和释放,重练使用退步算法,最大连接时间间隔为32秒。

 

使用方式:

  将redis_client.hpp 放到 hiredis的adapter目录即可。

#ifndef redis_client_h
#define redis_client_h
#include "reactor/define.hpp"
#include "reactor/event_handler.hpp"
#include "hiredis.h"
#include "async.h"
using namespace reactor;
static  void redisReactorAddRead(void *arg);
static  void redisReactorDelRead(void *arg);
static  void redisReactorAddWrite(void *arg);
static  void redisReactorDelWrite(void *arg);
static  void redisReactorCleanup(void *arg);
void connectCallBack(const redisAsyncContext *c, int status);
void disconnectCallBack(const redisAsyncContext *c, int status);
void get_call_fun(redisAsyncContext *c, void *r, void *arg)
{
    redisReply *reply = (redisReply *)r;
    std::string *key_str = (std::string *)arg;
    if (reply == NULL)
    {
        delete key_str;
        return;
    }
    LOG_INFO("[%s] -> %s\n", key_str->c_str(), reply->str);
    delete key_str;
    /* Disconnect after receiving the reply to GET */
    // redisAsyncDisconnect(c);
}
// -------------------------------------------------------------------
// !!NOTE, if obj conneted to server faild and unregister from epoll,
// prog exit, this object my leak memory
// -------------------------------------------------------------------
class CRedisClient : public CEventHandler
{
    public:
        // enum {MAX_BUF_SIZE = 4096};
        typedef CEventHandler super;
        CRedisClient(const char *srv_ip, uint16_t srv_port, reactor::CReactor *reactor)
            : super(reactor)
        {
            m_srv_ip_str = srv_ip;
            m_srv_port = srv_port;
        }
        int connect()
        {
            LOG_DEBUG("Enter CRedisClient connect()");
            m_client_status = CONNECT_STATUS::CLIENT_CONNECTING;
            clear_redis_context();
            m_redis_context = redisAsyncConnect(m_srv_ip_str.c_str(), m_srv_port);
            if (m_redis_context == nullptr)
            {
                return -1;
            }
            if (m_redis_context->err)
            {
                LOG_INFO("Connect to %s:%d Error: %s",
                        m_srv_ip_str.c_str(), m_srv_port, m_redis_context->errstr);
                return -1;
            }
            if (m_timer_id == 0)
            {
                m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
                LOG_DEBUG("Client regist timer to reactor id %d, timeout %d", m_timer_id, m_timeout_value);
            }
            this->attach();
            return 0;
        }
        virtual ~CRedisClient()
        {
            // maybe should not free redis context in deconstuct!!
            m_delete_redis_context = true;
            clear_redis_context();
        }
        virtual int open(void *data = nullptr)
        {
            m_client_status = CONNECT_STATUS::CLIENT_CONNECTED;
            m_delete_redis_context = false;
            if (m_timer_id == 0)
            {
                m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
                LOG_DEBUG("Client regist timer to reactor id %d, timeout %d",
                                                    m_timer_id, m_timeout_value);
            }
            LOG_INFO("Connect to RedisServer %s:%d succeed!!",
                                               m_srv_ip_str.c_str(), m_srv_port);
            return 0;
        }
        virtual int handle_input(socket_t socket)
        {
            redisAsyncHandleRead(m_redis_context);
            return 0;
        }
        virtual int handle_output(socket_t socket)
        {
            redisAsyncHandleWrite(m_redis_context);
            return 0;
        }
        virtual int handle_timeout(uint32_t tm, void *data = nullptr)
        {
            // LOG_DEBUG("Enter into timeout function....");
            if (m_client_status == CONNECT_STATUS::CLIENT_CONNECTED)
            {
                /* just for test */
                std::string key = std::to_string(tm);
                LOG_DEBUG("Set key %s", key.c_str());
                redisAsyncCommand(m_redis_context, NULL, NULL, "SET %s %s",key.c_str(), "aaa");
                redisAsyncCommand(m_redis_context, get_call_fun, (char*)new string(key), "GET %s", key.c_str());
            }
            else
            {
                static uint32_t last_tm = 0;
                if ((tm - last_tm) >= m_timeout_interval)
                {
                    //reconnect
                    LOG_DEBUG("Start reconnect now ...");
                    this->connect();
                    m_timeout_interval = m_timeout_interval * 2;
                    if (m_timeout_interval > 32)
                    {
                        m_timeout_interval = 1;
                    }
                    last_tm = tm;
                }
            }
            return 0;
        }
        virtual int handle_close(socket_t socket = INVALID_SOCKET, uint32_t mask = 0)
        {
            LOG_DEBUG("Enter into handle_close()");
            m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;
            // epoll call delete this handler
            if (mask & RE_MASK_DEL)
            {
                LOG_DEBUG("Call RE_MASK_DEL now");
                if (this->m_timer_id && (this->reactor() != nullptr))
                {
                    this->reactor()->unregist_timer(this->m_timer_id);
                    this->m_timer_id = 0;
                }
                delete this;
                return 0;
            }
            this->reactor()->del_event(this,0);
            return 0;
        }
        void clear_redis_context()
        {
            if (m_delete_redis_context && m_redis_context != nullptr)
            {
                LOG_DEBUG("Call redisAsynFree() now");
                redisAsyncFree(m_redis_context);
                m_redis_context = nullptr;
            }
        }
        int attach()
        {
            LOG_DEBUG("Enter attatch function... ");
            redisContext *context = &(m_redis_context->c);
            if (m_redis_context->ev.data != NULL)
            {
                return -1;
            }
            // set callback function
            redisAsyncSetConnectCallback(m_redis_context,connectCallBack);
            redisAsyncSetDisconnectCallback(m_redis_context,disconnectCallBack);
            this->set_handle(context->fd); // set handler
            m_redis_context->ev.addRead = redisReactorAddRead;
            m_redis_context->ev.delRead = redisReactorDelRead;
            m_redis_context->ev.addWrite = redisReactorAddWrite;
            m_redis_context->ev.delWrite = redisReactorDelWrite;
            m_redis_context->ev.cleanup = redisReactorCleanup;
            m_redis_context->ev.data = this;
            LOG_DEBUG("ac->ev.data %p", m_redis_context->ev.data);
            this->add_read();
            this->add_write();
            return 0;
        }
        void add_read()
        {
            LOG_TRACE_METHOD(__func__);
            if( (this->m_current_event_mask & reactor::EVENT_READ) > 0)
            {
                LOG_DEBUG("EV_READ(0x%0x) already in event_mask 0x%x",
                        reactor::EVENT_READ, this->m_current_event_mask);
                return;
            }
            this->reactor()->add_event(this, reactor::EVENT_READ);
        }
        void del_read()
        {
            LOG_TRACE_METHOD(__func__);
            this->reactor()->mod_event(this, this->m_current_event_mask&(~reactor::EVENT_READ));
        }
        void add_write()
        {
            LOG_TRACE_METHOD(__func__);
            this->schedule_write();
        }
        void del_write()
        {
            LOG_TRACE_METHOD(__func__);
            this->cancel_schedule_write();
        }
        void clean_up()
        {
            LOG_TRACE_METHOD(__func__);
        }
        // note!!!
        // connenct not succeed. we can free redis context. ]
        // But if connect succeed and borken, we don't connect
    protected:
        std::string m_srv_ip_str;
        uint16_t    m_srv_port;
        CONNECT_STATUS  m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;
        int m_timer_id  = 0;
        uint32_t m_timeout_value = 1;
        uint32_t  m_timeout_interval = 1;
        bool    m_delete_redis_context = true;
        redisAsyncContext *m_redis_context = nullptr;
};
static void redisReactorAddRead(void *arg)
{
    LOG_DEBUG("Enter redisReactorAddRead() arg %p", arg);
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->add_read();
}
static void redisReactorDelRead(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->del_read();
}
static void redisReactorAddWrite(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->add_write();
}
static void redisReactorDelWrite(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->del_write();
}
static void redisReactorCleanup(void *arg)
{
    CRedisClient *event_handler = (CRedisClient *)arg;
    event_handler->clean_up();
}
void connectCallBack(const redisAsyncContext *ac, int status)
{
    if (status != REDIS_OK)
    {
        LOG_ERROR("connectCallBack() Error: %s", ac->errstr);
        return;
    }
    CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
    event_handler->open();
    LOG_INFO("RedisClient Connected...");
}
void disconnectCallBack(const redisAsyncContext *ac, int status)
{
    CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
    event_handler->handle_close(0,0);
    if (status != REDIS_OK)
    {
        LOG_INFO("disconnectCallBack()!! Error: %s", ac->errstr);
        return;
    }
    LOG_INFO("RedisClient Disconnected...");
}
#endif /* redis_client_h */


使用的程序样例:


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
//#include <hiredis.h>
//#include <async.h>
#include <adapters/redis_client.hpp>
//#include "redis_client.hpp"
#include <signal.h>
static void signal_handler(int sig)
{
    if (sig == SIGINT)
    {
        reactor::CReactor::instance()->end_event_loop();
    }
}
/*
void get_call_fun(redisAsyncContext *c, void *r, void *arg)
{
    redisReply *reply = (redisReply *)r;
    std::string *key_str = (std::string *)arg;
    if (reply == NULL)
    {
        delete key_str;
        return;
    }
    LOG_INFO("[%s] -> %s\n", key_str->c_str(), reply->str);
    delete key_str;
    // Disconnect after receiving the reply to GET
    // redisAsyncDisconnect(c);
}
*/
int main (int argc, char **argv)
{
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT, signal_handler);
    CLoggerMgr logger("reactor.prop");
    reactor::CReactor *rt = reactor::CReactor::instance();
    CRedisClient *redis_client = new CRedisClient("127.0.0.1", 6379, rt);
    redis_client->connect();
    rt->run_event_loop();
    return 0;
}



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
24天前
|
NoSQL Java Redis
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
266 0
|
29天前
|
NoSQL Java Redis
SpringBoot集成Redis
SpringBoot集成Redis
411 0
|
8天前
|
监控 测试技术 数据安全/隐私保护
如何将代理IP集成到自动化测试框架中?
如何将代理IP集成到自动化测试框架中?
|
25天前
|
NoSQL Java Redis
SpringBoot集成Redis
SpringBoot集成Redis
53 1
|
1月前
|
缓存 NoSQL Java
springboot中集成redis,二次封装成工具类
springboot中集成redis,二次封装成工具类
174 0
|
1月前
|
监控 NoSQL Java
Spring Boot集成Redis启动失败【Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.G】
Spring Boot集成Redis启动失败【Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.G】
|
1月前
|
SQL API 数据处理
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
本文整理自阿里云开源大数据平台吕宴全关于新一代实时数据集成框架 Flink CDC 3.0 的核心技术架构解析。
729 0
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
|
1月前
|
存储 NoSQL Java
如何使用Spring Boot与Redis集成
2月更文挑战第12天】
54 0
|
NoSQL C# Redis
更高效地提高redis client多线程操作的并发吞吐设计
Redis是一个非常高效的基于内存的NOSQL数据库,它提供非常高效的数据读写效能.在实际应用中往往是带宽和CLIENT库读写损耗过高导致无法更好地发挥出Redis更出色的能力.下面结合一些redis本身的特性和一些client操作上的改变来提高整个redis操作的交通.
1138 0

热门文章

最新文章