从实战到原理,线程池的各类使用场景整合(上)

简介: 从实战到原理,线程池的各类使用场景整合(上)

在日常的开发工作中,我们经常会需要使用到线程池这类型的组件。例如下边几种应用场景:


线程池经典应用场景


异步发送邮件通知发送一个任务,然后注入到线程池中异步发送。


心跳请求任务创建一个任务,然后定时发送请求到线程池中。


类似的场景有很多,我们下边一步一步地来介绍不同的应用场景下,线程池的具体使用案例:


异步发送邮件场景


定义一个简单的邮件发送接口:


public interface SendEmailService {
    /**
     * 发送邮件
     *
     * @param emailDTO 邮件对象
     */
    void sendEmail(EmailDTO emailDTO);
}


接着是邮件发送的简单实现类:


@Service
public class SendEmailServiceImpl implements SendEmailService {
    @Resource
    private ExecutorService emailTaskPool;
    @Override
    public void sendEmail(EmailDTO emailDTO) {
        emailTaskPool.submit(() -> {
            try {
                System.out.printf("sending email .... emailDto is %s \n", emailDTO);
                Thread.sleep(1000);
                System.out.println("sended success");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}


邮件的发送逻辑通过一个简单的线程睡眠来模拟发送过程中的耗时操作。



然后是线程池方面的配置:


@Configuration
public class ThreadPoolConfig {
    @Bean
    public ExecutorService emailTaskPool() {
        return new ThreadPoolExecutor(2, 4,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
    }
}


controller模块的触发


@RestController
@RequestMapping(value = "/test")
public class TestController {
    @Resource
    private SendEmailService sendEmailService;
    @GetMapping(value = "/send-email")
    public boolean sendEmail() {
        EmailDTO emailDTO = new EmailDTO();
        emailDTO.setContent("测试文案");
        emailDTO.setReceiver("idea");
        emailDTO.setTitle("邮件标题");
        sendEmailService.sendEmail(emailDTO);
        return true;
    }
}


这是一个非常简单的案例,通过一个http请求,然后触发一个邮件的发送操作。


心跳请求场景


这类应用场景一般会在一些基础组件中使用到,例如一些具有心跳探活机制类型功能的中间件,如nacos。下边来看看对应的代码实践:首先是心跳模块代码:


public class HeartBeatInfo {
    private String info;
    private long nextSendTimeDelay;
    public String getInfo() {
        return info;
    }
    public void setInfo(String info) {
        this.info = info;
    }
    public long getNextSendTimeDelay() {
        return nextSendTimeDelay;
    }
    public void setNextSendTimeDelay(long nextSendTimeDelay) {
        this.nextSendTimeDelay = nextSendTimeDelay;
    }
    @Override
    public String toString() {
        return "HeartBeatInfo{" +
                "info='" + info + '\'' +
                ", nextSendTimeDelay=" + nextSendTimeDelay +
                '}';
    }
}


然后是模拟一个心跳包的发送服务接口定义:


public interface HeartBeatTaskService {
    void sendBeatInfo();
}


接下来是心跳任务的发送核心部分实现:


@Service
public class HeartBeatTaskServiceImpl implements HeartBeatTaskService {
    @Resource
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    @Override
    public void sendBeatInfo() {
        HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
        heartBeatInfo.setInfo("test-info");
        heartBeatInfo.setNextSendTimeDelay(1000);
        scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo),
                heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS);
    }
    class HeartBeatTask implements Runnable {
        private HeartBeatInfo heartBeatInfo;
        public HeartBeatTask(HeartBeatInfo heartBeatInfo) {
            this.heartBeatInfo = heartBeatInfo;
        }
        @Override
        public void run() {
            System.out.println("发送心跳数据包:" + heartBeatInfo.getInfo());
            HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
            heartBeatInfo.setInfo("test-info");
            heartBeatInfo.setNextSendTimeDelay(1000);
            scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo),
                    heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS);
        }
    }
}


在核心实现的内部有一个延时线程池ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor会在放入线程任务的一段指定的时间之后才触发任务的执行:


@Configuration
public class ThreadPoolConfig {
    @Bean
    public ScheduledThreadPoolExecutor  scheduledThreadPoolExecutor(){
        return new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("org.idea.threadpool.beat.sender");
                return thread;
            }
        });
    }
}


JDK内部线程池的设计


看了上边两个简单的案例之后,不知道你是否会有好奇:


到底线程池的内部运行机制会是怎样的呢?


简单手写一个单消费者任务处理模型


这里我们可以通过一段简单的代码来学习这部分的内容:首先,我们将需要处理的任务封装在一个对象内部,暂时定义如下所示:



public class AsyncHandlerData {
    private String dataInfo;
    public String getDataInfo() {
        return dataInfo;
    }
    public void setDataInfo(String dataInfo) {
        this.dataInfo = dataInfo;
    }
    @Override
    public String toString() {
        return "AsyncHandlerData{" +
                "dataInfo='" + dataInfo + '\'' +
                '}';
    }
}


然后会有一个专门消费这些个任务的service:


public interface AsyncHandlerService {
    /**
     * 任务放入队列中
     * 
     * @param asyncHandlerData
     */
    void putTask(AsyncHandlerData asyncHandlerData);
}


最后根据提前定义好的接口编写一个实现类,此时将相关的任务处理逻辑规整到了一个对象当中:


@Service
public class AsyncHandlerServiceImpl implements AsyncHandlerService, CommandLineRunner {
    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler();
    @Override
    public void putTask(AsyncHandlerData asyncHandlerData) {
        taskQueueHandler.addTask(asyncHandlerData);
    }
    @Override
    public void run(String... args) throws Exception {
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }
    public class TaskQueueHandler implements Runnable {
        private BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        public void addTask(AsyncHandlerData asyncHandlerData) {
            tasks.offer(asyncHandlerData);
        }
        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    System.out.println("异步处理任务数据:" + asyncHandlerData.getDataInfo());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


整个代码的思路逻辑比较简单,大致可以归整成下图所示:


image.png


整体的设计模式就是一端放入,由单个消费者取出。但是存在一个不足点,一旦消费者能力较弱,或者出现任务堵塞的话,就会导致任务队列出现堆积,然后越堆积越难处理地过来。



相关文章
|
2月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
139 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
11天前
三种线程的使用场景
三种创建多线程的使用场景 1、继承的方式:适合于这个任务只想被一个线程的对象执行的情况 2、实现Runnable接口方式:适合于一个任务想被多个线程执行的情况 3、实现Callable接口方式:也适合一个任务想被多个线程执行的情况,你还想得倒任务的执行结果
18 0
|
1月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
31 3
|
1月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
30 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
2月前
|
Java
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
36 7
|
2月前
|
Java Android开发 UED
🧠Android多线程与异步编程实战!告别卡顿,让应用响应如丝般顺滑!🧵
在Android开发中,为应对复杂应用场景和繁重计算任务,多线程与异步编程成为保证UI流畅性的关键。本文将介绍Android中的多线程基础,包括Thread、Handler、Looper、AsyncTask及ExecutorService等,并通过示例代码展示其实用性。AsyncTask适用于简单后台操作,而ExecutorService则能更好地管理复杂并发任务。合理运用这些技术,可显著提升应用性能和用户体验,避免内存泄漏和线程安全问题,确保UI更新顺畅。
96 5
|
1月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
59 0
|
1月前
|
Java 应用服务中间件 API
nginx线程池原理
nginx线程池原理
33 0
|
2月前
|
Java 开发者
Java中的多线程编程基础与实战
【9月更文挑战第6天】本文将通过深入浅出的方式,带领读者了解并掌握Java中的多线程编程。我们将从基础概念出发,逐步深入到代码实践,最后探讨多线程在实际应用中的优势和注意事项。无论你是初学者还是有一定经验的开发者,这篇文章都能让你对Java多线程有更全面的认识。
31 1
|
2月前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
45 0