从实战到原理,线程池的各类使用场景整合(中)

简介: 从实战到原理,线程池的各类使用场景整合(中)

线程池内部的源代码分析


我们在项目里使用线程池的时候,通常都会先创建一个具体实现Bean来定义线程池,例如:


@Bean
public ExecutorService emailTaskPool() {
    return new ThreadPoolExecutor(2, 4,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
}


ThreadPoolExecutor的父类是AbstractExecutorService,然后AbstractExecutorService的顶层接口是:ExecutorService。


就例如发送邮件接口而言,当线程池触发了submit函数的时候,实际上会调用到父类AbstractExecutorService对象的java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)方法,然后进入到ThreadPoolExecutor#execute部分。


@Override
public void sendEmail(EmailDTO emailDTO) {
    emailTaskPool.submit(() -> {
        try {
            System.out.printf("sending email .... emailDto is %s \n", emailDTO);
            Thread.sleep(1000);
            System.out.println("sended success");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}


java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 源代码位置:


/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


这里面你会看到返回的是一个future对象供调用方判断线程池内部的函数到底是否有完全执行成功。因此如果有时候如果需要判断线程池执行任务的结果话,可以这样操作:


Future future = emailTaskPool.submit(() -> {
          try {
              System.out.printf("sending email .... emailDto is %s \n", emailDTO);
              Thread.sleep(1000);
              System.out.println("sended success");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      });
      //todo something
      future.get();
}


在jdk8源代码中,提交任务的执行逻辑部分如下所示:新增线程任务的时候代码:


public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //工作线程数小于核心线程的时候,可以填写worker线程
        if (workerCountOf(c) < corePoolSize) {
              //新增工作线程的时候会加锁
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池的状态正常,切任务放入就绪队列正常
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //如果当前线程池处于关闭状态,则抛出拒绝异常
                reject(command);
            //如果工作线程数超过了核心线程数,那么就需要考虑新增工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果新增的工作线程已经达到了最大线程数限制的条件下,需要触发拒绝策略的抛出
        else if (!addWorker(command, false))
            reject(command);
    }


通过深入阅读工作线程主要存放在了一个hashset集合当中, 添加工作线程部分的逻辑代

码如下所示:


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //确保当前线程池没有进入到一个销毁状态中
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
              // 如果传入的core属性是false,则这里需要比对maximumPoolSize参数
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                //通过cas操作去增加线程池的工作线程数亩
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       //真正需要指定的任务是firstTask,它会被注入到worker对象当中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        //加入了锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一个hashset集合,会往里面新增工作线程    
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //worker本身是一个线程,但是worker对象内部还有一个线程的参数,
                //这个t才是真正的任务内容
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果worker线程创建好了,但是内部的真正任务还没有启动,此时突然整个
        //线程池的状态被关闭了,那么这时候workerStarted就会为false,然后将
        //工作线程的数目做自减调整。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}


进过理解之后,整体执行的逻辑以及先后顺序如下图所示:


image.png


首先判断线程池内部的现场是否都有任务需要执行。如果不是,则使用一个空闲的工作线程用于任务执行。否则会判断当前的堵塞队列是否已经满了,如果没有满则往队列里面投递任务,等待工作线程去处理。


如果堵塞队列已经满了,此时会判断工作线程数是否大于最大线程数,如果没有,则继续创建工作线程,如果已经达到则根据饱和策略去判断是果断抛出异常还是其他方式来进行处理。

相关文章
|
1月前
|
人工智能 JSON 前端开发
【Spring boot实战】Springboot+对话ai模型整体框架+高并发线程机制处理优化+提示词工程效果展示(按照框架自己修改可对接市面上百分之99的模型)
【Spring boot实战】Springboot+对话ai模型整体框架+高并发线程机制处理优化+提示词工程效果展示(按照框架自己修改可对接市面上百分之99的模型)
|
3月前
|
数据采集 并行计算 JavaScript
实战指南:在 Node.js 中利用多线程提升性能
在 Node.js 的世界中,多线程技术一直是一个受到广泛关注的领域。最初,Node.js 设计为单线程模式。随着技术发展,Node.js 引入了多线程支持,进而利用多核处理器的强大性能,提升了应用性能。接下来的内容将深入探讨 Node.js 如何实现多线程,以及在何种场合应该采用这种技术。
|
20天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
3天前
|
安全 Java 调度
Java线程:深入理解与实战应用
Java线程:深入理解与实战应用
21 0
|
1天前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
10天前
|
监控 Java 关系型数据库
JVM工作原理与实战(十三):打破双亲委派机制-线程上下文类加载器
JVM作为Java程序的运行环境,其负责解释和执行字节码,管理内存,确保安全,支持多线程和提供性能监控工具,以及确保程序的跨平台运行。本文主要介绍了打破双亲委派机制的方法、线程上下文类加载器等内容。
14 2
|
1月前
|
Java API 开发者
springboot 多线程的使用原理与实战
在Spring Boot中实现多线程,主要依赖于Spring框架的@Async注解以及底层Java的并发框架。这里将深入剖析Spring Boot多线程的原理,包括@Async注解的工作方式、任务执行器的角色以及如何通过配置来调整线程行为。
40 5
|
1月前
|
Java 数据处理
Java8的新特性parallelStream()的概念、对比线程优势与实战
parallelStream() 是 Java 8 中新增的一个方法,它是 Stream 类的一种扩展,提供了将集合数据并行处理的能力。普通的 stream() 方法是使用单线程对集合数据进行顺序处理,而 parallelStream() 方法则可以将集合数据分成多个小块,分配到多个线程并行处理,从而提高程序的执行效率。
57 3
|
2月前
|
安全 Java 开发者
Python多线程编程实战:提高程序执行效率的策略
Python多线程编程实战:提高程序执行效率的策略
125 1
|
4月前
|
XML Java 调度
Android App网络通信中通过runOnUiThread快速操纵界面以及利用线程池Executor调度异步任务实战(附源码 简单易懂)
Android App网络通信中通过runOnUiThread快速操纵界面以及利用线程池Executor调度异步任务实战(附源码 简单易懂)
31 0