一、如果要实现一个动态线程池,如何实现?
1)如果要实现一个动态线程池,首先需要考虑的是将线程池的相关配置信息外置。这样出现问题的时候,能够基于配置修改,实现热部署。修改配置后,就能生效。因此,可以考虑的配置方式有多种:nacos、apollo、zookeeper、consul、etcd等。
2)如果线程池出现问题或者完成修改后,能够基于监控的信息,进行通知和告警。这样就需要考虑通知和告警的方式的多样性:比如基于钉钉、微信、飞书、电子邮件等渠道进行通知和告警。
二、 dynamic-tp动态线程池的实现思路
1.事件发布
根据引入的dynamic-tp-spring-cloud-starter-nacos或者dynamic-tp-spring-boot-starter-nacos依赖
以nacos为例:
<dependency><groupId>cn.dynamictp</groupId><artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId><version>1.0.9</version></dependency>
可以看到自动装配的文件:DtpAutoConfiguration与NacosRefresher.
在DtpAutoConfiguration中,我们可以看到导入的配置信息:
BaseBeanAutoConfiguration.class}) ({
基于这个注解,关注BaseBeanAutoConfiguration这个类:
这个类主要干了下面这几件事请
DtpProperties 线程池相关配置 上下文holder dtpApplicationContextHolder dtpBanner打印 dtpBannerPrinter dtp后置处理器 dtpPostProcessor dtp注册 dtpRegistry dtp监控 dtpMonitor dtpEndpoint dtpEndpoint
其中
1)dtpBanner是做控制台启动项目时的banner打印操作。
2)dtpPostProcessor dtp后置处理器 处理所有相关bean
如果bean是执行器,则注册dtp,此时会注册到DTP_REGISTRY 中, 数据结构:Map
否则会基于ApplicationHolder拿到基于DynamicTp注解的class,如果当前基于DynamicTp的methodMetadata 为空,则返回bean,否则拿到dtpAnnotationVal。poolName也即dtpAnnotationVal。
如果当前的bean属于线程池任务执行器,则注册task执行器。包装执行器,放入通知信息notifyItems。registerCommon 执行注册。
3)dtpRegistry 注册dtp DTP_REGISTRY 数据结构:Map
其中最为重要的方法是刷新方法:
获取dtp执行器,对执行器进行转换为DtpMainProp。执行刷新。
4)dtp监控 dtpMonitor会执行监控发布:里面有有2个方法需要注意:
检查监控 checkAlarm
collect 收集监控指标信息
其中:检查监控的时候,会基于当前的发送告警的信息:基于对应的渠道进行消息发送。
此时会发布两个事件:publishAlarmCheckEvent、publishCollectEvent
NacosRefresher中存在的方法Refresh:
刷新当监听到配置发生改变的时候,doNoticeAsync 执行异步通知,通知业务方,此时发生了配置的变更。
刷新完成后,执行RefreshEvent刷新事件发布。
2.事件监听
发布完成后,可以看到对应的监听是在
com.dtp.starter.adapter.common.autoconfigure.AdapterCommonAutoConfiguration
适配器公共自动装配中
publicvoidonApplicationEvent(ApplicationEventevent) { try { if (eventinstanceofRefreshEvent) { doRefresh(((RefreshEvent) event).getDtpProperties()); } elseif (eventinstanceofCollectEvent) { doCollect(((CollectEvent) event).getDtpProperties()); } elseif (eventinstanceofAlarmCheckEvent) { doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties()); } } catch (Exceptione) { log.error("DynamicTp adapter, event handle failed.", e); } }
可以看到我们关心的三个发布事件,都在此进行了监听:
doRefresh、doCollect、doAlarmCheck
其中
刷新事件会执行相关渠道的通知
收集日志会执行对应的打印
告警信息会执行告警
三、使用
使用方式:以nacos为例,可以看到其基于@EnableDynamicTp实现对dtp相关bean的注册。DtpBeanDefinitionRegistrar即是完成注册的类。其主要是创建dtp配置对象DtpProperties,绑定dtp配置,获取执行器。拿到执行器后,遍历执行,绑定对应的信息,构建构造函数,注册bean信息。方便后续对线程池的操作。
privateThreadPoolExecutordtpExecutor1; "/dtp-nacos-example/test") (publicStringtest() throwsInterruptedException { task(); return"success"; } //获取dtp执行器publicvoidtask() throwsInterruptedException { //获取dtp执行器DtpExecutordtpExecutor2=DtpRegistry.getDtpExecutor("dtpExecutor2"); for (inti=0; i<100; i++) { Thread.sleep(100); dtpExecutor1.execute(() -> { log.info("i am dynamic-tp-test-1 task"); }); dtpExecutor2.execute(NamedRunnable.of(() -> { try { Thread.sleep(1000); } catch (InterruptedExceptione) { e.printStackTrace(); } log.info("i am dynamic-tp-test-2 task"); }, "task-"+i)); } }
由此可以看到实现了两个最为主要的功能:对线程池进行动态变更和对线程池的监控告警。使用了观察者模式、适配器模式。