Pre
我们在日常的工作中,都会使用到MQ这种组件, 某subscriber在消息中间件上注册了某个topic(主题),当有消息发送到了该topic上之后,注册在该topic上的所有subscriber都将会收到消息。
如图所示【消息中间件的消息订阅与发布】
消息中间件的核心作用是提供系统之间的异步消息处理机制。它可以在一个系统完成操作后,通过提交消息到消息中间件,触发其他依赖系统的后续处理,而不需要等待后续处理完全结束。
使用消息中间件的好处有:
提高系统处理效率,系统之间可以异步并行处理
降低系统耦合,通过消息进行解耦
提高系统故障隔离能力,一个系统故障不会影响其他系统
今天我们来实现一个Java进程内部的消息中间件Event Bus,它可以用于进程内不同组件之间的异步消息通信。
设计
Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event
register方法用来注册Event接收者(Subscriber)接受响应事件
EventBus采用同步的方式推送Event
AsyncEventBus采用异步的方式(Thread-Per-Message)推送Event
Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法用注解@Subscribe来标识。
Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber
Code
Bus接口
package com.artisan.busevent.intf; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world * @desc: Bus接口定义了EventBus的所有使用方法 */ public interface Bus { /** * 将某个对象注册到Bus上,从此之后该类就成为Subscriber了 */ void register(Object subscriber); /** * 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息 */ void unregister(Object subscriber); /** * 提交Event到默认的topic */ void post(Object event); /** * 提交Event到指定的topic */ void post(Object Event, String topic); /** * 关闭该bus */ void close(); /** * 返回Bus的名称标识 */ String getBusName(); }
Bus接口中定义了注册topic的方法和Event发送的方法,具体如下。
register(Object subscriber):将某个对象实例注册给Event Bus
unregister(Object subscriber):取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除
post(Object event):提交Event到Event Bus中,如果未指定topic则会将event广播给Event Bus默认的topic
post(Object Event,String topic):提交Event的同时指定了topic
close():销毁该Event Bus
getBusName():返回该Event Bus的名称
自定义注解 @Subscribe
注册对象给Event Bus的时候需要指定接收消息时的回调方法,采用注解的方式进行Event回调
package com.artisan.busevent.annotations; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Subscribe { String topic() default "default-topic"; }
@Subscribe
要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic(default-topic
)
同步EventBus
同步EventBus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式.
package com.artisan.busevent.impl; import com.artisan.busevent.intf.Bus; import com.artisan.busevent.intf.EventExceptionHandler; import java.util.concurrent.Executor; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class EventBus implements Bus { /** * 用于维护Subscriber的注册表 */ private final Registry registry = new Registry(); /** * Event Bus的名字 */ private String busName; /** * 默认的Event Bus的名字 */ private final static String DEFAULT_BUS_NAME = "default"; /** * 默认的topic的名字 */ private final static String DEFAULT_TOPIC = "default-topic"; /** * 用于分发广播消息到各个Subscriber的类 */ private final Dispatcher dispatcher; public EventBus() { this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE); } /** * @param busName */ public EventBus(String busName) { this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE); } EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) { this.busName = busName; this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor); } public EventBus(EventExceptionHandler exceptionHandler) { this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE); } /** * 将注册Subscriber的动作直接委托给Registry * * @param subscriber */ @Override public void register(Object subscriber) { this.registry.bind(subscriber); } /** * 解除注册同样委托给Registry * * @param subscriber */ @Override public void unregister(Object subscriber) { this.registry.unbind(subscriber); } /** * 提交Event到默认的topic * * @param event */ @Override public void post(Object event) { this.post(event, DEFAULT_TOPIC); } /** * 提交Event到指定的topic,具体的动作是由Dispatcher来完成的 * * @param event * @param topic */ @Override public void post(Object event, String topic) { this.dispatcher.dispatch(this, registry, event, topic); } /** * 关闭销毁Bus */ @Override public void close() { this.dispatcher.close(); } /** * 返回Bus的名称 * * @return */ @Override public String getBusName() { return this.busName; } }
EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。
registry和unregister都是通过Subscriber注册表来完成的。
Event的提交则是由Dispatcher来完成的。
Executor是使用JDK中的Executor接口,自定义的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。
异步EventBus
如果想要使用异步的方式进行推送,可使用EventBus的子类AsyncEventBus 。
异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可
package com.artisan.busevent.impl; import com.artisan.busevent.intf.EventExceptionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class AsyncEventBus extends EventBus { AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) { super(busName, exceptionHandler, executor); } public AsyncEventBus(String busName, ThreadPoolExecutor executor) { this(busName, null, executor); } public AsyncEventBus(ThreadPoolExecutor executor) { this("default-async", null, executor); } public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) { this("default-async", exceptionHandler, executor); } }
重写了父类EventBus的构造函数,使用ThreadPoolExecutor替代Executor 。
Subscriber注册表Registry
注册表维护了topic和subscriber之间的关系,当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口
package com.artisan.busevent.impl; import com.artisan.busevent.annotations.Subscribe; import com.artisan.busevent.relations.Subscriber; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ class Registry { /** * 存储Subscriber集合和topic之间关系的map */ private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>(); public void bind(Object subscriber) { //获取Subscriber Object的方法集合然后进行绑定 List<Method> subscribeMethods = getSubscribeMethods(subscriber); subscribeMethods.forEach(m -> tierSubscriber(subscriber, m)); } public void unbind(Object subscriber) { //unbind为了提高速度,只对Subscriber进行失效操作 subscriberContainer.forEach((key, queue) -> queue.forEach(s -> { if (s.getSubscribeObject() == subscriber) { s.setDisable(true); } })); } public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) { return subscriberContainer.get(topic); } private void tierSubscriber(Object subscriber, Method method) { final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class); String topic = subscribe.topic(); //当某topic没有Subscriber Queue的时候创建一个 subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>()); //创建一个Subscriber并且加入Subscriber列表中 subscriberContainer.get(topic).add(new Subscriber(subscriber, method)); } private List<Method> getSubscribeMethods(Object subscriber) { final List<Method> methods = new ArrayList<>(); Class<?> temp = subscriber.getClass(); //不断获取当前类和父类的所有@Subscribe方法 while (temp != null) { //获取所有的方法 Method[] declaredMethods = temp.getDeclaredMethods(); //只有public方法 &&有一个入参 &&最重要的是被@Subscribe标记的方法才符合回调方法 Arrays.stream(declaredMethods) .filter(m -> m.isAnnotationPresent(Subscribe.class) && m.getParameterCount() == 1 && m.getModifiers() == Modifier.PUBLIC) .forEach(methods::add); temp = temp.getSuperclass(); } return methods; } }
由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类,所设计的EventBus对Subscriber没有做任何限制,但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic),同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息,比如
/** *非常普通的对象 */ public class SimpleObject { /** *subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数 */ @Subscribe(topic = "artisan-topic") public void test2(Integer x) { } @Subscribe(topic = "test-topic") public void test3(Integer x) { } }
SimpleObject的实例被注册到了Event Bus之后,test2和test3这两个方法将会被加入到注册表中,分别用来接受来自artisan-topic和test-topic的event
Event广播Dispatcher
Dispatcher的主要作用是将EventBus post的event推送给每一个注册到topic上的subscriber上,具体的推送其实就是执行被@Subscribe注解的方法.
package com.artisan.busevent.impl; import com.artisan.busevent.relations.Subscriber; import com.artisan.busevent.intf.Bus; import com.artisan.busevent.intf.EventContext; import com.artisan.busevent.intf.EventExceptionHandler; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class Dispatcher { private final Executor executorService; private final EventExceptionHandler exceptionHandler; public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE; public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE; private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) { this.executorService = executorService; this.exceptionHandler = exceptionHandler; } public void dispatch(Bus bus, Registry registry, Object event, String topic) { //根据topic获取所有的Subscriber列表 ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic); if (null == subscribers) { if (exceptionHandler != null) { exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"), new BaseEventContext(bus.getBusName(), null, event)); } return; } //遍历所有的方法,并且通过反射的方式进行方法调用 subscribers.stream() .filter(subscriber -> !subscriber.isDisable()) .filter(subscriber -> { Method subscribeMethod = subscriber.getSubscribeMethod(); Class<?> aClass = subscribeMethod.getParameterTypes()[0]; return (aClass.isAssignableFrom(event.getClass())); }).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus)); } private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) { Method subscribeMethod = subscriber.getSubscribeMethod(); Object subscribeObject = subscriber.getSubscribeObject(); executorService.execute(() -> { try { subscribeMethod.invoke(subscribeObject, event); } catch (Exception e) { if (null != exceptionHandler) { exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event)); } } }); } public void close() { if (executorService instanceof ExecutorService) { ((ExecutorService) executorService).shutdown(); } } static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) { return new Dispatcher(executor, exceptionHandler); } static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) { return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler); } static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) { return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler); } /** * 顺序执行的ExecutorService */ private static class SeqExecutorService implements Executor { private final static SeqExecutorService INSTANCE = new SeqExecutorService(); @Override public void execute(Runnable command) { command.run(); } } /** * 每个线程负责一次消息推送 */ private static class PreThreadExecutorService implements Executor { private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService(); @Override public void execute(Runnable command) { new Thread(command).start(); } } /** * 默认的EventContext实现 */ private static class BaseEventContext implements EventContext { private final String eventBusName; private final Subscriber subscriber; private final Object event; private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) { this.eventBusName = eventBusName; this.subscriber = subscriber; this.event = event; } @Override public String getSource() { return this.eventBusName; } @Override public Object getSubscriber() { return subscriber != null ? subscriber.getSubscribeObject() : null; } @Override public Method getSubscribe() { return subscriber != null ? subscriber.getSubscribeMethod() : null; } @Override public Object getEvent() { return this.event; } } }
在Dispatcher中,除了从Registry中获取对应的Subscriber执行之外,我们还定义了几个静态内部类,其主要是实现了Executor接口和EventContent接口。
测试
简单的Subscriber
package com.artisan.busevent.consumer; import com.artisan.busevent.annotations.Subscribe; import com.artisan.busevent.entity.Artisan; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Slf4j public class SubscriberA { /** * 消费默认主题的数据 ,接受的数据类型为 String类型 * * @param message */ @SneakyThrows @Subscribe public void consumerDefaultTopic(String message) { log.info("consumerDefaultTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getName()); log.info("SubscriberA-->consumerDefaultTopic(模拟执行5秒)-->收到Message(字符串)--> {}", message); TimeUnit.SECONDS.sleep(5); log.info("-----------------------consumerDefaultTopic OVER---------------------------------"); } /** * 消费 test 的数据 , 接受的数据类型 为 artisan对象 * * @param artisan */ @SneakyThrows @Subscribe(topic = "test") public void consumerSpecTopic(Artisan artisan) { log.info("consumerSpecTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getName()); log.info("SubscriberA-->consumerSpecTopic(模拟执行10秒)-->收到Message-(对象)-> {}", artisan); TimeUnit.SECONDS.sleep(10); log.info("-----------------------consumerSpecTopic OVER---------------------------------"); } }
package com.artisan.busevent.consumer; import com.artisan.busevent.annotations.Subscribe; import com.artisan.busevent.entity.Artisan; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Slf4j public class SubscriberB { /** * 消费默认主题的数据 * @param message */ @SneakyThrows @Subscribe public void consumerDefaultTopic(String message) { log.info("consumerDefaultTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getName()); log.info("SubscriberB-->consumerDefaultTopic(模拟执行3秒)-->收到Message(字符串)--> {}", message); TimeUnit.SECONDS.sleep(3); log.info("-----------------------consumerDefaultTopic OVER---------------------------------"); } /** * 消费 test 主题的数据 * @param artisan */ @SneakyThrows @Subscribe(topic = "test") public void consumerSpecTopic(Artisan artisan) { log.info("consumerSpecTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getName()); log.info("SubscriberB-->consumerSpecTopic(模拟执行5秒)-->收到Message(对象)--> {} ", artisan); TimeUnit.SECONDS.sleep(5); log.info("-----------------------consumerSpecTopic OVER---------------------------------"); } }
package com.artisan.busevent.entity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.List; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Data @NoArgsConstructor @AllArgsConstructor() public class Artisan { private String name; private Integer age; private List<String> hobbies; }
同步Event Bus
package com.artisan.busevent.producer; import com.artisan.busevent.consumer.SubscriberA; import com.artisan.busevent.consumer.SubscriberB; import com.artisan.busevent.entity.Artisan; import com.artisan.busevent.impl.EventBus; import com.artisan.busevent.intf.Bus; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import java.util.Arrays; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Slf4j public class EventBus_SyncTest { @Test public void testSync() { Bus bus = new EventBus("TestBus"); bus.register(new SubscriberA()); bus.register(new SubscriberB()); log.info("--------默认 主题 ----"); // 默认主题 bus.post("Hello 小工匠"); log.info("--------test 主题 ----"); // 指定topic bus.post(new Artisan("artisan", 18, Arrays.asList("JAVA", "AIGC")), "test"); } }
异步Event Bus
package com.artisan.busevent.producer; import com.artisan.busevent.consumer.SubscriberA; import com.artisan.busevent.consumer.SubscriberB; import com.artisan.busevent.entity.Artisan; import com.artisan.busevent.impl.AsyncEventBus; import com.artisan.busevent.impl.EventBus; import com.artisan.busevent.intf.Bus; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Slf4j public class EventBus_AsyncTest { @Test public void testSync() throws InterruptedException { Bus bus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10)); bus.register(new SubscriberA()); bus.register(new SubscriberB()); log.info("--------默认 主题 ----"); // 默认主题 bus.post("Hello 小工匠"); log.info("--------test 主题 ----"); // 指定topic bus.post(new Artisan("artisan", 18, Arrays.asList("JAVA", "AIGC")), "test"); TimeUnit.SECONDS.sleep(20); } }
小结
EventBus有点类似于GOF设计模式中的监听者模式,但是EventBus提供的功能更加强大,使用起来也更加灵活,EventBus中的Subscriber不需要继承任何类或者实现任何接口,在使用EventBus时只需要持有Bus的引用即可。
在EventBus的设计中有三个非常重要的角色(Bus、Registry和Dispatcher),
Bus主要提供给外部使用的操作方法,
Registry注册表用来整理记录所有注册在EventBus上的Subscriber,
Dispatcher主要负责对Subscriber消息进行推送(用反射的方式执行方法),但是考虑到程序的灵活性,Dispatcher方法中又提供了Executor的多态方式。