学习线程池原理从手写一个线程池开始

简介: 学习线程池原理从手写一个线程池开始

概述


线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记。


线程池框架设计


我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的。同时,线程也不是任意多创建的,因为活跃的线程会消耗系统资源,特别是内存,在一定的范围内,增加线程可以提高系统的吞吐率,如果超过了这个范围,反而会降低程序的执行速度。

因此,设计一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作, 达到下面的目标:

  • 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
  • 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
  • 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

线程池的核心思想: 线程复用,同一个线程可以被重复使用,来处理多个任务。

为了实现线程池功能,需要考虑下面几个设计要点:

  1. 线程池可以接口外部提交的任务执行
  2. 线程池有工作线程的数量,有任务执行,没有任务也空闲在那,等待任务过来,这样既避免线程频繁创建销毁带来的开销,同时也可以避免线程池无限制的创建线程
  3. 如果线程池接受提交的任务超过工作线程的数量了,该怎么办?可以用一个队列把任务存下来,等工作线程完成任务后去队列中获取任务,执行
  4. 那如果任务实在是太多太多了,达到了我们认为的队列最大值,怎么办,我们可以设计一种任务太多的策略,可以进行切换,比如直接丢弃任务、报错等等

看了上面的设计目标和要点,是不是能立刻想到一个非常经典的设计模型——生产者消费者模型。

1671195281016.jpg

  • 阻塞队列存储执行任务,比如外部main函数作为生产者向队列生产任务。
  • 线程池中的工作线程作为消费者获取任务执行。

现在我们将我们的设计思路转换为代码。


代码实现


阻塞队列的实现


  • 阻塞队列主要存放任务,有容量限制
  • 阻塞队列提供添加和删除任务的API, 如果超过容量,阻塞不能添加任务,如果没有任务,阻塞无法获取任务。
/**
 * <p>自定义任务队列, 用来存放任务 </p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  10:15
 * @version: 1.0.0
 */
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    // 容量
    private int capcity;
    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }
    // 阻塞的方式添加任务
    public void put(T task) {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.size() >= capcity) {
                log.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            log.debug("task add successfully");
            emptyWaitSet.signal();
        }  finally {
            lock.unlock();
        }
    }
    // 阻塞获取任务
    public T take() {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.isEmpty()) {
                try {
                    log.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            log.debug("take task successfully");
            // 从队列中获取元素
            return task;
        } finally {
            lock.unlock();
        }
    }
}
  • put()方法是向阻塞队列中添加任务
  • take()方法是向阻塞队列中获取任务


线程池消费端实现


  1. 定义执行器接口
/**
 * <p>定义一个执行器的接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  12:31
 * @version: 1.0.0
 */
public interface Executor {
    /**
     * 提交任务执行
     * @param task 任务
     */
    void execute(Runnable task);
}
  1. 定义线程池类实现该接口
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 核心工作线程数
     */
    private int coreSize;
    /**
     * 工作线程集合
     */
    private Set<Worker> workers = new HashSet<>();
    /**
     *  创建线程池
     * @param coreSize 工作线程数量
     * @param capcity 阻塞队列容量
     */
    public ThreadPool(int coreSize, int capcity) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
    }
    /**
     * 提交任务执行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程数小于阈值,直接开始任务执行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超过了阈值,加入到队列中
                taskQueue.put(task);
            }
        }
    }
    /**
     * 工作线程,对执行的任务做了一层包装处理
     */
    class Worker extends Thread {
        private Runnable task;
        public Worker(Runnable task) {
            this.task = task;
        }
        @Override
        public void run() {
            // 如果任务不为空,或者可以从队列中获取任务
            while (task != null || (task = taskQueue.take()) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 执行完后,设置任务为空
                    task = null;
                }
            }
              // 移除工作线程
            synchronized (workers){
                log.debug("remove worker successfully");
                workers.remove(this);
            }
        }
    }
}
  • Worker类是工作线程类,包装了执行任务,里面实现了从队列获取任务,然后执行任务。
  • execute方法的实现中,如果工作线程数量小于阈值的话,直接创建新的工作线程,否则将任务添加到队列中。
  1. 演示
@Test
    public void testThreadPool1() throws InterruptedException {
        Executor executor = new ThreadPool(2, 4);
        // 提交任务
        for (int i = 0; i < 6; i++) {
            final  int j = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(10);
                    log.info("run task {}", j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(10);
        }
        Thread.sleep(10000);
    }

运行结果:

1671195309282.jpg


获取任务超时设计


目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略。

@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
    // 容量
    private int capcity;
    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    public TimeoutBlockingQueue(int capcity) {
        this.capcity = capcity;
    }
    // 带超时时间的获取
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()){
                try {
                    if (nanos<=0){
                        return null;
                    }
                    // 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            return deque.getFirst();
        }finally {
            lock.unlock();
        }
    }
    // 带超时时间的增加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.size() == capcity){
                try {
                    if (nanos<=0){
                        return false;
                    }
                    // 更新剩余需要等待的时间
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }
}
  • 新加TimeoutBlockingQueue类,添加offer和poll待超时的添加和获取任务的方法。


拒绝策略设计


目前的实现还是有个漏洞,无法自定义任务超出阈值的一个拒绝策略,我们可以通过利用函数式编程+策略模式去实现。

  1. 定义策略模式的函数式接口
/**
 * <p>拒绝策略的函数式接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  13:15
 * @version: 1.0.0
 */
@FunctionalInterface
public interface RejectPolicy<T> {
    /**
     * 拒绝策略的接口
     * @param queue
     * @param task
     */
    void reject(BlockingQueue<T> queue, T task);
}
  1. 添加函数式接口的调用入口

我们可以在阻塞队列添加任务新加一个api, 添加任务如果超过容量,调用函数式接口。

@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    ........
    /**
     * 尝试添加任务
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 如果队列超过容量
            if (deque.size()> capcity){
                log.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            }else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}
  1. 修改ThreadPool类
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
    .....
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;
    // 通过构造方法传入执行的拒绝策略
    public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
        this.rejectPolicy = rejectPolicy;
    }
    /**
     * 提交任务执行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程数小于阈值,直接开始任务执行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超过了阈值,加入到队列中
                //taskQueue.put(task);
                // 调用tryPut的方式
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }
   ....
}
  • 通过构造方法的方式传入要执行的拒绝策略
  • 调用tryPut方法添加任务
  1. 演示

1671195333967.jpg


总结


jdk中的线程池实现远比这里手写的要复杂,里面还涉及救急线程、各种内置的拒绝策略以及不同的队列容器等等,但是他们的思想基本一致的,通过这个练习,在后面阅读线程池源码的时候会有很大的帮助。

目录
相关文章
|
2天前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
18 6
|
2月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
221 64
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
125 38
|
2月前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
87 4
|
2月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
123 2
|
2月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
447 2
|
24天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
53 1
|
3月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
68 1
|
3月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
47 3
|
3月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
32 2