线程的创建、Lambda函数式接口?Runnable和Callable之间的适配?动态修改线程任务?这里带你图解Java线程池

简介: 上面只是提到了对于Thread执行任务的一种动态实现方法,肯定还有其他的。那么动态实现有什么好处呢?当我们有很多个任务的时候,我们如果一直使用new,再让gc的话,那么对于系统资源的消耗无疑是巨大的。那么这个时候,如果我们固定一下,专门拿几个线程来处理并发任务呢?但是当并发任务很多又该怎么办?这个时候就引入了池化思想 —— Pool什么是池?在学JDBC的时候我们知道了连接池,在学Spring的时候,我们又接触到了对象池。其实按理来说线程池应该是大家在初学JavaSE的时候应该就遇到的,这里我们再来讲一下。线程池,就是用一个容器来管理线程,这个容器叫做池(Poo

线程的创建、Lambda函数式接口?Runnable和Callable之间的适配?动态修改线程任务?这里带你图解Java线程池

Java线程创建的方式

  • 继承Thread类,重写run方法
  • 重写Runnable接口,交给Tread类静态代理实现
  • 实现Callable接口,用FutureTask封装
  • 实现Runnable接口,用FutureTask封装
  • 继承FutureTask类,重写run方法(猜想、偏方,你非要实现其实也可以,hhh)
  • 线程池创建

前三种创建方式

这里为了便于叙述,毕竟不是本次的重点,我直接上源码,没基础的可以去找些其他资料来补一补

public class ThreadpoolApplication {
   
   

    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
   
        // 1. 继承Thread类实现
        Thread impl =new ThreadImpl();
        impl.start();
        // 2. 实现Runnable接口
        // 对于有 @FunctionalInterface 的类 or 接口,我们可以使用lambda表达式来简化,当然
        // 没有这个注解也可以,但是一定要符合函数式编程的规范
        //      1. 只能有1个待实现的方法
        //      2. 允许有默认方法
        //      3. 只能是接口
        // @FunctionalInterface 可有可无,但是为了规范建议写上,起一个标记作用,告诉编译器这是一个函数式接口
        // 可以让IDE帮你检测你的函数式接口是否符合规范
        Thread runnableImpl = new Thread(new Runnable() {
   
   

            // 这里可以用函数式接口lambda表达式来简写,具体的内容这里不做过多解释,
            // 你可以理解lambda实现为是对接口的简单实现,因为你用lambda返回的也是个Runnable实现对象
            // 后面我就直接用Lambda表达式来简写了。
            @Override
            public void run() {
   
   

            }
        });

        runnableImpl.start();

        // 实现Callable接口,带返回值
        FutureTask<Integer> futureTask= new FutureTask<>(()->{
   
   
            return 1;
        });
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
        Integer result=0;

        // 实现Runnable接口,带返回值
        futureTask=new FutureTask<>(()->{
   
   

        },result);
        new Thread(futureTask).start();
        futureTask.get();    // 其实这个值就是你设置的值,可以去看一下源码,这里只是为了方便任务管理

        return ;
    }

}

浅说函数式编程 —— Lambda的魔法

大家可能对函数式编程有点懵,其实就是符合上面所说的规范

对于有 @FunctionalInterface 的类 or 接口,我们可以使用lambda表达式来简化,当然没有这个注解也可以,但是一定要符合函数式编程的规范

  • 只能有1个待实现的方法
  • 允许有默认方法
  • 只能是接口,抽象类也不行

@FunctionalInterface 可有可无,但是为了规范建议写上,起一个标记作用,告诉编译器这是一个函数式接口可以让IDE帮你检测你的函数式接口是否符合规范

比如这样子:

@FunctionalInterface
public interface MethodInterface {
   
   
    void print();

    default MethodInterface andThen(MethodInterface methodInterface){
   
   
        // 想一下有什么区别
        //  print();
        //  methodInterface.print();
        //  return methodInterface;
        return ()->{
   
   
            print();
            methodInterface.print();;
        };
        // 不用lambda表达式,那么就是在andThen里面进行执行的,每次执行andThen的时候都会自动执行print方法,而且总体上每次             
        // methodInterface.print()会执行两次,那么methodInterface.print()不写的话最后需要再执行一次applay
        // 如果你是使用的lambda表达式返回,那么返回的是一个全新的接口,如果我们需要链式调用完,那么在最后还要执行一下
        // print方法,当然你也可以选择实现一个end()方法来表示结束,相当于另外起一个名
    }
}

其实上面的函数式接口和Cusumer一样,不过Cusumer多了一个判空的过程,除此之外还有另外几个常用的函数式接口(统一规范),如下:

Cusumer

@FunctionalInterface
public interface Consumer<T> {
   
   


    void accept(T t);


    default Consumer<T> andThen(Consumer<? super T> after) {
   
   
        Objects.requireNonNull(after);
        return (T t) -> {
   
    accept(t); after.accept(t); };
    }
}

Function

@FunctionalInterface
public interface Function<T, R> {
   
   

    R apply(T t);

    // 执行一个before操作,类似于AOP思想
    default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
   
   
        Objects.requireNonNull(before);
        // 将执行完成before的结果再带入当前的Fuction进行运算
        // nnd, 这不是指针啊!!!,lambda表达是只有一行的时候可以不用写大括号
        return (V v) -> apply(before.apply(v));
        // 相当于这样
        // return (V v) -> {
   
   
        //     apply(before.apply(v));
        // }
    }

    default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
   
   
        Objects.requireNonNull(after);
        // 在本次执行完之后再执行after
        return (T t) -> after.apply(apply(t));
    }

    // 这又是个什么玩意儿,别急,看我代码补全,构造一个传入和返回同类型的接口
    // 这有啥用,呃呃呃,简写而已,这个是对apply的实现就是给啥还啥,但是只能过用在Function接口上
    // 其他接口不适用,因为接口返回类型,但是你都可以用 t -> t 来表示
    static <T> Function<T, T> identity() {
   
   
        // return t -> return t;
        return t -> t;
    }
}

BiFunction

@FunctionalInterface
public interface BiFunction<T, U, R> {
   
   

    // 对t和u进行运算,返回R类型,nnd,不就是sort函数的第三个cmp参数吗,不说了
    R apply(T t, U u);

    // 这一块就不说了
    default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
   
   
        Objects.requireNonNull(after);
        return (T t, U u) -> after.apply(apply(t, u));
    }

Supplier

工厂模式,要什么可以现在这类面配置好,最后再来一个beanFactory.get()就行

@FunctionalInterface
public interface Supplier<T> {
   
   

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

Predicate

@FunctionalInterface
public interface Predicate<T> {
   
   

    // 实现一个原子性的判断,也就是说我们可以把这个放在if里面来玩儿
    boolean test(T t);

    // 与运算   
    default Predicate<T> and(Predicate<? super T> other) {
   
   
        Objects.requireNonNull(other);
        return (t) -> test(t) && other.test(t);
    }

    // 非
    default Predicate<T> negate() {
   
   
        return (t) -> !test(t);
    }

    // 或
    default Predicate<T> or(Predicate<? super T> other) {
   
   
        Objects.requireNonNull(other);
        return (t) -> test(t) || other.test(t);
    }

    // 判断相等
    static <T> Predicate<T> isEqual(Object targetRef) {
   
   
        return (null == targetRef)
                ? Objects::isNull
                : object -> targetRef.equals(object);
    }

      // 类似于上面Function的identity()方法
    @SuppressWarnings("unchecked")
    static <T> Predicate<T> not(Predicate<? super T> target) {
   
   
        Objects.requireNonNull(target);
        return (Predicate<T>)target.negate();
    }
}

也许你有一个疑问,为什么最后又会返回一个函数式接口?而不是使用直接执行,拿这个方法来说

 default MethodInterface andThen(MethodInterface methodInterface){
   
   
        // 想一下有什么区别
        //  print();
        //  return methodInterface;
        return ()->{
   
   
            print();
            methodInterface.print();;
        };
        // 不用lambda表达式,那么就是在andThen里面进行执行的,每次执行andThen的时候都会自动执行print方法,而且总体上每次             
        // methodInterface.print()会执行两次,那么methodInterface.print()不写的话最后需要再执行一次applay
        // 如果你是使用的lambda表达式返回,那么返回的是一个全新的接口,如果我们需要链式调用完,那么在最后还要执行一下
        // print方法,当然你也可以选择实现一个end()方法来表示结束,相当于另外起一个名
    }

对于直接返回method,最后再来执行一次,那么就像排队一样,一步一步执行,那么你调试的时候,就会频繁的在上下文跳来跳去。

如果你使用接口封装,那么调用print()相当于是一次组合,什么还不懂?

那么好,如果说我要用这个结合来做很多次(比如放入异步任务,当然这里不可能,我是说如果),你不用接口组合,而是用面向过程来实现

那么我们每次放进去都要放

a.andThen()
    .andThen()
    .andThen()
    .andThen()
    ...
    .andThen().print();

// 重新运行只能这样做
a.andThen()
    .andThen()
    .andThen()
    .andThen()
    ...
    .andThen().print();

那么如果我们是用组合

我们只需要这样做

MethodInterface mi=a.andThen()
    .andThen()
    .andThen()
    .andThen()
    ...
    .andThen();

// 每次要重复执行的时候只需要运行一下,一行代码解决
mi.print();

关于函数式编程在stream中的运用,我给大家推荐一篇文章,这里我就不做过多的讲解了,毕竟就是写调用的问题,上面的能看懂,那么我相信你看源代码的时候都能够迎刃而解!

FutureTask源码浅析

一个线程一个Thread对象,那么Thread对象的创建和销毁就太频繁了,那么有没有优化的方案呢?

之前我们不是将FutureTask给放入了Thread进行运行吗?也就是说FutureTask能够管理我们的任务方法

没错,我们可以用FutureTask来对线程进行封装,再放入Thread,每次任务结束的时候控制FutrueTask就可以了,完全不用管Thread对象的创建

那么FutureTask的任务创建是啥?

来看看源码吧,先来看看是如何做到Runnable和Callable之间的适配的

public FutureTask(Callable<V> callable) {
   
   
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;        // 如果是callable,那么直接静态代理,能够理解
        this.state = NEW;       // ensure visibility of callable
    }

public FutureTask(Runnable runnable, V result) {
   
   
    this.callable = Executors.callable(runnable, result);    // 如果是runnable,那么采用适配器模式转换一下
    /**
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    **/
    this.state = NEW;       // ensure visibility of callable
}

// runnable适配器
private static final class RunnableAdapter<T> implements Callable<T> {
   
   
        private final Runnable task;
        private final T result;
        RunnableAdapter(Runnable task, T result) {
   
   
            this.task = task;
            this.result = result;
        }
        public T call() {
   
   
            task.run();
            return result;    // 这里可以看到只是使用适配器模式对task进行一个代理,返回的result的值是固定的,当然有异常的话就没有返回
        }
        public String toString() {
   
   
            return super.toString() + "[Wrapped task = " + task + "]";
        }
    }
        public void run() {
   
   
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
   
   
            Callable<V> c = callable;
            if (c != null && state == NEW) {
   
   
                V result;
                boolean ran;
                try {
   
   
                    result = c.call();        // 不管是哪个类型,都适配成call来被代理
                    ran = true;
                } catch (Throwable ex) {
   
   
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
   
   
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

来看看FutureTask的继承

image-20230810122254018

但是找了一圈儿也没有找到一个可以设置callable的方法,都是初始化,那么就自己实现吧,参考上面的实现一个

package fyi.wzl.threadpool.threadimpl;

import org.springframework.util.ObjectUtils;

import java.util.concurrent.Callable;

public class CallableImpl<T> implements Callable {
   
   

    private Callable<T> callable;
    @Override
    public T call() throws Exception {
   
   
        if (ObjectUtils.isEmpty(callable)) return null;
        return callable.call();
    }

    public CallableImpl(Callable<T> callable) {
   
   
        this.callable = callable;
    }

    public CallableImpl(Runnable runnable,T res) {
   
   
        this.callable = new RunnableAdpter(runnable,res);
    }

    public CallableImpl<T> setCallable(Callable<T> callable) {
   
   
        this.callable = callable;
        return this;
    }

    public CallableImpl<T> setCallable(Runnable runnable,T res) {
   
   
        this.callable = new RunnableAdpter(runnable,res);
        return this;
    }

    private class RunnableAdpter implements Callable<T> {
   
   
        Runnable runnable;
        T res;

        public RunnableAdpter(Runnable runnable, T res) {
   
   
            this.runnable = runnable;
            this.res = res;
        }

        @Override
        public T call() throws Exception {
   
   
            this.runnable.run();
            return res;
        }
    }
}

当然,你也可以写一个CallableAdpter来适配Callable为Runnable

因为Java引用传递,那么我们可以通过修改callble来达到修改任务的目的,那么貌似不用FutureTask也可以了?当然你也可以实现一个类似FutrueTask的实现,这样就不用FutrueTask,直接交给Thread进行管理也可以。

那么FutrueTask的作用是啥?

FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提供 start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消。

再看看FutrueTask的源码,又有AQS这些东东,这些东西我们在以后的文章里面细说。

如何动态修改Thread的run方法 —— 线程池实现的猜想(结果肯定是事与愿违,只是鼠人的一种猜想罢了)

好了,刚才讲到,我们可以用FutrueTask来封装Runnable和Callable(那么我封装自己也可以吧),那么我们也可以手写一个实现来实现可变换任务的封装(其实这里不太严谨,但是为了照顾新手,就讲的简单点,我们直接使用)

  CallableImpl<Integer> callable = new CallableImpl<>(() -> {
   
   
            System.out.println("没改");
            return 1;
        });
        FutureTask futureTask = new FutureTask<Integer>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();
        callable.setCallable(()->{
   
   
            System.out.println("改了");
            return 2;
        });
        System.out.println(futureTask.get());

但是注意的是Thread中start只能执行一次,那么怎么做?

重写Thread来判断任务状态,然后没有任务的时候进行阻塞,有任务的时候进行唤醒,下面来简单实现一下(其实这里也可以实现一个任务队列,然后异步监控来判断实现notifyall)

public class ThreadpoolApplication {
   
   

    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
   
        CallableImpl<Integer> callable = new CallableImpl<>(() -> {
   
   
            System.out.println("没改,线程ID为:"+Thread.currentThread().getId());
            return 1;
        });
        ThreadImpl thread=new ThreadImpl(callable);
        // 线程开始后是死循环,没执行一次后进行阻塞,等到新任务进入后进再执行
        thread.start();
        AtomicInteger i= new AtomicInteger(0);
        while(true){
   
   
            // 这里是每次设定不同的任务
                callable.setCallable(thread.getId(),()->{
   
   
                    System.out.println("改了"+i.getAndIncrement()+"线程ID为:"+Thread.currentThread().getId());
                    return 2;
                });
        }
    }
}

来看一看对于Thread的重新实现吧

public class ThreadImpl extends Thread{
   
   

    // 自己实现的一个Callble接口,模仿FutrueTask实现
    CallableImpl callable=null;

    public ThreadImpl(CallableImpl callable) {
   
   
        this.callable = callable;
    }

    // 其实主要是对run方法进行的修改,这里有一点AOP那个味道了,好好好
    @Override
    public void run() {
   
   
        while (true){
   
   
            // 多线程代码
            String intern = String.valueOf(this.getId()).intern();
            synchronized (intern){
   
   
              try {
   
   
                  this.callable.call();
              } catch (Exception e) {
   
   
                  throw new RuntimeException(e);
              } finally {
   
   
                  try {
   
   
                      intern.wait();
                  } catch (InterruptedException e) {
   
   
                      throw new RuntimeException(e);
                  }
              }
          }

        }
    }
}
// 这个方法可能有点偏离原始实现Thread的思想,现在感觉应该是以实现RUnnable接口为主,难得改了,有兴趣的朋友可以写一下
public class CallableImpl<T> implements Callable {
   
   

    private Callable<T> callable;
    @Override
    public T call() throws Exception {
   
   
        if (ObjectUtils.isEmpty(callable)) return null;
        return callable.call();
    }

    public CallableImpl(Callable<T> callable) {
   
   
        this.callable = callable;
    }

    public CallableImpl(Runnable runnable,T res) {
   
   
        this.callable = new RunnableAdpter(runnable,res);
    }

    public CallableImpl<T> setCallable(long threadId,Callable<T> callable) throws InterruptedException {
   
   
        String id=String.valueOf(threadId).intern();
        synchronized (id) {
   
   
            this.callable = callable;
            id.intern().notify();
            // notify需要等待锁释放完成后才会对其它锁进行唤醒操作,不信的话可以把注释删掉跑一边
//            while (true){
   
   
//                if (id==null)break;
//            }
        }
        return this;
    }

    public CallableImpl<T> setCallable(Runnable runnable,T res) {
   
   
        this.callable = new RunnableAdpter(runnable,res);
        return this;
    }

    private class RunnableAdpter implements Callable<T> {
   
   
        Runnable runnable;
        T res;

        public RunnableAdpter(Runnable runnable, T res) {
   
   
            this.runnable = runnable;
            this.res = res;
        }

        @Override
        public T call() throws Exception {
   
   
            this.runnable.run();
            return res;
        }
    }
}

看看执行结果

image-20230810164019956

第四种线程创建方式 —— 线程池

核心概念

上面只是提到了对于Thread执行任务的一种动态实现方法,肯定还有其他的。

那么动态实现有什么好处呢?

当我们有很多个任务的时候,我们如果一直使用new,再让gc的话,那么对于系统资源的消耗无疑是巨大的。

那么这个时候,如果我们固定一下,专门拿几个线程来处理并发任务呢?但是当并发任务很多又该怎么办?

这个时候就引入了池化思想 —— Pool

什么是池?

在学JDBC的时候我们知道了连接池,在学Spring的时候,我们又接触到了对象池。

其实按理来说线程池应该是大家在初学JavaSE的时候应该就遇到的,这里我们再来讲一下。

线程池,就是用一个容器来管理线程,这个容器叫做池(Pool)。

pool里面的线程数量是固定的,我们拿着固定的线程数量区执行不同的任务,下面来看一个思维图。

image-20230810220812599

这里要引入几概念:

  • 最大线程数

    线程池允许创建的最大线程数量。

  • 核心线程数

    线程池维护的最小线程数量,核心线程创建后不会被回收(注意:设置allowCoreThreadTimeout=true后,空闲的核心线程超过存活时间也会被回收)。

  • 阻塞队列

    一个用于存放任务的队列

当最开始有任务到达的时候,会抢先占用核心线程,当核心线程占用满了以后进入任务队列,任务队列满了以后还有新的线程,那么启用临时线程来进行处理,注意的是临时线程不会处理阻塞队列中的任务,并且临时线程拥有一个存活时间,当长时间没有任务的时候,就会进行自动销毁。

如果临时线程也满了,就是说超过了最大线程数,这也代表着阻塞队列满了,那么采用拒绝策略。

拒绝策略

下面我们堆四大拒绝策略来展开说说

AbortPolicy

终止策略,这是ThreadPoolExecutor线程池默认的拒绝策略,程序将会抛出RejectedExecutionException异常。

    public static class AbortPolicy implements RejectedExecutionHandler {
   
   

        public AbortPolicy() {
   
    }


        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   
   
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

CallerRunsPolicy

调用者运行策略,线程池中没办法运行,那么就由提交任务的这个线程运行(嗯,也就相当于变成了原来的单线程串行执行)。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
   
   
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() {
   
    }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   
   
            if (!e.isShutdown()) {
   
   
                r.run();    // 直接就在这里运行了,并且不会爬出异常
            }
        }
    }

DiscardOldestPolicy

丢弃最早未处理请求策略,丢弃最先进入阻塞队列的任务以腾出空间让新的任务入队列。
有一点像滑动窗口那个感觉了

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
   
   

        public DiscardOldestPolicy() {
   
    }


        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   
   
            if (!e.isShutdown()) {
   
   
                e.getQueue().poll();
                e.execute(r);        // 在这里面执行了入队操作
            }
        }
    }

DiscardPolicy

丢弃策略,什么都不做,即丢弃新提交的任务。

    public static class DiscardPolicy implements RejectedExecutionHandler {
   
   

        public DiscardPolicy() {
   
    }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   
   
        }
    }

总结一下,上面其实可以看出一个优先级:

任务总是按照如下优先级进入:

核心线程>阻塞队列>临时线程>拒绝策略

创建线程池

说了这么多,那么我们如何创建线程池呢?

我们只需要new一个对象就可以了

ThreadPoolExecutor

这里我们来玩儿个好玩儿的,玩儿玩儿阻塞,哈哈,组省赛策略我们选择CallerRunsPlicy,也就是哪来的回哪里去

public class ThreadpoolApplication {
   
   

    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
   
        int corePoolSize=3;
        int maximumPoolSize=6;
        long keepAliveTime=7000L;
        TimeUnit unit=TimeUnit.MILLISECONDS;
        BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(10);
        ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
        int i=0;
        Thread local=Thread.currentThread();
        while (i++<3){
   
   
            int j=0;
            while(j++<6){
   
   
                executorService.execute(()->{
   
   
                    Thread thread = Thread.currentThread();
                    System.out.println(thread +"完成,时间:"+new Date());
                    try {
   
   
                        if (!thread.equals(local)){
   
   
                            Thread.sleep(5000);
                        }
                        else {
   
   
                            Thread.sleep(2000);
                        }
                    } catch (InterruptedException e) {
   
   
                        throw new RuntimeException(e);
                    }
                });
            }
        }
    }
}

我们可以看一下运行结果

Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[main,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:55:03 CST 2023
Thread[main,5,main]完成,时间:Fri Aug 11 05:55:05 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:55:08 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:55:13 CST 2023

可以看到我们在主线程也产生了阻塞,那么我们把阻塞时间给换一下,看看会发生啥(qwq,就是玩儿)

这个时候的输出如下

Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[main,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:56:43 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-6,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:56:45 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-4,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 05:56:47 CST 2023
Thread[pool-1-thread-5,5,main]完成,时间:Fri Aug 11 05:56:48 CST 2023

其实这里也可以看到,lambda表达式其实就是将函数封装成接口,但是让我们用函数的方法来书写,方便看,然后直接传入,其实任务的提交很快,先提交,再消费,所以说主线程就算阻塞了也不会影响其他线程的运行

其实出了execute以外还有一个方法。

executorService.submit

这里有一个submit和一个execute,有什么区别呢?

submit会有一个Futrue\<?>的返回值,我们可以通过Futrue\.get()来获取T类型的返回值,来看看演示吧

我们稍微改一下循环

        while (i++<3){
   
   
            int j=0;
            while(j++<6){
   
   
                int finalJ = j;
                Future<?> submit = executorService.submit(() -> {
   
   
                    Thread thread = Thread.currentThread();
                    System.out.println(thread + "完成,时间:" + new Date());
                    try {
   
   
                        if (!thread.equals(local)) {
   
   
                            Thread.sleep(5000);
                        } else {
   
   
                            Thread.sleep(2000);
                        }
                    } catch (InterruptedException e) {
   
   
                        throw new RuntimeException(e);
                    }
                    return finalJ;
                });
                System.out.println(String.valueOf(i)+","+submit.get()+",时间:"+new Date());
            }

运行结果

Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:10 CST 2023
1,1,时间:Fri Aug 11 06:18:15 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:18:15 CST 2023
1,2,时间:Fri Aug 11 06:18:20 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:18:20 CST 2023
1,3,时间:Fri Aug 11 06:18:25 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:25 CST 2023
1,4,时间:Fri Aug 11 06:18:30 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:18:30 CST 2023
1,5,时间:Fri Aug 11 06:18:35 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:18:35 CST 2023
1,6,时间:Fri Aug 11 06:18:40 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:40 CST 2023
2,1,时间:Fri Aug 11 06:18:45 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:18:45 CST 2023
2,2,时间:Fri Aug 11 06:18:50 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:18:50 CST 2023
2,3,时间:Fri Aug 11 06:18:55 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:18:55 CST 2023
2,4,时间:Fri Aug 11 06:19:00 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:19:00 CST 2023
2,5,时间:Fri Aug 11 06:19:05 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:19:05 CST 2023
2,6,时间:Fri Aug 11 06:19:10 CST 2023
Thread[pool-1-thread-1,5,main]完成,时间:Fri Aug 11 06:19:10 CST 2023
3,1,时间:Fri Aug 11 06:19:15 CST 2023
Thread[pool-1-thread-2,5,main]完成,时间:Fri Aug 11 06:19:15 CST 2023
3,2,时间:Fri Aug 11 06:19:20 CST 2023
Thread[pool-1-thread-3,5,main]完成,时间:Fri Aug 11 06:19:20 CST 2023
...

通过调试,发现get方法是阻塞的,没错和上面FutrueTask一样,因为它们都实现于Futrue接口,当然各自也有实现。

那么这里我们如何才能异步获取结果而不印象主线程的运行呢,把Futrue放在一个数组里面就可以了,哈哈。

这里其实也有一些小细节,看代码都能理解,但还是得根据实际来解释

public class ThreadpoolApplication {
   
   
    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
   
        int corePoolSize=3;
        int maximumPoolSize=6;
        long keepAliveTime=7000L;
        TimeUnit unit=TimeUnit.MILLISECONDS;
        BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(10);
        ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
        int i=0;
        Thread local=Thread.currentThread();
        List<List<Future>> list=new ArrayList<>();
        list.add(new ArrayList<>());
        System.out.println("任务提交开始,时间:"+new Date().getTime());
        while (i++<3){
   
   
            list.add(new ArrayList<>());
            int j=0;
            while(j++<6){
   
   
                int finalJ = j;
                int finalI1 = i;
                System.out.println(finalI1 +","+finalJ+","+Thread.currentThread()+"提交,时间:" + new Date().getTime());
                Future<?> submit = executorService.submit(() -> {
   
   
                    Thread thread = Thread.currentThread();
                    try {
   
   
                        System.out.println(finalI1 +","+finalJ+","+thread +"开始处理,时间:" + new Date().getTime());
                        if (!thread.equals(local)) {
   
   
                            Thread.sleep(5000);
                        } else {
   
   
                            Thread.sleep(2000);
                        }
                    } catch (InterruptedException e) {
   
   
                        throw new RuntimeException(e);
                    } finally {
   
   
                        System.out.println(finalI1 +","+finalJ+","+thread + "完成,时间:" + new Date().getTime());
                    }
                    return finalJ;
                });
                list.get(i).add(submit);
            }
        }
        System.out.println("任务提交完成,时间:"+new Date().getTime());
        for (i=1;i<list.size();i++){
   
   
            int finalI = i;
            list.get(i).forEach(v->{
   
   
                try {
   
   
                    System.out.println(finalI+","+v.get()+",获取结果,时间:"+new Date().getTime());
                } catch (InterruptedException e) {
   
   
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
   
   
                    throw new RuntimeException(e);
                }
            });
        }
    }
}

来看看结果(时间戳前面的几位1691710我就删了,都一样,懒得看)

任务提交开始,时间:1691713108082
1,1,Thread[main,5,main]提交,时间:1691713108082
1,2,Thread[main,5,main]提交,时间:1691713108161
1,3,Thread[main,5,main]提交,时间:1691713108161
1,1,Thread[pool-1-thread-1,5,main]开始处理,时间:1691713108161
1,2,Thread[pool-1-thread-2,5,main]开始处理,时间:1691713108161
1,4,Thread[main,5,main]提交,时间:1691713108161
1,5,Thread[main,5,main]提交,时间:1691713108161
1,6,Thread[main,5,main]提交,时间:1691713108162
1,3,Thread[pool-1-thread-3,5,main]开始处理,时间:1691713108162
2,1,Thread[main,5,main]提交,时间:1691713108162
2,2,Thread[main,5,main]提交,时间:1691713108162
2,3,Thread[main,5,main]提交,时间:1691713108162
2,4,Thread[main,5,main]提交,时间:1691713108162
2,5,Thread[main,5,main]提交,时间:1691713108162
2,6,Thread[main,5,main]提交,时间:1691713108162
3,1,Thread[main,5,main]提交,时间:1691713108162
3,2,Thread[main,5,main]提交,时间:1691713108162
3,3,Thread[main,5,main]提交,时间:1691713108163
3,4,Thread[main,5,main]提交,时间:1691713108163
3,2,Thread[pool-1-thread-4,5,main]开始处理,时间:1691713108163
3,5,Thread[main,5,main]提交,时间:1691713108163
3,5,Thread[main,5,main]开始处理,时间:1691713108163
3,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691713108163
3,4,Thread[pool-1-thread-6,5,main]开始处理,时间:1691713108163
3,5,Thread[main,5,main]完成,时间:1691713110163
3,6,Thread[main,5,main]提交,时间:1691713110163
3,6,Thread[main,5,main]开始处理,时间:1691713110163
3,6,Thread[main,5,main]完成,时间:1691713112164
任务提交完成,时间:1691713112164
1,1,Thread[pool-1-thread-1,5,main]完成,时间:1691713113162
1,2,Thread[pool-1-thread-2,5,main]完成,时间:1691713113162
1,1,获取结果,时间:1691713113162
1,4,Thread[pool-1-thread-1,5,main]开始处理,时间:1691713113162
1,5,Thread[pool-1-thread-2,5,main]开始处理,时间:1691713113162
1,2,获取结果,时间:1691713113162
1,3,Thread[pool-1-thread-3,5,main]完成,时间:1691713113163
1,6,Thread[pool-1-thread-3,5,main]开始处理,时间:1691713113163
1,3,获取结果,时间:1691713113163
3,2,Thread[pool-1-thread-4,5,main]完成,时间:1691713113164
3,4,Thread[pool-1-thread-6,5,main]完成,时间:1691713113164
3,3,Thread[pool-1-thread-5,5,main]完成,时间:1691713113164
2,2,Thread[pool-1-thread-6,5,main]开始处理,时间:1691713113164
2,1,Thread[pool-1-thread-4,5,main]开始处理,时间:1691713113164
2,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691713113164
1,4,Thread[pool-1-thread-1,5,main]完成,时间:1691713118163
1,5,Thread[pool-1-thread-2,5,main]完成,时间:1691713118163
2,4,Thread[pool-1-thread-1,5,main]开始处理,时间:1691713118163
1,4,获取结果,时间:1691713118163
2,5,Thread[pool-1-thread-2,5,main]开始处理,时间:1691713118163
1,5,获取结果,时间:1691713118163
1,6,Thread[pool-1-thread-3,5,main]完成,时间:1691713118164
2,6,Thread[pool-1-thread-3,5,main]开始处理,时间:1691713118164
1,6,获取结果,时间:1691713118164
2,1,Thread[pool-1-thread-4,5,main]完成,时间:1691713118165
2,3,Thread[pool-1-thread-5,5,main]完成,时间:1691713118165
2,2,Thread[pool-1-thread-6,5,main]完成,时间:1691713118165
2,1,获取结果,时间:1691713118165
3,1,Thread[pool-1-thread-4,5,main]开始处理,时间:1691713118165
2,2,获取结果,时间:1691713118165
2,3,获取结果,时间:1691713118165
2,4,Thread[pool-1-thread-1,5,main]完成,时间:1691713123164
2,5,Thread[pool-1-thread-2,5,main]完成,时间:1691713123164
2,4,获取结果,时间:1691713123164
2,5,获取结果,时间:1691713123164
2,6,Thread[pool-1-thread-3,5,main]完成,时间:1691713123165
2,6,获取结果,时间:1691713123165
3,1,Thread[pool-1-thread-4,5,main]完成,时间:1691713123166
3,1,获取结果,时间:1691713123166
3,2,获取结果,时间:1691713123166
3,3,获取结果,时间:1691713123166
3,4,获取结果,时间:1691713123166
3,5,获取结果,时间:1691713123166
3,6,获取结果,时间:1691713123166

从这个结果我们可以看出我们把全部把任务提交后任务,接下来我们跟着结果来分析一下

从上面可以看出

核心线程如下

  • pool-1-thread-1
  • pool-1-thread-2
  • pool-1-thread-3

临时线程

  • pool-1-thread-4

  • pool-1-thread-5

  • pool-1-thread-6

    这里为了方便看结果,我简单处理了下文本

任务提交开始,时间:08082
1,1,从【主线程】提交,时间:08082
1,2,从【主线程】提交,时间:08161
1,3,从【主线程】提交,时间:08161
1,1,在 【核心线程1】(线程1)开始处理,时间:08161
1,2,在 【核心线程2】(线程2)开始处理,时间:08161
# 后面的10个进程进入阻塞队列
1,4,从【主线程】提交,时间:08161
1,5,从【主线程】提交,时间:08161
1,6,从【主线程】提交,时间:08162
1,3,在 【核心线程3】(线程3)开始处理,时间:08162
2,1,从【主线程】提交,时间:08162
2,2,从【主线程】提交,时间:08162
2,3,从【主线程】提交,时间:08162
2,4,从【主线程】提交,时间:08162
2,5,从【主线程】提交,时间:08162
2,6,从【主线程】提交,时间:08162
3,1,从【主线程】提交,时间:08162
# 阻塞队列满了
3,2,从【主线程】提交,时间:08162
3,3,从【主线程】提交,时间:08163
3,4,从【主线程】提交,时间:08163
3,2,在 【临时线程1】(线程4)开始处理,时间:08163
3,5,从【主线程】提交,时间:08163        # 核心线程、阻塞队列、临时线程都满了,执行拒绝策略
3,5,从【主线程】开始处理,时间:08163       # 在 主线程 处理
3,3,在 【临时线程2】(线程5)开始处理,时间:08163
3,4,在 【临时线程3】(线程6)开始处理,时间:08163
3,5,从【主线程】完成,时间:10163
3,6,从【主线程】提交,时间:10163
3,6,从【主线程】开始处理,时间:10163
3,6,从【主线程】完成,时间:12164        # 注意,这里开始主线程的任务完了
任务提交完成,时间:12164
1,1,在 【核心线程1】(线程1)完成,时间:13162
1,2,在 【核心线程2】(线程2)完成,时间:13162
1,1,获取结果,时间:13162
1,4,在 【核心线程1】(线程1)开始处理,时间:13162
1,5,在 【核心线程2】(线程2)开始处理,时间:13162
1,2,获取结果,时间:13162
1,3,在 【核心线程3】(线程3)完成,时间:13163
1,6,在 【核心线程3】(线程3)开始处理,时间:13163
1,3,获取结果,时间:13163
3,2,在 【临时线程1】(线程4)完成,时间:13164    # 临时线程没有销毁又进入临时线程了
3,4,在 【临时线程3】(线程6)完成,时间:13164
3,3,在 【临时线程2】(线程5)完成,时间:13164
2,2,在 【临时线程3】(线程6)开始处理,时间:13164
2,1,在 【临时线程1】(线程4)开始处理,时间:13164
2,3,在 【临时线程2】(线程5)开始处理,时间:13164
1,4,在 【核心线程1】(线程1)完成,时间:18163
1,5,在 【核心线程2】(线程2)完成,时间:18163
2,4,在 【核心线程1】(线程1)开始处理,时间:18163
1,4,获取结果,时间:18163
2,5,在 【核心线程2】(线程2)开始处理,时间:18163
1,5,获取结果,时间:18163
1,6,在 【核心线程3】(线程3)完成,时间:18164
2,6,在 【核心线程3】(线程3)开始处理,时间:18164
1,6,获取结果,时间:18164
2,1,在 【临时线程1】(线程4)完成,时间:18165
2,3,在 【临时线程2】(线程5)完成,时间:18165
2,2,在 【临时线程3】(线程6)完成,时间:18165
2,1,获取结果,时间:18165
3,1,在 【临时线程1】(线程4)开始处理,时间:18165
2,2,获取结果,时间:18165
2,3,获取结果,时间:18165
2,4,在 【核心线程1】(线程1)完成,时间:23164
2,5,在 【核心线程2】(线程2)完成,时间:23164
2,4,获取结果,时间:23164
2,5,获取结果,时间:23164
2,6,在 【核心线程3】(线程3)完成,时间:23165
2,6,获取结果,时间:23165
3,1,在 【临时线程1】(线程4)完成,时间:23166
3,1,获取结果,时间:23166
3,2,获取结果,时间:23166
3,3,获取结果,时间:23166
3,4,获取结果,时间:23166
3,5,获取结果,时间:23166
3,6,获取结果,时间:23166

从上面可以看出当核心线程和临时线程同时存在,如果这个时候开始消费阻塞队列里的任务是,临时线程可以和核心线程一起处理,神奇!

核心线程会被销毁吗?

那么我们再来想想,这里我们如果没有任务进入的话,那么核心线程会不会销毁?

试一试。

我们来写一个方法,来获取线程池里面的信息

    public static void printThreadPloolInfo(ThreadPoolExecutor threadPoolExecutor){
   
   
        System.out.println("当前线程池大小:"+threadPoolExecutor.getPoolSize()+
                " 核心线程数:"+threadPoolExecutor.getCorePoolSize()+
                " 活跃线程数:"+threadPoolExecutor.getActiveCount());
    }

同样,修改一下线程池创建的代码

 ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,new ThreadPoolExecutor.CallerRunsPolicy());

最后的代码:

public class ThreadpoolApplication {
   
   
    public static void printThreadPloolInfo(ThreadPoolExecutor threadPoolExecutor){
   
   
        System.out.println("当前线程池大小:"+threadPoolExecutor.getPoolSize()+
                " 核心线程数:"+threadPoolExecutor.getCorePoolSize()+
                " 活跃线程数:"+threadPoolExecutor.getActiveCount());
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
   
        int corePoolSize=3;
        int maximumPoolSize=6;
        long keepAliveTime=7000L;
        TimeUnit unit=TimeUnit.MILLISECONDS;
        BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(10);
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
        int i=0;
        Thread local=Thread.currentThread();
        List<List<Future>> list=new ArrayList<>();
        list.add(new ArrayList<>());
        System.out.println("任务提交开始,时间:"+new Date().getTime());
        while (i++<3){
   
   
            list.add(new ArrayList<>());
            int j=0;
            while(j++<6){
   
   
                int finalJ = j;
                int finalI1 = i;
                System.out.println(finalI1 +","+finalJ+","+Thread.currentThread()+"提交,时间:" + new Date().getTime());
                Future<?> submit = executorService.submit(() -> {
   
   
                    Thread thread = Thread.currentThread();
                    try {
   
   
                        System.out.println(finalI1 +","+finalJ+","+thread +"开始处理,时间:" + new Date().getTime());
                        if (!thread.equals(local)) {
   
   
                            Thread.sleep(5000);
                        } else {
   
   
                            Thread.sleep(2000);
                        }
                    } catch (InterruptedException e) {
   
   
                        throw new RuntimeException(e);
                    } finally {
   
   
                        System.out.println(finalI1 +","+finalJ+","+thread + "完成,时间:" + new Date().getTime());
                    }
                    return finalJ;
                });
                list.get(i).add(submit);
            }
            printThreadPloolInfo(executorService);
        }
        System.out.println("任务提交完成,时间:"+new Date().getTime());
        for (i=1;i<list.size();i++){
   
   
            int finalI = i;
            list.get(i).forEach(v->{
   
   
                try {
   
   
                    System.out.println(finalI+","+v.get()+",获取结果,时间:"+new Date().getTime());
                } catch (InterruptedException e) {
   
   
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
   
   
                    throw new RuntimeException(e);
                }
            });
        }
        long current = new Date().getTime();
        while(true){
   
   
            printThreadPloolInfo(executorService);
            if(new Date().getTime()-current>10*1000)break;
            Thread.sleep(500);
        }
        return;
    }
}

结果:

任务提交开始,时间:1691722972284
1,1,Thread[main,5,main]提交,时间:1691722972284
1,2,Thread[main,5,main]提交,时间:1691722972339
1,1,Thread[pool-1-thread-1,5,main]开始处理,时间:1691722972339
1,3,Thread[main,5,main]提交,时间:1691722972339
1,2,Thread[pool-1-thread-2,5,main]开始处理,时间:1691722972339
1,4,Thread[main,5,main]提交,时间:1691722972339
1,5,Thread[main,5,main]提交,时间:1691722972339
1,6,Thread[main,5,main]提交,时间:1691722972339
1,3,Thread[pool-1-thread-3,5,main]开始处理,时间:1691722972339
当前线程池大小:3 核心线程数:3 活跃线程数:3
2,1,Thread[main,5,main]提交,时间:1691722972340
2,2,Thread[main,5,main]提交,时间:1691722972340
2,3,Thread[main,5,main]提交,时间:1691722972340
2,4,Thread[main,5,main]提交,时间:1691722972340
2,5,Thread[main,5,main]提交,时间:1691722972340
2,6,Thread[main,5,main]提交,时间:1691722972340
当前线程池大小:3 核心线程数:3 活跃线程数:3
3,1,Thread[main,5,main]提交,时间:1691722972340
3,2,Thread[main,5,main]提交,时间:1691722972340
3,3,Thread[main,5,main]提交,时间:1691722972340
3,2,Thread[pool-1-thread-4,5,main]开始处理,时间:1691722972340
3,4,Thread[main,5,main]提交,时间:1691722972340
3,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691722972341
3,5,Thread[main,5,main]提交,时间:1691722972341
3,5,Thread[main,5,main]开始处理,时间:1691722972341
3,4,Thread[pool-1-thread-6,5,main]开始处理,时间:1691722972341
3,5,Thread[main,5,main]完成,时间:1691722974341
3,6,Thread[main,5,main]提交,时间:1691722974341
3,6,Thread[main,5,main]开始处理,时间:1691722974341
3,6,Thread[main,5,main]完成,时间:1691722976342
当前线程池大小:6 核心线程数:3 活跃线程数:6
任务提交完成,时间:1691722976342
1,2,Thread[pool-1-thread-2,5,main]完成,时间:1691722977340
1,3,Thread[pool-1-thread-3,5,main]完成,时间:1691722977340
1,1,Thread[pool-1-thread-1,5,main]完成,时间:1691722977340
1,4,Thread[pool-1-thread-3,5,main]开始处理,时间:1691722977340
1,5,Thread[pool-1-thread-2,5,main]开始处理,时间:1691722977340
1,1,获取结果,时间:1691722977340
1,6,Thread[pool-1-thread-1,5,main]开始处理,时间:1691722977340
1,2,获取结果,时间:1691722977340
1,3,获取结果,时间:1691722977340
3,4,Thread[pool-1-thread-6,5,main]完成,时间:1691722977342
3,2,Thread[pool-1-thread-4,5,main]完成,时间:1691722977342
2,2,Thread[pool-1-thread-4,5,main]开始处理,时间:1691722977342
3,3,Thread[pool-1-thread-5,5,main]完成,时间:1691722977342
2,1,Thread[pool-1-thread-6,5,main]开始处理,时间:1691722977342
2,3,Thread[pool-1-thread-5,5,main]开始处理,时间:1691722977342
1,5,Thread[pool-1-thread-2,5,main]完成,时间:1691722982340
1,6,Thread[pool-1-thread-1,5,main]完成,时间:1691722982340
1,4,Thread[pool-1-thread-3,5,main]完成,时间:1691722982340
2,5,Thread[pool-1-thread-1,5,main]开始处理,时间:1691722982340
2,4,Thread[pool-1-thread-2,5,main]开始处理,时间:1691722982340
1,4,获取结果,时间:1691722982340
2,6,Thread[pool-1-thread-3,5,main]开始处理,时间:1691722982340
1,5,获取结果,时间:1691722982340
1,6,获取结果,时间:1691722982340
2,2,Thread[pool-1-thread-4,5,main]完成,时间:1691722982342
2,1,Thread[pool-1-thread-6,5,main]完成,时间:1691722982342
3,1,Thread[pool-1-thread-4,5,main]开始处理,时间:1691722982342
2,1,获取结果,时间:1691722982342
2,3,Thread[pool-1-thread-5,5,main]完成,时间:1691722982342
2,2,获取结果,时间:1691722982342
2,3,获取结果,时间:1691722982342
2,6,Thread[pool-1-thread-3,5,main]完成,时间:1691722987341
2,4,Thread[pool-1-thread-2,5,main]完成,时间:1691722987341
2,4,获取结果,时间:1691722987341
2,5,Thread[pool-1-thread-1,5,main]完成,时间:1691722987341
2,5,获取结果,时间:1691722987341
2,6,获取结果,时间:1691722987341
3,1,Thread[pool-1-thread-4,5,main]完成,时间:1691722987343
3,1,获取结果,时间:1691722987343
3,2,获取结果,时间:1691722987343
3,3,获取结果,时间:1691722987343
3,4,获取结果,时间:1691722987343
3,5,获取结果,时间:1691722987343
3,6,获取结果,时间:1691722987343
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0

我们处理一下

当前线程池大小:3 核心线程数:3 活跃线程数:3
当前线程池大小:6 核心线程数:3 活跃线程数:6
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:6 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0
当前线程池大小:3 核心线程数:3 活跃线程数:0

可以看到,核心线程并不会被销毁,所谓核心线程,只是保证了线程池的最小线程数而已。

那么核心线程能否被回收呢?

我们可以看到构造器中有一个方法

executorService.allowCoreThreadTimeOut(true);

那我们开启试一试

我们会看到最后又有新的东西出现

当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:4 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0
当前线程池大小:0 核心线程数:3 活跃线程数:0

这里也就是说corePool只是一个阙值,用户可以自定义是否销毁还是不销毁。

核心线程只是按照开发者的想法起的一个名称而已,和临时线程没有什么区别。

那么线程存放在哪里的呢?

线程的存放——深入剖析ThreadPoolExecutor源码

这个地方我们跟随源代码来看一看。

我们在ThreadPoolExecutor的源码里面发现了一个东西

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

看到没有,线程其实是存放在一个HashSet里面去的

我们来看看submit和execute

submit()和execute()

  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
   
   
        return new FutureTask<T>(callable);
    }

public <T> Future<T> submit(Callable<T> task) {
   
   
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);    // submit也会交给execute进行处理
        return ftask;
    }
class Integer{
   
   
       /**
     * The number of bits used to represent an {@code int} value in two's
     * complement binary form.
     *
     * @since 1.5
     */
    @Native public static final int SIZE = 32;
}

/**

主要的池控制状态ctl是一个原子整数,包含两个概念字段:

workerCount,表示有效线程数。
runState,表示运行状态,关闭状态等。

**/

// 运行状态存储在高位比特中
/***
RUNNING:运行中的状态,所有线程都可接受新任务。
SHUTDOWN:关闭状态,不再接受新任务,但会处理已排队的任务。
STOP:停止状态,不再接受新任务,不处理已排队的任务,且会中断正在执行的任务。
TIDYING:整理状态,所有任务已终止,线程池正在清理资源。
TERMINATED:终止状态,线程池已经终止,所有任务已完成。
***/
private static final int COUNT_BITS = Integer.SIZE - 3;


// 这个地方参考HashMap如何通过哈希寻找位置,COUNT_BITS是哈希掩码,https://www.wzl1.top/2023/07/21/996/
// 看到这里你以为是这样?放屁!还记得计算机网络如何通过网络掩码来计算网络号和主机号吗?这个就是相似的原理
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static int workerCountOf(int c)  {
   
    return c & CAPACITY; }

public void execute(Runnable command) {
   
   
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
   
       // 当前任务数量小于核心线程数
            if (addWorker(command, true))    // 将任务加入到核心线程池里面
                return;
            c = ctl.get();           
            /**
                private static boolean isRunning(int c) {
                           return c < SHUTDOWN;
                    }
            **/
            // 判断运行状态,并且加入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
   
   
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);    // 那我不执行,没问题吧,具体的addworker代码看后文
        }
        else if (!addWorker(command, false))//阻塞队列满了,启动非核心
            reject(command);
    }

    private volatile RejectedExecutionHandler handler;

    final void reject(Runnable command) {
   
   
        handler.rejectedExecution(command, this);    // 拒绝策略
    }

    // 拒绝策略的实现就看上面了
    public interface RejectedExecutionHandler {
   
   
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }

我们再来看看addWork

addWork()

/*
runState提供了主要的生命周期控制,可以取以下值:

RUNNING:接受新任务并处理排队的任务。
SHUTDOWN:不接受新任务,但处理排队的任务。
STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务。
TIDYING:所有任务已终止,workerCount为零,转换到TIDYING状态的线程将运行terminated()钩子方法。
TERMINATED:terminated()已完成。
这些值之间的数值顺序很重要,以允许有序比较。runState随时间单调递增,但不必达到每个状态。转换如下:

RUNNING -> SHUTDOWN:调用shutdown()时,可能隐式地在finalize()中调用。
(RUNNING或SHUTDOWN) -> STOP:调用shutdownNow()时。
SHUTDOWN -> TIDYING:队列和池都为空时。
STOP -> TIDYING:池为空时。
TIDYING -> TERMINATED:完成了terminated()钩子方法。
*/


private static final int RUNNING    = -1 << COUNT_BITS;        // 运行
private static final int SHUTDOWN   =  0 << COUNT_BITS;        // 关闭
private static final int STOP       =  1 << COUNT_BITS;        // 暂停
private static final int TIDYING    =  2 << COUNT_BITS;        // 整理
private static final int TERMINATED =  3 << COUNT_BITS;        // 终止

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Packing and unpacking ctl
    // CAPACITY = 00000000011111111111111111(前面自动补零,有30个左右的零,后面的1同理)
    private static int runStateOf(int c)     {
   
    return c & ~CAPACITY; }        // 取高位
    private static int workerCountOf(int c)  {
   
    return c & CAPACITY; }        // 取低位
    private static int ctlOf(int rs, int wc) {
   
    return rs | wc; }            // 合并

    private boolean compareAndIncrementWorkerCount(int expect) {
   
   
        return ctl.compareAndSet(expect, expect + 1);
    }

private boolean addWorker(Runnable firstTask, boolean core) {
   
   
        retry:
        for (;;) {
   
   
            int c = ctl.get();
            int rs = runStateOf(c);            // 获取运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))    
                // 如果不再接受任务的状态,并且满足下面条件之一直接return false
                // 1. 当前状态不是shutdown (shutdown都不是了,那么我也不能接受新的任务)
                // 2. firstTask不为空    (在这个条件满足之前,一定会满足为非运行态并且是shutdown状态,虽然你不会空,但是我也无法接受任务了啊)
                // 3. workQueue为空(好好好,shutdown状态,firstTask为空,这个时候没有workQueue了,那么我也不用处理了对不对)
                // rs != shutdown || firstTask != null || workQueue.isEmpty()
                // 唯一能够进行下去的就是, rs是shutdown的状态,fitstTask为空,并且有任务可以处理
                return false;

            for (;;) {
   
   
                int wc = workerCountOf(c);        // 获取任务数量
                if (wc >= CAPACITY ||            // OOM溢出,CAPACITY虽然是掩码,每一位为1,那么是不是可以看成任务数量的最大值
                    wc >= (core ? corePoolSize : maximumPoolSize))    // 加入核心线程还是临时线程
                    return false;
                if (compareAndIncrementWorkerCount(c))    // 如果这个时候c的值没有变化,那么自增(CAS乐观锁)
                    break retry;    // 语法糖,跳出retry,的所有循环,直接进入try那里,和goto类似,你在这里可以看作是对外层循环的控制
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)    // 运行状态改变,重试
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
   
   
            w = new Worker(firstTask);        // 新建一个work
            /**
            这里是构造函数,内部使用的工厂模式来创建的线程
          Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

            **/
            final Thread t = w.thread;    // 获取线程
            if (t != null) {
   
   
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
   
   
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());    // 再次看运行状态

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
   
   
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);        // 放入work集合
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;    // 修改最大线程数
                        workerAdded = true;
                    }
                } finally {
   
   
                    mainLock.unlock();
                }
                if (workerAdded) {
   
   
                    t.start();        // 执行线程
                    workerStarted = true;
                }
            }
        } finally {
   
   
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

对于Worker,来说,有一个现成制造工厂,其实我们可以在线程池创建的时候去设定,这里我们看看默认工厂是如何实现的

DefaultThreadFactor

static class DefaultThreadFactory implements ThreadFactory {
   
   
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
   
   
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
   
   
        // 在这里,我们把runnable传进去的
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);    // 默认模式不开启守护线程
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

而这里的r,我们从worker里面传入的是this

 Worker(Runnable firstTask) {
   
   
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

我这里直接把Worker这个类给放出来

Worker

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
   
   
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
   
   
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
   
   
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
   
   
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
   
   
        if (compareAndSetState(0, 1)) {
   
   
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
   
   
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        {
   
    acquire(1); }
    public boolean tryLock()  {
   
    return tryAcquire(1); }
    public void unlock()      {
   
    release(1); }
    public boolean isLocked() {
   
    return isHeldExclusively(); }

    void interruptIfStarted() {
   
   
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
   
   
            try {
   
   
                t.interrupt();
            } catch (SecurityException ignore) {
   
   
            }
        }
    }
}

既然是这样,那么t.start()一定会调用run()方法,我们再来看看runWoker(this)

runWokrer()

final void runWorker(Worker w) {
   
   
    Thread wt = Thread.currentThread();    // 获取当前线程信息
    Runnable task = w.firstTask;        // 获取当前Worker任务
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
   
   
        // 从当前的worker里面获取,或者从workQueue里面获取
        while (task != null || (task = getTask()) != null) {
   
   
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
   
   
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
   
   
                    task.run();            // 执行这个任务
                } catch (RuntimeException x) {
   
   
                    thrown = x; throw x;
                } catch (Error x) {
   
   
                    thrown = x; throw x;
                } catch (Throwable x) {
   
   
                    thrown = x; throw new Error(x);
                } finally {
   
   
                    afterExecute(task, thrown);
                }
            } finally {
   
   
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
   
   
        processWorkerExit(w, completedAbruptly);
    }
}

好了,现在我们知道大概的调用流程了

submit()->execute()->addWorker()->new Worker()->t=worker.thread=ThreadFactory.newThread(this)->t.start()执行worker.run()->runwark(this)->this.task.run()

总得来说就是把当前任务交给了worker来进行代理,那么阻塞队列的消费呢?上面的逻辑是在addWorker成功的情况下,如果失败了,就放在workQueue里面,那个offer()方法(祝大家拿到心仪的offer),然后我们在addWorker(null)。

那么我们找找workQueue.take()在哪里。

getTask()

private Runnable getTask() {
   
   
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
   
   
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
   
   
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);    // 获取线程数

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;    // 判断可用线程是否有时间限制

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
   
   
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
   
   
            /**
            public E poll(long timeout, TimeUnit unit) throws 
        参数:此方法采用两个强制性参数:
            timeout–等待的时间,以单位为单位。
            unit–超时参数的TimeUnit。
        返回值:此方法从此LinkedBlockingQueue的头部检索并删除元素,如果在元素可用之前经过了指定的等待时间,则为null。
        异常:如果在等待元素可用时方法被中断,则此方法将引发InterruptedException。
            **/
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    //如果我们等待任务的时间都已经超过了线程存活时间,那么也没有必要                                                                           处理了,直接处理null就行
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
   
   
            timedOut = false;
        }
    }
}

那么在那里又用到了getTask()呢?

哦,是runWorker(),也就是是说上面我们的流程又要变一下了。

submit()->execute()->addWorker()(如果添加失败的话放入队列,addWorker(null))->new Worker()->t=worker.thread=ThreadFactory.newThread(this)->t.start()执行worker.run()->runwark(this) (如果task是null的话从阻塞队列里面拿队首的来执行)->this.task.run()

那么问题又来了,我们是如何保证一定是被指定的线程执行的呢?

这就要从我们的线程是从哪里来的说起,我们创建线程的时候是在addWorker()方法,然后通过线程工厂,但是默认线程工厂的代码及其简单,给我感觉很强的应该是里面的ThreadGroup,我们看看什么时候用到了。

原来是构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
   
   
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

再来回顾下DefaultThreadFactor

static class DefaultThreadFactory implements ThreadFactory {
   
   
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
   
   
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
   
   
        // 在这里,我们把runnable传进去的
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);    // 默认模式不开启守护线程
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

那么好吧,我们继续看看ThreadGroup

ThreadGroup

这一块就是Java SE的内容了,不做过多的阐述

public class ThreadGroup implements Thread.UncaughtExceptionHandler {
   
   
    private final ThreadGroup parent;
    String name;
    int maxPriority;
    boolean destroyed;
    boolean daemon;
    boolean vmAllowSuspension;

    int nUnstartedThreads = 0;
    int nthreads;
    Thread threads[];

    int ngroups;
    ThreadGroup groups[];

    ...    // 忽略其他方法
}

原来这里秒你就有一个线程数组,那么ThreadGroup哪里有用?

Thread的一个构造方法

public Thread(ThreadGroup group, Runnable target, String name,
              long stackSize) {
   
   
    init(group, target, name, stackSize);
}

我们再来看看init

private void init(ThreadGroup g, Runnable target, String name,
                  long stackSize) {
   
   
    init(g, target, name, stackSize, null, true);
}

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
   
   
        if (name == null) {
   
   
            throw new NullPointerException("name cannot be null");
        }

        this.name = name;

        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
   
   
            /* Determine if it's an applet or not */

            /* If there is a security manager, ask the security manager
               what to do. */
            if (security != null) {
   
   
                g = security.getThreadGroup();
            }

            /* If the security doesn't have a strong opinion of the matter
               use the parent thread group. */
            if (g == null) {
   
   
                g = parent.getThreadGroup();
            }
        }

        /* checkAccess regardless of whether or not threadgroup is
           explicitly passed in. */
        g.checkAccess();

        /*
         * Do we have the required permissions?
         */
        if (security != null) {
   
   
            if (isCCLOverridden(getClass())) {
   
   
                security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
            }
        }

        g.addUnstarted();
        // 对新的这个线程初始化
        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }

但是,你这只是说明了线程组啊,线程数量是在哪里限制的呢?addWork()

那么如何将任务退出的呢?

我们碰一碰processWorkerExit()吧

processWorkerExit()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   
   
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
   
   
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
   
   
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
   
   
        if (!completedAbruptly) {
   
   
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

从上面可以看出,线程池的本质其实是一直控制new的线程的数量,只不过线程的ID、名称一直没有变过,所以是给人一种只是切换任务的错觉。

Executors

另外JUC为我们提供了一个构造器Executors,之里面有几种线程池的实现,底层还是靠的new ThreadPoolExecutor(),但是《阿里巴巴开发手册》里面不建议这样做,这里还是简单的介绍一下。

Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

newFiexedThreadPool(int Threads)

创建固定数目线程的线程池。

  • newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程。

  • 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子

  • 和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器

  • 从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
    fixed池线程数固定,并且是0秒IDLE(无IDLE)

    cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE

    public static ExecutorService newFixedThreadPool(int nThreads) {
   
   
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
   
   
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

  • 缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中
  • 缓存型池子通常用于执行一些生存期很短的异步型任务
    因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。
  • 能reuse的线程,必须是timeout IDLE内的池中线程,缺省 timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
          注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。
    
    public static ExecutorService newCachedThreadPool() {
   
   
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
   
   
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

newSingleThreadExecutor()

创建一个单线程化的Executor。

单例线程,任意时间池中只能有一个线程
用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

    public static ExecutorService newSingleThreadExecutor() {
   
   
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
   
   
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

调度型线程池

这个池子里的线程可以按schedule依次delay执行,或周期执行

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   
   
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
   
   
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

总结

线程池,总算是搞懂了,所谓的核心线程数,只是一个定界指针,而线程存活时间也是靠按照时间来获取任务来实现的。

而任务的执行,并不是靠更换线程的仍无,仍然是靠new线程, 当执行完了以后进行processWorkerExit()进行释放

同时大家也可以看到,这里面的阻塞队列,有多种实现,具体的各种实现我们下期再谈。

参考文章:

目录
相关文章
|
3天前
|
Java 开发者
Java一分钟之-Lambda表达式与函数式接口
【5月更文挑战第12天】Java 8引入的Lambda表达式简化了函数式编程,与函数式接口结合,实现了代码高效编写。本文介绍了Lambda的基本语法,如参数列表、箭头符号和函数体,并展示了如何使用Lambda实现`Runnable`接口。函数式接口仅有一个抽象方法,可与Lambda搭配使用。`@FunctionalInterface`注解用于确保接口具有单一抽象方法。文章还讨论了常见的问题和易错点,如非函数式接口、类型冲突以及Lambda表达式的局部变量可见性,并提供了避免这些问题的策略。通过理解Lambda和函数式接口,开发者能提高代码可读性和效率。
42 4
|
3天前
|
Java 数据库
【Java多线程】对线程池的理解并模拟实现线程池
【Java多线程】对线程池的理解并模拟实现线程池
16 1
|
1天前
|
Java 测试技术 Python
Python的多线程允许在同一进程中并发执行任务
【5月更文挑战第17天】Python的多线程允许在同一进程中并发执行任务。示例1展示了创建5个线程打印&quot;Hello World&quot;,每个线程调用同一函数并使用`join()`等待所有线程完成。示例2使用`ThreadPoolExecutor`下载网页,创建线程池处理多个URL,打印出每个网页的大小。Python多线程还可用于线程间通信和同步,如使用Queue和Lock。
14 1
|
2天前
|
存储 Java 编译器
Java中的抽象类与接口,在阿里工作5年了
Java中的抽象类与接口,在阿里工作5年了
|
2天前
|
消息中间件 前端开发 Java
美团面试:如何实现线程任务编排?
线程任务编排指的是对多个线程任务按照一定的逻辑顺序或条件进行组织和安排,以实现协同工作、顺序执行或并行执行的一种机制。 ## 1.线程任务编排 VS 线程通讯 有同学可能会想:那线程的任务编排是不是问的就是线程间通讯啊? 线程间通讯我知道了,它的实现方式总共有以下几种方式: 1. Object 类下的 wait()、notify() 和 notifyAll() 方法; 2. Condition 类下的 await()、signal() 和 signalAll() 方法; 3. LockSupport 类下的 park() 和 unpark() 方法。 但是,**线程通讯和线程的任务编排是
|
3天前
|
Java API 容器
Java8函数式编程接口:Consumer、Supplier、Function、Predicate
Java8函数式编程接口:Consumer、Supplier、Function、Predicate
8 1
|
3天前
|
Java ice
【Java开发指南 | 第二十九篇】Java接口
【Java开发指南 | 第二十九篇】Java接口
9 0
|
3天前
|
Java
【Java开发指南 | 第九篇】访问实例变量和方法、继承、接口
【Java开发指南 | 第九篇】访问实例变量和方法、继承、接口
14 4
|
3天前
|
安全 Java 数据安全/隐私保护
Java一分钟之-Java反射机制:动态操作类与对象
【5月更文挑战第12天】本文介绍了Java反射机制的基本用法,包括获取Class对象、创建对象、访问字段和调用方法。同时,讨论了常见的问题和易错点,如忽略访问权限检查、未捕获异常以及性能损耗,并提供了相应的避免策略。理解反射的工作原理和合理使用有助于提升代码灵活性,但需注意其带来的安全风险和性能影响。
23 4
|
3天前
|
Java 调度
Java一分钟之线程池:ExecutorService与Future
【5月更文挑战第12天】Java并发编程中,`ExecutorService`和`Future`是关键组件,简化多线程并提供异步执行能力。`ExecutorService`是线程池接口,用于提交任务到线程池,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。通过`submit()`提交任务并返回`Future`对象,可检查任务状态、获取结果或取消任务。注意处理`ExecutionException`和避免无限等待。实战示例展示了如何异步执行任务并获取结果。理解这些概念对提升并发性能至关重要。
18 5