从实战到原理,线程池的各类使用场景整合(中)

简介: 从实战到原理,线程池的各类使用场景整合(中)

线程池内部的源代码分析


我们在项目里使用线程池的时候,通常都会先创建一个具体实现Bean来定义线程池,例如:


@Bean
public ExecutorService emailTaskPool() {
    return new ThreadPoolExecutor(2, 4,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
}


ThreadPoolExecutor的父类是AbstractExecutorService,然后AbstractExecutorService的顶层接口是:ExecutorService。


就例如发送邮件接口而言,当线程池触发了submit函数的时候,实际上会调用到父类AbstractExecutorService对象的java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)方法,然后进入到ThreadPoolExecutor#execute部分。


@Override
public void sendEmail(EmailDTO emailDTO) {
    emailTaskPool.submit(() -> {
        try {
            System.out.printf("sending email .... emailDto is %s \n", emailDTO);
            Thread.sleep(1000);
            System.out.println("sended success");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}


java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 源代码位置:


/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


这里面你会看到返回的是一个future对象供调用方判断线程池内部的函数到底是否有完全执行成功。因此如果有时候如果需要判断线程池执行任务的结果话,可以这样操作:


Future future = emailTaskPool.submit(() -> {
          try {
              System.out.printf("sending email .... emailDto is %s \n", emailDTO);
              Thread.sleep(1000);
              System.out.println("sended success");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      });
      //todo something
      future.get();
}


在jdk8源代码中,提交任务的执行逻辑部分如下所示:新增线程任务的时候代码:


public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //工作线程数小于核心线程的时候,可以填写worker线程
        if (workerCountOf(c) < corePoolSize) {
              //新增工作线程的时候会加锁
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池的状态正常,切任务放入就绪队列正常
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //如果当前线程池处于关闭状态,则抛出拒绝异常
                reject(command);
            //如果工作线程数超过了核心线程数,那么就需要考虑新增工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果新增的工作线程已经达到了最大线程数限制的条件下,需要触发拒绝策略的抛出
        else if (!addWorker(command, false))
            reject(command);
    }


通过深入阅读工作线程主要存放在了一个hashset集合当中, 添加工作线程部分的逻辑代

码如下所示:


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //确保当前线程池没有进入到一个销毁状态中
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
              // 如果传入的core属性是false,则这里需要比对maximumPoolSize参数
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                //通过cas操作去增加线程池的工作线程数亩
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       //真正需要指定的任务是firstTask,它会被注入到worker对象当中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        //加入了锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一个hashset集合,会往里面新增工作线程    
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //worker本身是一个线程,但是worker对象内部还有一个线程的参数,
                //这个t才是真正的任务内容
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果worker线程创建好了,但是内部的真正任务还没有启动,此时突然整个
        //线程池的状态被关闭了,那么这时候workerStarted就会为false,然后将
        //工作线程的数目做自减调整。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}


进过理解之后,整体执行的逻辑以及先后顺序如下图所示:


image.png


首先判断线程池内部的现场是否都有任务需要执行。如果不是,则使用一个空闲的工作线程用于任务执行。否则会判断当前的堵塞队列是否已经满了,如果没有满则往队列里面投递任务,等待工作线程去处理。


如果堵塞队列已经满了,此时会判断工作线程数是否大于最大线程数,如果没有,则继续创建工作线程,如果已经达到则根据饱和策略去判断是果断抛出异常还是其他方式来进行处理。

相关文章
|
3月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
144 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
19天前
三种线程的使用场景
三种创建多线程的使用场景 1、继承的方式:适合于这个任务只想被一个线程的对象执行的情况 2、实现Runnable接口方式:适合于一个任务想被多个线程执行的情况 3、实现Callable接口方式:也适合一个任务想被多个线程执行的情况,你还想得倒任务的执行结果
20 0
|
2月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
31 3
|
2月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
33 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
3月前
|
Java
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
领略Lock接口的风采,通过实战演练,让你迅速掌握这门高深武艺,成为Java多线程领域的武林盟主
38 7
|
3月前
|
Java Android开发 UED
🧠Android多线程与异步编程实战!告别卡顿,让应用响应如丝般顺滑!🧵
在Android开发中,为应对复杂应用场景和繁重计算任务,多线程与异步编程成为保证UI流畅性的关键。本文将介绍Android中的多线程基础,包括Thread、Handler、Looper、AsyncTask及ExecutorService等,并通过示例代码展示其实用性。AsyncTask适用于简单后台操作,而ExecutorService则能更好地管理复杂并发任务。合理运用这些技术,可显著提升应用性能和用户体验,避免内存泄漏和线程安全问题,确保UI更新顺畅。
103 5
|
2月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
61 0
|
2月前
|
Java 应用服务中间件 API
nginx线程池原理
nginx线程池原理
37 0
|
3月前
|
Java 开发者
Java中的多线程编程基础与实战
【9月更文挑战第6天】本文将通过深入浅出的方式,带领读者了解并掌握Java中的多线程编程。我们将从基础概念出发,逐步深入到代码实践,最后探讨多线程在实际应用中的优势和注意事项。无论你是初学者还是有一定经验的开发者,这篇文章都能让你对Java多线程有更全面的认识。
32 1
|
3月前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
55 0