从实战到原理,线程池的各类使用场景整合(下)

简介: 从实战到原理,线程池的各类使用场景整合(下)

线程池常用参数介绍


corePoolSize核心线程数,当往线程池内部提交任务的时候,线程池会创建一个线程来执行任务。即使此时有空闲的工作线程能够处理当前任务,只要总的工作线程数小于corePoolSize,也会创建新的工作线程。


maximumPoolSize当任务的堵塞队列满了之后,如果还有新的任务提交到线程池内部,此时倘若工作线程数小于maximumPoolSize,则会创建新的工作线程。


keepAliveTime上边我们说到了工作线程


Worker(java.util.concurrent.ThreadPoolExecutor.Worker),当工作线程处于空闲状态中,如果超过了keepAliveTime依然没有任务,那么就会销毁当前工作线程。如果工作线程需要一直处于执行任务,每个任务的连续间隔都比较短,那么这个keepAliveTime 属性可以适当地调整大一些。


unitkeepAliveTime对应的时间单位


workQueue工作队列,当工作线程数达到了核心线程数,那么此时新来的线程就会被放入到工作队列中。线程池内部的工作队列全部都是继承自阻塞队列的接口,对于常用的阻塞队列类型为:


  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • PriorityBlockingQueue


RejectedExecutionHandlerJDK内部的线程拒绝策略包含了多种许多种,这里我罗列一些常见的拒绝策略给大家认识下:


  • AbortPolicy 直接抛出异常
  • CallerRunsPolicy 任务的执行由注入的线程自己执行
  • DiscardOldestPolicy 直接抛弃掉堵塞队列中队列头部的任务,然后执行尝试将当前任务提交到堵塞队列中。
  • DiscardPolicy 直接抛弃这个任务


从线程池设计中的一些启发


多消费队列的设计场景应用:业务上游提交任务,然后任务被放进一个堵塞队列中,接下来消费者需要从堵塞队列中提取元素,并且将它们转发到多个子队列中,各个子队列分别交给不同的子消费者处理数据。例如下图所示:


image.png


public interface AsyncHandlerService {
    /**
     * 任务放入队列中
     * 
     * @param asyncHandlerData
     */
    boolean putTask(AsyncHandlerData asyncHandlerData);
    /**
     * 启动消费
     */
    void startJob();
}


多消费者分发处理实现类:


@Component("asyncMultiConsumerHandlerHandler")
public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{
    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10);
    @Override
    public boolean putTask(AsyncHandlerData asyncHandlerData) {
        return taskQueueHandler.addTask(asyncHandlerData);
    }
    @Override
    public void startJob(){
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }
    /**
     * 将任务分发给各个子队列去处理
     */
    static class TaskQueueHandler implements Runnable {
        private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11);
        public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() {
            return tasks;
        }
        private TaskDispatcherHandler[] taskDispatcherHandlers;
        private int childConsumerSize = 0;
        public TaskQueueHandler(int childConsumerSize) {
            this.childConsumerSize = childConsumerSize;
            taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize];
            for (int i = 0; i < taskDispatcherHandlers.length; i++) {
                taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i);
                Thread thread = new Thread(taskDispatcherHandlers[i]);
                thread.setDaemon(false);
                thread.setName("taskQueueHandler-child-"+i);
                thread.start();
            }
        }
        public boolean addTask(AsyncHandlerData asyncHandlerData) {
            return tasks.offer(asyncHandlerData);
        }
        @Override
        public void run() {
            int index = 0;
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    index = (index == taskDispatcherHandlers.length) ? 0 : index;
                    taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
                    index++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class TaskDispatcherHandler implements Runnable {
        private BlockingQueue<AsyncHandlerData> subTaskQueue;
        private String childName;
        private AtomicLong taskCount = new AtomicLong(0);
        public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) {
            this.subTaskQueue = blockingQueue;
            this.childName = childName;
        }
        public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) {
            subTaskQueue.add(asyncHandlerData);
        }
        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = subTaskQueue.take();
                    long count = taskCount.incrementAndGet();
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo() + count);
                    Thread.sleep(3000);
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo()+" 任务处理结束" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


测试接口:


@GetMapping(value = "/send-async-data")
    public boolean sendAsyncData(){
        AsyncHandlerData asyncHandlerData = new AsyncHandlerData();
        asyncHandlerData.setDataInfo("data info");
        boolean status = asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
        if(!status){
            throw new RuntimeException("insert fail");
        }
        return status;
    }


这种设计模型适合用于对于请求吞吐量要求较高,每个请求都比较耗时的场景中。


自定义拒绝策略的应用根据具体的应用场景,通过实现java.util.concurrent.RejectedExecutionHandler接口,自定义拒绝策略,例如对于当抛出拒绝异常的时候,往数据库中记录一些信息或者日志。


相关案例代码:


public class MyRejectPolicy{
    static class MyTask implements Runnable{
        @Override
        public void run() {
            System.out.println("this is test");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS
                , new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("任务被拒绝:" + r.toString());
                //记录一些信息
            }
        });
        for(int i=0;i<100;i++){
            Thread thread = new Thread(new MyTask());
            threadPoolExecutor.execute(thread);
        }
        Thread.yield();
    }
}


统计线程池的详细信息


通过阅读线程池的源代码之后,可以借助重写beforeExecute、afterExecute、terminated 方法去对线程池的每个线程耗时做统计。以及通过继承 ThreadPoolExecutor 对象之后,对当前线程池的coreSIze、maxiMumSize等等属性进行监控。


相关案例代码:


public class SysThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private Logger logger = LoggerFactory.getLogger(SysThreadPool.class);
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.currentTimeMillis());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        long endTime = System.currentTimeMillis();
        long executeTime = endTime - startTime.get();
        logger.info("Thread {}: ExecuteTime {}", r, executeTime);
    }
    @Override
    public void shutdown() {
        super.shutdown();
    }
    @Override
    public void execute(Runnable command) {
        super.execute(command);
    }
    public void getTaskInfo(){
        logger.info("coreSize: {}, maxSize: {}, activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
    }
    static class MyTestTask implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        SysThreadPool sysThreadPool = new SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(2));
        sysThreadPool.getTaskInfo();
        System.out.println("------------");
        for(int i=0;i<10;i++){
            Thread thread = new Thread(new MyTestTask());
            sysThreadPool.submit(thread);
            sysThreadPool.getTaskInfo();
        }
        System.out.println("------------");
        Thread.sleep(3000);
    }
}


通过日志打印记录线程池的参数变化:


image.png


通过这份案例代码不妨可以设想下通过一些定时上报逻辑来实现线程池的监控功能。

相关文章
|
20天前
|
并行计算 算法 安全
面试必问的多线程优化技巧与实战
多线程编程是现代软件开发中不可或缺的一部分,特别是在处理高并发场景和优化程序性能时。作为Java开发者,掌握多线程优化技巧不仅能够提升程序的执行效率,还能在面试中脱颖而出。本文将从多线程基础、线程与进程的区别、多线程的优势出发,深入探讨如何避免死锁与竞态条件、线程间的通信机制、线程池的使用优势、线程优化算法与数据结构的选择,以及硬件加速技术。通过多个Java示例,我们将揭示这些技术的底层原理与实现方法。
71 3
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
78 6
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
4月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
177 29
|
2月前
三种线程的使用场景
三种创建多线程的使用场景 1、继承的方式:适合于这个任务只想被一个线程的对象执行的情况 2、实现Runnable接口方式:适合于一个任务想被多个线程执行的情况 3、实现Callable接口方式:也适合一个任务想被多个线程执行的情况,你还想得倒任务的执行结果
30 0
|
3月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
47 3
|
3月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
49 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
4月前
|
Java
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
51 7
|
4月前
|
Java Android开发 UED
🧠Android多线程与异步编程实战!告别卡顿,让应用响应如丝般顺滑!🧵
在Android开发中,为应对复杂应用场景和繁重计算任务,多线程与异步编程成为保证UI流畅性的关键。本文将介绍Android中的多线程基础,包括Thread、Handler、Looper、AsyncTask及ExecutorService等,并通过示例代码展示其实用性。AsyncTask适用于简单后台操作,而ExecutorService则能更好地管理复杂并发任务。合理运用这些技术,可显著提升应用性能和用户体验,避免内存泄漏和线程安全问题,确保UI更新顺畅。
153 5
|
3月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
72 0