消息总线重构之EventBus

简介: 最近花了不少时间对消息总线进行了重构。重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景: (1)改进广播通知 (2)业务逻辑串联,用事件驱动替代责任链模式 EventBus简介 EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读。

最近花了不少时间对消息总线进行了重构。重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景:

(1)改进广播通知

(2)业务逻辑串联,用事件驱动替代责任链模式

EventBus简介

EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读。总得来说,EventBus是观察者模型的实现,利用它你既可以实现观察者模型的业务场景,还可以基于它的事件驱动机制来实现应用程序内组件之间的解耦与通信。

改进广播通知

广播通知是消息总线提供的功能之一。在重构之前,客户端接收广播通知是通过消息总线客户端SDK的一个API来实现的:

public void setNotificationListener(IMessageReceiveListener notificationListener);

但之前的广播通知设计并不合理。它受限于之前的基于RabbitMQ的树形路由拓扑模型:


这个拓扑结构中有些只发送不接受的“虚拟队列”并不是真实存在的队列。这些消息生产者无法接收消息,这是非常大的一个缺陷。我一直在想办法重新设计它,之前的关注点都集中在RabbitMQ上,想在MQ上找到一种解决方案,但这很难,除非摈弃“虚拟队列”的设计。于是,我将关注点转移到消息总线中另一个可以提供pub/sub的组件上(后称之为pubsuber),该组件目前可以是redis也可以是zookeeper。因为每个client(更准确得说是每个创建client的pool)都会以长连接的方式挂在pubsuber上。所以,它本身就是一个很不错的广播渠道,并且因为它脱离RabbitMQ单独实现,跟虚拟队列的设计不相冲突。

上面的思路没有问题,但语义与实现上并不对等。通知的收发从语义上来说应该是Client API级别的。而PubSuber接收到的广播事件却是Pool级别的,并不依赖client(Pool创建PubSuber以及Client)。我们不应该在Pool层面上接收广播事件。因此这里存在一个事件的截获与二次转发的过程。这是我们针对EventBus的第一个应用场景:用它转发PubSuber接收到的广播通知给client。

PubSuber接收到广播消息之后通过EventBus 作二次转发:

    public class NotifyHandler implements IPubSubListener {

        @Override
        public void onChange(String channel, byte[] data, Map<String, Object> params) {
            NotifyEvent notifyEvent = new NotifyEvent();
            Message broadcastMsg = pubsuberManager.deserialize(data, Message.class);
            if (broadcastMsg != null && broadcastMsg.getMessageType().equals(MessageType.BroadcastMessage)) {
                notifyEvent.setMsg(broadcastMsg);
                getComponentEventBus().post(notifyEvent);
            }
        }
    }

事件发布完了之后,EventBus会将其分发到该事件的订阅者处理,这里需要注意的是创建的EventBus是一个异步EventBus的实例,它在一个独立的线程上执行事件处理器方法。而所有的事件处理器都需要通过Client进行注册:

    public void registerEventProcessor(Object eventProcessor) {
        componentEventBus.register(eventProcessor);
    }

以上这一步,就将消息通知跟Client关联起来。而且对多个client注册不同的事件处理器,还可以起到多播的作用(原来在Pool级别是一个事件,现在在Client级别,多个Client可以应对若干个处理器)。

EventBus通过注解来解析事件处理器与事件之间的关联关系,更多的实现细节,请参考之前的文章。下面就是订阅广播通知的方式:

    public static class NotificationEventProcessor {

        @Subscribe
        public void onNotification(NotifyEvent event) {
            logger.info("onNotification");
            Message message = event.getMsg();
            assertNotNull(message);
            assertEquals("test", new String(message.getContent(), Constants.CHARSET_OF_UTF8));
        }

    }

仅仅需要一个注解即可。当然最后别忘记移除注册,如果你不再希望接收通知的话,整个过程如下:

    public void testBroadcast() throws Exception {
        String secret = "kljasdoifqoikjhhhqwhebasdfasdf";

        Message msg = MessageFactory.createMessage(MessageType.BroadcastMessage);
        msg.setContentType("text/plain");
        msg.setContentEncoding("utf-8");

        msg.setContent("test".getBytes(Constants.CHARSET_OF_UTF8));

        NotificationEventProcessor eventProcessor = new NotificationEventProcessor();
        client.registerEventProcessor(eventProcessor);

        client.broadcast(secret, new Message[]{msg});

        TimeUnit.SECONDS.sleep(10);

        client.unregisterEventProcessor(eventProcessor);
    }

这样,原先的拓扑结构就不再包含广播通知的实现了:


事件驱动替代责任链模式

客户端跟消息总线的一次通信,需要经历多个业务逻辑环节。这些业务逻辑有些有顺序关系,有些没有。我们希望将逻辑进行拆分、自由组合搭配并且能够互不干扰得扩展。在此之前的实现基于责任链模式,有一点问题:当长连接消费时,因为真正的消费通常是chain的最后一个调用(方式是:阻塞,一直等到超过设定的时间),所以整个递归链都阻在最后一个调用。而递归调用的实现是基于栈,因此如果最后一个调用不返回(很多时候这种长连接的生命周期跟应用的生命周期相同),整个调用链以及调用中的局部变量一直都不被释放,某种程度上这有点像内存泄露了。这个问题,我曾在之前的文章中探讨过,但一直没找到太好的解决方法,除非我们放弃使用责任链模式。

但基于同步事件驱动的方式似乎能起到跟责任链模式一样的效果。它通过事件分发来驱动业务逻辑调用。将chain的每一个调用都看做是一个事件处理方法,一个单向通信逻辑(比如produce)对应一个事件处理器(produceEventProcessor)。因为此处的EventBus是同步的(事件处理逻辑在调用线程上执行,执行顺序跟事件发生的顺序相同),所以只要编排好事件顺序,一一触发事件,事件处理器也就会一一按照事件触发的顺序执行。

我们以消息生产者来看一下通过EventBus改造后的业务逻辑是什么样子。

首先我们定义一个生产消息的事件处理器:

public class ProduceEventProcessor extends CommonEventProcessor {

}

为了使得逻辑关系紧凑,我们将事件以内部类的方式定义在生产消息的事件处理器内部:

//region events definition
    public static class ValidateEvent extends CarryEvent {
    }


    public static class PermissionCheckEvent extends CarryEvent {
    }

    public static class ProduceEvent extends CarryEvent {
    }
    //endregion

定义每个事件的事件处理方法:

@Subscribe
public void onValidate(ValidateEvent event) {
}

@Subscribe
public void onPermissionCheckEvent(PermissionCheckEvent event) {
}

@Subscribe
public void onProduce(ProduceEvent event) {

}

在client被调用以生产消息时,首先创建该事件处理器的实例,然后向EventBus注册事件处理器:

        EventBus carryEventBus = this.getContext().getCarryEventBus();

        //register event processor
        ProduceEventProcessor eventProcessor = new ProduceEventProcessor();
        carryEventBus.register(eventProcessor);

只有注册了该实例,在发布事件时,才会触发该实例的事件处理方法。注册完成该实例之后,需要初始化事件对象,这里事件之间以及事件处理器之间没有必然联系,我们以一个消息上下文对象的引用来让它们以共享“内存”的方式进行数据交换:

        //init events
        ProduceEventProcessor.ValidateEvent validateEvent = new ProduceEventProcessor.ValidateEvent();
        ProduceEventProcessor.MsgBodySizeCheckEvent msgBodySizeCheckEvent = new ProduceEventProcessor.MsgBodySizeCheckEvent();
        ProduceEventProcessor.PermissionCheckEvent permissionCheckEvent = new ProduceEventProcessor.PermissionCheckEvent();
        ProduceEventProcessor.MsgIdGenerateEvent msgIdGenerateEvent = new ProduceEventProcessor.MsgIdGenerateEvent();
        ProduceEventProcessor.MsgBodyCompressEvent msgBodyCompressEvent = new ProduceEventProcessor.MsgBodyCompressEvent();
        ProduceEventProcessor.ProduceEvent produceEvent = new ProduceEventProcessor.ProduceEvent();

        validateEvent.setMessageContext(ctx);
        msgBodySizeCheckEvent.setMessageContext(ctx);
        permissionCheckEvent.setMessageContext(ctx);
        msgIdGenerateEvent.setMessageContext(ctx);
        msgBodyCompressEvent.setMessageContext(ctx);
        produceEvent.setMessageContext(ctx);

准备工作就绪,现在开始发布事件。这里事件的发布顺序跟执行顺序是一致的,所以我们需要根据业务逻辑来编排事件,以形成原先的串联调用的效果:

//arrange event order and emit!
carryEventBus.post(validateEvent);
carryEventBus.post(msgBodySizeCheckEvent);
carryEventBus.post(permissionCheckEvent);
carryEventBus.post(msgIdGenerateEvent);
carryEventBus.post(msgBodyCompressEvent);
carryEventBus.post(produceEvent);

这就是重构的整个过程。我们发现这里不再存在链式(递归)调用了,各个事件处理器方法之间也没有耦合性,它们通过MessageContext来共享上下文。如果我们要增加新的业务逻辑,如何扩展?四步走:

(1)定义一个新事件对象

(2)定义一个新的事件处理器方法

(3)实例化该事件对象

(4)根据需要插入原先的编排过的事件中去并发布该事件

跟原先的事件没有任何关系。

更多实现,可以查看项目源码:banyan


原文发布时间为:2015-06-30


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
9月前
|
消息中间件 druid Java
web后端-SpringCloud-Bus消息总线组件
web后端-SpringCloud-Bus消息总线组件
|
9月前
|
设计模式 JavaScript 前端开发
js设计模式-观察者模式与发布/订阅模式
观察者模式和发布/订阅模式是JavaScript中的两种设计模式,用于处理对象间的通信和事件处理。观察者模式中,一个主题对象状态改变会通知所有观察者。实现包括定义主题和观察者对象,以及在主题中添加、删除和通知观察者的功能。发布/订阅模式则引入事件管理器,允许发布者发布事件,订阅者通过订阅接收通知。
|
9月前
|
消息中间件 缓存 监控
【C++ 观察者模式的应用】跨进程观察者模式实战:结合ZeroMQ和传统方法
【C++ 观察者模式的应用】跨进程观察者模式实战:结合ZeroMQ和传统方法
281 1
|
设计模式 消息中间件 Java
SpringBoot事件监听机制及观察者/发布订阅模式详解
介绍观察者模式和发布订阅模式的区别。 SpringBoot快速入门事件监听。 什么是观察者模式? 观察者模式是经典行为型设计模式之一。 在GoF的《设计模式》中,观察者模式的定义:在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。如果你觉得比较抽象,接下来这个例子应该会让你有所感觉:
|
消息中间件 设计模式 Java
SpringBoot事件监听机制及观察者模式/发布订阅模式
SpringBoot事件监听机制及观察者模式/发布订阅模式
438 0
|
设计模式 开发者
设计模式之订阅发布模式
设计模式之订阅发布模式
265 0
|
Java UED Spring
如何实现业务解耦?spring中事件监听了解一下
耦合这个词在平常的开发工作中应该不陌生,简单理解就是代码中各部分关联度过高。
如何实现业务解耦?spring中事件监听了解一下
|
JavaScript
手写代码:实现一个EventBus
EventBus,事件总线。总线一词来自于《计算机组成原理》中的”系统总线“,是指用于连接多个部件的信息传输线,各部件共享的传输介质。我们通常把事件总线也成为自定义事件,一般包含`on`、`once`、`emit`、`off`等方法。在Vue2中想要实现EventBus比较简单,直接暴露出一个`new Vue()`实例即可,以此为思路,我们应该如何自定义实现EventBus呢?
550 0
手写代码:实现一个EventBus
|
消息中间件 存储 关系型数据库
基于领域事件实现微服务解耦
基于领域事件实现微服务解耦
207 0
基于领域事件实现微服务解耦
|
设计模式 存储 前端开发
Java设计模式-观察者模式(订阅发布模式)
Java设计模式-观察者模式(订阅发布模式)
372 0
Java设计模式-观察者模式(订阅发布模式)

热门文章

最新文章