小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(上)

简介: 小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(上)

前言


说到事件驱动,我心里一直就有一个不解的疑问:它和我们老生长谈的一些概念比如:【观察者模式】【发布订阅模式】【消息队列MQ】【消息驱动】【EventSourcing】等等是一回事吗?


可能很多小伙伴会回答:差不多。确实,很有深意的三字回答。


那么本文将以Spring的事件驱动机制为引子,好好的聊聊这里面的关系和差异~


JDK中的事件驱动机制


在了解其它之前,有必要先了解下JDK为我们提供的事件驱动(EventListener、EventObject)、观察者模式(Observer)。


JDK不仅提供了Observable类、Observer接口支持观察者模式,而且也提供了EventObject、EventListener接口来支持事件监听模式。


这些类都属于java.util下的


观察者模式(Observable和Observer) JDK1.0提供

被观察对象:观察者 = 1:n (观察者可以有N个嘛)


观察者(Observer)相当于事件监听者(监听器),被观察者(Observable)相当于事件源和事件,执行逻辑时通知observer即可触发oberver的update,同时可传被观察者和参数。简化了事件-监听模式的实现。

// 观察者,实现此接口即可
public interface Observer {
  // 当被观察的对象发生变化时候,这个方法会被调用
  //Observable o:被观察的对象
  // Object arg:传入的参数
    void update(Observable o, Object arg);
}
// 它是一个Class
public class Observable {
  // 是否变化,决定了后面是否调用update方法
    private boolean changed = false;
    // 用来存放所有`观察自己的对象`的引用,以便逐个调用update方法
    // 需要注意的是:1.8的jdk源码为Vector(线程安全的),有版本的源码是ArrayList的集合实现; 
    private Vector<Observer> obs;
    public Observable() {
        obs = new Vector<>();
    }
  public synchronized void addObserver(Observer o); //添加一个观察者 注意调用的是addElement方法,添加到末尾   所以执行时是倒序执行的
  public synchronized void deleteObserver(Observer o);
  public synchronized void deleteObservers(); //删除所有的观察者
  // 循环调用所有的观察者的update方法
  public void notifyObservers();
  public void notifyObservers(Object arg);
    public synchronized int countObservers() {
        return obs.size();
    }
  // 修改changed的值
    protected synchronized void setChanged() {
        changed = true;
    }
    protected synchronized void clearChanged() {
        changed = false;
    }
    public synchronized boolean hasChanged() {
        return changed;
    }
}


它的使用非常的便捷,看个例子就能明白;


class Person extends Observable {
    public String name;
    public Person(String name) {
        this.name = name;
    }
    // 给鱼:这样所有观察的猫都会过来了
    // fishType: 鱼的名字
    public void giveFish(String fishName) {
        setChanged(); // 这个一定不能忘
        notifyObservers(fishName);
    }
}
class Cat implements Observer {
    public String name;
    public Cat(String name) {
        this.name = name;
    }
    @Override
    public void update(Observable o, Object arg) {
        String preffix = o.toString();
        if (o instanceof Person) {
            preffix = ((Person) o).name;
        }
        System.out.println(preffix + "主人放 " + arg + "~了," + name + "去吃鱼吧");
    }
}
// 测试方法如下:
    public static void main(String[] args) {
        Person person = new Person("fsx");
        // 来10只猫 观察这个人
        for (int i = 0; i < 10; i++) {
            person.addObserver(new Cat("cat" + i));
        }
        //开始放fish,这时候观察的猫就应该都过来了
        person.giveFish("草鱼");
    }
// 输出
fsx主人放 草鱼~了,cat9去吃鱼吧
fsx主人放 草鱼~了,cat8去吃鱼吧
fsx主人放 草鱼~了,cat7去吃鱼吧
fsx主人放 草鱼~了,cat6去吃鱼吧
fsx主人放 草鱼~了,cat5去吃鱼吧
fsx主人放 草鱼~了,cat4去吃鱼吧
fsx主人放 草鱼~了,cat3去吃鱼吧
fsx主人放 草鱼~了,cat2去吃鱼吧
fsx主人放 草鱼~了,cat1去吃鱼吧
fsx主人放 草鱼~了,cat0去吃鱼吧


JDK的观察者模式使用起来确实非常的方便,我们只需要面对两个对象即可。内部观察者队列啥的都交给Observable去处理了。 并且,它是线程安全的


发布订阅模式(EventListener和EventObject) JDK1.1提供


Spring中的事件驱动机制


事件机制一般包括三个部分:EventObject,EventListener和Source。

EventObject:事件状态对象的基类,它封装了事件源对象以及和事件相关的信息。所有java的事件类都需要继承该类

EventListener:是一个标记接口,就是说该接口内是没有任何方法的。所有事件监听器都需要实现该接口。事件监听器注册在事件源上,当事件源的属性或状态改变的时候,调用相应监听器内的回调方法(自己写)。

Source:一个普通的POJO。事件最初发生的地方,他里面必须含有监听它的监听器们


class MyEvent extends EventObject {
    public MyEvent(Object source) {
        super(source);
    }
}
// 状态改变事件
class StatusChangedListener implements EventListener {
    public void handleEvent(MyEvent event) {
        System.out.println(event.getSource() + " 的状态改变啦~");
    }
}
// 状态没变化事件
class StateSameListener implements EventListener {
    public void handleEvent(MyEvent event) {
        System.out.println(event.getSource() + " 的状态没有任何变化~");
    }
}
class MySource {
    private int status;
    List<EventListener> eventListeners = new ArrayList<>();
    public int getStatus() {
        return status;
    }
    public void setStatus(int status) {
        this.status = status;
    }
    public void addListener(EventListener listener) {
        eventListeners.add(listener);
    }
    // 调用所有的合适的监听器
    public void notifyListeners(int oldStatus, int newStatus) {
        eventListeners.forEach(l -> {
            if (oldStatus == newStatus) {
                // doSamething
            } else {
                // doSamething
            }
        });
    }
}
// 测试方法
    public static void main(String[] args) {
        MySource mySource = new MySource();
        mySource.addListener(new StatusChangedListener());
        mySource.addListener(new StateSameListener());
        int oldStatus = mySource.getStatus();
        mySource.setStatus(1);
        int newStatus = mySource.getStatus();
        // 触发所有的监听者们
        mySource.notifyListeners(oldStatus, newStatus);
    }


对弈上面的观察者模式,监听模式使用起来确实非常的繁琐,且还线程安全问题还得自己考虑解决。我个人觉得JDK的源生的事件、监听模式非常难用(不太建议使用,它最大的败笔在于EventListener接口没有定义一个抽象方法,不知道是作何考虑的,应该是为了更加抽象吧)。因此接下来,大行其道的Spring事件机制就很好的解决使用上的问题~~~它也是今天的主菜


Spring中事件驱动机制



Spring提供了ApplicationEventPublisher接口作为事件发布者(ApplicationContext接口继承了该接口,担当着事件发布者的角色)。

Spring提供了ApplicationEventMulticaster接口,负责管理ApplicationListener和真正发布ApplicationEvent(ApplicationContext是委托给它完成的)


ApplicationListener实现了JDK的EventListener,但它抽象出一个onApplicationEvent方法,使用更方便。ApplicationEvent继承自EventObject。 Spring这么做我觉得完全是为了兼容Java规范~

ApplicationEventPublisher最终都是委托给ApplicationEventMulticaster去完成的。当然你也可以自己去实现一个ApplicationEventMulticaster


在博文:【小家Spring】Spring IOC容器启动流程 AbstractApplicationContext#refresh()方法源码分析(二),Spring容器启动/刷新的完整总结

这里讲解IoC容器refresh()的时候,第八步:initApplicationEventMulticaster()和第十步:registerListeners()和第十二步:inishRefresh()方法里的publishEvent(new ContextRefreshedEvent(this))都是和时间机制相关的方法。


initApplicationEventMulticaster():我们向容器注册了一个SimpleApplicationEventMulticaster(若我们自己没指定的话),因此若我们希望手动控制时间的发布,是可以@Autowired进来的

registerListeners():会把所有的ApplicationListener添加进ApplicationEventMulticaster进行管理(注意此处并不包括@EventListener标注的注解方法)

publishEvent:发布事件。因为ApplicationContext继承了ApplicationEventMulticaster,因此我们一般发布时间建议用它就成了


public abstract class ApplicationEvent extends EventObject {
  private static final long serialVersionUID = 7099057708183571937L;  
  private final long timestamp;
  public ApplicationEvent(Object source) {
    super(source);
    this.timestamp = System.currentTimeMillis();
  }
  public final long getTimestamp() {
    return this.timestamp;
  }
}
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
  // 此子接口提供了泛型,和提供了统一的处理方法
  void onApplicationEvent(E event);
}
@FunctionalInterface
public interface ApplicationEventPublisher {
  default void publishEvent(ApplicationEvent event) {
    publishEvent((Object) event);
  }
  // 这个接口是Spring4.2后提供的,可以发布任意的事件对象(即使不是ApplicationEvent的子类了)
  // 当这个对象不是一个ApplicationEvent,我们会使用PayloadApplicationEvent来包装一下再发送
  // 比如后面会建讲到的@EventListener注解标注的放 就是使用的它
  void publishEvent(Object event);
}

我们知道Spring4.2后提供了@EventListener注解,让我们更便捷的使用监听了,非常非常非常的方便:


ApplicationListener类模式的演示和原理解析


它的继承树如下:


image.png


这里只是纯的Spring环境,若你是SpringBoot和Spring Cloud环境,实现类将非常非常之多,课件事件驱动模式还是蛮重要的~


GenericApplicationListener和SmartApplicationListener


// @since 3.0
public interface SmartApplicationListener extends ApplicationListener<ApplicationEvent>, Ordered {
  boolean supportsEventType(Class<? extends ApplicationEvent> eventType);
  boolean supportsSourceType(Class<?> sourceType);
}
// @since 4.2
public interface GenericApplicationListener extends ApplicationListener<ApplicationEvent>, Ordered {
  boolean supportsEventType(ResolvableType eventType);
  boolean supportsSourceType(Class<?> sourceType);
}


它俩没啥,只是更多的关注了事件的细节些。


GenericApplicationListener是4.2才支持的。若你出现了java.lang.ClassNotFoundException: org.springframework.context.event.GenericApplicationListener这种异常,请检查是不是你Maven的版本冲突引起~


这是Spring最早期就提供了的一种事件监听方式。实现起来也非常的简单。


通过Spring源码我们了解到,Spring容器刷新的时候会发布ContextRefreshedEvent事件,因此若我们需要监听此事件,直接写个监听类即可:

@Slf4j
@Component
public class ApplicationRefreshedEventListener implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        Object source = event.getSource();
        // 此处的source就是ApplicationContext这个对象
        System.out.println(source); //WebApplicationContext for namespace 'dispatcher-servlet': startup date [Tue Mar 26 14:26:27 CST 2019]; parent: Root WebApplicationContext
        //容器此时已经准备好了,可以做你该做的事了~......(请注意:若存在父子容器或者多个容器情况,此方法会被执行多次,请务必注意是否幂等)
    }
}


若是web环境,FrameworkServlet在处理完每一次i请求,也会发出一个事件:ServletRequestHandledEvent


自己发布一个事件,然后自己监听~~~~


public class MyAppEvent extends ApplicationEvent {
    public MyAppEvent(Object source) {
        super(source);
    }
}
// 写个监听器,然后交给容器管理即可
@Slf4j
@Component
public class MyEventListener implements ApplicationListener<MyAppEvent> {
    @Override
    public void onApplicationEvent(MyAppEvent event) {
        Object source = event.getSource();
        long timestamp = event.getTimestamp();
        System.out.println(source);
        System.out.println(timestamp);
        //doSomething
    }
}
    public static void main(String[] args) {
        ApplicationContext applicationContext = new AnnotationConfigApplicationContext(RootConfig.class);
        // 发布自己的事件
        applicationContext.publishEvent(new MyAppEvent("this is my event"));
    }
// 输出:
this is my event
1553581974928


Spring内置的事件讲解



image.png

Web相关事件:


  • RequestHandledEvent:Web相关事件,只能应用于使用DispatcherServlet的Web应用。在使用Spring作为前端的MVC控制器时,当Spring处理用户请求结束后,系统会自动触发该事件(即ServletRequestHandledEvent)


ApplicationContextEvent:应用本身的事件

  • ContextRefreshedEvent:容器初始化完成刷新时触发。此时所有的Bean已经初始化完成、后置处理器等都已经完成
  • ContextStartedEvent:AbstractApplicationContext#strart()被调用时。 需要手动调用,个人觉得没啥卵用
  • ContextStoppedEvent:容器的stop方法被手动调用时。 也没啥卵用
  • ContextClosedEvent:close() 关闭容器时候发布。一个已关闭的上下文到达生命周期末端;它不能被刷新或重启


@EventListener注解方法模式演示


在任意方法上标注@EventListener注解,指定 classes,即需要处理的事件类型,一般就是 ApplicationEven 及其子类(当然任意事件也是Ok的,比如下面的MyAppEvent就是个普通的POJO),可以设置多项。

public class MyAppEvent {
    private String name;
    public MyAppEvent(String name) {
        this.name = name;
    }
}
// 显然此处,它会收到两个时间,分别进行处理
@Component
public class MyAllEventListener {
    //value必须给值,但可以不用是ApplicationEvent的子类  任意事件都ok
    // 也可以给一个入参,代表事件的Event
    @EventListener(value = {ContextRefreshedEvent.class, MyAppEvent.class}
            // confition的使用,若同一个事件进行区分同步异步 等等条件的可以使用此confition 支持spel表达式  非常强大
            /*,condition = "#event.isAsync == false"*/)
    public void handle(Object o) {
        System.out.println(o);
        System.out.println("事件来了~");
    }
}
    public static void main(String[] args) {
        ApplicationContext applicationContext = new AnnotationConfigApplicationContext(RootConfig.class);
        // 发布自己的事件
        applicationContext.publishEvent(new MyAppEvent("this is my event"));
    }


显然这种方式更被推崇,因为它是方法级别的,更轻便了。(Spring4.2之后提出)


@EventListener的使用注意事项


不乏有小伙伴在启动的时候看到过这样的异常


Caused by: java.lang.IllegalStateException: Need to invoke method 'applicationContextEvent' declared on target class 'HelloServiceImpl', but not found in any interface(s) of the exposed proxy type. Either pull the method up to an interface or switch to CGLIB proxies by enforcing proxy-target-class mode in your configuration.
  at org.springframework.core.MethodIntrospector.selectInvocableMethod(MethodIntrospector.java:132)
  at org.springframework.aop.support.AopUtils.selectInvocableMethod(AopUtils.java:134)
  at org.springframework.context.event.EventListenerMethodProcessor.processBean(EventListenerMethodProcessor.java:177)
  at org.springframework.context.event.EventListenerMethodProcessor.afterSingletonsInstantiated(EventListenerMethodProcessor.java:133)


那是因为:你把@EventListener写在XXXImpl实现类里面了,形如这样:


@Slf4j
@Service
public class HelloServiceImpl implements HelloService {
  ...
    private ApplicationContext applicationContext;
    @EventListener(classes = ContextRefreshedEvent.class)
    public void applicationContextEvent(ContextRefreshedEvent event) {
        applicationContext = event.getApplicationContext();
    }
    ...
}


根本原因:Spring在解析标注有此注解的方法的时候是这么解析的:


public class EventListenerMethodProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {
  ...
  private void processBean(final String beanName, final Class<?> targetType) {
    ...
      Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
    ...
  }
  ...
}


这里的context.getType(beanName)就是问题的关键。因为Spring默认给我们使用的是JDK Proxy代理(此处只考虑被代理的情况,我相信没有人的应用不使用代理的吧),所以此处getType拿到的默认就是个Proxy,显然它是它是找不到我们对应方法的(因为方法在impl的实现类里,接口里可以木有)

其实Spring的异常信息里已经说得很清楚了错误原因,再一次感叹Spring的错误消息的完善性,真的非常非常赞,特别有助于我们定位问题和解决问题


另外有一个小细节:标注有@EventListener注解(包括@TransactionalEventListener)的方法的访问权限最低是protected的

另外可以在监听方法上标注@Order来控制执行顺序哦,一般人我不告诉他~

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9天前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
32 0
|
26天前
|
XML Java 数据格式
编织Spring魔法:解读核心容器中的Beans机制【beans 一】
编织Spring魔法:解读核心容器中的Beans机制【beans 一】
31 0
|
26天前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
23 1
|
2月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
20 1
|
26天前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
30 0
|
26天前
|
消息中间件 NoSQL Java
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
44 1
|
2月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
24 0
|
2月前
|
消息中间件 网络架构
【面试问题】什么是 MQ topic 交换器(模式匹配) ?
【1月更文挑战第27天】【面试问题】什么是 MQ topic 交换器(模式匹配) ?
|
2月前
|
物联网 Go 网络性能优化
MQTT协议本身支持多种消息收发模式
MQTT协议本身支持多种消息收发模式【1月更文挑战第24天】【1月更文挑战第120篇】
23 3
|
2月前
|
消息中间件 Java Spring
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
48 0

相关产品

  • 云消息队列 MQ