从实战到原理,线程池的各类使用场景整合(上)

简介: 从实战到原理,线程池的各类使用场景整合(上)

在日常的开发工作中,我们经常会需要使用到线程池这类型的组件。例如下边几种应用场景:


线程池经典应用场景


异步发送邮件通知发送一个任务,然后注入到线程池中异步发送。


心跳请求任务创建一个任务,然后定时发送请求到线程池中。


类似的场景有很多,我们下边一步一步地来介绍不同的应用场景下,线程池的具体使用案例:


异步发送邮件场景


定义一个简单的邮件发送接口:


public interface SendEmailService {
    /**
     * 发送邮件
     *
     * @param emailDTO 邮件对象
     */
    void sendEmail(EmailDTO emailDTO);
}


接着是邮件发送的简单实现类:


@Service
public class SendEmailServiceImpl implements SendEmailService {
    @Resource
    private ExecutorService emailTaskPool;
    @Override
    public void sendEmail(EmailDTO emailDTO) {
        emailTaskPool.submit(() -> {
            try {
                System.out.printf("sending email .... emailDto is %s \n", emailDTO);
                Thread.sleep(1000);
                System.out.println("sended success");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}


邮件的发送逻辑通过一个简单的线程睡眠来模拟发送过程中的耗时操作。



然后是线程池方面的配置:


@Configuration
public class ThreadPoolConfig {
    @Bean
    public ExecutorService emailTaskPool() {
        return new ThreadPoolExecutor(2, 4,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
    }
}


controller模块的触发


@RestController
@RequestMapping(value = "/test")
public class TestController {
    @Resource
    private SendEmailService sendEmailService;
    @GetMapping(value = "/send-email")
    public boolean sendEmail() {
        EmailDTO emailDTO = new EmailDTO();
        emailDTO.setContent("测试文案");
        emailDTO.setReceiver("idea");
        emailDTO.setTitle("邮件标题");
        sendEmailService.sendEmail(emailDTO);
        return true;
    }
}


这是一个非常简单的案例,通过一个http请求,然后触发一个邮件的发送操作。


心跳请求场景


这类应用场景一般会在一些基础组件中使用到,例如一些具有心跳探活机制类型功能的中间件,如nacos。下边来看看对应的代码实践:首先是心跳模块代码:


public class HeartBeatInfo {
    private String info;
    private long nextSendTimeDelay;
    public String getInfo() {
        return info;
    }
    public void setInfo(String info) {
        this.info = info;
    }
    public long getNextSendTimeDelay() {
        return nextSendTimeDelay;
    }
    public void setNextSendTimeDelay(long nextSendTimeDelay) {
        this.nextSendTimeDelay = nextSendTimeDelay;
    }
    @Override
    public String toString() {
        return "HeartBeatInfo{" +
                "info='" + info + '\'' +
                ", nextSendTimeDelay=" + nextSendTimeDelay +
                '}';
    }
}


然后是模拟一个心跳包的发送服务接口定义:


public interface HeartBeatTaskService {
    void sendBeatInfo();
}


接下来是心跳任务的发送核心部分实现:


@Service
public class HeartBeatTaskServiceImpl implements HeartBeatTaskService {
    @Resource
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    @Override
    public void sendBeatInfo() {
        HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
        heartBeatInfo.setInfo("test-info");
        heartBeatInfo.setNextSendTimeDelay(1000);
        scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo),
                heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS);
    }
    class HeartBeatTask implements Runnable {
        private HeartBeatInfo heartBeatInfo;
        public HeartBeatTask(HeartBeatInfo heartBeatInfo) {
            this.heartBeatInfo = heartBeatInfo;
        }
        @Override
        public void run() {
            System.out.println("发送心跳数据包:" + heartBeatInfo.getInfo());
            HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
            heartBeatInfo.setInfo("test-info");
            heartBeatInfo.setNextSendTimeDelay(1000);
            scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo),
                    heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS);
        }
    }
}


在核心实现的内部有一个延时线程池ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor会在放入线程任务的一段指定的时间之后才触发任务的执行:


@Configuration
public class ThreadPoolConfig {
    @Bean
    public ScheduledThreadPoolExecutor  scheduledThreadPoolExecutor(){
        return new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("org.idea.threadpool.beat.sender");
                return thread;
            }
        });
    }
}


JDK内部线程池的设计


看了上边两个简单的案例之后,不知道你是否会有好奇:


到底线程池的内部运行机制会是怎样的呢?


简单手写一个单消费者任务处理模型


这里我们可以通过一段简单的代码来学习这部分的内容:首先,我们将需要处理的任务封装在一个对象内部,暂时定义如下所示:



public class AsyncHandlerData {
    private String dataInfo;
    public String getDataInfo() {
        return dataInfo;
    }
    public void setDataInfo(String dataInfo) {
        this.dataInfo = dataInfo;
    }
    @Override
    public String toString() {
        return "AsyncHandlerData{" +
                "dataInfo='" + dataInfo + '\'' +
                '}';
    }
}


然后会有一个专门消费这些个任务的service:


public interface AsyncHandlerService {
    /**
     * 任务放入队列中
     * 
     * @param asyncHandlerData
     */
    void putTask(AsyncHandlerData asyncHandlerData);
}


最后根据提前定义好的接口编写一个实现类,此时将相关的任务处理逻辑规整到了一个对象当中:


@Service
public class AsyncHandlerServiceImpl implements AsyncHandlerService, CommandLineRunner {
    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler();
    @Override
    public void putTask(AsyncHandlerData asyncHandlerData) {
        taskQueueHandler.addTask(asyncHandlerData);
    }
    @Override
    public void run(String... args) throws Exception {
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }
    public class TaskQueueHandler implements Runnable {
        private BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        public void addTask(AsyncHandlerData asyncHandlerData) {
            tasks.offer(asyncHandlerData);
        }
        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    System.out.println("异步处理任务数据:" + asyncHandlerData.getDataInfo());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


整个代码的思路逻辑比较简单,大致可以归整成下图所示:


image.png


整体的设计模式就是一端放入,由单个消费者取出。但是存在一个不足点,一旦消费者能力较弱,或者出现任务堵塞的话,就会导致任务队列出现堆积,然后越堆积越难处理地过来。



相关文章
|
9月前
|
存储 SQL 安全
Java 无锁方式实现高性能线程实战操作指南
本文深入探讨了现代高并发Java应用中单例模式的实现方式,分析了传统单例(如DCL)的局限性,并提出了多种无锁实现方案。包括基于ThreadLocal的延迟初始化、VarHandle原子操作、Record不可变对象、响应式编程(Reactor)以及CDI依赖注入等实现方式。每种方案均附有代码示例及适用场景,同时通过JMH性能测试对比各实现的优劣。最后,结合实际案例设计了一个高性能配置中心,展示了无锁单例在实际开发中的应用。总结中提出根据场景选择合适的实现方式,并遵循现代单例设计原则以优化性能和安全性。文中还提供了代码获取链接,便于读者实践与学习。
173 0
|
5月前
|
存储 缓存 监控
什么是线程池?它的工作原理?
我是小假 期待与你的下一次相遇 ~
344 1
|
5月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
395 1
|
10月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
595 0
|
5月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
505 0
|
7月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
582 1
|
9月前
|
算法 Java 测试技术
深度优化OSS上传性能:多线程分片上传 vs 断点续传实战对比
本文深入解析对象存储服务(OSS)文件上传性能优化技术,重点探讨多线程分片上传与断点续传两种方案。通过理论分析、代码实现和性能测试,对比其在不同场景下的表现差异,并提供选型建议与最佳实践,助力提升大文件上传效率与稳定性。
911 0
|
9月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
459 0

热门文章

最新文章