小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(下)

简介: 小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(下)

AbstractApplicationEventMulticaster 时间发布器的抽象实现


它是对事件发布器的抽象实现,如果你自己想自定义一个时间发布器,可以继承它


// @since 1.2.3
// 提供基本的侦听器注册功能   比如处理代理对象类型~~~
public abstract class AbstractApplicationEventMulticaster
    implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
  // Retriever:猎犬  
  // 它是一个内部类,内部持有applicationListeners和applicationListenerBeans的引用
  // 是一个类似包装的类,详细可参加下面具体分析
  private final ListenerRetriever defaultRetriever = new 
ListenerRetriever(false);
  // 显然它是一个缓存:key由eventType, sourceType唯一确定~
  final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);
  // retrieval的互斥锁
  private Object retrievalMutex = this.defaultRetriever;
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    this.beanFactory = beanFactory;
    if (beanFactory instanceof ConfigurableBeanFactory) {
      ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;
      if (this.beanClassLoader == null) {
        this.beanClassLoader = cbf.getBeanClassLoader();
      }
      // 互斥锁 用容器里面的互斥锁
      this.retrievalMutex = cbf.getSingletonMutex();
    }
  }
  // 向容器内注册一个监听器~~~~
  // 需要注意的是,添加进来的监听器都是保存到defaultRetriever里面的
  // 最后getApplicationListeners就是从这里拿的(注册进来多少  最终返回多少~~~)
  @Override
  public void addApplicationListener(ApplicationListener<?> listener) {
    synchronized (this.retrievalMutex) {
      // 这一步:若类型是SingletonTargetSource也给拿出来~~~
      // 如果不是被代理的对象Advised,那就返回null
      Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
      if (singletonTarget instanceof ApplicationListener) {
        // 从默认的持有的applicationListeners里把它移除~~
        // 下面一句肯定又是会添加进来的,所以可议保证它在顶部~~~
        this.defaultRetriever.applicationListeners.remove(singletonTarget);
      }
      this.defaultRetriever.applicationListeners.add(listener);
      // 没加一个进来  都清空了缓存~~~~~~~~~~~~~~~~
      this.retrieverCache.clear();
    }
  }
  // 同样的 根据名称添加一个监听器也是可以的
  @Override
  public void addApplicationListenerBean(String listenerBeanName) {
    synchronized (this.retrievalMutex) {
      this.defaultRetriever.applicationListenerBeans.add(listenerBeanName);
      this.retrieverCache.clear();
    }
  }
  ...// remove方法类似
  // 如果不传参数,那就是返回defaultRetriever这个里面的值即可~
  protected Collection<ApplicationListener<?>> getApplicationListeners() {
    synchronized (this.retrievalMutex) {
      return this.defaultRetriever.getApplicationListeners();
    }
  }
  // 如果指定了event事件和eventType,那就这个方法   绝大多数情况下都是这里~~~
  // 获取该事件对应的监听者:相当于只会获取supportsEvent() = true支持的这种事件~
  protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) {
    Object source = event.getSource();
    Class<?> sourceType = (source != null ? source.getClass() : null);
    // 这个key是它俩共同决定的~~~~
    ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
    // Quick check for existing entry on ConcurrentHashMap...
    // 缓存里若存在  直接返回即可~~~~
    ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
    if (retriever != null) {
      return retriever.getApplicationListeners();
    }
    // 这里面~~~ 有个缓存安全的特殊处理,其最为核心的方法,其实还是retrieveApplicationListeners
    // 若是缓存安全的,才会缓存它  否则直接return即可~~~~
    // 什么叫缓存安全isCacheSafe:原理很简单,就是判断该类型是否在指定classloader或者其parent classloader中
    if (this.beanClassLoader == null || (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
            (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
      // Fully synchronized building and caching of a ListenerRetriever
      synchronized (this.retrievalMutex) {
        retriever = this.retrieverCache.get(cacheKey);
        if (retriever != null) {
          return retriever.getApplicationListeners();
        }
        retriever = new ListenerRetriever(true);
        // 需要缓存起来,所以才需要把retriever传过去,否则传null即可~(下面传的null)
        Collection<ApplicationListener<?>> listeners = retrieveApplicationListeners(eventType, sourceType, retriever);
        // 每个事件对应的Listener,都缓存在此处了~(注意:首次get的才给与缓存)
        // 因为有的是个体的beanName,有的是给的Bean,所以首次去拿时候缓存吧~~~
        this.retrieverCache.put(cacheKey, retriever);
        return listeners;
      }
    } else {
      // No ListenerRetriever caching -> no synchronization necessary
      return retrieveApplicationListeners(eventType, sourceType, null);
    }
  }
  // 关于`retrieveApplicationListeners`方法,它就是从defaultRetriever把applicationListeners和beanNames都拿出来合并了
  // 摒弃萃取出supportsEvent() 只支持这种类型的事件~
  // 最终它还`AnnotationAwareOrderComparator.sort(allListeners);`,证明监听器是支持排序接口的~
}


这就是我们getApplicationListeners的具体内容,我们发现:它只会拿注册到本容器的监听器(注册在谁身上就是谁的~~~),并不会去父类的拿的,所以这点一定要注意,你自己写监听器的时候也是需要注意这一点的,避免一些重复执行吧~~~


@EventListener使用中的小细节

  • @EventListener注解用在接口或者父类上都是没有任何问题的(这样子类就不用再写了,在接口层进行控制)
  • @EventListener标注的方法,无视访问权限
  • AbstractApplicationEventMulticaster的相关方法比如addApplicationListenerBean、removeApplicationListener。。。都是线程安全的。
  • 若想要异步执行事件,请自己配置@Bean这个Bean。然后setTaskExecutor()一个进去

需要注意的是,若你注册在接口上,请保证你使用的是JDK的动态代理机制,否则可能导致问题,一般并不建议这么干(虽然可以这么干)


@Component
public class MyAllEventListener implements MyAllEventListenerInterface {
    @Override
    public void handChild(Child c) {
        System.out.println(c.getName() + " 发来了事件");
    }
}
// 注解写在接口上,也是能正常work的~~~
interface MyAllEventListenerInterface {
    @EventListener(Child.class)
    void handChild(Child c);
}


ApplicationListener和@EventListener的区别


@EventListener存在漏事件的现象,但是ApplicationListener能监听到所有的相关事件


上面这句话怎么理解呢?这个和ApplicationListener什么时候注册有关。上面已经讲述了AbstractApplicationEventMulticaster是怎么获取到当前的所有的监听器的,那么他们的区别就在于:它俩注册的时机不一样(此处统一不考虑手动注册时间的情况):


ApplicationListener的注册时机


它是靠一个后置处理器:ApplicationListenerDetector它来处理的。它有两个方法处理:

// @since 4.3.4 出现得还是比较晚的~~~
class ApplicationListenerDetector implements DestructionAwareBeanPostProcessor, MergedBeanDefinitionPostProcessor {
  ...
  // 这个方法会在merge Bean的定义信息时候执行,缓存下该Bean是否是单例Bean
  // 因为后面注册的时候:只有单例Bean才给注册为监听器~~~
  @Override
  public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
    if (this.applicationContext != null) {
      this.singletonNames.put(beanName, beanDefinition.isSingleton());
    }
  }
  ...
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (this.applicationContext != null && bean instanceof ApplicationListener) {
      // 显然  只有单例Bean才会add进去  注册进去    
      if (Boolean.TRUE.equals(flag)) {
        this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
      } else if (Boolean.FALSE.equals(flag)) {
        // 输出一个warn日志:
        if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
          // 提示用户这个Bean实现了ApplicationListener  但是并不是单例的
          logger.warn("...");
        }
        // 不是单例的就从缓存移除吧~~~~
        this.singletonNames.remove(beanName);
      }
    }
    return bean;
  }
  ...
}


因为它是以Bean定义的形式注册进工厂的,并且refresh()中有一步registerListeners()它负责注册所有的监听器(Bean形式的),然后才是finishBeanFactoryInitialization(beanFactory),所以它是不会落掉事件的。


如果你的Bean被误伤:提前初始化了,那就不属于这个讨论范畴了。

参考:【小家Spring】注意BeanPostProcessor启动时对依赖Bean的“误伤”陷阱(is not eligible for getting processed by all…)


@EventListener的注册时机


注册它的是EventListenerMethodProcessor,它是一个SmartInitializingSingleton,它一直到preInstantiateSingletons()所有的单例Bean全部实例化完成了之后,它才被统一注册进去。所以它注册的时机是挺晚的。


由此知道,如果你在普通的单例Bean初始化期间(比如给属性赋值时、构造函数内。。。)发出了一个时间,@EventListener这种方式的监听器很有可能是监听不到的。


比如我遇到的一个例子:

@RestController
public class xxxController {
  ...
  // 此处它是一个@FeignClient,所以在初始化xxxController 的时候肯定会顺带初始化`StaffClient` 
    @Autowired
    private StaffClient staffClient;
}


如上,StaffClient这个@FeignClient会创建出一个Feign的子容器(它的父容器为boot容器),而此时我们的监听器为:


@Component
public class MyListener {
    @EventListener(classes = ContextRefreshedEvent.class)
    public void list1(ContextRefreshedEvent event) {
        ApplicationContext applicationContext = event.getApplicationContext();
        int beanDefinitionCount = applicationContext.getBeanDefinitionCount();
        System.out.println("当前容器的Bean总数:" + beanDefinitionCount);
    }
}


因为它是@EventListener,且MyListener这个Bean是交给SpringBoot容器管理的,而feign子容器创建的时候,其实还处于Boot容器流程的内部,所以此时@EventListener肯定是没有注册上的,因此此方法代表的监听器就不会生效了。


其实绝大多数情况下我们都可议采用@EventListener去监听事件,一般使用ApplicationListener的时候,大都只需要监听本容器发出的时间,比如我们监听ContextRefreshedEvent很多时候都会加上这么一句:

@Component
public class MyListener implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    private ApplicationContext applicationContext;
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
      // 相当于只监听本容器发出来的时间,别的容器的我不管~~~~~~~
        if (applicationContext == event.getApplicationContext()) {
      ...
        }
    }
}


了解事件的执行时机原理,能够避免很多的误伤,以及为啥监听器没生效,一看便知。


@EventListener注册不上去的小坑

上面说了,它的注册依赖于EventListenerMethodProcessor,它的执行是发生在容器对所有的单例Bean已经全部完成初始化了~比如我们这样介入一下:

@Component
public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
            // 让EventListenerMethodProcessor惰性加载~~~~
            if (beanDefinitionName.equals(AnnotationConfigUtils.EVENT_LISTENER_PROCESSOR_BEAN_NAME)) {
                beanFactory.getBeanDefinition(beanDefinitionName).setLazyInit(true);
            }
        }
    }
}


这样容器完成所有的单例实例化步骤后,其实EventListenerMethodProcessor这个Bean并没有完成真正的实例化的。而beanFactory.preInstantiateSingletons()方法最后一步为:


public void preInstantiateSingletons() throws BeansException {
  ...
    // Trigger post-initialization callback for all applicable beans...
    // 执行所有的SmartInitializingSingleton  这里面最为核心的就在于~~~~
    // getSingleton(beanName)这个方法,是直接去Map里找,只有被实例化的的单例Bean才会返回true,否则是false
    // 不知为何Spring此处不用getBean()   我个人认为  这是Spring为了提高速度的一个疏忽吧~~~~~
    for (String beanName : beanNames) {
      Object singletonInstance = getSingleton(beanName);
      if (singletonInstance instanceof SmartInitializingSingleton) {
        final SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
        if (System.getSecurityManager() != null) {
          AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
              smartSingleton.afterSingletonsInstantiated();
              return null;
            }
          }, getAccessControlContext());
        }
        else {
          smartSingleton.afterSingletonsInstantiated();
        }
      }
    }
}


如上:getSingleton方法是直接去DefaultSingletonBeanRegistry的Map<String, Object> singletonObjects里找的(含singletonFactories)。显然EventListenerMethodProcessor因为是Lazy加载,所以目前还仅仅是Bean的定义信息,所以就不会检测@EventListener的方法,因此它就不生效了


这是一个坑,当然一般情况下我们不会这么做,但是若真的出现了此情况(比如我们希望提高启动速度,全局惰性加载就会有问题了),希望可以快速定位到原因

各大模式大比拼

  • 观察者模式:它是设计模式里的一个术语。是一个非常经典的行为型设计模式。。猫叫了,主人醒了,老鼠跑了,这一经典的例子,是事件驱动模型在设计层面的体现。
  • 发布订阅模式:很多人认为等同于观察者模式。但我的理解是两者唯一区别,是发布订阅模式需要有一个调度中心,而观察者模式不需要(观察者的列表可以直接由被观察者维护)。 但它俩混用没问题,一般都不会在表达上有歧义
  • 消息队列MQ:中间件级别的消息队列(ActiveMQ,RabbitMQ),可以认为是发布订阅模式的一个具体体现

事件驱动->发布订阅->MQ,从抽象到具体。 因此MQ算是一个落地的产品了


  • EventSourcing:这个要关联到领域驱动设计。DDD对事件驱动也是非常地青睐,领域对象的状态完全是由事件驱动来控制。比如有著名的CQRS架构~~~

CQRS架构和微服务的关系:微服务的目的是为了从业务角度拆分(职责分离)当前业务领域的不同业务模块到不同的服务,每个微服务之间的数据完全独立,它们之间的交互可以通过SOA RPC调用(耦合比较高),也可以通过EDA 消息驱动(耦合比较低,比如我们常用的分布式产品:MQ)。


这类模式的优缺点


有点:


1.支持简单的广播通信,自动通知所有已经订阅过的对象

2.目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用(保持职责单一,解耦)

3.观察者模式分离了观察者和被观察者二者的责任,这样让类之间各自维护自己的功能,专注于自己的功能,会提高系统的可维护性和可重用性。


缺点:

  • 如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
  • 如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃


总结


本文暂时只介绍了Spring中的一些简单的事件驱动机制,相信如果之后再看到Event,Publisher,EventListener·一类的单词后缀时,也能立刻和事件机制联系上了

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4天前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
22 0
|
21天前
|
XML Java 数据格式
编织Spring魔法:解读核心容器中的Beans机制【beans 一】
编织Spring魔法:解读核心容器中的Beans机制【beans 一】
31 0
|
21天前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
21 1
|
1月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
19 1
|
21天前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
25 0
|
1月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
24 0
|
1月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
25 0
|
4天前
|
NoSQL Java Redis
Spring boot 实现监听 Redis key 失效事件
【2月更文挑战第2天】 Spring boot 实现监听 Redis key 失效事件
27 0
|
25天前
|
消息中间件 网络架构
【面试问题】什么是 MQ topic 交换器(模式匹配) ?
【1月更文挑战第27天】【面试问题】什么是 MQ topic 交换器(模式匹配) ?
|
29天前
|
物联网 Go 网络性能优化
MQTT协议本身支持多种消息收发模式
MQTT协议本身支持多种消息收发模式【1月更文挑战第24天】【1月更文挑战第120篇】
23 3

相关产品

  • 云消息队列 MQ