概述
在工作中,我们都会使用到MQ 比如 Apache Kafka等,某subscriber在消息中间件上注册了某个topic(主题),当有消息发送到了该topic上之后,注册在该topic上的所有subscriber都将会收到消息 。
消息中间件提供了系统之间的异步处理机制。 主业务完成后即可向用户返回成功的通知,然后提交各种消息至消息中间件,这样注册在消息中间件的其他系统就可以顺利地接收通知了,然后执行各自的业务逻辑。消息中间件主要用于解决进程之间消息异步处理的解决方案,这里,我们使用消息中间件的思想设计一个Java进程内部的消息中间件——Event Bus。
EventBus架构类图
- Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event,register方法用来注册Event接收者(Subscriber)接受响应事件
- EventBus采用同步的方式推送Event,AsyncEventBus采用异步的方式(Thread-Per-Message)推送Event。
- Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法我们用注解@Subscribe来标识。
- Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber
Code
Bus接口 (定义注册topic以及发送event接口)
/** * Bus接口定义了EventBus的所有使用方法 * * @author artisan */ public interface Bus { /** * 将某个对象注册到Bus上,从此之后该类就成为Subscriber了 */ void register(Object subscriber); /** * 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息 */ void unregister(Object subscriber); /** * 提交Event到默认的topic */ void post(Object event); /** * 提交Event到指定的topic */ void post(Object event, String topic); /** * 关闭该bus */ void close(); /** * 返回Bus的名称标识 */ String getBusName(); }
Bus接口中定义了注册topic的方法和Event发送的方法
register(Object subscriber)
:将某个对象实例注册给Event Bus。unregister(Object subscriber)
:取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除。post(Object event)
:提交Event到Event Bus中,如果未指定topic则会将event广播给Event Bus默认的topic。post(Object event, String topic)
:提交Event的同时指定了topic。close()
:销毁该Event Bus。getBusName()
:返回该Event Bus的名称
自定义注解-回调方法及topic
注册对象给Event Bus的时候需要指定接收消息时的回调方法,我们采用注解的方式进行Event回调
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author artisan */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Subscribe { String topic() default "default-topic"; }
@Subscribe要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic 【default-topic】
同步EventBus
同步EventBus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式
/** * @author 小工匠 * @version 1.0 * @description: Bus实现类 * @date 2021/12/1 23:00 * @mark: show me the code , change the world */ public class EventBus implements Bus { /** * 用于维护Subscriber的注册表 */ private final Registry registry = new Registry(); /** * Event Bus的名字 */ private String busName; /** * 默认的Event Bus的名字 */ private final static String DEFAULT_BUS_NAME = "default"; /** * 默认的topic的名字 */ private final static String DEFAULT_TOPIC = "default-topic"; /** * 用于分发广播消息到各个Subscriber的类 */ private final Dispatcher dispatcher; /** * 构造函数 */ public EventBus() { this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE); } public EventBus(String busName) { this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE); } EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) { this.busName = busName; this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor); } public EventBus(EventExceptionHandler exceptionHandler) { this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE); } /** * 将注册Subscriber的动作直接委托给Registry * * @param subscriber */ @Override public void register(Object subscriber) { this.registry.bind(subscriber); } /** * 接触注册同样委托给Registry * * @param subscriber */ @Override public void unregister(Object subscriber) { this.registry.unbind(subscriber); } /** * 提交Event到默认的topic * * @param event */ @Override public void post(Object event) { this.post(event, DEFAULT_TOPIC); } /** * 提交Event到指定的topic,具体的动作是由Dispatcher来完成的 * * @param event * @param topic */ @Override public void post(Object event, String topic) { this.dispatcher.dispatch(this, registry, event, topic); } /** * 关闭销毁Bus */ @Override public void close() { this.dispatcher.close(); } /** * 返回Bus的名称 * * @return */ @Override public String getBusName() { return this.busName; } }
有几个点需要注意一下
- EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。
- registry和unregister都是通过Subscriber注册表来完成的。
- Event的提交则是由Dispatcher来完成的
- Executor使用JDK中的Executor接口,如果我们自己开发的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。
异步EventBus
异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可
import java.util.concurrent.ThreadPoolExecutor; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/2 10:59 * @mark: show me the code , change the world */ public class AsyncEventBus extends EventBus { AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) { super(busName, exceptionHandler, executor); } public AsyncEventBus(String busName, ThreadPoolExecutor executor) { this(busName, null, executor); } public AsyncEventBus(ThreadPoolExecutor executor) { this("default-async", null, executor); } public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) { this("default-async", exceptionHandler, executor); } }
可以看到AsyncEventBus 重写了父类EventBus的构造函数,使用ThreadPoolExecutor替代Executor。
Subscriber注册表Registry (维护topic和subscriber之间的关系)
注册表Registry维护了topic和subscriber之间的关系。
当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口
import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/1 23:42 * @mark: show me the code , change the world */ class Registry { /** * 存储Subscriber集合和topic之间关系的map */ private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>(); /** * 获取Subscriber Object的方法集合然后进行绑定 * * @param subscriber */ public void bind(Object subscriber) { List<Method> subscribeMethods = getSubscribeMethods(subscriber); subscribeMethods.forEach(m -> tierSubscriber(subscriber, m)); } public void unbind(Object subscriber) { //unbind为了提高速度,只对Subscriber进行失效操作 subscriberContainer.forEach((key, queue) -> queue.forEach(s -> { if (s.getSubscribeObject() == subscriber) { s.setDisable(true); } })); } public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) { return subscriberContainer.get(topic); } private void tierSubscriber(Object subscriber, Method method) { final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class); String topic = subscribe.topic(); //当某topic没有Subscriber Queue的时候创建一个 subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>()); //创建一个Subscriber并且加入Subscriber列表中 subscriberContainer.get(topic).add(new Subscriber(subscriber, method)); } private List<Method> getSubscribeMethods(Object subscriber) { final List<Method> methods = new ArrayList<>(); Class<?> temp = subscriber.getClass(); //不断获取当前类和父类的所有@Subscribe方法 while (temp != null) { //获取所有的方法 Method[] declaredMethods = temp.getDeclaredMethods(); //只有public方法 &&有一个入参 &&最重要的是被@Subscribe标记的方法才符合回调方法 Arrays.stream(declaredMethods) .filter(m -> m.isAnnotationPresent(Subscribe.class) && m.getParameterCount() == 1 && m.getModifiers() == Modifier.PUBLIC) .forEach(methods::add); temp = temp.getSuperclass(); } return methods; } }
由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类。
我们所设计的EventBus对Subscriber没有做任何限制,但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic)
同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息
public class SimpleObject { /** * subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数 */ @Subscribe(topic = "artisan-topic") public void test2(Integer x) { } @Subscribe(topic = "test-topic") public void test3(Integer x) { } }
SimpleObject的实例被注册到了Event Bus之后,test2和test3这两个方法将会被加入到注册表中,分别用来接受来自artisan-topic和test-topic的event 。
Event广播Dispatcher
Dispatcher的主要作用是将EventBus post的event推送给每一个注册到topic上的subscriber上,具体的推送其实就是执行被@Subscribe注解的方法。
import java.lang.reflect.Method; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/2 00:36 * @mark: show me the code , change the world */ public class Dispatcher { private final Executor executorService; private final EventExceptionHandler exceptionHandler; public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE; public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE; private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) { this.executorService = executorService; this.exceptionHandler = exceptionHandler; } public void dispatch(Bus bus, Registry registry, Object event, String topic) { //根据topic获取所有的Subscriber列表 ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic); if (null == subscribers) { if (exceptionHandler != null) { exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"), new BaseEventContext(bus.getBusName(), null, event)); } return; } //遍历所有的方法,并且通过反射的方式进行方法调用 subscribers.stream() .filter(subscriber -> !subscriber.isDisable()) .filter(subscriber -> { Method subscribeMethod = subscriber.getSubscribeMethod(); Class<?> aClass = subscribeMethod.getParameterTypes()[0]; return (aClass.isAssignableFrom(event.getClass())); }).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus)); } private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) { Method subscribeMethod = subscriber.getSubscribeMethod(); Object subscribeObject = subscriber.getSubscribeObject(); executorService.execute(() -> { try { subscribeMethod.invoke(subscribeObject, event); } catch (Exception e) { if (null != exceptionHandler) { exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event)); } } }); } public void close() { if (executorService instanceof ExecutorService) { ((ExecutorService) executorService).shutdown(); } } static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) { return new Dispatcher(executor, exceptionHandler); } static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) { return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler); } static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) { return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler); } /** * 顺序执行的ExecutorService */ private static class SeqExecutorService implements Executor { private final static SeqExecutorService INSTANCE = new SeqExecutorService(); @Override public void execute(Runnable command) { command.run(); } } /** * 每个线程负责一次消息推送 */ private static class PreThreadExecutorService implements Executor { private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService(); @Override public void execute(Runnable command) { new Thread(command).start(); } } /** * 默认的EventContext实现 */ private static class BaseEventContext implements EventContext { private final String eventBusName; private final Subscriber subscriber; private final Object event; private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) { this.eventBusName = eventBusName; this.subscriber = subscriber; this.event = event; } @Override public String getSource() { return this.eventBusName; } @Override public Object getSubscriber() { return subscriber != null ? subscriber.getSubscribeObject() : null; } @Override public Method getSubscribe() { return subscriber != null ? subscriber.getSubscribeMethod() : null; } @Override public Object getEvent() { return this.event; } } }
在Dispatcher中,除了从Registry中获取对应的Subscriber执行之外,我们还定义了几个静态内部类,其主要是实现了Executor接口和EventContent。
其他类接口设计
除了上面一些比较核心的类之外,还需要Subscriber封装类以及EventContext、Event-ExceptionHandler接口
【Subscriber类】
import java.lang.reflect.Method; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/1 23:33 * @mark: show me the code , change the world */ public class Subscriber { private final Object subscribeObject; private final Method subscribeMethod; private boolean disable = false; public Subscriber(Object subscribeObject, Method subscribeMethod) { this.subscribeObject = subscribeObject; this.subscribeMethod = subscribeMethod; } public Object getSubscribeObject() { return subscribeObject; } public Method getSubscribeMethod() { return subscribeMethod; } public boolean isDisable() { return disable; } public void setDisable(boolean disable) { this.disable = disable; } }
Subscriber类封装了对象实例和被@Subscribe标记的方法,也就是说一个对象实例有可能会被封装成若干个Subscriber
【EventExceptionHandler接口】
EventBus会将方法的调用交给Runnable接口去执行,我们都知道Runnable接口不能抛出checked异常信息,并且在每一个subscribe方法中,也不允许将异常抛出从而影响EventBus对后续Subscriber进行消息推送,但是异常信息又不能被忽略掉,因此注册一个异常回调接口就可以知道在进行消息广播推送时都发生了什么
public interface EventExceptionHandler { void handle(Throwable cause, EventContext context); }
【EventContext接口】
EventContext接口提供了获取消息源、消息体,以及该消息是由哪一个Subscriber的哪个subscribe方法所接受,主要用于消息推送出错时被回调接口EventExceptionHandler使用
import java.lang.reflect.Method; public interface EventContext { String getSource(); Object getSubscriber(); Method getSubscribe(); Object getEvent(); }
测试
我们简单地定义两个普通对象SimpleSubscriber1和SimpleSubscriber2
/** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/2 11:06 * @mark: show me the code , change the world */ public class SimpleSubscriber1 { @Subscribe public void method1(String message) { System.out.println(String.format("线程 %s , SimpleSubscriber2#method1 called --- %s ",Thread.currentThread().getName() ,message)); } @Subscribe(topic = "test") public void method2(String message) { System.out.println(String.format("线程:%s: Test Topic | SimpleSubscriber2#method2 called --- %s", Thread.currentThread().getName(), message)); } }
/** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/2 11:27 * @mark: show me the code , change the world */ public class SimpleSubscriber2 { @Subscribe public void method1(String message) { System.out.println(String.format("线程 %s , SimpleSubscriber2#method1 called --- %s ",Thread.currentThread().getName() ,message)); } @Subscribe(topic = "test") public void method2(String message) { System.out.println(String.format("线程:%s: Test Topic | SimpleSubscriber2#method2 called --- %s", Thread.currentThread().getName(), message)); } }
模拟复杂消息
/** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/2 11:28 * @mark: show me the code , change the world */ public class SimpleSubscriber3 { @Subscribe public void method1(Object message) { if (message instanceof WildMessage){ System.out.println(String.format("线程 %s , SimpleSubscriber3#method1 called --- %s ",Thread.currentThread().getName() ,((WildMessage)message).getData())); } } }
/** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/2 13:16 * @mark: show me the code , change the world */ public class WildMessage { private String data; public WildMessage(String data) { this.data = data; } public String getData() { return data; } public void setData(String data) { this.data = data; } }
同步&异步 Event Bus
import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/2 13:32 * @mark: show me the code , change the world */ public class Test { public static void main(String[] args) { Bus bus = new EventBus("TestBus"); // 注册 bus.register(new SimpleSubscriber1()); bus.register(new SimpleSubscriber2()); // 发布消息 bus.post("Hello"); bus.post("Hello", "test"); bus.register(new SimpleSubscriber3()); bus.post(new WildMessage("SourceMessage")); System.out.println("\n\n\n\n"); System.out.println("-------异步-----"); Bus asyncEventBus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10)); asyncEventBus.register(new SimpleSubscriber1()); asyncEventBus.register(new SimpleSubscriber2()); asyncEventBus.post("Hello"); asyncEventBus.post("Hello", "test"); } }
解析下结果哈: 同步的EventBus,将三个普通的对象注册给了bus,当bus发送Event的时候topic相同,Event类型相同的subscribe方法将被执行。
同步的Event Bus有个缺点,若其中的一个subscribe方法运行时间比较长,则会影响下一个subscribe方法的执行,因此采用AsyncEventBus是另外一个比较好的选择。