从实战到原理,线程池的各类使用场景整合(下)

简介: 从实战到原理,线程池的各类使用场景整合(下)

线程池常用参数介绍


corePoolSize核心线程数,当往线程池内部提交任务的时候,线程池会创建一个线程来执行任务。即使此时有空闲的工作线程能够处理当前任务,只要总的工作线程数小于corePoolSize,也会创建新的工作线程。


maximumPoolSize当任务的堵塞队列满了之后,如果还有新的任务提交到线程池内部,此时倘若工作线程数小于maximumPoolSize,则会创建新的工作线程。


keepAliveTime上边我们说到了工作线程


Worker(java.util.concurrent.ThreadPoolExecutor.Worker),当工作线程处于空闲状态中,如果超过了keepAliveTime依然没有任务,那么就会销毁当前工作线程。如果工作线程需要一直处于执行任务,每个任务的连续间隔都比较短,那么这个keepAliveTime 属性可以适当地调整大一些。


unitkeepAliveTime对应的时间单位


workQueue工作队列,当工作线程数达到了核心线程数,那么此时新来的线程就会被放入到工作队列中。线程池内部的工作队列全部都是继承自阻塞队列的接口,对于常用的阻塞队列类型为:


  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • PriorityBlockingQueue


RejectedExecutionHandlerJDK内部的线程拒绝策略包含了多种许多种,这里我罗列一些常见的拒绝策略给大家认识下:


  • AbortPolicy 直接抛出异常
  • CallerRunsPolicy 任务的执行由注入的线程自己执行
  • DiscardOldestPolicy 直接抛弃掉堵塞队列中队列头部的任务,然后执行尝试将当前任务提交到堵塞队列中。
  • DiscardPolicy 直接抛弃这个任务


从线程池设计中的一些启发


多消费队列的设计场景应用:业务上游提交任务,然后任务被放进一个堵塞队列中,接下来消费者需要从堵塞队列中提取元素,并且将它们转发到多个子队列中,各个子队列分别交给不同的子消费者处理数据。例如下图所示:


image.png


public interface AsyncHandlerService {
    /**
     * 任务放入队列中
     * 
     * @param asyncHandlerData
     */
    boolean putTask(AsyncHandlerData asyncHandlerData);
    /**
     * 启动消费
     */
    void startJob();
}


多消费者分发处理实现类:


@Component("asyncMultiConsumerHandlerHandler")
public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{
    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10);
    @Override
    public boolean putTask(AsyncHandlerData asyncHandlerData) {
        return taskQueueHandler.addTask(asyncHandlerData);
    }
    @Override
    public void startJob(){
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }
    /**
     * 将任务分发给各个子队列去处理
     */
    static class TaskQueueHandler implements Runnable {
        private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11);
        public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() {
            return tasks;
        }
        private TaskDispatcherHandler[] taskDispatcherHandlers;
        private int childConsumerSize = 0;
        public TaskQueueHandler(int childConsumerSize) {
            this.childConsumerSize = childConsumerSize;
            taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize];
            for (int i = 0; i < taskDispatcherHandlers.length; i++) {
                taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i);
                Thread thread = new Thread(taskDispatcherHandlers[i]);
                thread.setDaemon(false);
                thread.setName("taskQueueHandler-child-"+i);
                thread.start();
            }
        }
        public boolean addTask(AsyncHandlerData asyncHandlerData) {
            return tasks.offer(asyncHandlerData);
        }
        @Override
        public void run() {
            int index = 0;
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    index = (index == taskDispatcherHandlers.length) ? 0 : index;
                    taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
                    index++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class TaskDispatcherHandler implements Runnable {
        private BlockingQueue<AsyncHandlerData> subTaskQueue;
        private String childName;
        private AtomicLong taskCount = new AtomicLong(0);
        public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) {
            this.subTaskQueue = blockingQueue;
            this.childName = childName;
        }
        public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) {
            subTaskQueue.add(asyncHandlerData);
        }
        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = subTaskQueue.take();
                    long count = taskCount.incrementAndGet();
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo() + count);
                    Thread.sleep(3000);
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo()+" 任务处理结束" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


测试接口:


@GetMapping(value = "/send-async-data")
    public boolean sendAsyncData(){
        AsyncHandlerData asyncHandlerData = new AsyncHandlerData();
        asyncHandlerData.setDataInfo("data info");
        boolean status = asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
        if(!status){
            throw new RuntimeException("insert fail");
        }
        return status;
    }


这种设计模型适合用于对于请求吞吐量要求较高,每个请求都比较耗时的场景中。


自定义拒绝策略的应用根据具体的应用场景,通过实现java.util.concurrent.RejectedExecutionHandler接口,自定义拒绝策略,例如对于当抛出拒绝异常的时候,往数据库中记录一些信息或者日志。


相关案例代码:


public class MyRejectPolicy{
    static class MyTask implements Runnable{
        @Override
        public void run() {
            System.out.println("this is test");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS
                , new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("任务被拒绝:" + r.toString());
                //记录一些信息
            }
        });
        for(int i=0;i<100;i++){
            Thread thread = new Thread(new MyTask());
            threadPoolExecutor.execute(thread);
        }
        Thread.yield();
    }
}


统计线程池的详细信息


通过阅读线程池的源代码之后,可以借助重写beforeExecute、afterExecute、terminated 方法去对线程池的每个线程耗时做统计。以及通过继承 ThreadPoolExecutor 对象之后,对当前线程池的coreSIze、maxiMumSize等等属性进行监控。


相关案例代码:


public class SysThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private Logger logger = LoggerFactory.getLogger(SysThreadPool.class);
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.currentTimeMillis());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        long endTime = System.currentTimeMillis();
        long executeTime = endTime - startTime.get();
        logger.info("Thread {}: ExecuteTime {}", r, executeTime);
    }
    @Override
    public void shutdown() {
        super.shutdown();
    }
    @Override
    public void execute(Runnable command) {
        super.execute(command);
    }
    public void getTaskInfo(){
        logger.info("coreSize: {}, maxSize: {}, activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
    }
    static class MyTestTask implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        SysThreadPool sysThreadPool = new SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(2));
        sysThreadPool.getTaskInfo();
        System.out.println("------------");
        for(int i=0;i<10;i++){
            Thread thread = new Thread(new MyTestTask());
            sysThreadPool.submit(thread);
            sysThreadPool.getTaskInfo();
        }
        System.out.println("------------");
        Thread.sleep(3000);
    }
}


通过日志打印记录线程池的参数变化:


image.png


通过这份案例代码不妨可以设想下通过一些定时上报逻辑来实现线程池的监控功能。

相关文章
|
9月前
|
存储 SQL 安全
Java 无锁方式实现高性能线程实战操作指南
本文深入探讨了现代高并发Java应用中单例模式的实现方式,分析了传统单例(如DCL)的局限性,并提出了多种无锁实现方案。包括基于ThreadLocal的延迟初始化、VarHandle原子操作、Record不可变对象、响应式编程(Reactor)以及CDI依赖注入等实现方式。每种方案均附有代码示例及适用场景,同时通过JMH性能测试对比各实现的优劣。最后,结合实际案例设计了一个高性能配置中心,展示了无锁单例在实际开发中的应用。总结中提出根据场景选择合适的实现方式,并遵循现代单例设计原则以优化性能和安全性。文中还提供了代码获取链接,便于读者实践与学习。
173 0
|
5月前
|
存储 缓存 监控
什么是线程池?它的工作原理?
我是小假 期待与你的下一次相遇 ~
344 1
|
5月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
395 1
|
10月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
595 0
|
5月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
505 0
|
7月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
582 1
|
9月前
|
算法 Java 测试技术
深度优化OSS上传性能:多线程分片上传 vs 断点续传实战对比
本文深入解析对象存储服务(OSS)文件上传性能优化技术,重点探讨多线程分片上传与断点续传两种方案。通过理论分析、代码实现和性能测试,对比其在不同场景下的表现差异,并提供选型建议与最佳实践,助力提升大文件上传效率与稳定性。
911 0
|
9月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
459 0

热门文章

最新文章