深度解析 EventBus 事件总线原理

简介:

一、问题描述

在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知)。如果是同一个jvm里面通知的话,就可以使用EventBus。由于EventBus使用起来简单、便捷,因此,工作中会经常用到。深入理解该框架的原理就很有必要。

二、框架解析

2.1、组织结构

eventbus的组织结构如下:

eventbus主要有以下几部分组成:

1、eventbus、asyncEventBus:事件发送器。

2、event:事件承载单元。

3、SubscriberRegistry:订阅者注册器,将订阅者注册到event上,即将有注解Subscribe的方法和event绑定起来。

4、Dispatcher:事件分发器,将事件的订阅者调用来执行。

5、Subscriber、SynchronizedSubscriber:订阅者,并发订阅还是同步订阅。

2.2、运行原理

1、eventbus是基于注册监听的方式来运行的,因此,首先需要将eventbus,然后才会有事件及监听者。新建eventbus或者AsyncEventBus的方式如下:

 EventBus eventBus = new EventBus();

或者

 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,
                30, TimeUnit.SECONDS, workQueue);
        AsyncEventBus asyncEventBus = new AsyncEventBus(executor);

2、注册监听者。

eventBus.register(eventListener);

底层就是将类eventListener中所有注解有Subscribe的方法与其Event对放在一个map中(一个event可以对应多个Subscribe的方法)。实现如下:

void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

3、事件发送:执行指定事件类型的订阅者(包含了method),从订阅者中获取指定事件的订阅者,然后按照规则(同步、异步)执行指定的方法。

public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

上述代码说明,如果事件没有监听者,就当作死亡事件来对待。

 /** Dispatches {@code event} to this subscriber using the proper executor. */
  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

这里就说明,最后就是被订阅的方法被调用。

4、EventBus与AsyncEventBus的区别

从字面上看,AsyncEventBus是异步的EventBus,那么EventBus应该就是同步的了。EventBus的executor为MoreExecutors.directExecutor(),其实现如下:

 public static Executor directExecutor() {
    return DirectExecutor.INSTANCE;
  }

  /** See {@link #directExecutor} for behavioral notes. */
  private enum DirectExecutor implements Executor {
    INSTANCE;

    @Override
    public void execute(Runnable command) {
      command.run();
    }

    @Override
    public String toString() {
      return "MoreExecutors.directExecutor()";
    }
  }

其execute方法直接执行线程的run方法,即同步调用run方法执行。EventBus的dispatcher为PerThreadQueuedDispatcher。其dispatch方法如下:

@Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));

      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }

dispatchEvent的实现如下:

 final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

因此,整个执行过程如下:

整个过程都是同步方式执行,因此,EventBus是同步的。

AsyncEventBus的dispatcher为LegacyAsyncDispatcher,executor为自己指定的线程池。运行流程如下:

虚线为线程池异步调度,因此,AsyncEventBus为异步方式。

5、AllowConcurrentEvents的作用

它所在的代码为:

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }

  private static boolean isDeclaredThreadSafe(Method method) {
    return method.getAnnotation(AllowConcurrentEvents.class) != null;
  }

即如果订阅者方法上有注解AllowConcurrentEvents,则返回Subscriber,否则,返回SynchronizedSubscriber。SynchronizedSubscriber的字面意思为同步订阅者,它的实现代码为:

@Override
    void invokeSubscriberMethod(Object event) throws InvocationTargetException {
      synchronized (this) {
        super.invokeSubscriberMethod(event);
      }
    }

即没有使用注解AllowConcurrentEvents的订阅者,在并发环境中,都是串行执行。这在高并发环境中,会严重影响性能。

三、使用案例

3.1、eventbus定义

@Configuration
public class ConfigBean {

    @Bean
    public EventBus executorService() {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,
                30, TimeUnit.SECONDS, workQueue);
        return new AsyncEventBus(executor);
    }
}

3.2、注册与事件发送

@Service
public class TestService implements InitializingBean {

    @Autowired
    private EventListener eventListener ;

    @Autowired
    private EventBus eventBus ;

    public void postEvent(){
        eventBus.post(new LoginEvent("iwill","123456"));
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        eventBus.register(eventListener);
    }
}

3.3、订阅者定义

package com.iwill.eventBus.listener;

import com.google.common.eventbus.Subscribe;
import com.iwill.eventBus.event.LoginEvent;
import com.iwill.eventBus.event.RegisterEvent;
import org.springframework.stereotype.Component;

@Component
public class EventListener {

    @Subscribe
    public void subscribeLoginEvent1(LoginEvent event){
        System.out.println("method 1 : receive login event ");
    }

    @Subscribe
    public void subscribeLoginEvent2(LoginEvent event){
        System.out.println("method 2 : receive login event ");
    }

    @Subscribe
    public void subscribeRegisterEvent(RegisterEvent event){
        try{
            Thread.sleep(10000L);
        }catch (Exception exp){
            exp.printStackTrace();
        }
        System.out.println("method  : receive register event ");
    }
}

四、注意事项

1、在高并发的环境下使用AsyncEventBus时,发送事件可能会出现异常,因为它使用的线程池,当线程池的线程不够用时,会拒绝接收任务,就会执行线程池的拒绝策略,如果需要关注是否提交事件成功,就需要将线程池的拒绝策略设为抛出异常,并且try-catch来捕获异常。如下:

try {
            eventBus.post(new LoginEvent("iwill", "123456"));
        }catch (Exception exp){
            //TODO 落表或者其他处理
        }

2、本文用到的guava版本如下:

  <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>26.0-jre</version>
        </dependency>

本文来自云栖社区合作伙伴“开源中国”

本文作者:王练

原文链接

相关文章
|
12月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
701 86
|
9月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
11月前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
352 14
|
8月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
617 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
9月前
|
机器学习/深度学习 算法 数据挖掘
解析静态代理IP改善游戏体验的原理
静态代理IP通过提高网络稳定性和降低延迟,优化游戏体验。具体表现在加快游戏网络速度、实时玩家数据分析、优化游戏设计、简化更新流程、维护网络稳定性、提高连接可靠性、支持地区特性及提升访问速度等方面,确保更流畅、高效的游戏体验。
233 22
解析静态代理IP改善游戏体验的原理
|
8月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
1088 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
9月前
|
编解码 缓存 Prometheus
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
630 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
|
8月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
619 2
|
10月前
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
12198 46
|
9月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
1169 12

推荐镜像

更多
  • DNS