项目源码
真诚的希望能给我项目一个stars!!!
线程池概念
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如Tomcat。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
系统无法合理管理内部的资源分布,会降低系统的稳定性。
为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
使用线程池的好处如下:
降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
提高响应速度:任务到达时,无需等待线程创建即可立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
过多的概念以及线程池中的核心参数这些问题我就不讲解了,可以直接看下面这篇文章
ThreadPoolExecutor介绍
先看一下ThreadPoolExecutor的类图。
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
ThreadPoolExecutor的执行机制如下图。
(来源网络)
上图表明了当一个任务过来的时候,三种处理情况,直接拒绝,直接执行,或者是放入缓冲区。
线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。**线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。**阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
上文中也提到了几个BlockingQueue了,这里不做赘述。
这里也不在讲解线程池的任务申请流程了。
Nacos
我们知道,使用Nacos作为注册中心和配置中心的好处在于,如果我们的配置修改了,我们的项目是可以实时监控到的并且进行配置的同步更新。
我们可以按照如上的方式来配置线程池的参数,这是完全没问题的。
我们也知道,我们把这里的参数修改之后,我们的代码里面也会同步的修改这些值为新值。
但是有一个问题,我们知道,我们创建线程池的时候都是new一个线程池出来,那么这里有一个问题就在于,我们使用的是spring项目,我们不会去说当nacos的配置文件修改之后,然后去通知项目然后new一个线程池出来,而是希望在原有的线程池的参数上进行修改。
恰巧,ThreadPoolExecutor也提供了这些方法给我们来让我们运行时修改线程池的参数。
那么现在问题就变成了,我们如何做到nacos的配置更新之后,我们如何对已经创建的线程池进行参数的修改呢?
肯定能最快想到的就是事件通知机制了,我们猜测nacos能实现本地代码配置的实时更新,大概率也是使用了通知功能,所以我们翻看nacos的源码可以发现nacos有一个方法可以添加一个监听器来监听配置文件的更新。
实现对Nacos配置文件更新的事件监听机制
那么接下来的代码来实现Nacos的事件监听机制
spring: application: name: dynamic-thread-pool profiles: active: dev cloud: nacos: discovery: namespace: test group: DYNAMIC_THREADPOOL server-addr: xxxx # 填写nacos地址 config: server-addr: xxxx # 填写nacos地址 group: DYNAMIC_THREADPOOL namespace: test file-extension: properties shared-configs: - data-id: dynamic-thread-pool-dev.properties refresh: true server: port: 8080
然后我们编写一个配置类,当然,这个配置类其实可有可无
package zhang.blossom.dynamic.threadpool.config; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import java.util.Properties; /** * nacos配置 * * @author 张锦标 */ @Slf4j @Configuration public class NacosConfig { @Value("${spring.cloud.nacos.config.server-addr}") private String serverAddr; @Value("${spring.cloud.nacos.config.namespace}") private String namespace; @Bean @Primary public ConfigService configService() { Properties properties = new Properties(); properties.put("serverAddr", serverAddr); properties.put("namespace", namespace); try { return NacosFactory.createConfigService(properties); } catch (NacosException e) { log.error(e.toString(), e); } return null; } }
之后我们创建一个监听器,有多种方法,我们直接@Bean创建一个我们配置好的ConfigService也可以,当然,我这里选择的是实现ApplicationRunner接口,这样子可以做到项目启动成功后会自动执行run方法。
代码如下
package zhang.blossom.dynamic.threadpool.listener; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import zhang.blossom.dynamic.threadpool.core.ResizableCapacityLinkedBlockIngQueue; import javax.annotation.Resource; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 项目启动后添加监听,实现ApplicationRunner,在项目成功后会自动执行run方法 * * @author wangfenglei */ @Slf4j @Component public class NacosListener implements ApplicationRunner { @Resource private ConfigService configService; @Value("${spring.cloud.nacos.config.group}") private String groupId; public static final String DATA_ID = "dynamic-thread-pool-dev.properties"; @Autowired @Qualifier("commonThreadPool") private ThreadPoolExecutor threadPoolExecutor; @Override public void run(ApplicationArguments args) throws Exception { //添加nacos配置文件监听 listenerNacosConfig(); } /** * 监听数据源变化 * * @throws Exception 异常 */ private void listenerNacosConfig() throws Exception { configService.addListener(DATA_ID, groupId, new Listener() { @Override public void receiveConfigInfo(String configInfo) { //configInfo是一个字符串,它的内容就是你配置文件里所有的内容 //这里推荐配置文件使用properties方式,不然后续不好处理 } @Override public Executor getExecutor() { return null; } }); } /** * 向nacos发布内容 * 会直接覆写原本的配置文件,请谨慎使用 * @param content 内容 * @throws Exception 异常 */ private void publishConfig(String content) throws Exception { //发布内容 configService.publishConfig(DATA_ID, groupId, content); } }
ok,然后我现在创建一个配置类,来读取nacos上面对应的参数。
package zhang.blossom.dynamic.threadpool.config; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * @author: 张锦标 * @date: 2023/6/15 11:25 * ThreadPoolProperty类 */ //@Configuration @RefreshScope @Component @ConfigurationProperties("dynamic.threadpool") public class ThreadPoolProperty { //@Value("${dynamic.threadpool.corePoolSize}") private Integer corePoolSize; //@Value("${dynamic.threadpool.maximumPoolSize}") private Integer maximumPoolSize; //@Value("${dynamic.threadpool.queueCapacity}") private Integer queueCapacity; public Integer getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(Integer corePoolSize) { this.corePoolSize = corePoolSize; } public Integer getMaximumPoolSize() { return maximumPoolSize; } public void setMaximumPoolSize(Integer maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } public Integer getQueueCapacity() { return queueCapacity; } public void setQueueCapacity(Integer queueCapacity) { this.queueCapacity = queueCapacity; } }
之后, 启动项目就可以得到我们在nacos上配置的参数的值了。
下面是一个测试:
package zhang.blossom.dynamic.threadpool.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import zhang.blossom.dynamic.threadpool.config.ThreadPoolProperty; /** * @author: 张锦标 * @date: 2023/6/15 11:59 * TestController类 */ @RestController public class TestController { @Autowired private ThreadPoolProperty threadPoolProperty; @GetMapping("/get") public String getValue(){ return threadPoolProperty.getMaximumPoolSize() +" "+ threadPoolProperty.getCorePoolSize()+" "+ threadPoolProperty.getQueueCapacity(); } }
定时通知功能
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.1</version> </dependency>
使用xxl-job来实现定时通知功能
package zhang.blossom.dynamic.threadpool.handler; import com.xxl.job.core.handler.annotation.XxlJob; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import zhang.blossom.dynamic.threadpool.service.SendMailService; import java.util.concurrent.ThreadPoolExecutor; /** * @author 张锦标 * @version 1.0 */ @Component public class SimpleXxxJob { @Autowired private SendMailService sendMailService; @Autowired @Qualifier("commonThreadPool") private ThreadPoolExecutor threadPoolExecutor; //使用XXLJOB注解定义一个job @XxlJob(value = "sendMailHandler", init = "initHandler", destroy = "destroyHandler") public void sendMailHandler() { sendMailService.sendMailToWarn(threadPoolExecutor); System.out.println("发送短信成功。。。。。。。。。。。"); } //任务初始化方法 public void initHandler() { System.out.println("任务调用初始化方法执行"); } public void destroyHandler() { System.out.println("任务执行器被销毁"); } }
邮件发送通知功能
package zhang.blossom.dynamic.threadpool.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.mail.SimpleMailMessage; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.stereotype.Service; import java.util.concurrent.ThreadPoolExecutor; /** * @author: 张锦标 * @date: 2023/6/15 15:24 * SendMailToWarn类 */ @Service public class SendMailService{ @Autowired private JavaMailSender javaMailSender; @Value("${warn.recipient}") public String recipient; @Value("${warn.addresser}") public String addresser; public boolean sendMailToWarn(ThreadPoolExecutor threadPoolExecutor){ SimpleMailMessage simpleMailMessage = new SimpleMailMessage(); simpleMailMessage.setTo(recipient); simpleMailMessage.setFrom(addresser); simpleMailMessage.setSubject("线程池情况汇报"); String s = "CorePoolSize="+threadPoolExecutor.getCorePoolSize()+" "+ "LargestPoolSize="+threadPoolExecutor.getLargestPoolSize()+" "+ "MaximumPoolSize="+threadPoolExecutor.getMaximumPoolSize(); simpleMailMessage.setText(s); javaMailSender.send(simpleMailMessage); return true; } }
开始测试
这里特别需要先强调一个点,JDK提供的线程池虽然有获得工作队列的方法,但是目前目前实现的消息队列都是不能动态修改容量的,所以你需要自己实现一个工作队列。
具体可以看我项目中的代码实现。
(动动你发财的小手给我的项目点一个stars!!!)
这是项目一开始启动的时候的线程池的参数情况。
ok,然后我们修改nacos的配置文件,使其发生监听事件
然后,我们的xxl-job也会定时的给我们进行消息提示
到此为止,整个项目完成。