从实战到原理,线程池的各类使用场景整合(下)

简介: 从实战到原理,线程池的各类使用场景整合(下)

线程池常用参数介绍


corePoolSize核心线程数,当往线程池内部提交任务的时候,线程池会创建一个线程来执行任务。即使此时有空闲的工作线程能够处理当前任务,只要总的工作线程数小于corePoolSize,也会创建新的工作线程。


maximumPoolSize当任务的堵塞队列满了之后,如果还有新的任务提交到线程池内部,此时倘若工作线程数小于maximumPoolSize,则会创建新的工作线程。


keepAliveTime上边我们说到了工作线程


Worker(java.util.concurrent.ThreadPoolExecutor.Worker),当工作线程处于空闲状态中,如果超过了keepAliveTime依然没有任务,那么就会销毁当前工作线程。如果工作线程需要一直处于执行任务,每个任务的连续间隔都比较短,那么这个keepAliveTime 属性可以适当地调整大一些。


unitkeepAliveTime对应的时间单位


workQueue工作队列,当工作线程数达到了核心线程数,那么此时新来的线程就会被放入到工作队列中。线程池内部的工作队列全部都是继承自阻塞队列的接口,对于常用的阻塞队列类型为:


  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • PriorityBlockingQueue


RejectedExecutionHandlerJDK内部的线程拒绝策略包含了多种许多种,这里我罗列一些常见的拒绝策略给大家认识下:


  • AbortPolicy 直接抛出异常
  • CallerRunsPolicy 任务的执行由注入的线程自己执行
  • DiscardOldestPolicy 直接抛弃掉堵塞队列中队列头部的任务,然后执行尝试将当前任务提交到堵塞队列中。
  • DiscardPolicy 直接抛弃这个任务


从线程池设计中的一些启发


多消费队列的设计场景应用:业务上游提交任务,然后任务被放进一个堵塞队列中,接下来消费者需要从堵塞队列中提取元素,并且将它们转发到多个子队列中,各个子队列分别交给不同的子消费者处理数据。例如下图所示:


image.png


public interface AsyncHandlerService {
    /**
     * 任务放入队列中
     * 
     * @param asyncHandlerData
     */
    boolean putTask(AsyncHandlerData asyncHandlerData);
    /**
     * 启动消费
     */
    void startJob();
}


多消费者分发处理实现类:


@Component("asyncMultiConsumerHandlerHandler")
public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{
    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10);
    @Override
    public boolean putTask(AsyncHandlerData asyncHandlerData) {
        return taskQueueHandler.addTask(asyncHandlerData);
    }
    @Override
    public void startJob(){
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }
    /**
     * 将任务分发给各个子队列去处理
     */
    static class TaskQueueHandler implements Runnable {
        private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11);
        public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() {
            return tasks;
        }
        private TaskDispatcherHandler[] taskDispatcherHandlers;
        private int childConsumerSize = 0;
        public TaskQueueHandler(int childConsumerSize) {
            this.childConsumerSize = childConsumerSize;
            taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize];
            for (int i = 0; i < taskDispatcherHandlers.length; i++) {
                taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i);
                Thread thread = new Thread(taskDispatcherHandlers[i]);
                thread.setDaemon(false);
                thread.setName("taskQueueHandler-child-"+i);
                thread.start();
            }
        }
        public boolean addTask(AsyncHandlerData asyncHandlerData) {
            return tasks.offer(asyncHandlerData);
        }
        @Override
        public void run() {
            int index = 0;
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    index = (index == taskDispatcherHandlers.length) ? 0 : index;
                    taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
                    index++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class TaskDispatcherHandler implements Runnable {
        private BlockingQueue<AsyncHandlerData> subTaskQueue;
        private String childName;
        private AtomicLong taskCount = new AtomicLong(0);
        public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) {
            this.subTaskQueue = blockingQueue;
            this.childName = childName;
        }
        public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) {
            subTaskQueue.add(asyncHandlerData);
        }
        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = subTaskQueue.take();
                    long count = taskCount.incrementAndGet();
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo() + count);
                    Thread.sleep(3000);
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo()+" 任务处理结束" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


测试接口:


@GetMapping(value = "/send-async-data")
    public boolean sendAsyncData(){
        AsyncHandlerData asyncHandlerData = new AsyncHandlerData();
        asyncHandlerData.setDataInfo("data info");
        boolean status = asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
        if(!status){
            throw new RuntimeException("insert fail");
        }
        return status;
    }


这种设计模型适合用于对于请求吞吐量要求较高,每个请求都比较耗时的场景中。


自定义拒绝策略的应用根据具体的应用场景,通过实现java.util.concurrent.RejectedExecutionHandler接口,自定义拒绝策略,例如对于当抛出拒绝异常的时候,往数据库中记录一些信息或者日志。


相关案例代码:


public class MyRejectPolicy{
    static class MyTask implements Runnable{
        @Override
        public void run() {
            System.out.println("this is test");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS
                , new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("任务被拒绝:" + r.toString());
                //记录一些信息
            }
        });
        for(int i=0;i<100;i++){
            Thread thread = new Thread(new MyTask());
            threadPoolExecutor.execute(thread);
        }
        Thread.yield();
    }
}


统计线程池的详细信息


通过阅读线程池的源代码之后,可以借助重写beforeExecute、afterExecute、terminated 方法去对线程池的每个线程耗时做统计。以及通过继承 ThreadPoolExecutor 对象之后,对当前线程池的coreSIze、maxiMumSize等等属性进行监控。


相关案例代码:


public class SysThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private Logger logger = LoggerFactory.getLogger(SysThreadPool.class);
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.currentTimeMillis());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        long endTime = System.currentTimeMillis();
        long executeTime = endTime - startTime.get();
        logger.info("Thread {}: ExecuteTime {}", r, executeTime);
    }
    @Override
    public void shutdown() {
        super.shutdown();
    }
    @Override
    public void execute(Runnable command) {
        super.execute(command);
    }
    public void getTaskInfo(){
        logger.info("coreSize: {}, maxSize: {}, activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
    }
    static class MyTestTask implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        SysThreadPool sysThreadPool = new SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(2));
        sysThreadPool.getTaskInfo();
        System.out.println("------------");
        for(int i=0;i<10;i++){
            Thread thread = new Thread(new MyTestTask());
            sysThreadPool.submit(thread);
            sysThreadPool.getTaskInfo();
        }
        System.out.println("------------");
        Thread.sleep(3000);
    }
}


通过日志打印记录线程池的参数变化:


image.png


通过这份案例代码不妨可以设想下通过一些定时上报逻辑来实现线程池的监控功能。

相关文章
|
9天前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
51 1
|
5月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
357 0
|
8月前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
7月前
|
数据采集 存储 安全
Python爬虫实战:利用短效代理IP爬取京东母婴纸尿裤数据,多线程池并行处理方案详解
本文分享了一套结合青果网络短效代理IP和多线程池技术的电商数据爬取方案,针对京东母婴纸尿裤类目商品信息进行高效采集。通过动态代理IP规避访问限制,利用多线程提升抓取效率,同时确保数据采集的安全性和合法性。方案详细介绍了爬虫开发步骤、网页结构分析及代码实现,适用于大规模电商数据采集场景。
|
9月前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
192 6
|
10月前
|
并行计算 算法 安全
面试必问的多线程优化技巧与实战
多线程编程是现代软件开发中不可或缺的一部分,特别是在处理高并发场景和优化程序性能时。作为Java开发者,掌握多线程优化技巧不仅能够提升程序的执行效率,还能在面试中脱颖而出。本文将从多线程基础、线程与进程的区别、多线程的优势出发,深入探讨如何避免死锁与竞态条件、线程间的通信机制、线程池的使用优势、线程优化算法与数据结构的选择,以及硬件加速技术。通过多个Java示例,我们将揭示这些技术的底层原理与实现方法。
518 3
|
11月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
216 8
|
11月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
136 3
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
146 0

热门文章

最新文章

下一篇
oss教程