小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(下)

简介: 小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(下)

AbstractApplicationEventMulticaster 时间发布器的抽象实现


它是对事件发布器的抽象实现,如果你自己想自定义一个时间发布器,可以继承它


// @since 1.2.3
// 提供基本的侦听器注册功能   比如处理代理对象类型~~~
public abstract class AbstractApplicationEventMulticaster
    implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
  // Retriever:猎犬  
  // 它是一个内部类,内部持有applicationListeners和applicationListenerBeans的引用
  // 是一个类似包装的类,详细可参加下面具体分析
  private final ListenerRetriever defaultRetriever = new 
ListenerRetriever(false);
  // 显然它是一个缓存:key由eventType, sourceType唯一确定~
  final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);
  // retrieval的互斥锁
  private Object retrievalMutex = this.defaultRetriever;
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    this.beanFactory = beanFactory;
    if (beanFactory instanceof ConfigurableBeanFactory) {
      ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;
      if (this.beanClassLoader == null) {
        this.beanClassLoader = cbf.getBeanClassLoader();
      }
      // 互斥锁 用容器里面的互斥锁
      this.retrievalMutex = cbf.getSingletonMutex();
    }
  }
  // 向容器内注册一个监听器~~~~
  // 需要注意的是,添加进来的监听器都是保存到defaultRetriever里面的
  // 最后getApplicationListeners就是从这里拿的(注册进来多少  最终返回多少~~~)
  @Override
  public void addApplicationListener(ApplicationListener<?> listener) {
    synchronized (this.retrievalMutex) {
      // 这一步:若类型是SingletonTargetSource也给拿出来~~~
      // 如果不是被代理的对象Advised,那就返回null
      Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
      if (singletonTarget instanceof ApplicationListener) {
        // 从默认的持有的applicationListeners里把它移除~~
        // 下面一句肯定又是会添加进来的,所以可议保证它在顶部~~~
        this.defaultRetriever.applicationListeners.remove(singletonTarget);
      }
      this.defaultRetriever.applicationListeners.add(listener);
      // 没加一个进来  都清空了缓存~~~~~~~~~~~~~~~~
      this.retrieverCache.clear();
    }
  }
  // 同样的 根据名称添加一个监听器也是可以的
  @Override
  public void addApplicationListenerBean(String listenerBeanName) {
    synchronized (this.retrievalMutex) {
      this.defaultRetriever.applicationListenerBeans.add(listenerBeanName);
      this.retrieverCache.clear();
    }
  }
  ...// remove方法类似
  // 如果不传参数,那就是返回defaultRetriever这个里面的值即可~
  protected Collection<ApplicationListener<?>> getApplicationListeners() {
    synchronized (this.retrievalMutex) {
      return this.defaultRetriever.getApplicationListeners();
    }
  }
  // 如果指定了event事件和eventType,那就这个方法   绝大多数情况下都是这里~~~
  // 获取该事件对应的监听者:相当于只会获取supportsEvent() = true支持的这种事件~
  protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) {
    Object source = event.getSource();
    Class<?> sourceType = (source != null ? source.getClass() : null);
    // 这个key是它俩共同决定的~~~~
    ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
    // Quick check for existing entry on ConcurrentHashMap...
    // 缓存里若存在  直接返回即可~~~~
    ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
    if (retriever != null) {
      return retriever.getApplicationListeners();
    }
    // 这里面~~~ 有个缓存安全的特殊处理,其最为核心的方法,其实还是retrieveApplicationListeners
    // 若是缓存安全的,才会缓存它  否则直接return即可~~~~
    // 什么叫缓存安全isCacheSafe:原理很简单,就是判断该类型是否在指定classloader或者其parent classloader中
    if (this.beanClassLoader == null || (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
            (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
      // Fully synchronized building and caching of a ListenerRetriever
      synchronized (this.retrievalMutex) {
        retriever = this.retrieverCache.get(cacheKey);
        if (retriever != null) {
          return retriever.getApplicationListeners();
        }
        retriever = new ListenerRetriever(true);
        // 需要缓存起来,所以才需要把retriever传过去,否则传null即可~(下面传的null)
        Collection<ApplicationListener<?>> listeners = retrieveApplicationListeners(eventType, sourceType, retriever);
        // 每个事件对应的Listener,都缓存在此处了~(注意:首次get的才给与缓存)
        // 因为有的是个体的beanName,有的是给的Bean,所以首次去拿时候缓存吧~~~
        this.retrieverCache.put(cacheKey, retriever);
        return listeners;
      }
    } else {
      // No ListenerRetriever caching -> no synchronization necessary
      return retrieveApplicationListeners(eventType, sourceType, null);
    }
  }
  // 关于`retrieveApplicationListeners`方法,它就是从defaultRetriever把applicationListeners和beanNames都拿出来合并了
  // 摒弃萃取出supportsEvent() 只支持这种类型的事件~
  // 最终它还`AnnotationAwareOrderComparator.sort(allListeners);`,证明监听器是支持排序接口的~
}


这就是我们getApplicationListeners的具体内容,我们发现:它只会拿注册到本容器的监听器(注册在谁身上就是谁的~~~),并不会去父类的拿的,所以这点一定要注意,你自己写监听器的时候也是需要注意这一点的,避免一些重复执行吧~~~


@EventListener使用中的小细节

  • @EventListener注解用在接口或者父类上都是没有任何问题的(这样子类就不用再写了,在接口层进行控制)
  • @EventListener标注的方法,无视访问权限
  • AbstractApplicationEventMulticaster的相关方法比如addApplicationListenerBean、removeApplicationListener。。。都是线程安全的。
  • 若想要异步执行事件,请自己配置@Bean这个Bean。然后setTaskExecutor()一个进去

需要注意的是,若你注册在接口上,请保证你使用的是JDK的动态代理机制,否则可能导致问题,一般并不建议这么干(虽然可以这么干)


@Component
public class MyAllEventListener implements MyAllEventListenerInterface {
    @Override
    public void handChild(Child c) {
        System.out.println(c.getName() + " 发来了事件");
    }
}
// 注解写在接口上,也是能正常work的~~~
interface MyAllEventListenerInterface {
    @EventListener(Child.class)
    void handChild(Child c);
}


ApplicationListener和@EventListener的区别


@EventListener存在漏事件的现象,但是ApplicationListener能监听到所有的相关事件


上面这句话怎么理解呢?这个和ApplicationListener什么时候注册有关。上面已经讲述了AbstractApplicationEventMulticaster是怎么获取到当前的所有的监听器的,那么他们的区别就在于:它俩注册的时机不一样(此处统一不考虑手动注册时间的情况):


ApplicationListener的注册时机


它是靠一个后置处理器:ApplicationListenerDetector它来处理的。它有两个方法处理:

// @since 4.3.4 出现得还是比较晚的~~~
class ApplicationListenerDetector implements DestructionAwareBeanPostProcessor, MergedBeanDefinitionPostProcessor {
  ...
  // 这个方法会在merge Bean的定义信息时候执行,缓存下该Bean是否是单例Bean
  // 因为后面注册的时候:只有单例Bean才给注册为监听器~~~
  @Override
  public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
    if (this.applicationContext != null) {
      this.singletonNames.put(beanName, beanDefinition.isSingleton());
    }
  }
  ...
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (this.applicationContext != null && bean instanceof ApplicationListener) {
      // 显然  只有单例Bean才会add进去  注册进去    
      if (Boolean.TRUE.equals(flag)) {
        this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
      } else if (Boolean.FALSE.equals(flag)) {
        // 输出一个warn日志:
        if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
          // 提示用户这个Bean实现了ApplicationListener  但是并不是单例的
          logger.warn("...");
        }
        // 不是单例的就从缓存移除吧~~~~
        this.singletonNames.remove(beanName);
      }
    }
    return bean;
  }
  ...
}


因为它是以Bean定义的形式注册进工厂的,并且refresh()中有一步registerListeners()它负责注册所有的监听器(Bean形式的),然后才是finishBeanFactoryInitialization(beanFactory),所以它是不会落掉事件的。


如果你的Bean被误伤:提前初始化了,那就不属于这个讨论范畴了。

参考:【小家Spring】注意BeanPostProcessor启动时对依赖Bean的“误伤”陷阱(is not eligible for getting processed by all…)


@EventListener的注册时机


注册它的是EventListenerMethodProcessor,它是一个SmartInitializingSingleton,它一直到preInstantiateSingletons()所有的单例Bean全部实例化完成了之后,它才被统一注册进去。所以它注册的时机是挺晚的。


由此知道,如果你在普通的单例Bean初始化期间(比如给属性赋值时、构造函数内。。。)发出了一个时间,@EventListener这种方式的监听器很有可能是监听不到的。


比如我遇到的一个例子:

@RestController
public class xxxController {
  ...
  // 此处它是一个@FeignClient,所以在初始化xxxController 的时候肯定会顺带初始化`StaffClient` 
    @Autowired
    private StaffClient staffClient;
}


如上,StaffClient这个@FeignClient会创建出一个Feign的子容器(它的父容器为boot容器),而此时我们的监听器为:


@Component
public class MyListener {
    @EventListener(classes = ContextRefreshedEvent.class)
    public void list1(ContextRefreshedEvent event) {
        ApplicationContext applicationContext = event.getApplicationContext();
        int beanDefinitionCount = applicationContext.getBeanDefinitionCount();
        System.out.println("当前容器的Bean总数:" + beanDefinitionCount);
    }
}


因为它是@EventListener,且MyListener这个Bean是交给SpringBoot容器管理的,而feign子容器创建的时候,其实还处于Boot容器流程的内部,所以此时@EventListener肯定是没有注册上的,因此此方法代表的监听器就不会生效了。


其实绝大多数情况下我们都可议采用@EventListener去监听事件,一般使用ApplicationListener的时候,大都只需要监听本容器发出的时间,比如我们监听ContextRefreshedEvent很多时候都会加上这么一句:

@Component
public class MyListener implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    private ApplicationContext applicationContext;
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
      // 相当于只监听本容器发出来的时间,别的容器的我不管~~~~~~~
        if (applicationContext == event.getApplicationContext()) {
      ...
        }
    }
}


了解事件的执行时机原理,能够避免很多的误伤,以及为啥监听器没生效,一看便知。


@EventListener注册不上去的小坑

上面说了,它的注册依赖于EventListenerMethodProcessor,它的执行是发生在容器对所有的单例Bean已经全部完成初始化了~比如我们这样介入一下:

@Component
public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
            // 让EventListenerMethodProcessor惰性加载~~~~
            if (beanDefinitionName.equals(AnnotationConfigUtils.EVENT_LISTENER_PROCESSOR_BEAN_NAME)) {
                beanFactory.getBeanDefinition(beanDefinitionName).setLazyInit(true);
            }
        }
    }
}


这样容器完成所有的单例实例化步骤后,其实EventListenerMethodProcessor这个Bean并没有完成真正的实例化的。而beanFactory.preInstantiateSingletons()方法最后一步为:


public void preInstantiateSingletons() throws BeansException {
  ...
    // Trigger post-initialization callback for all applicable beans...
    // 执行所有的SmartInitializingSingleton  这里面最为核心的就在于~~~~
    // getSingleton(beanName)这个方法,是直接去Map里找,只有被实例化的的单例Bean才会返回true,否则是false
    // 不知为何Spring此处不用getBean()   我个人认为  这是Spring为了提高速度的一个疏忽吧~~~~~
    for (String beanName : beanNames) {
      Object singletonInstance = getSingleton(beanName);
      if (singletonInstance instanceof SmartInitializingSingleton) {
        final SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
        if (System.getSecurityManager() != null) {
          AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
              smartSingleton.afterSingletonsInstantiated();
              return null;
            }
          }, getAccessControlContext());
        }
        else {
          smartSingleton.afterSingletonsInstantiated();
        }
      }
    }
}


如上:getSingleton方法是直接去DefaultSingletonBeanRegistry的Map<String, Object> singletonObjects里找的(含singletonFactories)。显然EventListenerMethodProcessor因为是Lazy加载,所以目前还仅仅是Bean的定义信息,所以就不会检测@EventListener的方法,因此它就不生效了


这是一个坑,当然一般情况下我们不会这么做,但是若真的出现了此情况(比如我们希望提高启动速度,全局惰性加载就会有问题了),希望可以快速定位到原因

各大模式大比拼

  • 观察者模式:它是设计模式里的一个术语。是一个非常经典的行为型设计模式。。猫叫了,主人醒了,老鼠跑了,这一经典的例子,是事件驱动模型在设计层面的体现。
  • 发布订阅模式:很多人认为等同于观察者模式。但我的理解是两者唯一区别,是发布订阅模式需要有一个调度中心,而观察者模式不需要(观察者的列表可以直接由被观察者维护)。 但它俩混用没问题,一般都不会在表达上有歧义
  • 消息队列MQ:中间件级别的消息队列(ActiveMQ,RabbitMQ),可以认为是发布订阅模式的一个具体体现

事件驱动->发布订阅->MQ,从抽象到具体。 因此MQ算是一个落地的产品了


  • EventSourcing:这个要关联到领域驱动设计。DDD对事件驱动也是非常地青睐,领域对象的状态完全是由事件驱动来控制。比如有著名的CQRS架构~~~

CQRS架构和微服务的关系:微服务的目的是为了从业务角度拆分(职责分离)当前业务领域的不同业务模块到不同的服务,每个微服务之间的数据完全独立,它们之间的交互可以通过SOA RPC调用(耦合比较高),也可以通过EDA 消息驱动(耦合比较低,比如我们常用的分布式产品:MQ)。


这类模式的优缺点


有点:


1.支持简单的广播通信,自动通知所有已经订阅过的对象

2.目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用(保持职责单一,解耦)

3.观察者模式分离了观察者和被观察者二者的责任,这样让类之间各自维护自己的功能,专注于自己的功能,会提高系统的可维护性和可重用性。


缺点:

  • 如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
  • 如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃


总结


本文暂时只介绍了Spring中的一些简单的事件驱动机制,相信如果之后再看到Event,Publisher,EventListener·一类的单词后缀时,也能立刻和事件机制联系上了

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
24天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
64 2
|
11天前
|
缓存 Java 数据库连接
深入探讨:Spring与MyBatis中的连接池与缓存机制
Spring 与 MyBatis 提供了强大的连接池和缓存机制,通过合理配置和使用这些机制,可以显著提升应用的性能和可扩展性。连接池通过复用数据库连接减少了连接创建和销毁的开销,而 MyBatis 的一级缓存和二级缓存则通过缓存查询结果减少了数据库访问次数。在实际应用中,结合具体的业务需求和系统架构,优化连接池和缓存的配置,是提升系统性能的重要手段。
27 4
|
16天前
|
Java 开发者 Spring
深入解析:Spring AOP的底层实现机制
在现代软件开发中,Spring框架的AOP(面向切面编程)功能因其能够有效分离横切关注点(如日志记录、事务管理等)而备受青睐。本文将深入探讨Spring AOP的底层原理,揭示其如何通过动态代理技术实现方法的增强。
45 8
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件 设计模式 缓存
spring源码设计模式分析(四)-观察者模式
spring源码设计模式分析(四)-观察者模式
|
3月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
51 0
手撸MQ消息队列——循环数组
|
4月前
|
Java 开发工具 Spring
Spring的Factories机制介绍
Spring的Factories机制介绍
82 1
|
4月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
183 1
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
114 0

相关产品

  • 云消息队列 MQ