FFLIb Demo && CQRS

简介: 使用FFLIB 构建了一个demo,该demo模拟了一个常见的游戏后台架构,该demo主要有一下亮点: FFLIB 实现进程间通信非常方便 基于CQRS 思想构建LogicServer 使用Event Publish/Subscribe, 实现各个模块的解耦合 基于Event 实现实体对象的单元测试,在你gtest中,利用event做mock,同时利用event  做验证,单元测试就是一个Given(event,先提供条件), When(Command,触发操作), Expect(Event,期望结果是否发生)。

使用FFLIB 构建了一个demo,该demo模拟了一个常见的游戏后台架构,该demo主要有一下亮点:

  • FFLIB 实现进程间通信非常方便
  • 基于CQRS 思想构建LogicServer
  • 使用Event Publish/Subscribe 实现各个模块的解耦合
  • 基于Event 实现实体对象的单元测试,在你gtest中,利用eventmock,同时利用event  做验证,单元测试就是一个Givenevent,先提供条件), WhenCommand,触发操作), ExpectEvent,期望结果是否发生)。

模拟后台进程的通信

由于本demo 只在于演示fflibdemo中的细节没有做过多处理,主要通讯流程就是client – gatewayBroker – LogicServer

 

GatewayBroker 转发消息

Gatewaybroker 扮演的角色为接受连接,转发消息。示例代码如下:

int gateway_service_t::handle_common_logic(gate_msg_tool_t& msg_, socket_ptr_t sock_)
{
    struct lambda_t
    {
        static void callback(common_msg_t::out_t& msg_, long uid_)
        {
            //! send to client, add to gateway user map
            //! msg_sender_t::send_to_client(sock_, msg_);
        }
    };
    long uid = sock_->get_data<client_session_t>()->uid;
    common_msg_t::in_t dest_msg;
    dest_msg.uid = uid;
    dest_msg.content = msg_.packet_body;
    singleton_t<msg_bus_t>::instance().get_service_group("logic")
                                        ->get_service(0)
                                        ->async_call(dest_msg, binder_t::callback(&lambda_t::callback, uid));
    return 0;
}

LogicServer 各个逻辑模块处理请求

LogicServer 接收到消息后,将消息交由特定的逻辑模块处理,所有的逻辑模块接口都专门处理一种cmd,并且这些接口都已经注册到BUS中了。故LogicServer 将消息publishBUS中即可:

int logic_service_t::common_msg(common_msg_t::in_t& msg_, rpc_callcack_t<common_msg_t::out_t>& cb_)
{
    common_msg_t::out_t ret;
    cb_(ret);
    
    uint32_t* len = (uint32_t*)(msg_.content.c_str());
    string name(msg_.content.c_str()+4, *len);
    BUS.publish(name, msg_.content);
    return 0;
}

BUS 的细节

Service 中定义的接口,需要注册到BUS中,订阅相关的CMD,示例代码:

int task_service_t::start()
{
    subscriber_t subscriber;
    subscriber.reg<accept_task_cmd_t>(this)
              .reg<complete_task_cmd_t>(this);
    BUS.subscribe(subscriber);
    return 0;
}

void task_service_t::handle(const accept_task_cmd_t& cmd_)
{
    USER_MGR.get_user(cmd_.uid).get_tasks().accet_task(cmd_.tid);
}

void task_service_t::handle(const complete_task_cmd_t& cmd_)    
{
     USER_MGR.get_user(cmd_.uid).get_tasks().complete_task(cmd_.tid);
}

将特定的消息投递给特定接口只是BUS的功能之一,它也负责发布event eventcmd的区别是cmd是用户的操作,它会触发特定的实体逻辑,逻辑检查ok,将会创建某个或某些event,这些event会触发某些实体对象的数据改变。所有cmdevent都继承于type_i

class type_i
{
public:
    virtual ~ type_i(){}
    virtual int get_type_id() const { return -1; }
    virtual const string& get_type_name() const {static string foo; return foo; }
    
    virtual void   decode(const string& data_) {}
    virtual string encode()                    { return "";} 
};

 

其中typeidtypename都不需要使用者自己定义,有一个类event_t  会自动为其生成。示例代码如下:

class task_accepted_t: public event_t<task_accepted_t>
{
public:
    task_accepted_t(int task_id_ =0, int dest_value_ = 0):
        task_id(task_id_),
        dest_value(dest_value_)
    {}
    int task_id;
    int dest_value;
};

BUS event被发布时,所有的订阅者都会被调用:

virtual int publish(const event_i& event_)
    {
        return call(event_.get_type_id(), event_);
    }
    virtual int publish(const command_i& cmd_)
    {
        return call(cmd_.get_type_id(), cmd_);
    }
int call(int type_id_, const type_i& obj_)
    {
        int num = 0;
        pair<subscriber_t::callback_multimap_t::iterator, subscriber_t::callback_multimap_t::iterator> ret;
        ret = m_callbacks.equal_range(type_id_);

        for (subscriber_t::callback_multimap_t::iterator it = ret.first; it != ret.second; ++it)
        {
            try
            {
                ++num;
                it->second->callback(&obj_);
            }
            catch(exception& e)
            {
                cout <<"bus exception:" << e.what() <<"\n";
                continue;
            }
            return 0;
        }
        return num;
    } 

Logicserver 的细节

LogicServer的设计

LogicServer 是后台程序中最复杂的部分,应尽量保证其可扩展性。在本demo中,遵循如下原则:

  • 实体对象封装所有的业务逻辑,如Usertasks 封装用户所有的任务相关操作
  • 实体对象内部分成两部分,一部分为借口,如accept,用于验证用户操作是否有效,若无效抛出异常,若有效,创建evnet。另一部分专门处理event,当有event触发,修改对象内部数据,同时event也会被publishBUS 中,这样其他逻辑模块也可以进行其他处理。示例代码:
void user_tasks_t::accet_task(int task_id_)
{
    if (m_tasks.find(task_id_) != m_tasks.end()) throw task_exception_t("tid exist");
    apply_change(task_accepted_t(task_id_, 100));
}
void user_tasks_t::apply(const task_accepted_t& event_)
{
    task_ino_t task_info(event_.task_id, event_.dest_value, TASK_ACCEPTED);
    m_tasks.insert(make_pair(event_.task_id, task_info));
}
void apply_change(const T& event_, bool new_change_ = true)
    {
        apply(event_);
        if (new_change_)
        {
            BUS.publish(event_);
        }
}
  • Service 负责处理cmd,根据不同的cmd,调用实体对象的接口 
  • 使用Event做单元测试

单元测试流程

Given:

在测试实体对象特定的接口时,需要mock操作,由于实体对象的所有修改都是由Event 触发的,mock操作只是按照顺序提供给实体对象event即可:

//! 先 mock出数据 只需给对象提供相应的event即可
    task_accepted_t e;
    e.task_id = 100;
    e.dest_value = 200;
user_task.apply_change(e, false); 

When 

Event given完毕后,触发实体的接口,并且测试接口是否按照预定的逻辑操作,如验证失败是否抛出异常。

//! test interface

    EXPECT_THROW(user_task.accet_task(100), user_tasks_t::task_exception_t);

Expect

当调用实体对象时,若逻辑争取,会触发一些event产生,由于实体对象的数据不能被直接验证是否修改争取,但是可以通过验证event是否按照预想的顺序触发来达到目的。

class task_event_counter_t
{
public:
    task_event_counter_t():task_accepted_counter(0){}
    void  handle(const task_accepted_t& e_)
    {
        task_accepted_counter ++;
    }
    int task_accepted_counter;
};

#define EVENT_COUNTER (singleton_t<task_event_counter_t>::instance())
//! task_accepted_t will be trigger
    user_task.accet_task(200);
EXPECT_TRUE(EVENT_COUNTER.task_accepted_counter == 1);

如上代码所示, .accet_task()成功会触发, task_accepted_t 事件,通过验证此事件是否被触发,即可验证实体对象是否操作正常。

备注

示例代码地址:http://ffown.googlecode.com/svn/trunk/example/game_framework/

 

目录
相关文章
|
2月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
3月前
|
存储 设计模式 前端开发
|
3月前
|
消息中间件 SQL 关系型数据库
go-zero微服务实战系列(十、分布式事务如何实现)
go-zero微服务实战系列(十、分布式事务如何实现)
|
6月前
|
消息中间件 Dubbo Java
Simple RPC - 01 框架原理及总体架构初探
Simple RPC - 01 框架原理及总体架构初探
81 0
|
负载均衡 Dubbo Java
dubbo源码v2.7分析:结构、container入口及线程模型
Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架,提供了六大核心能力:面向接口代理的高性能RPC调用,智能容错和负载均衡,服务自动注册和发现,高度可扩展能力,运行期流量调度,可视化的服务治理与运维。
92 0
|
Dubbo Java 应用服务中间件
带你读《Apache Dubbo微服务开发从入门到精通》——四、 服务端异步执行
带你读《Apache Dubbo微服务开发从入门到精通》——四、 服务端异步执行
120 3
|
消息中间件 存储 设计模式
Seata 高性能 RPC 通信的实现- 巧用 reactor 模式
reactor 模式是一种事件驱动的应用层 I/O 处理模式,基于分而治之和事件驱动的思想,致力于构建一个高性能的可伸缩的 I/O 处理模式
181 0
|
运维 Dubbo 中间件
Dubbo3 源码解读-宋小生-6:Dubbo的SPI扩展机制之普通扩展对象的创建与Wrapper机制的源码解析
> Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。 > 本文是
206 0
|
消息中间件 运维 负载均衡
基于 RocketMQ 的 Dubbo-go 通信新范式
RocketMQ 新功能尝鲜!让 Dubbo-go 通过 RocketMQ 进行 RPC 通信来扩展 Dubbo-go 的通信方式,利用 Dubbo-go 丰富的服务治理能力和 RocketMQ 稳定的 RPC 通信能力,两者强强联合,打造 RPC 通信新范式。
236 0
基于 RocketMQ 的 Dubbo-go 通信新范式
|
负载均衡 程序员 Go
Go RPC入门指南:RPC的使用边界在哪里?如何实现跨语言调用?
就是因为无法在同一个进程内,或者无法在同一个服务器上通过本地调用的方式实现我们的需求。 HTTP能满足需求但是不够高效,所以我们需要使用RPC。
176 0
Go RPC入门指南:RPC的使用边界在哪里?如何实现跨语言调用?