ThreadPoolExecutor 使用

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: ThreadPoolExecutor 使用

ThreadPoolExecutor 介绍

简写:

package com.vipsoft.Thread;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExecutorTest {
    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2),
                new ThreadFactory() {
                    private final AtomicInteger mThreadNum = new AtomicInteger(1);
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r, "Thread-" + mThreadNum.getAndIncrement());
                        System.out.println(t.getName() + " has been created");
                        return t;
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // 可做日志记录等
                        System.err.println(r.toString() + " rejected " + executor.getTaskCount());
                    }
                });
        executor.prestartAllCoreThreads(); // 预启动所有核心线程
        for (int i = 1; i <= 10; i++) {
            final String taskName = String.valueOf(i);
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(this.toString() + " is running!");
                        Thread.sleep(3000); //让任务执行慢点
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                @Override
                public String toString() {
                    return "MyTask [name=" + taskName + "]";
                }
            });
        }
        //可读性好些
//        Runnable call = new Runnable() {
//            @Override
//            public void run() {
//                try {
//                    System.out.println(this.toString() + " is running!");
//                    Thread.sleep(3000); //让任务执行慢点
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }
//            @Override
//            public String toString() {
//                return "MyTask [name=" + taskName + "]";
//            }
//        }
//        for (int i = 1; i <= 10; i++) {
//            final String taskName = String.valueOf(i);
//            executor.execute(call);
//        }
        
        System.in.read(); //阻塞主线程
    }
}

 

常规写法:

package com.vipsoft.Thread;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExecutorTest {
    public static void main(String[] args) throws Exception {
        int corePoolSize = 2;
        int maximumPoolSize = 5;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3);  //定义一个大小为2的队列,只等有一个任务在排队等,多出来的需要开新线程
        ThreadFactory threadFactory = new MyTreadFactory();
        RejectedExecutionHandler handler = new MyPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        System.out.println("预启动线程(备战)");
        executor.prestartAllCoreThreads(); // 预启动所有核心线程,处于备战
        System.out.println("预启动线程数(备战):" + executor.getPoolSize());
        for (int i = 1; i <= 10; i++) {
            System.out.println(System.currentTimeMillis() + "  " + "开始 下发任务:" + i + " 当前线程总数:" + executor.getPoolSize());
            MyTask task = new MyTask(String.valueOf(i));
            executor.execute(task);
            System.out.println(System.currentTimeMillis() + "  " + "完成 下发任务:" + i + " 当前线程总数:" + executor.getPoolSize() + " 队列中的线程数量:" + workQueue.size());
            Thread.sleep(1); //停1毫秒,日志记录,时间后方便分析
            if (i == 9) {
               //TODO Thread.sleep(3000); //任务9下发后【会被拒绝】,停3秒,等队列或线程释放后,再下发任务10,这时候任务10不会被拒绝
            }
        }
        System.in.read(); //阻塞主线程
    }
    static class MyTreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "Thread-" + mThreadNum.getAndIncrement());
            System.out.println(System.currentTimeMillis() + "  " + t.getName() + " has been created");
            return t;
        }
    }
    public static class MyPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 可做日志记录等
            System.err.println(System.currentTimeMillis() + "  " + r.toString() + " rejected from " + e.toString());
        }
    }
    static class MyTask implements Runnable {
        private String name;
        public MyTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(System.currentTimeMillis() + "  " + this.toString() + " 开始运行! " + Thread.currentThread().getName());
                Thread.sleep(3000); //让任务执行慢点
                System.out.println(System.currentTimeMillis() + "  " + this.toString() + " 运行结束! " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString() {
            return "MyTask [name=" + this.name + "]";
        }
    }
}

 

运行结果:忽略图片内容

将日志按时间排序后解读:可容纳任务数 = 最大线程数 + 最大队列数

预启动线程(备战)
预启动线程数(备战):2
1654760985275  Thread-1 has been created    //预启动创建线程
1654760985275  Thread-2 has been created    //预启动创建线程
1654760985275  开始 下发任务:1 当前线程总数:2   //下发任务,当前有2线程(预启动创建)
1654760985276  MyTask [name=1] 开始运行! Thread-2
1654760985276  完成 下发任务:1 当前线程总数:2   //下发任务,当前有2线程(预启动创建)
1654760985278  MyTask [name=2] 开始运行! Thread-1
1654760985278  开始 下发任务:2 当前线程总数:2   
1654760985278  完成 下发任务:2 当前线程总数:2   //下发任务,当前有2线程(预启动创建)
1654760985280  开始 下发任务:3 当前线程总数:2
1654760985280  完成 下发任务:3 当前线程总数:2   //下发任务,当前有2线程(预启动创建)
1654760985282  开始 下发任务:4 当前线程总数:2
1654760985282  完成 下发任务:4 当前线程总数:2   //下发任务,当前有2线程(预启动创建)
1654760985284  开始 下发任务:5 当前线程总数:2
1654760985284  完成 下发任务:5 当前线程总数:2   //下发任务,当前有2线程(预启动创建)
1654760985286  MyTask [name=6] 开始运行! Thread-3
1654760985286  Thread-3 has been created
1654760985286  开始 下发任务:6 当前线程总数:2
1654760985286  完成 下发任务:6 当前线程总数:3   //下发任务,当前有2线程,3个队列,第6个任务,已经不够放了,新建一个线程
1654760985288  MyTask [name=7] 开始运行! Thread-4
1654760985288  Thread-4 has been created
1654760985288  开始 下发任务:7 当前线程总数:3
1654760985288  完成 下发任务:7 当前线程总数:4   //下发任务,当前有3线程,3个队列,第8个任务,已经不够放了,新建一个线程
1654760985290  MyTask [name=8] 开始运行! Thread-5
1654760985290  Thread-5 has been created
1654760985290  开始 下发任务:8 当前线程总数:4
1654760985290  完成 下发任务:8 当前线程总数:5   //下发任务,当前有4线程,3个队列,第8个任务,已经不够放了,新建一个线程
1654760985292  MyTask [name=9] rejected from java.util.concurrent.ThreadPoolExecutor@4b1210ee[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
1654760985292  开始 下发任务:9 当前线程总数:5
1654760985292  完成 下发任务:9 当前线程总数:5   //下发任务,当前有5线程,3个队列,第9个任务,已经不够放了,新建一个线程(超过了最大线程数 maximumPoolSize + 队列数 workQueue )拒绝任务9
1654760985294  MyTask [name=10] rejected from java.util.concurrent.ThreadPoolExecutor@4b1210ee[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
1654760985294  开始 下发任务:10 当前线程总数:5
1654760985294  完成 下发任务:10 当前线程总数:5  //下发任务,当前有5线程,3个队列,第10个任务,已经不够放了,新建一个线程(超过了最大线程数+队列数)拒绝任务10
1654760988276  MyTask [name=1] 运行结束! Thread-2   //等有任务结束后,再去下发任务9、10,就不会抛弃任务了
1654760988276  MyTask [name=3] 开始运行! Thread-2
1654760988278  MyTask [name=2] 运行结束! Thread-1
1654760988278  MyTask [name=4] 开始运行! Thread-1
1654760988286  MyTask [name=5] 开始运行! Thread-3
1654760988286  MyTask [name=6] 运行结束! Thread-3
1654760988288  MyTask [name=7] 运行结束! Thread-4
1654760988291  MyTask [name=8] 运行结束! Thread-5
1654760991288  MyTask [name=3] 运行结束! Thread-2
1654760991288  MyTask [name=4] 运行结束! Thread-1
1654760991288  MyTask [name=5] 运行结束! Thread-3

 

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
6月前
|
监控 Java 调度
Java线程池ThreadPoolExecutor初略探索
Java线程池ThreadPoolExecutor初略探索
|
4月前
|
监控 Java
ThreadPoolExecutor 介绍
ThreadPoolExecutor 介绍
45 0
|
6月前
|
Java
线程池ThreadPoolExecutor总结
线程池ThreadPoolExecutor总结
|
7月前
|
缓存 搜索推荐 Java
线程池之ThreadPoolExecutor
线程池之ThreadPoolExecutor
75 0
|
机器学习/深度学习 消息中间件 存储
ThreadPoolExecutor解读
ThreadPoolExecutor解读
|
算法 安全 Java
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
|
存储 缓存 监控
ThreadPoolExecutor:线程池不允许使用Executors创建
ThreadPoolExecutor:线程池不允许使用Executors创建
384 0
ThreadPoolExecutor:线程池不允许使用Executors创建
|
存储 缓存 监控
线程池 ThreadPoolExecutor 详解
对于操作系统而言,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要重新从内存中读取信息,破坏了数据的局部性。因此在并发编程中,当线程创建过多时,会影响程序性能,甚至引起程序崩溃。 而线程池属于池化管理模式,具有以下优点: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的性能消耗。 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。 提高线程的可管理性:能够对线程进行统一分配、调优和监控。
220 0
|
网络协议 Java
Java并发:线程池详解(ThreadPoolExecutor)
Java并发:线程池详解(ThreadPoolExecutor)
173 0
|
Java 调度
Java并发系列之7 深入理解线程池ThreadPoolExecutor
Java并发系列之7 深入理解线程池ThreadPoolExecutor