【高薪程序员必看】万字长文拆解Java并发编程!(9-1):并发工具-线程池

简介: 🌟 ​大家好,我是摘星!​ 🌟 今天为大家带来中的并发编程的强力并发工具-线程池,废话不多说让我们直接开始。

 

image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来中的并发编程的强力并发工具-线程池,废话不多说让我们直接开始。

目录

9. 并发工具

9.1. 线程池

9.1.1. 高并发问题

9.1.2. 自定义线程池

9.1.3. Tomcat连接池

9.1.4. Fork-Join


9. 并发工具

9.1. 线程池

9.1.1. 高并发问题

  1. 面对高并发场景时,手动的为每一个任务开启线程是一个极其消耗资源的方式
  2. 线程也不是创建得越多越好,从CPU也没有这么多时间片分配,那么就会有线程进入阻塞状态,造成线程上下文切换的问题,频繁的线程上下文切换会导致性能降低

线程池就是创建了一批线程并重复利用这些线程完成任务

  • 减少了线程的创建,提高了性能

9.1.2. 自定义线程池

image.gif 编辑

自定义线程池使用的是生产者消费者模式

  • Thread Pool:存放线程,充当消费者;存放任务队列,充当生产者.
  • Blocking Queue:中和消费者和生产者速率的阻塞队列
  • 当没有任务供线程池中的线程执行时,线程就会进去阻塞队列中等待
  • 当有大量任务但线程池中的线程完成速率跟不上时,多余的任务就进入阻塞队列中等待
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}
@Slf4j
public class CustomThreadPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{
            //            queue.getTask();//死等
            //            queue.getTask(1500,TimeUnit.MILLISECONDS);//超时等待
            //            log.debug("放弃{}",task);//放弃
            //            throw new RuntimeException("任务执行失败"+task);//抛出异常
            task.run();//自己执行
        });
        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                log.debug("任务{}", j+1);
            });
        }
    }
}
@Slf4j
class ThreadPool {
    //任务队列
    private BlockingQueue<Runnable> taskQueue;
    //线程集合
    private HashSet<Worker> workers = new HashSet<>();
    //核心线程数
    private int coreSize;
    //任务超时时间
    private int timeout;
    //时间单位
    private TimeUnit timeUnit;
    //拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;
    public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit, int capacity,RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }
    public void execute(Runnable task) {
        //如果线程数小于coreSize,直接执行
        if (workers.size() < coreSize) {
            Worker worker = new Worker(task);
            workers.add(worker);
            log.debug("新增worker:{}", worker);
            worker.start();
        } else {
            //如果线程数大于coreSIze 选择权有很多
            //1.死等任务
            //            taskQueue.addTask(task);
            //2.超时等待任务
            //3.调用者放弃任务
            //4.调用者抛出异常
            //5.调用者自己执行
            //使用策略模式将具体的交给调用者
            taskQueue.tryAddTask(rejectPolicy, task);
        }
    }
    //工作线程
    class Worker extends Thread {
        private Runnable task;
        public Worker(Runnable task) {
            this.task = task;
        }
        @Override
        public void run() {
            //如果当前任务不为空,直接执行
            //如果当前任务为空,去任务队列中获取任务并执行
            while (task != null || (task = taskQueue.getTask(timeout, timeUnit)) != null) {
                try {
                    task.run();
                    log.debug("正在执行{}", task);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("任务结束,移除线程{}", this);
                workers.remove(this);
            }
        }
    }
}
@Slf4j
class BlockingQueue<T> {
    //任务队列:使用Deque双向链表,有两种实现ArrayDeque和LinkedList,这里选用性能更优ArrayDeque
    private Deque<T> queue = new ArrayDeque<>();
    //锁:保证每个任务只能被一个线程执行
    private ReentrantLock lock = new ReentrantLock();
    //消费者条件变量
    private Condition fullWaitSet = lock.newCondition();
    //生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    //队列容量
    private int capacity;
    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }
    //获取任务
    public T getTask(long timeout, TimeUnit unit) {
        try {
            lock.lock();
            long nanos = unit.toNanos(timeout);
            //队列空时阻塞
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                    //返回的是剩余等待时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //唤醒添加任务的线程
            fullWaitSet.signalAll();
            //获取第一个任务并移除
            return queue.removeFirst();
        } finally {
            lock.unlock();
        }
    }
    //获取任务
    public T getTask() {
        try {
            lock.lock();
            //队列空时阻塞
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //唤醒添加任务的线程
            fullWaitSet.signalAll();
            //获取第一个任务并移除
            return queue.removeFirst();
        } finally {
            lock.unlock();
        }
    }
    public void tryAddTask(RejectPolicy<T> rejectPolicy, T task) {
        try {
            lock.lock();
            //判断队列是否已满
            if (queue.size() == capacity){
                rejectPolicy.reject(this,task);
            }else {//队列未满
                emptyWaitSet.signalAll();
                queue.addLast(task);
                log.debug("加入任务队列:{}", task);
            }
        } finally {
            lock.unlock();
        }
    }
    //添加任务
    public void addTask(T task) {
        try {
            lock.lock();
            //队列满时阻塞
            while (queue.size() == capacity) {
                try {
                    log.debug("等待加入任务队列:{}.......", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            //唤醒获取任务的线程
            emptyWaitSet.signalAll();
            //添加任务到队列尾部
            queue.addLast(task);
            log.debug("加入任务队列:{}", task);
        } finally {
            lock.unlock();
        }
    }
    //添加任务
    public boolean addTask(T task, long timeout, TimeUnit unit) {
        try {
            lock.lock();
            long nanos = unit.toNanos(timeout);
            //队列满时阻塞
            while (queue.size() == capacity) {
                try {
                    log.debug("等待加入任务队列:{}.......", task);
                    if (nanos <= 0) {
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            //唤醒获取任务的线程
            emptyWaitSet.signalAll();
            //添加任务到队列尾部
            queue.addLast(task);
            log.debug("加入任务队列:{}", task);
            return true;
        } finally {
            lock.unlock();
        }
    }
    //获取当前队列大小
    public int size() {
        try {
            lock.lock();
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

image.gif

9.1.3. Tomcat连接池

Tomcat分为两个部分:Connector(对外沟通)和Container(实现Servlet规范)

浏览器向Tomcat服务器发送请求后的流程如下:

image.gif 编辑

  1. LimitLatch:用于限流,控制最大连接数,类似于JUC中的Semaphore
  2. Acceptor:负责接受客户端的连接请求,并将连接交给Poller处理。
  3. Poller:负责管理连接的IO事件,例如读取,写入操作,将IO事件封装成一个任务对象提交给Executor处理
  4. Executor:负责处理请求的业务逻辑
  5. AcceptorPollerExecutor本质上都是ThreadPoolExecutor线程池,Tomcat将不同任务分配给这些线程池,保证工作效率,实现高并发。
  • Tomcat线程池对ThreadPoolExecutor进行了扩展,当总线程数达到最大线程数时,拒绝策略不会立即抛出异常,而是尝试将任务放入队列,如果失败,才会抛出异常。
  1. ConnectorContainer的配置

    image.gif 编辑

    image.gif 编辑

9.1.4. Fork-Join

Fork-Join是Java7后引入的一个并行处理框架,体现的是一种分治思想,其核心算法是工作窃取算法,通过将一个大任务拆分成在算法上相同的小算法,直至不能拆分可以直接求解,适用于CPU密集型运算,Fork-Join默认创建于CPU核数相同的线程池

Fork-Join的主要实现类是ForkJoinPoolRecursiveTaskRecursiveAction

  • ForkJoinPool是一个线程池,用于管理工作线程并执行任务,可以根据CPU核心数动态调整线程数量
  • RecursiveTask是一个抽象类,用于表示可以并行的有执行结果的任务。RecursiveAction是一个抽象类,用于表示可以并行的无执行结果的任务。通过实现RecursiveTask接口,实现compute()方法来定义任务。compute()会将任务拆分成小任务交给工作线程执行,并将执行结果合并交给父任务。
public class ForkJoinTest {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(4);
        Integer result = pool.invoke(new myTask(5));
        System.out.println(result);
    }
}
@Slf4j(topic = "task")
//1-n之间整数的和
class myTask extends RecursiveTask<Integer> {
    private int n;
    public myTask(int n) {
        this.n = n;
    }
    @Override
    public String toString() {
        return "{" + n + '}';
    }
    @Override
    protected Integer compute() {
        if (n == 1) {
            log.debug("join():{}", n);
            return 1;
        }
        //任务拆分
        myTask myTask = new myTask(n - 1);
        //将拆分的任务交给其他线程处理
        myTask.fork();
        log.debug("fork():{}+{}", n, myTask);
        //获取执行的结果
        Integer join = myTask.join();
        //将结果合并并返回给父任务
        int result = n + join;
        log.debug("join():{}+{}={}", n, myTask, result);
        return result;
    }
}

image.gif

@Slf4j(topic = "task2")
class Task2 extends RecursiveTask<Integer> {
    private int begin;
    private int end;
    public Task2(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }
    @Override
    public String toString() {
        return "{" + begin + ", " + end + '}';
    }
    @Override
    protected Integer compute() {
        if (begin == end){
            log.debug("join():{}",begin);
            return begin;
        }
        if (end - begin == 1){
            log.debug("join():{}+{}={}",begin,end,begin+end);
            return begin + end;
        }
        int mid = (begin + end) / 2;
        Task2 task1 = new Task2(begin, mid);
        task1.fork();
        Task2 task2 = new Task2(mid + 1, end);
        task2.fork();
        log.debug("fork():{}+{}=?",task1,task2);
        int result = task1.join() + task2.join();
        log.debug("join():{}+{}={}",task1,task2,result);
        return result;
    }
}

image.gif

目录
相关文章
|
24天前
|
缓存 NoSQL Java
Redis+Caffeine构建高性能二级缓存
大家好,我是摘星。今天为大家带来的是Redis+Caffeine构建高性能二级缓存,废话不多说直接开始~
173 0
|
24天前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
66 0
|
24天前
|
设计模式 缓存 安全
【高薪程序员必看】万字长文拆解Java并发编程!(8):设计模式-享元模式设计指南
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的经典对象复用设计模式-享元模式,废话不多说让我们直接开始。
48 0
|
24天前
|
存储 缓存 Java
【高薪程序员必看】万字长文拆解Java并发编程!(5):深入理解JMM:Java内存模型的三大特性与volatile底层原理
JMM,Java Memory Model,Java内存模型,定义了主内存,工作内存,确保Java在不同平台上的正确运行主内存Main Memory:所有线程共享的内存区域,所有的变量都存储在主存中工作内存Working Memory:每个线程拥有自己的工作内存,用于保存变量的副本.线程执行过程中先将主内存中的变量读到工作内存中,对变量进行操作之后再将变量写入主内存,jvm概念说明主内存所有线程共享的内存区域,存储原始变量(堆内存中的对象实例和静态变量)工作内存。
60 0
|
24天前
|
Java 程序员 应用服务中间件
【高薪程序员必看】万字长文拆解Java并发编程!(2 2-2)
📌 核心痛点暴击:1️⃣ 面了8家都被问synchronized锁升级?一张图看懂偏向锁→重量级锁全过程!2️⃣ 线程池参数不会配?高并发场景下这些参数调优救了项目组命!3️⃣ volatile双重检测单例模式到底安不安全?99%人踩过的内存可见性大坑!💡 独家亮点抢先看:✅ 图解JVM内存模型(JMM)三大特性,看完再也不怕指令重排序✅ 手撕ReentrantLock源码,AQS队列同步器实现原理大揭秘✅ 全网最细线程状态转换图(附6种状态转换触发条件表)
48 0
|
24天前
|
Java 程序员
【高薪程序员必看】万字长文拆解Java并发编程!(3-2):并发共享问题的解决与分析
wait方法和notify方法都是Object类的方法:让当前获取锁的线程进入waiting状态,并进入waitlist队列:让当前获取锁的线程进入waiting状态,并进入waitlist队列,等待n秒后自动唤醒:在waitlist队列中挑一个线程唤醒:唤醒所有在waitlist队列中的线程它们都是之间协作的手段,只有拥有对象锁的线程才能调用这些方法,否则会出现IllegalMonitorStateException异常park方法和unpark方法是LockSupport类中的方法。
38 0
|
24天前
|
网络协议 Java 大数据
【高薪程序员必看】万字长文拆解Java并发编程!(1)
📌 核心痛点暴击:1️⃣ 面了8家都被问synchronized锁升级?一张图看懂偏向锁→重量级锁全过程!2️⃣ 线程池参数不会配?高并发场景下这些参数调优救了项目组命!3️⃣ volatile双重检测单例模式到底安不安全?99%人踩过的内存可见性大坑!💡 独家亮点抢先看:✅ 图解JVM内存模型(JMM)三大特性,看完再也不怕指令重排序✅ 手撕ReentrantLock源码,AQS队列同步器实现原理大揭秘✅ 全网最细线程状态转换图(附6种状态转换触发条件表)
45 0
|
24天前
|
安全 Java 程序员
【高薪程序员必看】万字长文拆解Java并发编程!(2 2-1)
🔥【高薪程序员必看】万字长文拆解Java并发编程!面试官看了直呼内行,90%人不知道的线程安全骚操作!💻🚀《16个高频面试灵魂拷问+底层源码暴击》🔥👉戳这里看如何用1个月经验吊打3年程序员!📌 核心痛点暴击:1️⃣ 面了8家都被问synchronized锁升级?一张图看懂偏向锁→重量级锁全过程!2️⃣ 线程池参数不会配?高并发场景下这些参数调优救了项目组命!3️⃣ volatile双重检测单例模式到底安不安全?99%人踩过的内存可见性大坑!
36 0
|
24天前
|
存储 安全 Java
【高薪程序员必看】万字长文拆解Java并发编程!(4-1):悲观锁底层原理与性能优化实战
目录4. JVM字节码文件4.1. 字节码文件-组成4.1.1. 组成-基础信息4.1.1.1. 基础信息-魔数4.1.1.2. 基础信息-主副版本号4.1.2. 组成-常量池4.1.3. 组成-方法4.1.3.1. 方法-工作流程4.1.4. 组成-字段4.1.5. 组成-属性4.2. 字节码文件-查看工具4.2.1. javap4.2.2. jclasslib4.2.3. 阿里Arthas
34 0
|
24天前
|
监控 安全 Java
【高薪程序员必看】万字长文拆解Java并发编程!(4-2):悲观锁底层原理与性能优化实战
获取锁,不可被打打断:释放锁:可打断,获取锁:获取锁失败进入阻塞状态时,可以被其他线程的interrput方法打断:尝试获得锁,返回值是boolean,true获取锁成功,false获取锁失败:尝试获得锁,获取不到锁等待n单位时间,时间到了还没获取到锁就返回false,时间内获取到锁还是返回true,支持可打断:设置公平锁,默认为falseCondition condition = lock.newCondition()//Condition可以创建多个也就是支持多个条变量。
42 0