从实战到原理,线程池的各类使用场景整合(下)

简介: 从实战到原理,线程池的各类使用场景整合(下)

线程池常用参数介绍


corePoolSize核心线程数,当往线程池内部提交任务的时候,线程池会创建一个线程来执行任务。即使此时有空闲的工作线程能够处理当前任务,只要总的工作线程数小于corePoolSize,也会创建新的工作线程。


maximumPoolSize当任务的堵塞队列满了之后,如果还有新的任务提交到线程池内部,此时倘若工作线程数小于maximumPoolSize,则会创建新的工作线程。


keepAliveTime上边我们说到了工作线程


Worker(java.util.concurrent.ThreadPoolExecutor.Worker),当工作线程处于空闲状态中,如果超过了keepAliveTime依然没有任务,那么就会销毁当前工作线程。如果工作线程需要一直处于执行任务,每个任务的连续间隔都比较短,那么这个keepAliveTime 属性可以适当地调整大一些。


unitkeepAliveTime对应的时间单位


workQueue工作队列,当工作线程数达到了核心线程数,那么此时新来的线程就会被放入到工作队列中。线程池内部的工作队列全部都是继承自阻塞队列的接口,对于常用的阻塞队列类型为:


  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • PriorityBlockingQueue


RejectedExecutionHandlerJDK内部的线程拒绝策略包含了多种许多种,这里我罗列一些常见的拒绝策略给大家认识下:


  • AbortPolicy 直接抛出异常
  • CallerRunsPolicy 任务的执行由注入的线程自己执行
  • DiscardOldestPolicy 直接抛弃掉堵塞队列中队列头部的任务,然后执行尝试将当前任务提交到堵塞队列中。
  • DiscardPolicy 直接抛弃这个任务


从线程池设计中的一些启发


多消费队列的设计场景应用:业务上游提交任务,然后任务被放进一个堵塞队列中,接下来消费者需要从堵塞队列中提取元素,并且将它们转发到多个子队列中,各个子队列分别交给不同的子消费者处理数据。例如下图所示:


image.png


public interface AsyncHandlerService {
    /**
     * 任务放入队列中
     * 
     * @param asyncHandlerData
     */
    boolean putTask(AsyncHandlerData asyncHandlerData);
    /**
     * 启动消费
     */
    void startJob();
}


多消费者分发处理实现类:


@Component("asyncMultiConsumerHandlerHandler")
public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{
    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10);
    @Override
    public boolean putTask(AsyncHandlerData asyncHandlerData) {
        return taskQueueHandler.addTask(asyncHandlerData);
    }
    @Override
    public void startJob(){
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }
    /**
     * 将任务分发给各个子队列去处理
     */
    static class TaskQueueHandler implements Runnable {
        private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11);
        public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() {
            return tasks;
        }
        private TaskDispatcherHandler[] taskDispatcherHandlers;
        private int childConsumerSize = 0;
        public TaskQueueHandler(int childConsumerSize) {
            this.childConsumerSize = childConsumerSize;
            taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize];
            for (int i = 0; i < taskDispatcherHandlers.length; i++) {
                taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i);
                Thread thread = new Thread(taskDispatcherHandlers[i]);
                thread.setDaemon(false);
                thread.setName("taskQueueHandler-child-"+i);
                thread.start();
            }
        }
        public boolean addTask(AsyncHandlerData asyncHandlerData) {
            return tasks.offer(asyncHandlerData);
        }
        @Override
        public void run() {
            int index = 0;
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    index = (index == taskDispatcherHandlers.length) ? 0 : index;
                    taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
                    index++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class TaskDispatcherHandler implements Runnable {
        private BlockingQueue<AsyncHandlerData> subTaskQueue;
        private String childName;
        private AtomicLong taskCount = new AtomicLong(0);
        public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) {
            this.subTaskQueue = blockingQueue;
            this.childName = childName;
        }
        public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) {
            subTaskQueue.add(asyncHandlerData);
        }
        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = subTaskQueue.take();
                    long count = taskCount.incrementAndGet();
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo() + count);
                    Thread.sleep(3000);
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo()+" 任务处理结束" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


测试接口:


@GetMapping(value = "/send-async-data")
    public boolean sendAsyncData(){
        AsyncHandlerData asyncHandlerData = new AsyncHandlerData();
        asyncHandlerData.setDataInfo("data info");
        boolean status = asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
        if(!status){
            throw new RuntimeException("insert fail");
        }
        return status;
    }


这种设计模型适合用于对于请求吞吐量要求较高,每个请求都比较耗时的场景中。


自定义拒绝策略的应用根据具体的应用场景,通过实现java.util.concurrent.RejectedExecutionHandler接口,自定义拒绝策略,例如对于当抛出拒绝异常的时候,往数据库中记录一些信息或者日志。


相关案例代码:


public class MyRejectPolicy{
    static class MyTask implements Runnable{
        @Override
        public void run() {
            System.out.println("this is test");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS
                , new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("任务被拒绝:" + r.toString());
                //记录一些信息
            }
        });
        for(int i=0;i<100;i++){
            Thread thread = new Thread(new MyTask());
            threadPoolExecutor.execute(thread);
        }
        Thread.yield();
    }
}


统计线程池的详细信息


通过阅读线程池的源代码之后,可以借助重写beforeExecute、afterExecute、terminated 方法去对线程池的每个线程耗时做统计。以及通过继承 ThreadPoolExecutor 对象之后,对当前线程池的coreSIze、maxiMumSize等等属性进行监控。


相关案例代码:


public class SysThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private Logger logger = LoggerFactory.getLogger(SysThreadPool.class);
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.currentTimeMillis());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        long endTime = System.currentTimeMillis();
        long executeTime = endTime - startTime.get();
        logger.info("Thread {}: ExecuteTime {}", r, executeTime);
    }
    @Override
    public void shutdown() {
        super.shutdown();
    }
    @Override
    public void execute(Runnable command) {
        super.execute(command);
    }
    public void getTaskInfo(){
        logger.info("coreSize: {}, maxSize: {}, activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
    }
    static class MyTestTask implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        SysThreadPool sysThreadPool = new SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(2));
        sysThreadPool.getTaskInfo();
        System.out.println("------------");
        for(int i=0;i<10;i++){
            Thread thread = new Thread(new MyTestTask());
            sysThreadPool.submit(thread);
            sysThreadPool.getTaskInfo();
        }
        System.out.println("------------");
        Thread.sleep(3000);
    }
}


通过日志打印记录线程池的参数变化:


image.png


通过这份案例代码不妨可以设想下通过一些定时上报逻辑来实现线程池的监控功能。

相关文章
|
8天前
|
Java
并发编程之线程池的底层原理的详细解析
并发编程之线程池的底层原理的详细解析
45 0
|
24天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
2月前
|
算法 Unix 调度
【Qt 线程】深入探究QThread线程优先级:原理、应用与最佳实践
【Qt 线程】深入探究QThread线程优先级:原理、应用与最佳实践
33 0
|
24天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
7天前
|
安全 Java 调度
Java线程:深入理解与实战应用
Java线程:深入理解与实战应用
27 0
|
4天前
|
人工智能 安全 Java
Python 多线程编程实战:threading 模块的最佳实践
Python 多线程编程实战:threading 模块的最佳实践
119 5
|
5天前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
12天前
|
Java 调度
《Java 多线程实战系列》- 01 基本概念与底层原理
《Java 多线程实战系列》- 01 基本概念与底层原理
14 0
|
14天前
|
监控 Java 关系型数据库
JVM工作原理与实战(十三):打破双亲委派机制-线程上下文类加载器
JVM作为Java程序的运行环境,其负责解释和执行字节码,管理内存,确保安全,支持多线程和提供性能监控工具,以及确保程序的跨平台运行。本文主要介绍了打破双亲委派机制的方法、线程上下文类加载器等内容。
15 2
|
18天前
|
Java
Java线程通信的精髓:解析通知等待机制的工作原理
Java线程通信的精髓:解析通知等待机制的工作原理
26 3
Java线程通信的精髓:解析通知等待机制的工作原理