作者:石臻臻, CSDN博客之星Top5、Kafka Contributor 、nacos Contributor、华为云 MVP ,腾讯云TVP, 滴滴Kafka技术专家 、KnowStreaming。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!。
1任务管理类
因为Nacos中有很多地方使用了这个TaskManager,所以我们得先了解一下这个类是干啥用的,方便后面阅读源码时候不会吃力;
先说结论:TaskManager 可以看成是一个待执行的任务集合,用于处理一定要执行成功的任务 单线程的方式处理任务,保证任务一定被成功处理; 如果执行失败了,任务会被重新放入集合中等待下一次被消费;
AbstractTask
AbstractTask是个抽象类,所有的需要被执行的任务都继续这个类; 这个类主要提供执行任务所需要的数据和方法;例如
/* 一个任务两次处理的间隔,单位是毫秒*/ private long taskInterval; /*任务上次被处理的时间,用毫秒表示*/ private long lastProcessTime; /* TaskManager 判断当前是否需要处理这个Task,子类可以Override这个函数实现自己的逻辑 */ public boolean shouldProcess() { return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval); }
TaskProcessor任务处理器
TaskProcessor 是任务处理器接口,它有个方法
boolean process(String taskType, AbstractTask task);
用于执行对应的AbstractTask任务类; 不同的任务类型,可以实现自己的执行任务逻辑;
TaskManager任务管理类
TaskManager 是个任务管理类; 它里面有两个属性保存了待消费的任务AbstractTask,和任务执行需要的TaskProcessor;
/**待消费的任务AbstractTask**/ private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>(); /**任务AbstractTask对应的任务执行器TaskProcessor**/ private final ConcurrentHashMap<String, TaskProcessor> taskProcessors =new ConcurrentHashMap<String, TaskProcessor>();
如果taskProcessors中没有找到对应的任务执行器,那么它里面有一个默认执行器会执行
/**默认执行器**/ private TaskProcessor defaultTaskProcessor;
2使用用例
Nacos配置中心模块很重要一个功能就是,在初始化的时候以及每隔一段时间就会去数据库中把所有数据Dump到磁盘中;Dump就是一个任务类AbstractTask; 我们上面说过AbstractTask就是一个信息承载对象,主要给TaskProcessor提供执行所需要的数据;我们看看DumpTask;
DumpTask
DumpTask定义了自己的一些属性; 再看看其他的例如DumpAllTask、DumpAllBetaTask
DumpProcessor
DumpProcessor 是DumpTask任务的执行器;执行器中的方法
public boolean process(String taskType, AbstractTask task)
代码太长就不在这里分析了,它里面主要做的操作就是 保存配置文件到本地磁盘中,并缓存md5详细可以看文章 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中
对应DumpAllTask、DumpAllBetaTask 任务的任务执行器有DumpAllProcessor、DumpAllBetaProcessor
3DumpAllTask任务触发执行的地方
上面是DumpAllTask的定义和DumpAllTaskProcessor执行器的定义;定义好了之后是怎么被触发的呢?
DumpService初始化Dump配置信息
这个类就是专门Dump配置信息的服务类;上面提及的DumpAll就是在这里被调用的;我们来看下他主要方法;
@PostConstruct public void init() { DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); /**在new这个TaskManager类的时候,专门执行任务的一个线程就已经开始启动了,这不过这个时候还没有任务Task添加进去**/ dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager"); dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor); Runnable dumpAll = new Runnable() { @Override public void run() { dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask()); } }; /**每10分钟执行一次DumpAll操作**/ TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); }
DumpService在初始化的时候回调用这个init方法; 1.先new了一个DumpAllProcessor执行器; 2.再new 了一个TaskManager任务管理器;在new这个任务管理器的时候,就会启动一个线程专门去执行所有待执行的任务;只不过这个时候还没有添加任务; 3.将这个任务管理器的默认执行器设置为DumpAllProcessor; 4.每十分钟执行一次往TaskManager中添加一个DumpAllTask的任务;一经添加就会被TaskManager中的线程 processingThread
执行process方法;