【Java】实现一个简单的线程池

简介: ,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。

三、代码实现
因为我们只是简单的实现,所以有一些情况和实际不太相似。

1.BlockingQueue
先来看看我们阻塞队列当中的一些参数,为了在多线程环境下防止并发问题,我使用了ReentrantLock,使用它的目的是为了创建多个不同的阻塞条件。

在我们调用一个对象的await()方法后,我们的当前线程就会加入到一个特定的队列当中去等待,直到有调用了这个对象的notify()方法后才会从这个队列中抽取一个线程唤醒。

举个例子,我们去医院的时候,一个医生同一时间只能看一个病人,剩下的人都只能等待,如果只有一个大厅的话,看不同病的病人都只能等待在一个候诊室中。使用ReentrentLock的意思就是为了创建多个不同的候诊室,将不同医生的病人分开在不同的候诊室当中。

//1.阻塞队列
private Deque<T> deque = new ArrayDeque<>();
//2.实现阻塞的锁
private ReentrantLock lock = new ReentrantLock();
//3. 生产者等待条件
private Condition fullWaitSet = lock.newCondition();
//4.消费者等待条件
private Condition emptyWaitSet = lock.newCondition();
//5.阻塞队列的大小
private  int CAPACITY;

在自定义的阻塞队列中,我使用了一个双向队列来存储任务,并且设置了一个队列大小的属性,在我们创建这个队列的时候我们可以进行初始化。

先来看看阻塞队列任务的添加过程。这个逻辑并不难,我们在代码的上方上锁,在finally中解锁。如果这时我们的队列是满的,就无法在继续添加任务了,这个时候我们就把当前线程挂起(注意我们的挂起条件)。如果队列不是满的话那我们就加入到队尾,同时把另一类挂起的线程唤醒(这类线程在队列为空的时候挂起,等待任务的添加)。

// 生产者放入数据
public void put(T t) {
lock.lock();
try {
while (deque.size() == CAPACITY) {
fullWaitSet.await();
}
deque.addLast(t);
emptyWaitSet.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
在看看我们取任务的过程。同样加锁,当我们的队列为空的时候,线程挂起,等待任务的添加之后线程唤醒,如果队列不为空的话,我们从队列头部取出一个任务,并且唤起一类线程(这类线程在任务已经满了的时候无法在添加任务了,进行挂起,等待队列不为满)。

// 消费者从线程池当中获取任务
public T take(){
T t = null;
lock.lock();
try {
while(deque.size() == 0){
emptyWaitSet.await();
}
t = deque.removeFirst();
fullWaitSet.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return t;
}

我们上边的代码展示的队列的存取的过程都是死等状态,什么是死等状态?就是任务添加不进去或者取不出来的时候,线程会被一直挂起。真实并不是如此,这里只是简单的展示。

阻塞队列需要的就是这两个存取的过程。

2.ThreadPool
先看看线程池当中的属性。把刚才创建的任务队列加进去,因为线程池要时常和任务队列沟通。然后创建了一个HashSet结构用于存储我们的线程。下边的都是我们线程池需要的一些参数了,拒绝策略在这里没有写。

// 任务队列
private BlockedQueue<Runnable> taskQueue;

// 线程集合
private HashSet<Worker> workers = new HashSet<>();

//核心线程数
private int coreSize;

// 超时时间
private int timeout;

// 超时单位
private TimeUnit timeUnit;

来看看我们的线程池是如何工作的吧,可以看到我们线程池保存的是Worker对象,我们来看看这个Worker对象是干啥的。这个Worker对象实现了Runnable接口,我们可以把这个类当作线程类,这个类中有一个task属性,因为我们线程池当中的线程是要获取任务执行的,这个任务就用这个task属性代表。

这个Worker类一直在干一件事情,就是不断地从我们的任务队列当中获取任务(Worker类是ThreadPool的内部类),如果获取的任务不为空的话就执行任务,一旦没有任务可以执行那么就把当前的线程从线程池当中移除。

class Worker implements Runnable{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
while(task!=null || (task = taskQueue.take())!=null){
System.out.println("取出的任务是"+task);
try {
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
synchronized (workers){
workers.remove(this);
}
}
}
}

那什么时候用到这个Worker类呢?当我们调用ThreadPool中的execute()方法时,线程池中的线程会就调用这个run()方法。

来看我们的execute()方法。当我们的线程数小于我们的核心线程数的时候,我们可以直接创建一个新的线程,并且把我们的任务直接交给这个核心线程。反之我们不能创建,而是把任务添加到我们的任务队列当中,等待核心线程去执行这个任务。

// 任务执行
public void execute(Runnable task){
synchronized (workers){
if(workers.size() < coreSize){
// 创建核心线程
Worker worker = new Worker(task);
workers.add(worker);
Thread thread = new Thread(worker);
thread.start();
}else {

            taskQueue.put(task);
        }
    }

}

写完了上边的代码我们测试一下。

public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.MILLISECONDS,10);
for(int i = 0;i<12;i++){ int j = i; threadPool.execute(()->{
System.out.println("当前线程"+Thread.currentThread().getName()+"task "+j+" is running");
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}

方法运行了之后,即使任务全部执行,线程也不会结束。这是因为我们的worker类中的run方法调用了任务队列的take()方法,而take方法是会一直挂起的。

我们现在换一种带超时获取,在规定时间内获取不到任务就自动结束任务。这时候就用到我们传入的时间参数了,我们不再调用await()方法了,而是调用awaitNanos()方法,方法可以接收一个时间参数,这个方法可以消耗我们的nanos时间,在这个时间内如果获取不到的话线程就不在挂起了,这时还会进入到我们的while循环当中,判断我们的nanos是不是被消耗完了,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。

// 带超时获取
public T poll(int timeout,TimeUnit timeUnit){
T t = null;

相关文章
|
1天前
|
并行计算 Java 大数据
Java中的高效并行计算与多线程编程技术
Java中的高效并行计算与多线程编程技术
|
1天前
|
算法 安全 Java
Java小白教学—五千字带你了解多线程机制及线程安全问题
Java小白教学—五千字带你了解多线程机制及线程安全问题
|
1天前
|
安全 Java
解决Java中多线程同步问题的方案
解决Java中多线程同步问题的方案
|
1天前
|
安全 Java 开发者
Java多线程编程实践中的常见问题与解决方案
Java多线程编程实践中的常见问题与解决方案
|
2天前
|
安全 Java
Java多线程编程实践中的常见问题与解决方案
Java多线程编程实践中的常见问题与解决方案
|
2天前
|
设计模式 Java 容器
Java多线程编程中的设计模式与挑战
Java多线程编程中的设计模式与挑战
|
3天前
|
Java 数据处理 调度
Java多线程编程入门指南
Java多线程编程入门指南
|
3天前
|
安全 Java 开发者
Java并发编程中的线程安全策略
在现代软件开发中,Java语言的并发编程特性使得多线程应用成为可能。然而,随着线程数量的增加,如何确保数据的一致性和系统的稳定性成为开发者面临的挑战。本文将探讨Java并发编程中实现线程安全的几种策略,包括同步机制、volatile关键字的使用、以及java.util.concurrent包提供的工具类,旨在为Java开发者提供一系列实用的方法来应对并发问题。
9 0
|
3天前
|
监控 Java UED
Java并发编程:深入理解线程池的设计与应用
本文旨在通过数据导向和科学严谨的方式,深入探讨Java并发编程中的关键组件——线程池。文章首先概述了线程池的基本概念与重要性,随后详细解读了线程池的核心参数及其对性能的影响,并通过实验数据支持分析结果。此外,文中还将介绍如何根据不同的应用场景选择或设计合适的线程池,以及如何避免常见的并发问题。最后,通过案例研究,展示线程池在实际应用中的优化效果,为开发人员提供实践指导。
9 0
|
4天前
|
监控 安全 算法
如何有效地处理Java中的多线程
如何有效地处理Java中的多线程