线程池的经典应用场景(下)

简介: 线程池的经典应用场景(下)

线程池内部的源代码分析



我们在项目里使用线程池的时候,通常都会先创建一个具体实现Bean来定义线程池,例如:


@Bean
public ExecutorService emailTaskPool() {
    return new ThreadPoolExecutor(2, 4,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
}
复制代码


ThreadPoolExecutor的父类是AbstractExecutorService,然后AbstractExecutorService的顶层接口是:ExecutorService。


就例如发送邮件接口而言,当线程池触发了submit函数的时候,实际上会调用到父类AbstractExecutorService对象的


java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)方法,然后进入到ThreadPoolExecutor#execute部分。


@Override
public void sendEmail(EmailDTO emailDTO) {
    emailTaskPool.submit(() -> {
        try {
            System.out.printf("sending email .... emailDto is %s \n", emailDTO);
            Thread.sleep(1000);
            System.out.println("sended success");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
复制代码


java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 源代码位置:


/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
复制代码


这里面你会看到返回的是一个future对象供调用方判断线程池内部的函数到底是否有完全执行成功。因此如果有时候如果需要判断线程池执行任务的结果话,可以这样操作:


Future future = emailTaskPool.submit(() -> {
          try {
              System.out.printf("sending email .... emailDto is %s \n", emailDTO);
              Thread.sleep(1000);
              System.out.println("sended success");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      });
      //todo something
      future.get();
}
复制代码


在jdk8源代码中,提交任务的执行逻辑部分如下所示:


新增线程任务的时候代码:


public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //工作线程数小于核心线程的时候,可以填写worker线程
        if (workerCountOf(c) < corePoolSize) {
              //新增工作线程的时候会加锁
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池的状态正常,切任务放入就绪队列正常
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //如果当前线程池处于关闭状态,则抛出拒绝异常
                reject(command);
            //如果工作线程数超过了核心线程数,那么就需要考虑新增工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果新增的工作线程已经达到了最大线程数限制的条件下,需要触发拒绝策略的抛出
        else if (!addWorker(command, false))
            reject(command);
    }
复制代码


通过深入阅读工作线程主要存放在了一个hashset集合当中,

添加工作线程部分的逻辑代码如下所示:


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //确保当前线程池没有进入到一个销毁状态中
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
              // 如果传入的core属性是false,则这里需要比对maximumPoolSize参数
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                //通过cas操作去增加线程池的工作线程数亩
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       //真正需要指定的任务是firstTask,它会被注入到worker对象当中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        //加入了锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一个hashset集合,会往里面新增工作线程    
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //worker本身是一个线程,但是worker对象内部还有一个线程的参数,
                //这个t才是真正的任务内容
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果worker线程创建好了,但是内部的真正任务还没有启动,此时突然整个
        //线程池的状态被关闭了,那么这时候workerStarted就会为false,然后将
        //工作线程的数目做自减调整。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码


进过理解之后,整体执行的逻辑以及先后顺序如下图所示:


网络异常,图片无法展示
|


首先判断线程池内部的现场是否都有任务需要执行。如果不是,则使用一个空闲的工作线程用于任务执行。否则会判断当前的工作队列是否已经满了,如果没有满则往队列里面投递一个任务,等待线程去处理。如果工作队列已经满了,此时会根据饱和策略去判断,是否需要创建新的线程还是果断抛出异常等方式来进行处理。


线程池常用参数介绍


corePoolSize

核心线程数,当往线程池内部提交任务的时候,线程池会创建一个线程来执行任务。即使此时有空闲的工作线程能够处理当前任务,只要总的工作线程数小于corePoolSize,也会创建新的工作线程。


maximumPoolSize

当任务的堵塞队列满了之后,如果还有新的任务提交到线程池内部,此时倘若工作线程数小于maximumPoolSize,则会创建新的工作线程。


keepAliveTime

上边我们说到了工作线程Worker(java.util.concurrent.ThreadPoolExecutor.Worker),当工作线程处于空闲状态中,如果超过了keepAliveTime依然没有任务,那么就会销毁当前工作线程。


如果工作线程需要一直处于执行任务,每个任务的连续间隔都比较短,那么这个keepAliveTime

属性可以适当地调整大一些。


unit

keepAliveTime对应的时间单位


workQueue

工作队列,当工作线程数达到了核心线程数,那么此时新来的线程就会被放入到工作队列中。


线程池内部的工作队列全部都是继承自阻塞队列的接口,对于常用的阻塞队列类型为:


  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue
  • PriorityBlockingQueue


RejectedExecutionHandler


JDK内部的线程拒绝策略包含了多种许多种,这里我罗列一些常见的拒绝策略给大家认识下:


  • AbortPolicy
    直接抛出异常
  • CallerRunsPolicy
    任务的执行由注入的线程自己执行
  • DiscardOldestPolicy
    直接抛弃掉堵塞队列中队列头部的任务,然后执行尝试将当前任务提交到堵塞队列中。
  • DiscardPolicy
    直接抛弃这个任务


从线程池设计中的一些启发


多消费队列的设计


场景应用:


业务上游提交任务,然后任务被放进一个堵塞队列中,接下来消费者需要从堵塞队列中提取元素,并且将它们转发到多个子队列中,各个子队列分别交给不同的子消费者处理数据。例如下图所示:


网络异常,图片无法展示
|


public interface AsyncHandlerService {
    /**
     * 任务放入队列中
     * 
     * @param asyncHandlerData
     */
    boolean putTask(AsyncHandlerData asyncHandlerData);
    /**
     * 启动消费
     */
    void startJob();
}
复制代码


多消费者分发处理实现类:


@Component("asyncMultiConsumerHandlerHandler")
public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{
    private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10);
    @Override
    public boolean putTask(AsyncHandlerData asyncHandlerData) {
        return taskQueueHandler.addTask(asyncHandlerData);
    }
    @Override
    public void startJob(){
        Thread thread = new Thread(taskQueueHandler);
        thread.setDaemon(true);
        thread.start();
    }
    /**
     * 将任务分发给各个子队列去处理
     */
    static class TaskQueueHandler implements Runnable {
        private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11);
        public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() {
            return tasks;
        }
        private TaskDispatcherHandler[] taskDispatcherHandlers;
        private int childConsumerSize = 0;
        public TaskQueueHandler(int childConsumerSize) {
            this.childConsumerSize = childConsumerSize;
            taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize];
            for (int i = 0; i < taskDispatcherHandlers.length; i++) {
                taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i);
                Thread thread = new Thread(taskDispatcherHandlers[i]);
                thread.setDaemon(false);
                thread.setName("taskQueueHandler-child-"+i);
                thread.start();
            }
        }
        public boolean addTask(AsyncHandlerData asyncHandlerData) {
            return tasks.offer(asyncHandlerData);
        }
        @Override
        public void run() {
            int index = 0;
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = tasks.take();
                    index = (index == taskDispatcherHandlers.length) ? 0 : index;
                    taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
                    index++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class TaskDispatcherHandler implements Runnable {
        private BlockingQueue<AsyncHandlerData> subTaskQueue;
        private String childName;
        private AtomicLong taskCount = new AtomicLong(0);
        public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) {
            this.subTaskQueue = blockingQueue;
            this.childName = childName;
        }
        public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) {
            subTaskQueue.add(asyncHandlerData);
        }
        @Override
        public void run() {
            for (; ; ) {
                try {
                    AsyncHandlerData asyncHandlerData = subTaskQueue.take();
                    long count = taskCount.incrementAndGet();
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo() + count);
                    Thread.sleep(3000);
                    System.out.println("【" + childName + "】子任务队列处理:" + asyncHandlerData.getDataInfo()+" 任务处理结束" + count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
复制代码


测试接口:


@GetMapping(value = "/send-async-data")
    public boolean sendAsyncData(){
        AsyncHandlerData asyncHandlerData = new AsyncHandlerData();
        asyncHandlerData.setDataInfo("data info");
        boolean status = asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
        if(!status){
            throw new RuntimeException("insert fail");
        }
        return status;
    }
复制代码


这种设计模型适合用于对于请求吞吐量要求较高,每个请求都比较耗时的场景中。


自定义拒绝策略的应用


根据具体的应用场景,通过实现java.util.concurrent.RejectedExecutionHandler接口,自定义拒绝策略,例如对于当抛出拒绝异常的时候,往数据库中记录一些信息或者日志。


相关案例代码:


public class MyRejectPolicy{
    static class MyTask implements Runnable{
        @Override
        public void run() {
            System.out.println("this is test");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS
                , new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("任务被拒绝:" + r.toString());
                //记录一些信息
            }
        });
        for(int i=0;i<100;i++){
            Thread thread = new Thread(new MyTask());
            threadPoolExecutor.execute(thread);
        }
        Thread.yield();
    }
}
复制代码


统计线程池的详细信息


通过阅读线程池的源代码之后,可以借助重写beforeExecute、afterExecute、terminated 方法去对线程池的每个线程耗时做统计。以及通过继承 ThreadPoolExecutor 对象之后,对当前线程池的coreSIze、maxiMumSize等等属性进行监控。


相关案例代码:


public class SysThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private Logger logger = LoggerFactory.getLogger(SysThreadPool.class);
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.currentTimeMillis());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        long endTime = System.currentTimeMillis();
        long executeTime = endTime - startTime.get();
        logger.info("Thread {}: ExecuteTime {}", r, executeTime);
    }
    @Override
    public void shutdown() {
        super.shutdown();
    }
    @Override
    public void execute(Runnable command) {
        super.execute(command);
    }
    public void getTaskInfo(){
        logger.info("coreSize: {}, maxSize: {}, activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
    }
    static class MyTestTask implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        SysThreadPool sysThreadPool = new SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(2));
        sysThreadPool.getTaskInfo();
        System.out.println("------------");
        for(int i=0;i<10;i++){
            Thread thread = new Thread(new MyTestTask());
            sysThreadPool.submit(thread);
            sysThreadPool.getTaskInfo();
        }
        System.out.println("------------");
        Thread.sleep(3000);
    }
}
复制代码


通过日志打印记录线程池的参数变化:


网络异常,图片无法展示
|


网络异常,图片无法展示
|
通过这份案例代码不妨可以设想下通过一些定时上报逻辑来实现线程池的监控功能。

目录
相关文章
|
7月前
|
Java
线程池是什么?线程池在实际工作中的应用
总的来说,线程池是一种有效的多线程处理方式,它可以提高系统的性能和稳定性。在实际工作中,我们需要根据任务的特性和系统的硬件能力来合理设置线程池的大小,以达到最佳的效果。
177 18
|
12月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
793 6
|
11月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
221 2
|
12月前
|
数据采集 存储 数据处理
Python中的多线程编程及其在数据处理中的应用
本文深入探讨了Python中多线程编程的概念、原理和实现方法,并详细介绍了其在数据处理领域的应用。通过对比单线程与多线程的性能差异,展示了多线程编程在提升程序运行效率方面的显著优势。文章还提供了实际案例,帮助读者更好地理解和掌握多线程编程技术。
|
12月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
221 8
|
12月前
|
存储 监控 安全
深入理解ThreadLocal:线程局部变量的机制与应用
在Java的多线程编程中,`ThreadLocal`变量提供了一种线程安全的解决方案,允许每个线程拥有自己的变量副本,从而避免了线程间的数据竞争。本文将深入探讨`ThreadLocal`的工作原理、使用方法以及在实际开发中的应用场景。
247 2
|
12月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
256 5
|
12月前
|
缓存 监控 Java
Java 线程池在高并发场景下有哪些优势和潜在问题?
Java 线程池在高并发场景下有哪些优势和潜在问题?
229 2
|
监控 Java
在实际应用中选择线程异常捕获方法的考量
【10月更文挑战第15天】选择最适合的线程异常捕获方法需要综合考虑多种因素。没有一种方法是绝对最优的,需要根据具体情况进行权衡和选择。在实际应用中,还需要不断地实践和总结经验,以提高异常处理的效果和程序的稳定性。
205 3

热门文章

最新文章

下一篇
开通oss服务