【Nacos源码之配置管理 二】Nacos中的事件发布与订阅--观察者模式

简介: EventDispatcher在Nacos中是一个事件发布与订阅的类,也就是我们经常使用的Java设计模式——观察者模式一般发布与订阅主要有三个角色• 事件: 表示某些类型的事件动作,例如Nacos中的 本地数据发生变更事件 LocalDataChangeEvent• 事件源 : 事件源可以看成是一个动作,某个事件发生的动作,例如Nacos中本地数据发生了变更,就会通知给所有监听该事件的监听器• 事件监听器: 事件监听器监听到事件源之后,会执行自己的一些业务处理,监听器必须要有回调方法供事件源回调一个监听器可以监听多个事件,一个事件也可以被多个监听器监听那我们看看这个类中的角

1EventDispatcher

EventDispatcher在Nacos中是一个事件发布与订阅的类,也就是我们经常使用的Java设计模式——观察者模式

一般发布与订阅主要有三个角色

  • 事件: 表示某些类型的事件动作,例如Nacos中的 本地数据发生变更事件 LocalDataChangeEvent
  • 事件源 :   事件源可以看成是一个动作,某个事件发生的动作,例如Nacos中本地数据发生了变更,就会通知给所有监听该事件的监听器
  • 事件监听器:  事件监听器监听到事件源之后,会执行自己的一些业务处理,监听器必须要有回调方法供事件源回调

一个监听器可以监听多个事件,一个事件也可以被多个监听器监听

那我们看看这个类中的角色

2事件

Event

/**事件定义接口,所有事件继承这个空接口**/
    public interface Event {
    }

LocalDataChangeEvent

/**
 * 本地数据发生变更的事件。
 * @author Nacos
 */
public class LocalDataChangeEvent implements Event {
}

事件监听器

AbstractEventListener

/**抽象事件监听器; 每个监听器需要实现onEvent()处理事件,和interest()将要监听的事件列表**/
    static public abstract class AbstractEventListener {
        public AbstractEventListener() {
            /*自动注册到*/
            EventDispatcher.addEventListener(this);
        }
        /**感兴趣的事件列表**/
        abstract public List<Class<? extends Event>> interest();
  /**处理事件**/
        abstract public void onEvent(Event event);
    }

LongPollingService

/**
 * 长轮询服务。负责处理
 *
 * @author Nacos
 */
@Service
public class LongPollingService extends AbstractEventListener {
 @Override
    public List<Class<? extends Event>> interest() {
        List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>();
        eventTypes.add(LocalDataChangeEvent.class);
        return eventTypes;
    }
    @Override
    public void onEvent(Event event) {
        if (isFixedPolling()) {
            // ignore
        } else {
            if (event instanceof LocalDataChangeEvent) {
                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
            }
        }
    }
}

事件分发类

EventDispatcher

public class EventDispatcher {
    /**事件与事件监听器的数据中心; 一个事件可以对应着多个监听器**/
    static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();
 /**
     * 新增监听器
     */
    static public void addEventListener(AbstractEventListener listener) {
        for (Class<? extends Event> type : listener.interest()) {
            getEntry(type).listeners.addIfAbsent(listener);
        }
    }
    /**
     * 事件源调用的接口动作,告知某个事件发生了
     */
    static public void fireEvent(Event event) {
        if (null == event) {
            throw new IllegalArgumentException();
        }
        for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        }
    }

事件源

例如当删除配置文件的时候,就需要触发本地数据变更事件,则需要调用

EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));

调用了fireEvent之后所有监听这个Event的监听器都将执行listener.onEvent(event);


事件发布与订阅的使用方法有很多,但是基本模式都是一样的---观察者模式; 我们介绍一下其他的用法

3Google Guava 中的EventBus

EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。

EventBucket

我们自定义一个类EventBucket,来初始化及注册一些监听器(订阅者)

@Component
public class EventBucket {
    private static Logger logger = LoggerFactory.getLogger(EventBucket.class);
    /**事件总线**/
    private static EventBus asyncEventBus = new AsyncEventBus("asyncEventBus", Executors.newFixedThreadPool(5));
    private static AtomicBoolean isInit = new AtomicBoolean(false);
    private final List<AsyncListener> asyncListenerList;
    /**将所有类型为AsyncListener的监听器注入**/
    @Autowired
    public EventBucket(List<AsyncListener> asyncListenerList) {
        this.asyncListenerList = asyncListenerList;
    }
    @PostConstruct
    public synchronized void init() {
        //要将所有的事件监听者都在 EventBus中去注册
        if (isInit.compareAndSet(false, true)) {
            asyncListenerList.forEach(a -> asyncEventBus.register(a));
        }
    }
    /**发送事件**/
    public static void post(BaseEvent event) {
        try {
            asyncEventBus.post(event);
        } catch (Throwable e) {
            logger.error("EventBucket发送事件出错: " + e.getMessage(), e);
        }
    }
}

BaseEvent

定义BaseEvent 这个类有个post方法,用来发送事件的;所有的**事件必须继承此类

public class BaseEvent {
    public void post(){
        EventBucket.post(this);
    }
}

AsyncListener

定义一个监听器空接口,所有继承此接口的监听器类都将被注册到EventBus中;

public interface AsyncListener {
}

上面定义好了基本的类,那我们下面测试怎么使用发布以及订阅 首先订阅一个事件 TestEvent

public class TestEvent extends BaseEvent {
    private Integer id;
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
}

然后定义一个监听器 TestAsyncListener

@Component
public class TestAsyncListener implements AsyncListener {
    @Subscribe
    public void testEvent(TestEvent testEvent){
        System.out.print("我是TestAsyncListener接收到了TestEvent通知"+testEvent.toString());
    }
    @Subscribe
    public void test2Event(Test2Event test2Event){
        System.out.print("我是TestAsyncListener接收到了test2Event通知"+test2Event.toString());
    }
}

用注解@Subscribe 就可以直接订阅事件了; 那么接下来开始发送一个事件;我们再SpringBoot启动完之后调用一下发送事件通知

@SpringBootApplication
public class ClientsApplication {
    public static void main(String[] args) {
        SpringApplication.run(ClientsApplication.class, args);
        TestEvent event = new TestEvent();
        event.setId(1);
        //发布通知
        event.post();
    }
}

启动完成之后,立马打印了

4Spring事件驱动机制

这篇博客写的比较详细可以前往阅读Spring事件驱动机制

5Nacos中使用的监听扩展接口

  • [ ] SpringApplicationRunListener
  • [ ] ApplicationListener

SpringApplicationRunListener

SpringApplicationRunListener 接口的作用主要就是在Spring Boot 启动初始化的过程中可以通过SpringApplicationRunListener接口回调来让用户在启动的各个流程中可以加入自己的逻辑。 它也是 观察者模式,Spring为我们提供了这个监听器的扩展接口;它监听的就是SpringBoot启动初始化中下面的各个事件

SpringBoot启动过程的关键事件(按照触发顺序)包括:1. 开始启动2. Environment构建完成3. ApplicationContext构建完成4. ApplicationContext完成加载5. ApplicationContext完成刷新并启动6. 启动完成7. 启动失败

package org.springframework.boot;
public interface SpringApplicationRunListener {
    // 在run()方法开始执行时,该方法就立即被调用,可用于在初始化最早期时做一些工作
    void starting();
    // 当environment构建完成,ApplicationContext创建之前,该方法被调用
    void environmentPrepared(ConfigurableEnvironment environment);
    // 当ApplicationContext构建完成时,该方法被调用
    void contextPrepared(ConfigurableApplicationContext context);
    // 在ApplicationContext完成加载,但没有被刷新前,该方法被调用
    void contextLoaded(ConfigurableApplicationContext context);
    // 在ApplicationContext刷新并启动后,CommandLineRunners和ApplicationRunner未被调用前,该方法被调用
    void started(ConfigurableApplicationContext context);
    // 在run()方法执行完成前该方法被调用
    void running(ConfigurableApplicationContext context);
    // 当应用运行出错时该方法被调用
    void failed(ConfigurableApplicationContext context, Throwable exception);
}

StartingSpringApplicationRunListener

- 在这个监听类中,主要是做了一些系统属性的设置;如:

nacos.mode=stand alone / cluster
nacos.function.mode=All/config/naming
nacos.local.ip=InetUtils.getSelfIp()
@Override
    public void environmentPrepared(ConfigurableEnvironment environment) {
        if (STANDALONE_MODE) {
            System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "stand alone");
        } else {
            System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "cluster");
        }
        if (FUNCTION_MODE == null) {
           System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, "All");
        } else if(SystemUtils.FUNCTION_MODE_CONFIG.equals(FUNCTION_MODE)){
            System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_CONFIG);
        } else if(SystemUtils.FUNCTION_MODE_NAMING.equals(FUNCTION_MODE)) {
            System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_NAMING);
        }
        System.setProperty(LOCAL_IP_PROPERTY_KEY, LOCAL_IP);
    }
  • 还有顺便再启动结束之前,每秒中打印一次日志 Nacos is starting...

ApplicationListener

ApplicationListener 就是spring的监听器,能够用来监听事件,典型的观察者模式

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
 /**
  * Handle an application event.
  * @param event the event to respond to
  */
 void onApplicationEvent(E event);
}

ApplicationEvent事件的抽象类; 具体的事件必须继承这个类;

Nacos中

StandaloneProfileApplicationListener

public class StandaloneProfileApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>,
    PriorityOrdered {
    private static final Logger logger = LoggerFactory.getLogger(StandaloneProfileApplicationListener.class);
    @Override
    public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
        ConfigurableEnvironment environment = event.getEnvironment();
        if (environment.getProperty(STANDALONE_MODE_PROPERTY_NAME, boolean.class, false)) {
            environment.addActiveProfile(STANDALONE_SPRING_PROFILE);
        }
        if (logger.isInfoEnabled()) {
            logger.info("Spring Environment's active profiles : {} in standalone mode : {}",
                Arrays.asList(environment.getActiveProfiles()),
                STANDALONE_MODE
            );
        }
    }
    @Override
    public int getOrder() {
        return HIGHEST_PRECEDENCE;
    }
}

这里的监听器的泛型传的ApplicationEnvironmentPreparedEvent这个事件是SpringBoot内置的事件;

SpringApplication启动并且Environment首次可用于检查和修改时发布的事件,也就是说通过ApplicationEnvironmentPreparedEvent可以拿到Environment的属性;

在这里这个监听器的作用就是拿到 ConfigurableEnvironment,然后如果是单机模式standalone就设置一下ActiveProfile

6EnvironmentPostProcessor加载外部配置文件

SpringBoot支持动态的读取文件,留下的扩展接口org.springframework.boot.env.EnvironmentPostProcessor。这个接口是spring包下的,使用这个进行配置文件的集中管理,而不需要每个项目都去配置配置文件。

NacosDefaultPropertySourceEnvironmentPostProcessor加载Nacos配置文件

加载这个类是加载core模块下面的META-INF/nacos-default.properties 配置文件的;

public class NacosDefaultPropertySourceEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered {
@Override
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        ResourceLoader resourceLoader = getResourceLoader(application);
        processPropertySource(environment, resourceLoader);
    }
}

7Spring Factories SPI扩展机制

Spring Boot中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照Java中的SPI扩展机制来实现的

简单的总结下java SPI机制的思想。我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制;

在Dubbo中也定义了SPI机制;

在Spring中也有一种类似与Java SPI的加载机制。它在META-INF/spring.factories文件中配置接口的实现类名称,然后在程序中读取这些配置文件并实例化。

这种自定义的SPI机制是Spring Boot Starter实现的基础。

我们看上面说的Nacos中的几个类,并没有打上@Component等等Spring中的注解,没有这些注解那么他们是怎么被加载到Spring容器中被管理的呢?

我们打开文件 core/resources/META-INF/spring.factories

# ApplicationListener
org.springframework.context.ApplicationListener=\
com.alibaba.nacos.core.listener.StandaloneProfileApplicationListener
# EnvironmentPostProcessor
org.springframework.boot.env.EnvironmentPostProcessor=\
com.alibaba.nacos.core.env.NacosDefaultPropertySourceEnvironmentPostProcessor
# SpringApplicationRunListener
org.springframework.boot.SpringApplicationRunListener=\
com.alibaba.nacos.core.listener.LoggingSpringApplicationRunListener,\
com.alibaba.nacos.core.listener.StartingSpringApplicationRunListener

上面提及到的几个类的全类名都在这个文件中;

  • Spring Factories实现原理是什么 spring-core包里定义了SpringFactoriesLoader类,这个类实现了检索META-INF/spring.factories文件,并获取指定接口的配置的功能

具体的实现原理,后面可以再写一篇文章;

相关文章
|
12月前
|
网络协议 Java Nacos
Nacos—配置管理
Nacos—配置管理
504 0
|
2月前
|
Cloud Native Java Nacos
微服务时代的新宠儿!Spring Cloud Nacos实战指南,带你玩转服务发现与配置管理,拥抱云原生潮流!
【8月更文挑战第29天】Spring Cloud Nacos作为微服务架构中的新兴之星,凭借其轻量、高效的特点,迅速成为服务发现、配置管理和治理的首选方案。Nacos(命名和配置服务)由阿里巴巴开源,为云原生应用提供了动态服务发现及配置管理等功能,简化了服务间的调用与依赖管理。本文将指导你通过五个步骤在Spring Boot项目中集成Nacos,实现服务注册、发现及配置动态管理,从而轻松搭建出高效的微服务环境。
160 0
|
3月前
|
Java 数据库连接 Nacos
nacos配置管理拉取不到配置异常
在搭建Nacos配置时遇到异常,因配置了`file-extension: yaml`,服务尝试拉取`shared-jdbc.yaml`, `shared-log.yaml`, `shared-swagger.yaml`,但Nacos中这些共享配置的Data ID无后缀。修正方法是确保Data ID与预期文件名一致,包括.yaml扩展名。在验证中,修改了部分Data ID并导致服务因找不到未加后缀的`jdbc`配置而报错,提示在配置Data ID时应包含文件扩展名。
96 1
|
2月前
|
关系型数据库 MySQL Java
“惊呆了!无需改动Nacos源码,轻松实现SGJDBC连接MySQL?这操作太秀了,速来围观,错过等哭!”
【8月更文挑战第7天】在使用Nacos进行服务治理时,常需连接MySQL存储数据。使用特定的SGJDBC驱动连接MySQL时,一般无需修改Nacos源码。需确保SGJDBC已添加至类路径,并在Nacos配置文件中指定使用SGJDBC的JDBC URL。示例中展示如何配置Nacos使用MySQL及SGJDBC,并在应用中通过Nacos API获取配置信息建立数据库连接,实现灵活集成不同JDBC驱动的目标。
61 0
|
4月前
|
Java Nacos 数据格式
Spring Cloud Nacos 详解:服务注册与发现及配置管理平台
Spring Cloud Nacos 详解:服务注册与发现及配置管理平台
155 3
|
4月前
|
开发框架 .NET Nacos
使用 Nacos 在 C# (.NET Core) 应用程序中实现高效配置管理和服务发现
使用 Nacos 在 C# (.NET Core) 应用程序中实现高效配置管理和服务发现
315 0
|
4月前
|
SpringCloudAlibaba 安全 Java
SpringCloudalibaba之Nacos的配置管理
如图所示,nacos-config-example被192.168.56.1获取过。
128 0
|
5月前
|
负载均衡 Nacos 数据库
【Nacos】配置管理、微服务配置拉取、实现配置热更新、多环境配置
【Nacos】配置管理、微服务配置拉取、实现配置热更新、多环境配置
116 1
|
11月前
|
Java Nacos 数据库
nacos源码打包及相关配置
nacos源码打包及相关配置
282 4
|
5月前
|
负载均衡 Java Nacos
Nacos作为一个服务发现与配置管理工具,它本身不直接依赖于`ribbon-loadbalancer`包
Nacos作为一个服务发现与配置管理工具,它本身不直接依赖于`ribbon-loadbalancer`包【1月更文挑战第18天】【1月更文挑战第89篇】
71 4

热门文章

最新文章