在日常的开发工作中,我们经常会需要使用到线程池这类型的组件。例如下边几种应用场景:
线程池经典应用场景
异步发送邮件通知发送一个任务,然后注入到线程池中异步发送。
心跳请求任务创建一个任务,然后定时发送请求到线程池中。
类似的场景有很多,我们下边一步一步地来介绍不同的应用场景下,线程池的具体使用案例:
异步发送邮件场景
定义一个简单的邮件发送接口:
public interface SendEmailService { /** * 发送邮件 * * @param emailDTO 邮件对象 */ void sendEmail(EmailDTO emailDTO); }
接着是邮件发送的简单实现类:
@Service public class SendEmailServiceImpl implements SendEmailService { @Resource private ExecutorService emailTaskPool; @Override public void sendEmail(EmailDTO emailDTO) { emailTaskPool.submit(() -> { try { System.out.printf("sending email .... emailDto is %s \n", emailDTO); Thread.sleep(1000); System.out.println("sended success"); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
邮件的发送逻辑通过一个简单的线程睡眠来模拟发送过程中的耗时操作。
然后是线程池方面的配置:
@Configuration public class ThreadPoolConfig { @Bean public ExecutorService emailTaskPool() { return new ThreadPoolExecutor(2, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task")); } }
controller模块的触发
@RestController @RequestMapping(value = "/test") public class TestController { @Resource private SendEmailService sendEmailService; @GetMapping(value = "/send-email") public boolean sendEmail() { EmailDTO emailDTO = new EmailDTO(); emailDTO.setContent("测试文案"); emailDTO.setReceiver("idea"); emailDTO.setTitle("邮件标题"); sendEmailService.sendEmail(emailDTO); return true; } }
这是一个非常简单的案例,通过一个http请求,然后触发一个邮件的发送操作。
心跳请求场景
这类应用场景一般会在一些基础组件中使用到,例如一些具有心跳探活机制类型功能的中间件,如nacos。下边来看看对应的代码实践:首先是心跳模块代码:
public class HeartBeatInfo { private String info; private long nextSendTimeDelay; public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } public long getNextSendTimeDelay() { return nextSendTimeDelay; } public void setNextSendTimeDelay(long nextSendTimeDelay) { this.nextSendTimeDelay = nextSendTimeDelay; } @Override public String toString() { return "HeartBeatInfo{" + "info='" + info + '\'' + ", nextSendTimeDelay=" + nextSendTimeDelay + '}'; } }
然后是模拟一个心跳包的发送服务接口定义:
public interface HeartBeatTaskService { void sendBeatInfo(); }
接下来是心跳任务的发送核心部分实现:
@Service public class HeartBeatTaskServiceImpl implements HeartBeatTaskService { @Resource private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; @Override public void sendBeatInfo() { HeartBeatInfo heartBeatInfo = new HeartBeatInfo(); heartBeatInfo.setInfo("test-info"); heartBeatInfo.setNextSendTimeDelay(1000); scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo), heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS); } class HeartBeatTask implements Runnable { private HeartBeatInfo heartBeatInfo; public HeartBeatTask(HeartBeatInfo heartBeatInfo) { this.heartBeatInfo = heartBeatInfo; } @Override public void run() { System.out.println("发送心跳数据包:" + heartBeatInfo.getInfo()); HeartBeatInfo heartBeatInfo = new HeartBeatInfo(); heartBeatInfo.setInfo("test-info"); heartBeatInfo.setNextSendTimeDelay(1000); scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo), heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS); } } }
在核心实现的内部有一个延时线程池ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor会在放入线程任务的一段指定的时间之后才触发任务的执行:
@Configuration public class ThreadPoolConfig { @Bean public ScheduledThreadPoolExecutor scheduledThreadPoolExecutor(){ return new ScheduledThreadPoolExecutor(2, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("org.idea.threadpool.beat.sender"); return thread; } }); } }
JDK内部线程池的设计
看了上边两个简单的案例之后,不知道你是否会有好奇:
到底线程池的内部运行机制会是怎样的呢?
简单手写一个单消费者任务处理模型
这里我们可以通过一段简单的代码来学习这部分的内容:首先,我们将需要处理的任务封装在一个对象内部,暂时定义如下所示:
public class AsyncHandlerData { private String dataInfo; public String getDataInfo() { return dataInfo; } public void setDataInfo(String dataInfo) { this.dataInfo = dataInfo; } @Override public String toString() { return "AsyncHandlerData{" + "dataInfo='" + dataInfo + '\'' + '}'; } }
然后会有一个专门消费这些个任务的service:
public interface AsyncHandlerService { /** * 任务放入队列中 * * @param asyncHandlerData */ void putTask(AsyncHandlerData asyncHandlerData); }
最后根据提前定义好的接口编写一个实现类,此时将相关的任务处理逻辑规整到了一个对象当中:
@Service public class AsyncHandlerServiceImpl implements AsyncHandlerService, CommandLineRunner { private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(); @Override public void putTask(AsyncHandlerData asyncHandlerData) { taskQueueHandler.addTask(asyncHandlerData); } @Override public void run(String... args) throws Exception { Thread thread = new Thread(taskQueueHandler); thread.setDaemon(true); thread.start(); } public class TaskQueueHandler implements Runnable { private BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(1024 * 1024); public void addTask(AsyncHandlerData asyncHandlerData) { tasks.offer(asyncHandlerData); } @Override public void run() { for (; ; ) { try { AsyncHandlerData asyncHandlerData = tasks.take(); System.out.println("异步处理任务数据:" + asyncHandlerData.getDataInfo()); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
整个代码的思路逻辑比较简单,大致可以归整成下图所示:
整体的设计模式就是一端放入,由单个消费者取出。但是存在一个不足点,一旦消费者能力较弱,或者出现任务堵塞的话,就会导致任务队列出现堆积,然后越堆积越难处理地过来。