编辑
🌟 大家好,我是摘星! 🌟
今天为大家带来中的并发编程的强力并发工具-线程池,废话不多说让我们直接开始。
目录
9. 并发工具
9.1. 线程池
9.1.1. 高并发问题
- 面对高并发场景时,手动的为每一个任务开启线程是一个极其消耗资源的方式
- 线程也不是创建得越多越好,从CPU也没有这么多时间片分配,那么就会有线程进入阻塞状态,造成线程上下文切换的问题,频繁的线程上下文切换会导致性能降低
线程池就是创建了一批线程并重复利用这些线程完成任务
- 减少了线程的创建,提高了性能
9.1.2. 自定义线程池
编辑
自定义线程池使用的是生产者消费者模式
- Thread Pool:存放线程,充当消费者;存放任务队列,充当生产者.
- Blocking Queue:中和消费者和生产者速率的阻塞队列
- 当没有任务供线程池中的线程执行时,线程就会进去阻塞队列中等待
- 当有大量任务但线程池中的线程完成速率跟不上时,多余的任务就进入阻塞队列中等待
@FunctionalInterface interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); } @Slf4j public class CustomThreadPool { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{ // queue.getTask();//死等 // queue.getTask(1500,TimeUnit.MILLISECONDS);//超时等待 // log.debug("放弃{}",task);//放弃 // throw new RuntimeException("任务执行失败"+task);//抛出异常 task.run();//自己执行 }); for (int i = 0; i < 3; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } log.debug("任务{}", j+1); }); } } } @Slf4j class ThreadPool { //任务队列 private BlockingQueue<Runnable> taskQueue; //线程集合 private HashSet<Worker> workers = new HashSet<>(); //核心线程数 private int coreSize; //任务超时时间 private int timeout; //时间单位 private TimeUnit timeUnit; //拒绝策略 private RejectPolicy<Runnable> rejectPolicy; public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit, int capacity,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(capacity); this.rejectPolicy = rejectPolicy; } public void execute(Runnable task) { //如果线程数小于coreSize,直接执行 if (workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); log.debug("新增worker:{}", worker); worker.start(); } else { //如果线程数大于coreSIze 选择权有很多 //1.死等任务 // taskQueue.addTask(task); //2.超时等待任务 //3.调用者放弃任务 //4.调用者抛出异常 //5.调用者自己执行 //使用策略模式将具体的交给调用者 taskQueue.tryAddTask(rejectPolicy, task); } } //工作线程 class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //如果当前任务不为空,直接执行 //如果当前任务为空,去任务队列中获取任务并执行 while (task != null || (task = taskQueue.getTask(timeout, timeUnit)) != null) { try { task.run(); log.debug("正在执行{}", task); } catch (Exception e) { throw new RuntimeException(e); } finally { task = null; } } synchronized (workers) { log.debug("任务结束,移除线程{}", this); workers.remove(this); } } } } @Slf4j class BlockingQueue<T> { //任务队列:使用Deque双向链表,有两种实现ArrayDeque和LinkedList,这里选用性能更优ArrayDeque private Deque<T> queue = new ArrayDeque<>(); //锁:保证每个任务只能被一个线程执行 private ReentrantLock lock = new ReentrantLock(); //消费者条件变量 private Condition fullWaitSet = lock.newCondition(); //生产者条件变量 private Condition emptyWaitSet = lock.newCondition(); //队列容量 private int capacity; public BlockingQueue(int capacity) { this.capacity = capacity; } //获取任务 public T getTask(long timeout, TimeUnit unit) { try { lock.lock(); long nanos = unit.toNanos(timeout); //队列空时阻塞 while (queue.isEmpty()) { try { if (nanos <= 0) { return null; } nanos = emptyWaitSet.awaitNanos(nanos); //返回的是剩余等待时间 } catch (InterruptedException e) { e.printStackTrace(); } } //唤醒添加任务的线程 fullWaitSet.signalAll(); //获取第一个任务并移除 return queue.removeFirst(); } finally { lock.unlock(); } } //获取任务 public T getTask() { try { lock.lock(); //队列空时阻塞 while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //唤醒添加任务的线程 fullWaitSet.signalAll(); //获取第一个任务并移除 return queue.removeFirst(); } finally { lock.unlock(); } } public void tryAddTask(RejectPolicy<T> rejectPolicy, T task) { try { lock.lock(); //判断队列是否已满 if (queue.size() == capacity){ rejectPolicy.reject(this,task); }else {//队列未满 emptyWaitSet.signalAll(); queue.addLast(task); log.debug("加入任务队列:{}", task); } } finally { lock.unlock(); } } //添加任务 public void addTask(T task) { try { lock.lock(); //队列满时阻塞 while (queue.size() == capacity) { try { log.debug("等待加入任务队列:{}.......", task); fullWaitSet.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } } //唤醒获取任务的线程 emptyWaitSet.signalAll(); //添加任务到队列尾部 queue.addLast(task); log.debug("加入任务队列:{}", task); } finally { lock.unlock(); } } //添加任务 public boolean addTask(T task, long timeout, TimeUnit unit) { try { lock.lock(); long nanos = unit.toNanos(timeout); //队列满时阻塞 while (queue.size() == capacity) { try { log.debug("等待加入任务队列:{}.......", task); if (nanos <= 0) { return false; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { throw new RuntimeException(e); } } //唤醒获取任务的线程 emptyWaitSet.signalAll(); //添加任务到队列尾部 queue.addLast(task); log.debug("加入任务队列:{}", task); return true; } finally { lock.unlock(); } } //获取当前队列大小 public int size() { try { lock.lock(); return queue.size(); } finally { lock.unlock(); } } }
9.1.3. Tomcat连接池
Tomcat分为两个部分:Connector
(对外沟通)和Container
(实现Servlet规范)
浏览器向Tomcat服务器发送请求后的流程如下:
编辑
LimitLatch
:用于限流,控制最大连接数,类似于JUC中的Semaphore
。Acceptor
:负责接受客户端的连接请求,并将连接交给Poller
处理。Poller
:负责管理连接的IO事件,例如读取,写入操作,将IO事件封装成一个任务对象提交给Executor
处理Executor
:负责处理请求的业务逻辑Acceptor
、Poller
、Executor
本质上都是ThreadPoolExecutor
线程池,Tomcat将不同任务分配给这些线程池,保证工作效率,实现高并发。
- Tomcat线程池对
ThreadPoolExecutor
进行了扩展,当总线程数达到最大线程数时,拒绝策略不会立即抛出异常,而是尝试将任务放入队列,如果失败,才会抛出异常。
Connector
和Container
的配置
编辑
编辑
9.1.4. Fork-Join
Fork-Join
是Java7后引入的一个并行处理框架,体现的是一种分治思想,其核心算法是工作窃取算法,通过将一个大任务拆分成在算法上相同的小算法,直至不能拆分可以直接求解,适用于CPU密集型运算,Fork-Join
默认创建于CPU核数相同的线程池
Fork-Join
的主要实现类是ForkJoinPool
和RecursiveTask
RecursiveAction
。
ForkJoinPool
是一个线程池,用于管理工作线程并执行任务,可以根据CPU核心数动态调整线程数量RecursiveTask
是一个抽象类,用于表示可以并行的有执行结果的任务。RecursiveAction
是一个抽象类,用于表示可以并行的无执行结果的任务。通过实现RecursiveTask
接口,实现compute()
方法来定义任务。compute()
会将任务拆分成小任务交给工作线程执行,并将执行结果合并交给父任务。
public class ForkJoinTest { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); Integer result = pool.invoke(new myTask(5)); System.out.println(result); } } @Slf4j(topic = "task") //1-n之间整数的和 class myTask extends RecursiveTask<Integer> { private int n; public myTask(int n) { this.n = n; } @Override public String toString() { return "{" + n + '}'; } @Override protected Integer compute() { if (n == 1) { log.debug("join():{}", n); return 1; } //任务拆分 myTask myTask = new myTask(n - 1); //将拆分的任务交给其他线程处理 myTask.fork(); log.debug("fork():{}+{}", n, myTask); //获取执行的结果 Integer join = myTask.join(); //将结果合并并返回给父任务 int result = n + join; log.debug("join():{}+{}={}", n, myTask, result); return result; } }
@Slf4j(topic = "task2") class Task2 extends RecursiveTask<Integer> { private int begin; private int end; public Task2(int begin, int end) { this.begin = begin; this.end = end; } @Override public String toString() { return "{" + begin + ", " + end + '}'; } @Override protected Integer compute() { if (begin == end){ log.debug("join():{}",begin); return begin; } if (end - begin == 1){ log.debug("join():{}+{}={}",begin,end,begin+end); return begin + end; } int mid = (begin + end) / 2; Task2 task1 = new Task2(begin, mid); task1.fork(); Task2 task2 = new Task2(mid + 1, end); task2.fork(); log.debug("fork():{}+{}=?",task1,task2); int result = task1.join() + task2.join(); log.debug("join():{}+{}={}",task1,task2,result); return result; } }