一、前言:为什么JUC是Java并发编程的核心?
在Java开发中,“并发”是绕不开的核心议题——从后端接口的高并发请求处理,到分布式系统的多线程协同,再到大数据场景的并行计算,都离不开高效的并发编程工具。而JUC(java.util.concurrent)框架,正是Java官方为解决并发问题提供的“利器集合”,它封装了并发编程的核心难点(线程同步、线程池管理、原子操作、并发集合等),让开发者无需从零实现底层逻辑,就能安全、高效地编写并发代码。
二、JUC核心基础:线程与同步的底层逻辑
在深入JUC组件前,我们必须先搞懂:Java线程的底层实现是什么?为什么需要同步机制?这是理解JUC设计思想的前提。
2.1 线程的底层本质:内核线程与用户线程
Java线程在JDK 1.2之后,采用“一对一”的线程模型——即一个Java线程对应一个操作系统内核线程(Kernel Thread,KT)。其底层逻辑如下:
- 应用程序通过JVM调用操作系统的线程API创建内核线程;
- 内核线程由操作系统调度,负责执行具体的任务;
- 当Java线程阻塞时(如sleep、wait),对应的内核线程也会阻塞,JVM会调度其他就绪线程执行。
流程图(线程模型):
2.2 并发问题的根源:共享变量与竞态条件
并发编程的核心问题是“共享变量的线程安全”。当多个线程同时操作同一个共享变量时,会出现“竞态条件”(Race Condition),导致结果不符合预期。
核心原因:CPU指令重排序与内存可见性
- 指令重排序:CPU为了提高效率,会在不影响单线程执行结果的前提下,重新排序指令的执行顺序;
- 内存可见性:多个线程操作共享变量时,各自的工作内存(CPU缓存)与主内存的数据可能不同步;
- 原子性:多个操作被视为一个整体,要么全部执行,要么全部不执行(如i++并非原子操作,拆解为读取、加1、写入三步)。
示例:非线程安全的共享变量操作
package com.jam.demo.juc.basic;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
/**
* 共享变量的线程安全问题演示
* @author ken
*/
@Slf4j
public class SharedVariableTest {
// 共享变量
private int count = 0;
/**
* 自增操作(非原子)
*/
public void increment() {
count++;
}
@Test
public void testUnsafe() throws InterruptedException {
// 启动10个线程,每个线程执行1000次自增
int threadNum = 10;
Thread[] threads = new Thread[threadNum];
for (int i = < threadNum; i++) {
threads[i] = new Thread(() -> {
for (int j = 0< 1000; j++) {
increment();
}
}, "Thread-" + i);
threads[i].start();
}
// 等待所有线程执行完成
for (Thread thread : threads) {
thread.join();
}
// 预期结果:10*1000=10000,实际结果往往小于10000
log.info("最终count值:{}", count);
}
}
运行结果:最终count值可能为9876、9953等,始终小于10000。原因是count++的三步指令被多个线程交替执行,导致数据覆盖。
2.3 JUC的核心解决思路:封装同步与原子性机制
JUC框架通过以下三类核心机制解决并发问题:
- 同步机制:通过锁(Lock)、信号量(Semaphore)等组件,保证同一时间只有一个线程执行临界区代码;
- 原子操作:通过原子类(AtomicInteger、LongAdder)封装CAS操作,保证单个变量的原子性;
- 内存可见性:通过volatile关键字、锁的释放-获取机制,保证共享变量的内存可见性。
三、JUC核心基石:AQS原理与实现
AQS(AbstractQueuedSynchronizer)是JUC框架的“灵魂”——ReentrantLock、CountDownLatch、Semaphore等核心组件,均基于AQS实现。理解AQS,就等于掌握了JUC的核心设计思想。
3.1 AQS的核心设计:状态变量+双向链表+条件队列
AQS的核心目标是“实现一个可重入的同步器”,其底层通过三个核心部分实现:
1. 核心状态变量(state)
- 用volatile修饰的int变量,代表同步状态(如锁的持有状态、信号量的许可数);
- 核心操作:getState()、setState()、compareAndSetState()(CAS操作,保证原子性)。
2. 双向同步队列(CLH队列)
- 用于存放等待获取同步资源的线程;
- 每个节点(Node)包含:线程引用、前驱节点、后继节点、等待状态(如CANCELLED、SIGNAL);
- 采用“自旋+park”的方式实现线程阻塞与唤醒,减少CPU空转。
3. 条件队列(Condition Queue)
- 用于实现“等待-通知”机制(类似Object的wait()/notify());
- 每个Condition对应一个独立的条件队列,可实现多条件等待。
AQS核心结构流程图:
3.2 AQS的核心流程:获取资源与释放资源
AQS的核心逻辑围绕“获取资源”和“释放资源”展开,子类(如ReentrantLock)只需重写以下方法,即可实现不同的同步语义:
- tryAcquire(int arg):尝试获取资源(独占式);
- tryRelease(int arg):尝试释放资源(独占式);
- tryAcquireShared(int arg):尝试获取资源(共享式);
- tryReleaseShared(int arg):尝试释放资源(共享式);
- isHeldExclusively():判断当前线程是否独占资源。
1. 独占式获取资源流程(以ReentrantLock为例)
2. 独占式释放资源流程(以ReentrantLock为例)
3.3 AQS的核心特点
- 模板方法模式:AQS定义了获取/释放资源的核心流程(模板方法),子类只需重写tryAcquire等方法,无需关注队列管理、线程阻塞/唤醒等底层细节;
- 独占式与共享式:支持两种资源获取模式(独占式如锁,共享式如CountDownLatch);
- 可重入性:通过state变量记录持有次数,支持线程重入获取资源;
- 高效性:采用CAS+park/unpark机制,减少锁的开销,比synchronized更灵活。
四、JUC核心组件实战:从基础到进阶
4.1 独占锁:ReentrantLock(可重入锁)
ReentrantLock是synchronized的增强版,支持可重入、公平锁/非公平锁、条件等待等特性,底层基于AQS实现。
1. 核心特性对比(ReentrantLock vs synchronized)
| 特性 | ReentrantLock | synchronized |
| 锁类型 | 可指定公平/非公平(默认非公平) | 非公平锁 |
| 可重入性 | 支持 | 支持 |
| 等待可中断 | 支持(lockInterruptibly()) | 不支持 |
| 超时获取锁 | 支持(tryLock(long, TimeUnit)) | 不支持 |
| 条件等待 | 多个Condition,支持多条件等待 | 仅一个条件队列 |
| 性能 | 高并发场景下更优(减少上下文切换) | JDK 1.6后优化,性能接近 |
2. 实战示例:ReentrantLock的基本使用
package com.jam.demo.juc.lock;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* ReentrantLock实战示例
* 包含:公平锁/非公平锁、可重入、条件等待、超时获取、中断获取
* @author ken
*/
@Slf4j
public class ReentrantLockDemo {
// 1. 非公平锁(默认)
private final ReentrantLock nonFairLock = new ReentrantLock();
// 2. 公平锁
private final ReentrantLock fairLock = new ReentrantLock(true);
// 3. 条件队列(用于多条件等待)
private final Condition condition = nonFairLock.newCondition();
private int count = 0;
/**
* 示例1:基本锁操作(可重入)
*/
public void reentrantDemo() {
nonFairLock.lock();
try {
count++;
log.info("第一次获取锁,count:{}", count);
// 重入获取锁
reentrantInner();
} finally {
// 注意:获取几次锁,就要释放几次
nonFairLock.unlock();
log.info("第一次释放锁");
}
}
private void reentrantInner() {
nonFairLock.lock();
try {
count++;
log.info("重入获取锁,count:{}", count);
} finally {
nonFairLock.unlock();
log.info("重入释放锁");
}
}
/**
* 示例2:条件等待(await/signal)
*/
public void conditionDemo() throws InterruptedException {
new Thread(() -> {
nonFairLock.lock();
try {
log.info("线程A:获取锁,等待条件满足");
// 等待条件,释放锁并阻塞
condition.await();
log.info("线程A:条件满足,继续执行");
count++;
log.info("线程A:count:{}", count);
} catch (InterruptedException e) {
log.error("线程A被中断", e);
} finally {
nonFairLock.unlock();
}
}, "Thread-A").start();
// 主线程休眠1秒,确保线程A先获取锁
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
nonFairLock.lock();
try {
log.info("线程B:获取锁,发送条件通知");
// 唤醒等待条件的线程
condition.signal();
log.info("线程B:通知完成,释放锁");
} finally {
nonFairLock.unlock();
}
}, "Thread-B").start();
}
/**
* 示例3:超时获取锁(tryLock)
*/
public void tryLockDemo() throws InterruptedException {
new Thread(() -> {
nonFairLock.lock();
try {
// 持有锁3秒
TimeUnit.SECONDS.sleep(3);
log.info("线程C:持有锁3秒后释放");
} catch (InterruptedException e) {
log.error("线程C被中断", e);
} finally {
nonFairLock.unlock();
}
}, "Thread-C").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
boolean acquired = false;
try {
// 尝试获取锁,超时时间2秒
acquired = nonFairLock.tryLock(2, TimeUnit.SECONDS);
if (acquired) {
log.info("线程D:成功获取锁");
} else {
log.info("线程D:超时未获取到锁");
}
} catch (InterruptedException e) {
log.error("线程D被中断", e);
} finally {
if (acquired) {
nonFairLock.unlock();
}
}
}, "Thread-D").start();
}
/**
* 示例4:中断获取锁(lockInterruptibly)
*/
public void interruptibleLockDemo() {
Thread threadE = new Thread(() -> {
try {
log.info("线程E:尝试获取锁(可中断)");
nonFairLock.lockInterruptibly();
try {
log.info("线程E:成功获取锁");
} finally {
nonFairLock.unlock();
}
} catch (InterruptedException e) {
log.info("线程E:获取锁过程中被中断");
}
}, "Thread-E");
threadE.start();
// 主线程休眠1秒后中断线程E
try {
TimeUnit.SECONDS.sleep(1);
threadE.interrupt();
log.info("主线程:中断线程E");
} catch (InterruptedException e) {
log.error("主线程被中断", e);
}
}
@Test
public void testAll() throws InterruptedException {
log.info("=== 测试可重入 ===");
reentrantDemo();
log.info("\n=== 测试条件等待 ===");
conditionDemo();
TimeUnit.SECONDS.sleep(2);
log.info("\n=== 测试超时获取锁 ===");
tryLockDemo();
TimeUnit.SECONDS.sleep(4);
log.info("\n=== 测试中断获取锁 ===");
interruptibleLockDemo();
TimeUnit.SECONDS.sleep(2);
}
}
3. 底层原理:ReentrantLock如何基于AQS实现?
ReentrantLock通过内部类Sync(继承AQS)实现核心逻辑,分为公平锁(FairSync)和非公平锁(NonfairSync):
- 非公平锁:线程获取锁时,先尝试CAS修改state为1(不排队直接抢锁),失败后再加入CLH队列;
- 公平锁:线程获取锁时,直接加入CLH队列,按队列顺序获取锁;
- 可重入性:获取锁时,若当前线程已持有锁,则state+1;释放锁时,state-1,直到state=0时释放锁。
4.2 共享锁:CountDownLatch与CyclicBarrier
共享锁的核心特点是“多个线程可同时获取资源”,JUC中典型的共享锁组件是CountDownLatch(倒计时门闩)和CyclicBarrier(循环屏障)。
4.2.1 CountDownLatch:等待多线程完成
CountDownLatch基于AQS的共享模式实现,核心功能是“让一个或多个线程等待其他线程完成所有任务后,再继续执行”。
核心原理:
- 初始化时指定“计数次数”(state=计数次数);
- 每个线程完成任务后,调用countDown(),state-1(tryReleaseShared);
- 等待线程调用await(),若state>0则阻塞,直到state=0时,所有等待线程被唤醒(共享模式下,唤醒后会继续唤醒后续线程)。
实战示例:CountDownLatch实现多线程任务协调
package com.jam.demo.juc.share;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* CountDownLatch实战示例
* 场景:主线程等待3个任务线程完成后,统计结果
* @author ken
*/
@Slf4j
public class CountDownLatchDemo {
// 任务数量
private static final int TASK_NUM = 3;
// 用于统计每个任务的结果
private int[] taskResults = new int[TASK_NUM];
/**
* 执行任务
* @param taskId 任务ID
* @param latch CountDownLatch
*/
public void executeTask(int taskId, CountDownLatch latch) {
try {
log.info("任务{}:开始执行", taskId);
// 模拟任务执行(1-3秒随机)
long sleepTime = 1000 + (long) (Math.random() * 2000);
TimeUnit.MILLISECONDS.sleep(sleepTime);
// 模拟任务结果(随机0-100的整数)
taskResults[taskId] = (int) (Math.random() * 101);
log.info("任务{}:执行完成,结果:{},耗时:{}ms", taskId, taskResults[taskId], sleepTime);
} catch (InterruptedException e) {
log.error("任务{}被中断", taskId, e);
} finally {
// 任务完成,计数减1
if (!ObjectUtils.isEmpty(latch)) {
latch.countDown();
}
}
}
/**
* 示例1:基本使用(主线程等待所有任务完成)
*/
@Test
public void testBasic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(TASK_NUM);
ExecutorService executor = Executors.newFixedThreadPool(TASK_NUM);
for (int i = < TASK_NUM; i++) {
int taskId = i;
executor.submit(() -> executeTask(taskId, latch));
}
log.info("主线程:等待所有任务完成...");
// 等待所有任务完成(阻塞)
latch.await();
log.info("主线程:所有任务完成,开始统计结果");
// 统计结果
int sum = 0;
for (int result : taskResults) {
sum += result;
}
log.info("主线程:所有任务结果总和:{},平均值:{}", sum, sum / (double) TASK_NUM);
executor.shutdown();
}
/**
* 示例2:超时等待(主线程等待指定时间后,不再等待)
*/
@Test
public void testTimeout() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(TASK_NUM);
ExecutorService executor = Executors.newFixedThreadPool(TASK_NUM);
for (int i = < TASK_NUM; i++) {
int taskId = i;
executor.submit(() -> {
try {
// 任务1执行5秒(模拟超时场景)
if (taskId == 0) {
TimeUnit.SECONDS.sleep(5);
} else {
TimeUnit.SECONDS.sleep(1);
}
taskResults[taskId] = (int) (Math.random() * 101);
log.info("任务{}:执行完成,结果:{}", taskId, taskResults[taskId]);
} catch (InterruptedException e) {
log.error("任务{}被中断", taskId, e);
} finally {
latch.countDown();
}
});
}
log.info("主线程:等待所有任务完成(超时时间3秒)...");
// 等待3秒,若未完成则继续执行
boolean allCompleted = latch.await(3, TimeUnit.SECONDS);
if (allCompleted) {
log.info("主线程:所有任务完成");
} else {
log.info("主线程:等待超时,部分任务未完成");
}
executor.shutdownNow();
}
}
4.2.2 CyclicBarrier:多线程同步等待
CyclicBarrier与CountDownLatch类似,但核心区别在于:
- CountDownLatch:计数只能使用一次,线程完成任务后计数减1,等待线程等待计数为0;
- CyclicBarrier:计数可循环使用,多个线程到达“屏障”后,才继续执行,可设置“屏障动作”(所有线程到达后执行)。
核心原理:
- 初始化时指定“参与线程数”和“屏障动作”;
- 每个线程到达屏障时,调用await(),计数器减1;
- 当计数器减至0时,执行屏障动作,然后重置计数器,所有线程继续执行;
- 支持重置(reset()),可循环使用。
实战示例:CyclicBarrier实现多线程同步执行
package com.jam.demo.juc.share;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.StringUtils;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* CyclicBarrier实战示例
* 场景:3个线程同步到达屏障后,执行统计动作,然后循环执行下一轮任务
* @author ken
*/
@Slf4j
public class CyclicBarrierDemo {
// 参与线程数
private static final int THREAD_NUM = 3;
// 循环次数
private static final int LOOP_NUM = 2;
// 用于统计每轮任务的结果
private int roundSum = 0;
/**
* 执行任务
* @param threadId 线程ID
* @param barrier CyclicBarrier
*/
public void executeTask(int threadId, CyclicBarrier barrier) {
for (int round = 0;< LOOP_NUM; round++) {
try {
log.info("线程{}:第{}轮任务开始执行", threadId, round + 1);
// 模拟任务执行(1秒)
Thread.sleep(1000);
// 模拟任务结果(随机0-10的整数)
int result = (int) (Math.random() * 11);
roundSum += result;
log.info("线程{}:第{}轮任务完成,结果:{},当前总和:{}", threadId, round + 1, result, roundSum);
// 到达屏障,等待其他线程
log.info("线程{}:到达屏障,等待其他线程...", threadId);
barrier.await();
// 屏障动作执行完成后,继续下一轮
log.info("线程{}:第{}轮屏障动作完成,进入下一轮", threadId, round + 1);
} catch (InterruptedException e) {
log.error("线程{}被中断", threadId, e);
return;
} catch (BrokenBarrierException e) {
log.error("线程{}:屏障被破坏", threadId, e);
return;
}
}
}
@Test
public void testCyclicBarrier() throws InterruptedException {
// 初始化CyclicBarrier:3个线程,屏障动作(统计每轮结果)
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, () -> {
log.info("=== 屏障动作:第{}轮所有任务完成,总和:{},平均值:{} ===",
(roundSum / THREAD_NUM) > 0 ? (roundSum / THREAD_NUM) : 1,
roundSum,
roundSum / (double) THREAD_NUM);
// 重置总和,用于下一轮
roundSum = 0;
});
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
for (int i = 0; i< THREAD_NUM; i++) {
int threadId = i;
executor.submit(() -> executeTask(threadId, barrier));
}
executor.shutdown();
// 等待所有任务执行完成
while (!executor.isTerminated()) {
Thread.sleep(500);
}
log.info("所有轮次任务执行完成");
}
/**
* 示例:BrokenBarrierException场景(线程被中断导致屏障破坏)
*/
@Test
public void testBrokenBarrier() throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, () -> {
log.info("屏障动作执行");
});
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
Thread thread0 = new Thread(() -> {
try {
log.info("线程0:执行任务");
Thread.sleep(1000);
log.info("线程0:到达屏障");
barrier.await();
} catch (InterruptedException e) {
log.error("线程0被中断", e);
} catch (BrokenBarrierException e) {
log.error("线程0:屏障被破坏", e);
}
});
executor.submit(thread0);
executor.submit(() -> {
try {
log.info("线程1:执行任务");
Thread.sleep(2000);
log.info("线程1:到达屏障");
barrier.await();
} catch (InterruptedException e) {
log.error("线程1被中断", e);
} catch (BrokenBarrierException e) {
log.error("线程1:屏障被破坏", e);
}
});
// 主线程休眠1.5秒后中断线程0,导致屏障破坏
Thread.sleep(1500);
thread0.interrupt();
executor.shutdown();
while (!executor.isTerminated()) {
Thread.sleep(500);
}
}
}
4.2.3 CountDownLatch vs CyclicBarrier 核心区别
| 特性 | CountDownLatch | CyclicBarrier |
| 核心功能 | 等待多线程完成任务 | 多线程同步到达屏障后继续执行 |
| 计数是否可循环 | 不可循环(一次性) | 可循环(reset()重置) |
| 线程角色 | 分为“等待线程”和“计数线程” | 所有线程角色相同 |
| 屏障动作 | 无(等待计数为0后直接唤醒) | 支持(所有线程到达后执行) |
| 底层实现 | AQS共享模式 | 基于ReentrantLock和Condition |
4.3 信号量:Semaphore(控制并发访问数)
Semaphore(信号量)用于控制“同时访问某个资源的线程数”,核心是“许可数”的分配与释放,底层基于AQS的共享模式实现。
核心原理:
- 初始化时指定“许可数”(state=许可数);
- 线程获取资源时,调用acquire(),尝试获取1个许可(state-1<0则阻塞;
- 线程释放资源时,调用release(),释放1个许可(state+1),唤醒等待线程;
- 支持公平/非公平模式,可一次性获取/释放多个许可。
实战示例:Semaphore控制接口并发访问
package com.jam.demo.juc.semaphore;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.StringUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore实战示例
* 场景:控制接口的并发访问数(最多3个线程同时访问)
* @author ken
*/
@Slf4j
public class SemaphoreDemo {
// 最大并发数(许可数)
private static final int MAX_CONCURRENT = 3;
// 非公平信号量(默认)
private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT);
// 公平信号量
private final Semaphore fairSemaphore = new Semaphore(MAX_CONCURRENT, true);
/**
* 模拟接口访问
* @param userId 用户ID
*/
public void accessApi(String userId) {
try {
// 1. 获取许可(阻塞等待)
semaphore.acquire();
log.info("用户{}:成功获取许可,开始访问接口", userId);
// 2. 模拟接口处理(1-2秒)
long handleTime = 1000 + (long) (Math.random() * 1000);
TimeUnit.MILLISECONDS.sleep(handleTime);
log.info("用户{}:接口访问完成,耗时:{}ms", userId, handleTime);
} catch (InterruptedException e) {
log.error("用户{}:访问接口被中断", userId, e);
} finally {
// 3. 释放许可
semaphore.release();
log.info("用户{}:释放许可", userId);
}
}
/**
* 示例1:基本使用(控制最大并发数)
*/
@Test
public void testBasic() throws InterruptedException {
// 模拟10个用户并发访问
int userNum = 10;
ExecutorService executor = Executors.newFixedThreadPool(userNum);
for (int i = < userNum; i++) {
String userId = "user-" + (i + 1);
executor.submit(() -> accessApi(userId));
}
executor.shutdown();
while (!executor.isTerminated()) {
TimeUnit.MILLISECONDS.sleep(500);
}
log.info("所有用户访问完成");
}
/**
* 示例2:超时获取许可(tryAcquire)
*/
@Test
public void testTryAcquire() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = < 5; i++) {
String userId = "user-" + (i + 1);
executor.submit(() -> {
boolean acquired = false;
try {
// 尝试获取许可,超时时间1秒
acquired = semaphore.tryAcquire(1, TimeUnit.SECONDS);
if (acquired) {
log.info("用户{}:成功获取许可,访问接口", userId);
TimeUnit.MILLISECONDS.sleep(2000);
log.info("用户{}:接口访问完成", userId);
} else {
log.info("用户{}:超时未获取到许可,拒绝访问", userId);
}
} catch (InterruptedException e) {
log.error("用户{}:获取许可被中断", userId, e);
} finally {
if (acquired) {
semaphore.release();
}
}
});
}
executor.shutdown();
while (!executor.isTerminated()) {
TimeUnit.MILLISECONDS.sleep(500);
}
}
/**
* 示例3:一次性获取/释放多个许可
*/
@Test
public void testAcquireReleaseMultiple() throws InterruptedException {
// 模拟需要2个许可才能访问的资源
Semaphore multiSemaphore = new Semaphore(MAX_CONCURRENT);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = < 3; i++) {
String userId = "user-" + (i + 1);
executor.submit(() -> {
boolean acquired = false;
try {
// 一次性获取2个许可
acquired = multiSemaphore.tryAcquire(2, 1, TimeUnit.SECONDS);
if (acquired) {
log.info("用户{}:成功获取2个许可,访问资源", userId);
TimeUnit.MILLISECONDS.sleep(1500);
log.info("用户{}:资源访问完成", userId);
} else {
log.info("用户{}:未获取到2个许可,拒绝访问", userId);
}
} catch (InterruptedException e) {
log.error("用户{}:获取许可被中断", userId, e);
} finally {
if (acquired) {
// 一次性释放2个许可
multiSemaphore.release(2);
log.info("用户{}:释放2个许可", userId);
}
}
});
}
executor.shutdown();
while (!executor.isTerminated()) {
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
4.4 线程池:ThreadPoolExecutor(核心并发工具)
线程池是JUC中最常用的组件之一,用于“管理线程生命周期、复用线程、控制并发数”,避免频繁创建/销毁线程带来的性能开销。JUC中的线程池核心实现是ThreadPoolExecutor,Executors工具类创建的线程池(如newFixedThreadPool)本质上都是ThreadPoolExecutor的封装。
4.4.1 线程池核心参数与工作原理
核心参数(7个):
- corePoolSize:核心线程数(常驻线程,即使空闲也不销毁);
- maximumPoolSize:最大线程数(核心线程+临时线程的最大数量);
- keepAliveTime:临时线程的空闲时间(超过此时间则销毁);
- unit:keepAliveTime的时间单位;
- workQueue:任务队列(存放等待执行的任务);
- threadFactory:线程工厂(用于创建线程,可自定义线程名称、优先级等);
- handler:拒绝策略(当线程池满且队列满时,处理新任务的策略)。
核心工作原理:
拒绝策略(4种默认策略):
- AbortPolicy:直接抛出RejectedExecutionException(默认策略);
- CallerRunsPolicy:由提交任务的线程自己执行任务;
- DiscardPolicy:直接丢弃新任务,不抛出异常;
- DiscardOldestPolicy:丢弃队列中最旧的任务,然后将新任务加入队列。
4.4.2 线程池实战:自定义线程池+实战场景
package com.jam.demo.juc.threadpool;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* ThreadPoolExecutor实战示例
* 场景:自定义线程池处理批量任务,结合MyBatis-Plus实现数据库批量插入
* @author ken
*/
@Slf4j
public class ThreadPoolExecutorDemo {
// 自定义线程工厂(指定线程名称,便于排查问题)
private static final ThreadFactory CUSTOM_THREAD_FACTORY = new ThreadFactory() {
private final AtomicInteger threadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "demo-thread-" + threadNum.getAndIncrement());
// 设置线程优先级(默认5)
thread.setPriority(Thread.NORM_PRIORITY);
// 设置为非守护线程
thread.setDaemon(false);
return thread;
}
};
// 自定义拒绝策略(记录日志+抛出异常)
private static final RejectedExecutionHandler CUSTOM_REJECT_HANDLER = (r, executor) -> {
log.error("线程池满,无法处理任务:{}", r.toString());
throw new RejectedExecutionException("线程池已达最大负载,拒绝执行任务");
};
/**
* 初始化自定义线程池
* 核心参数说明:
* - 核心线程数:CPU核心数(CPU密集型任务)
* - 最大线程数:CPU核心数*2(防止线程过多导致上下文切换)
* - 空闲时间:30秒(临时线程空闲30秒销毁)
* - 任务队列:LinkedBlockingQueue,容量100(避免无界队列导致内存溢出)
* @return 自定义线程池
*/
public ThreadPoolExecutor createCustomThreadPool() {
int cpuNum = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuNum,
cpuNum * 2,
30,
TimeUnit.SECONDS,
new<>(100),
CUSTOM_THREAD_FACTORY,
CUSTOM_REJECT_HANDLER
);
}
/**
* 模拟任务:处理数据(实际场景可替换为MyBatis-Plus数据库操作)
* @param data 待处理数据
* @return 处理结果
*/
public String processData(String data) {
try {
// 模拟任务处理(50-200ms)
long sleepTime = 50 + (long) (Math.random() * 150);
TimeUnit.MILLISECONDS.sleep(sleepTime);
return "processed-" + data;
} catch (InterruptedException e) {
log.error("处理数据{}被中断", data, e);
Thread.currentThread().interrupt();
return null;
}
}
/**
* 示例1:批量处理任务(submit提交任务,获取Future结果)
*/
@Test
public void testBatchProcess() throws InterruptedException, ExecutionException {
ThreadPoolExecutor executor = createCustomThreadPool<String> dataList = Lists.newArrayList();
// 模拟100条数据
for (int i =< 100; i++) {
dataList.add("data-" + (i + 1));
<Future<String>> futureList = Lists.newArrayList();
for (String data : dataList) {
// 提交任务,获取Future(用于获取结果)
<String> future = executor.submit(() -> processData(data));
futureList.add(future);
}
// 关闭线程池(不再接受新任务,等待现有任务完成)
executor.shutdown();
// 等待所有任务完成(超时10秒)
boolean finished = executor.awaitTermination(10, TimeUnit.SECONDS);
if (!finished) {
// 超时未完成,中断任务
executor.shutdownNow();
log.error("线程池任务执行超时,强制中断");
return;
}
// 处理任务<String> resultList = Lists.newArrayList();
for (Future<String> future : futureList) {
if (future.isDone() && !future.isCancelled()) {
String result = future.get();
if (!CollectionUtils.isEmpty(result)) {
resultList.add(result);
}
}
}
log.info("批量处理完成,成功处理数据条数:{},总数据条数:{}", resultList.size(), dataList.size());
log.info("处理结果示例:{}", resultList.subList(0, Math.min(10, resultList.size())));
}
/**
* 示例2:使用CompletableFuture优化任务处理(支持异步回调)
*/
@Test
public void testCompletableFuture() throws InterruptedException {
ThreadPoolExecutor executor = createCustomThreadPool();<String> dataList = Lists.newArrayList();
for (int i = 0; i < 50; i++) {
dataList.add("data-" + (i + 1));
}
// 用于统计成功/失败数量
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0<Comp<Void>> futureList = Lists.newArrayList();
for (String data : dataList) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> processData(data), executor)
// 任务成功回调
.thenAccept(result -> {
if (!CollectionUtils.isEmpty(result)) {
successCount.incrementAndGet();
log.info("任务成功:{} -> {}", data, result);
}
})
// 任务异常回调
.exceptionally(ex -> {
failCount.incrementAndGet();
log.error("任务失败:{}", data, ex);
return null;
});
futureList.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
executor.shutdown();
log.info("CompletableFuture批量处理完成,成功:{},失败:{}", successCount.get(), failCount.get());
}
/**
* 示例3:拒绝策略触发场景
*/
@Test
public void testRejectPolicy() {
// 初始化一个小容量线程池(核心1,最大2,队列2)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
2,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
CUSTOM_THREAD_FACTORY,
CUSTOM_REJECT_HANDLER
);
// 提交5个任务(核心1+队列2+最大2,共5个任务会触发拒绝)
for (int i = 0< 5; i++) {
int taskId = i;
try {
executor.submit(() -> {
try {
// 模拟任务执行(3秒)
TimeUnit.SECONDS.sleep(3);
log.info("任务{}执行完成", taskId);
} catch (InterruptedException e) {
log.error("任务{}被中断", taskId, e);
}
});
log.info("提交任务{}成功", taskId);
} catch (RejectedExecutionException e) {
log.error("提交任务{}失败", taskId, e);
}
}
executor.shutdown();
}
/**
* 示例4:MyBatis-Plus批量插入实战(结合线程池)
* 注:实际项目中需注入Mapper,此处模拟
*/
@Test
public void testMyBatisPlusBatchInsert() throws InterruptedException {
ThreadPoolExecutor executor = createCustomThreadPool();
// 模拟1000条待<User> userList = Lists.newArrayList();
for (int i = 0;< 1000; i++) {
User user = new User();
user.setId(Long.valueOf(i + 1));
user.setName("user-" + (i + 1));
user.setAge(18 + (i % 20));
user.setEmail("user-" + (i + 1) + "@demo.com");
userList.add(user);
}
// 分片处理(每100条为一片)
int batchSize = 100;
List<User>> batchList = Lists.partition(userList, batchSize);
List<Void>> futureList = Lists.newArrayList();
for<User> batch : batchList) {
<Void> future = CompletableFuture.runAsync(() -> {
// 实际场景:调用MyBatis-Plus的saveBatch方法
boolean success = saveBatch(batch);
if (success) {
log.info("批量插入成功,条数:{}", batch.size());
} else {
log.error("批量插入失败,条数:{}", batch.size());
}
}, executor);
futureList.add(future);
}
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
executor.shutdown();
log.info("所有批量插入任务完成");
}
/**
* 模拟MyBatis-Plus的saveBatch方法
*/
<User> userList) {
try {
// 模拟数据库插入操作(100ms)
TimeUnit.MILLISECONDS.sleep(100);
return true;
} catch (InterruptedException e) {
log.error("批量插入失败", e);
return false;
}
}
/**
* 模拟User实体类(MyBatis-Plus)
*/
@lombok.Data
static class User {
private Long id;
private String name;
private Integer age;
private String email;
}
}
4.4.3 线程池使用注意事项
- 避免使用Executors创建线程池:
- newFixedThreadPool:无界队列,可能导致内存溢出;
- newCachedThreadPool:最大线程数无限制,高并发下可能创建大量线程,导致CPU/内存耗尽;
- 推荐使用ThreadPoolExecutor自定义线程池,明确核心参数。
- 合理设置核心参数:
- CPU密集型任务(如计算):核心线程数=CPU核心数+1;
- IO密集型任务(如数据库/网络操作):核心线程数=CPU核心数*2;
- 任务队列:使用有界队列,避免无界队列导致内存溢出。
- 线程池关闭:
- shutdown():温和关闭,不再接受新任务,等待现有任务完成;
- shutdownNow():强制关闭,中断正在执行的任务,返回未执行的任务;
- 建议结合awaitTermination()等待任务完成,避免任务丢失。
- 异常处理:
- submit()提交的任务,异常会被封装在Future中,需通过get()获取异常;
- execute()提交的任务,异常会直接抛出,需在任务内部捕获;
- 推荐使用CompletableFuture的exceptionally()处理异步任务异常。
- 线程池监控:
- 自定义线程池时,可重写beforeExecute()、afterExecute()、terminated()方法,实现任务执行前后的监控;
- 通过线程池的getActiveCount()(活跃线程数)、getQueue().size()(队列任务数)等方法监控线程池状态。
4.5 原子类:AtomicXxx与LongAdder(线程安全的变量操作)
原子类是JUC提供的“线程安全的变量操作工具”,底层基于CAS(Compare And Swap)操作实现,无需加锁即可保证变量操作的原子性,性能优于synchronized和ReentrantLock。
4.5.1 核心原子类分类
- 基本类型原子类:AtomicInteger、AtomicLong、AtomicBoolean;
- 数组类型原子类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray;
- 引用类型原子类:AtomicReference、AtomicStampedReference、AtomicMarkableReference;
- 字段更新器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater(用于更新对象的volatile字段);
- 累加器:LongAdder、DoubleAdder(高并发场景下累加性能优于AtomicLong)。
4.5.2 CAS原理:无锁编程的核心
CAS是一种“无锁原子操作”,核心逻辑是“比较-替换”:
- 步骤:先比较变量的当前值是否等于预期值,若等于则将变量更新为目标值,否则不更新;
- 底层实现:基于CPU的cmpxchg指令(硬件级原子操作),无需操作系统的锁机制,性能高效;
- 缺点:ABA问题、循环时间长导致CPU开销大、只能保证单个变量的原子性。
CAS流程图:
4.5.3 实战示例:原子类的使用与对比
package com.jam.demo.juc.atomic;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
/**
* 原子类实战示例
* 包含:AtomicInteger基本使用、LongAdder高并发累加、ABA问题解决
* @author ken
*/
@Slf4j
public class AtomicDemo {
// 线程数
private static final int THREAD_NUM = 10;
// 每个线程执行次数
private static final int EXECUTE_TIMES = 100000;
/**
* 示例1:AtomicInteger基本使用(替代非线程安全的int)
*/
@Test
public void testAtomicInteger() throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(THREAD_NUM);
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
for (int i = 0;< THREAD_NUM; i++) {
executor.submit(() -> {
for (int j = < EXECUTE_TIMES; j++) {
// 原子自增(i++)
atomicInteger.incrementAndGet();
// 其他原子操作:decrementAndGet()(i--)、getAndIncrement()(++i)、addAndGet()(i+=n)等
}
latch.countDown();
});
}
latch.await();
executor.shutdown();
// 预期结果:10*100000=1000000
log.info("AtomicInteger最终值:{}", atomicInteger.get());
}
/**
* 示例2:LongAdder vs AtomicLong(高并发累加性能对比)
*/
@Test
public void testLongAdderVsAtomicLong() throws InterruptedException {
// 测试AtomicLong
AtomicInteger atomicLongResult = new AtomicInteger(0);
CountDownLatch atomicLatch = new CountDownLatch(THREAD_NUM);
ExecutorService atomicExecutor = Executors.newFixedThreadPool(THREAD_NUM);
long atomicStart = System.currentTimeMillis();
for (int i =< THREAD_NUM; i++) {
atomicExecutor.submit(() -> {
for (int j = 0;< EXECUTE_TIMES; j++) {
atomicLongResult.incrementAndGet();
}
atomicLatch.countDown();
});
}
atomicLatch.await();
atomicExecutor.shutdown();
long atomicCost = System.currentTimeMillis() - atomicStart;
// 测试LongAdder
LongAdder longAdder = new LongAdder();
CountDownLatch adderLatch = new CountDownLatch(THREAD_NUM);
ExecutorService adderExecutor = Executors.newFixedThreadPool(THREAD_NUM);
long adderStart = System.currentTimeMillis();
for (int i = 0;< THREAD_NUM; i++) {
adderExecutor.submit(() -> {
for (int j =< EXECUTE_TIMES; j++) {
longAdder.increment();
}
adderLatch.countDown();
});
}
adderLatch.await();
adderExecutor.shutdown();
long adderCost = System.currentTimeMillis() - adderStart;
log.info("AtomicInteger:结果={},耗时={}ms", atomicLongResult.get(), atomicCost);
log.info("LongAdder:结果={},耗时={}ms", longAdder.sum(), adderCost);
// 结论:高并发下LongAdder性能优于AtomicInteger,因为LongAdder采用分段累加(减少CAS竞争)
}
/**
* 示例3:ABA问题及解决(AtomicStampedReference)
* ABA问题:线程1将A改为B,线程2又将B改为A,线程1再次CAS时会认为值未变,导致错误更新
*/
@Test
public void testABAProblem() {
// 1. 普通AtomicReference存在ABA问题
AtomicReference<String> atomicRef = new<>("A");
String oldValue = atomicRef.get();
// 线程1:尝试将A改为C
new Thread(() -> {
log.info("线程1:当前值={}", atomicRef.get());
try {
// 休眠1秒,让线程2执行ABA操作
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("线程1被中断", e);
}
// CAS:预期值A,目标值C
boolean success = atomicRef.compareAndSet(oldValue, "C");
log.info("线程1:CAS结果={},当前值={}", success, atomicRef.get());
}, "Thread-1").start();
// 线程2:执行ABA操作(A->B->A)
new Thread(() -> {
log.info("线程2:当前值={}", atomicRef.get());
// A->B
atomicRef.compareAndSet("A", "B");
log.info("线程2:A->B,当前值={}", atomicRef.get());
// B->A
atomicRef.compareAndSet("B", "A");
log.info("线程2:B->A,当前值={}", atomicRef.get());
}, "Thread-2").start();
// 等待线程执行完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("主线程被中断", e);
}
// 2. AtomicStampedReference解决ABA问题(增加版本号)
log.info("\n=== 使用AtomicStampedReference解决ABA问题 ===");
// 初始化:值A,版本号1
<String> stampedRef = new Atomic<>("A", 1);
int oldStamp = stampedRef.getStamp();
// 线程3:尝试将A改为C(带版本号)
new Thread(() -> {
log.info("线程3:当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("线程3被中断", e);
}
// CAS:预期值A,目标值C,预期版本号oldStamp
boolean success = stampedRef.compareAndSet("A", "C", oldStamp, oldStamp + 1);
log.info("线程3:CAS结果={},当前值={},版本号={}", success, stampedRef.getReference(), stampedRef.getStamp());
}, "Thread-3").start();
// 线程4:执行ABA操作
new Thread(() -> {
log.info("线程4:当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
// A->B(版本号1->2)
stampedRef.compareAndSet("A", "B", 1, 2);
log.info("线程4:A->B,当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
// B->A(版本号2->3)
stampedRef.compareAndSet("B", "A", 2, 3);
log.info("线程4:B->A,当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
}, "Thread-4").start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("主线程被中断", e);
}
}
/**
* 示例4:字段更新器(AtomicIntegerFieldUpdater)
* 用于更新对象的volatile字段(无需修改对象类,动态更新)
*/
@Test
public void testAtomicFieldUpdater() {
// 初始化对象
User user = new User("ken", 20);
log.info("更新前:name={},age={}", user.getName(), user.getAge());
// 1. 创建字段更新器(更新User类的age字段,必须是volatile修饰)
<User> ageUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
// 2. 原子更新age字段
ageUpdater.incrementAndGet(user);
log.info("更新后(自增1):age={}", user.getAge());
ageUpdater.addAndGet(user, 5);
log.info("更新后(加5):age={}", user.getAge());
// 3. 尝试更新(预期值26,目标值30)
boolean success = ageUpdater.compareAndSet(user, 26, 30);
log.info("CAS更新结果:{},最终age={}", success, user.getAge());
}
/**
* 模拟User类(volatile字段)
*/
@lombok.Data
static class User {
private String name;
// 必须是volatile修饰,否则字段更新器无法使用
private volatile int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
}
}
4.5.4 原子类核心总结
- 适用场景:单个变量的原子性操作(如计数、累加),无需加锁,性能高效;
- LongAdder优势:高并发场景下,通过“分段累加”减少CAS竞争,性能优于AtomicLong;
- ABA问题解决:使用AtomicStampedReference(版本号)或AtomicMarkableReference(标记位);
- 字段更新器:无需修改对象类,即可动态更新其volatile字段,灵活性高。
4.6 并发集合:线程安全的集合框架
Java默认的集合框架(如ArrayList、HashMap)是非线程安全的,在并发场景下使用会出现数据不一致问题。JUC提供了专门的并发集合,解决了线程安全问题,同时性能优于Vector、Hashtable(后者使用synchronized加锁,性能较差)。
4.6.1 并发集合核心分类与对比
| 并发集合类 | 对应非并发集合 | 核心特点/底层实现 | 适用场景 |
| ConcurrentHashMap | HashMap | 分段锁(JDK 1.8后改为CAS+ synchronized) | 高并发键值对存储 |
| CopyOnWriteArrayList | ArrayList | 写时复制(修改时复制整个数组,读无锁) | 读多写少场景 |
| CopyOnWriteArraySet | HashSet | 基于CopyOnWriteArrayList实现 | 读多写少的无序集合 |
| ConcurrentLinkedQueue | LinkedList | 无锁队列(CAS实现) | 高并发队列(FIFO) |
| ConcurrentLinkedDeque | LinkedList | 无锁双端队列 | 高并发双端队列(FIFO/LIFO) |
| LinkedBlockingQueue | LinkedList | 基于ReentrantLock的有界/无界队列 | 生产-消费模型(需控制并发数) |
| ArrayBlockingQueue | ArrayList | 基于数组的有界队列(ReentrantLock) | 固定容量的生产-消费模型 |
| SynchronousQueue | 无 | 无缓冲队列(生产-消费直接交互) | 线程间直接传递任务 |
| PriorityBlockingQueue | PriorityQueue | 基于堆的无界优先级队列 | 高并发优先级任务调度 |
4.6.2 核心并发集合实战
package com.jam.demo.juc.collection;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 并发集合实战示例
* 包含:ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue、LinkedBlockingQueue
* @author ken
*/
@Slf4j
public class ConcurrentCollectionDemo {
// 线程数
private static final int THREAD_NUM = 5;
// 每个线程操作次数
private static final int OPERATE_TIMES = 10000;
/**
* 示例1:ConcurrentHashMap(高并发键值对存储)
* 对比HashMap(非线程安全)和ConcurrentHashMap(线程安全)
*/
@Test
public void testConcurrentHashMap() throws InterruptedException {
// 1. 测试HashMap(非线程安全)
<String, Integer> hash<>();
CountDownLatch hashMapLatch = new CountDownLatch(THREAD_NUM);
ExecutorService hashMapExecutor = Executors.newFixedThreadPool(THREAD_NUM);
AtomicInteger hashMapErrorCount = new AtomicInteger(0);
for (int i = 0;< THREAD_NUM; i++) {
int threadId = i;
hashMapExecutor.submit(() -> {
for (int j = 0;< OPERATE_TIMES; j++) {
String key = "key-" + (j % 10); // 共10个key,模拟竞争
try {
// 非原子操作:get+put,会出现线程安全问题
hashMap.put(key, hashMap.getOrDefault(key, 0) + 1);
} catch (Exception e) {
hashMapErrorCount.incrementAndGet();
}
}
hashMapLatch.countDown();
});
}
hashMapLatch.await();
hashMapExecutor.shutdown();
int hashMapTotal = hashMap.values().stream().mapToInt(Integer::intValue).sum();
log.info("HashMap:总计数={}(预期={}),错误数={}", hashMapTotal, THREAD_NUM * OPERATE_TIMES, hashMapErrorCount.get());
// 2. 测试ConcurrentHashMap(线程安全)<String, Integer> concurrentHashMap = new Concurrent<>();
CountDownLatch chmLatch = new CountDownLatch(THREAD_NUM);
ExecutorService chmExecutor = Executors.newFixedThreadPool(THREAD_NUM);
for (int i = 0;< THREAD_NUM; i++) {
int threadId = i;
chmExecutor.submit(() -> {
for (int j = 0;< OPERATE_TIMES; j++) {
String key = "key-" + (j % 10);
// 原子操作:computeIfAbsent+getAndIncrement
concurrentHashMap.computeIfAbsent(key, k -> new AtomicInteger(0)).getAndIncrement();
}
chmLatch.countDown();
});
}
chmLatch.await();
chmExecutor.shutdown();
int chmTotal = concurrentHashMap.values().stream().mapToInt(AtomicInteger::intValue).sum();
log.info("ConcurrentHashMap:总计数={}(预期={})", chmTotal, THREAD_NUM * OPERATE_TIMES);
}
/**
* 示例2:CopyOnWriteArrayList(读多写少场景)
* 核心特点:写时复制,读无锁,写加锁(性能较差)
*/
@Test
public void testCopyOnWriteArrayList() throws InterruptedException {
List<String> cowList = new<>();
CountDownLatch latch = new CountDownLatch(THREAD_NUM);
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
for (int i = 0; i< THREAD_NUM; i++) {
int threadId = i;
executor.submit(() -> {
// 写操作:add(写时复制)
for (int j = 0; j < OPERATE_TIMES / 10; j++) {
cowList.add("data-" + threadId + "-" + j);
}
log.info("线程{}:写操作完成,当前列表大小={}", threadId, cowList.size());
// 读操作:遍历(无锁)
for (int j = 0;< 10; j++) {
String data = cowList.get(j % cowList.size());
if (j == 0) {
log.info("线程{}:读操作示例,data={}", threadId, data);
}
}
latch.countDown();
});
}
latch.await();
executor.shutdown();
log.info("CopyOnWriteArrayList最终大小:{}", cowList.size());
// 注意:CopyOnWriteArrayList的迭代器是快照迭代器,不支持remove操作
try {
for (String data : cowList) {
cowList.remove(data);
}
} catch (UnsupportedOperationException e) {
log.error("CopyOnWriteArrayList迭代器不支持remove操作", e);
}
}
/**
* 示例3:ConcurrentLinkedQueue(高并发无锁队列)
* 适用场景:高并发下的FIFO队列,无锁设计,性能优于LinkedBlockingQueue
*/
@Test
public void testConcurrentLinkedQueue() throws InterruptedException {<String> queue<>();
CountDownLatch producerLatch = new CountDownLatch(3); // 3个生产者
CountDownLatch consumerLatch = new CountDownLatch(2); // 2个消费者
ExecutorService executor = Executors.newFixedThreadPool(5);
// 生产者线程:向队列添加任务
for (int i = 0; i< 3; i++) {
int producerId = i;
executor.submit(() -> {
for (int j = < 1000; j++) {
String task = "task-" + producerId + "-" + j;
queue.offer(task);
if (j % 100 == 0) {
log.info("生产者{}:添加任务{},队列大小={}", producerId, task, queue.size());
}
// 模拟生产耗时
try {
Thread.sleep(1);
} catch (InterruptedException e) {
log.error("生产者{}被中断", producerId, e);
}
}
log.info("生产者{}:生产完成", producerId);
producerLatch.countDown();
});
}
// 消费者线程:从队列获取任务
for (int i = < 2; i++) {
int consumerId = i;
executor.submit(() -> {
while (true) {
// 非阻塞获取任务(无任务返回null)
String task = queue.poll();
if (!CollectionUtils.isEmpty(task)) {
log.info("消费者{}:处理任务{},队列剩余大小={}", consumerId, task, queue.size());
// 模拟消费耗时
try {
Thread.sleep(2);
} catch (InterruptedException e) {
log.error("消费者{}被中断", consumerId, e);
}
} else {
// 检查生产者是否完成,若完成则退出
if (producerLatch.getCount() == 0) {
break;
}
}
}
log.info("消费者{}:消费完成", consumerId);
consumerLatch.countDown();
});
}
producerLatch.await();
consumerLatch.await();
executor.shutdown();
log.info("所有任务处理完成,队列剩余大小={}", queue.size());
}
/**
* 示例4:LinkedBlockingQueue(有界队列,生产-消费模型)
* 核心特点:基于ReentrantLock,支持有界容量,可用于流量控制
*/
@Test
public void testLinkedBlockingQueue() throws InterruptedException {
// 初始化有界队列,容量100
Queue<Integer> queue = new Linked<>(100);
CountDownLatch producerLatch = new CountDownLatch(2);
CountDownLatch consumerLatch = new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(5);
// 生产者:向队列添加数据,队列满时阻塞
for (int i = 0; i< 2; i++) {
int producerId = i;
executor.submit(() -> {
for (int j = < 500; j++) {
try {
// 阻塞添加(队列满时等待)
queue.put(j);
if (j % 100 == 0) {
log.info("生产者{}:添加数据{},队列大小={}", producerId, j, queue.size());
}
} catch (InterruptedException e) {
log.error("生产者{}被中断", producerId, e);
return;
}
}
log.info("生产者{}:生产完成", producerId);
producerLatch.countDown();
});
}
// 消费者:从队列获取数据,队列空时阻塞
for (int i = 0< 3; i++) {
int consumerId = i;
executor.submit(() -> {
while (true) {
try {
// 阻塞获取(队列空时等待)
Integer data = queue.take();
log.info("消费者{}:处理数据{},队列剩余大小={}", consumerId, data, queue.size());
// 模拟消费耗时
Thread.sleep(1);
} catch (InterruptedException e) {
log.error("消费者{}被中断", consumerId, e);
return;
}
// 检查生产者是否完成且队列空,若满足则退出
if (producerLatch.getCount() == 0 && queue.isEmpty()) {
break;
}
}
log.info("消费者{}:消费完成", consumerId);
consumerLatch.countDown();
});
}
producerLatch.await();
consumerLatch.await();
executor.shutdown();
log.info("所有任务处理完成,队列剩余大小={}", queue.size());
}
}
4.7 CompletableFuture:异步编程神器
CompletableFuture是JDK 8引入的异步编程工具,基于Future和回调机制,支持链式调用、异步组合、异常处理,解决了传统Future需要阻塞获取结果的问题,是JUC中异步编程的核心组件。
4.7.1 CompletableFuture核心特点
- 异步执行:支持异步提交任务,无需阻塞等待结果;
- 链式调用:通过thenApply、thenAccept、thenRun等方法实现链式处理;
- 任务组合:支持多个CompletableFuture的组合(allOf、anyOf);
- 异常处理:通过exceptionally、whenComplete等方法处理异步任务异常;
- 自定义线程池:可指定线程池执行任务,避免使用默认的ForkJoinPool。
4.7.2 CompletableFuture核心方法分类
| 方法类型 | 核心方法 | 功能描述 |
| 异步执行 | runAsync、supplyAsync | 无返回值/有返回值的异步任务 |
| 链式处理 | thenApply、thenAccept、thenRun | 任务完成后执行下一个任务(有输入/无输入) |
| 异步组合 | thenCompose、thenCombine、allOf、anyOf | 多个任务的依赖组合/并行组合 |
| 异常处理 | exceptionally、whenComplete、handle | 任务异常时的处理/任务完成后的统一处理 |
| 结果获取 | get、join、getNow | 阻塞获取/非阻塞获取/立即获取(有默认值) |
4.7.3 CompletableFuture实战:异步任务链与组合
package com.jam.demo.juc.future;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* CompletableFuture实战示例
* 包含:异步任务链、任务组合、异常处理、实战场景(异步查询+数据聚合)
* @author ken
*/
@Slf4j
public class CompletableFutureDemo {
// 自定义线程池(避免使用默认ForkJoinPool)
private static final ExecutorService CUSTOM_EXECUTOR = Executors.newFixedThreadPool(5);
/**
* 模拟服务调用:查询用户信息
* @param userId 用户ID
* @return 用户信息
*/
public<User> queryUser(Long userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("查询用户信息:userId={}", userId);
// 模拟服务调用耗时(500ms)
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
log.error("查询用户信息被中断", e);
throw new RuntimeException("查询用户信息失败", e);
}
// 模拟返回用户信息
return new User(userId, "用户" + userId, 20 + (int) (userId % 10));
}, CUSTOM_EXECUTOR);
}
/**
* 模拟服务调用:查询用户订单
* @param userId 用户ID
* @return 订单列表
*/
<List<Order>> queryOrders(Long userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("查询用户订单:userId={}", userId);
// 模拟服务调用耗时(800ms)
try {
TimeUnit.MILLISECONDS.sleep(800);
} catch (InterruptedException e) {
log.error("查询用户订单被中断", e);
throw new RuntimeException("查询用户订单失败", e);
}
// 模拟返回订单列表<Order> orders = Lists.newArrayList();
for (int i = 0; i < 2; i++) {
orders.add(new Order(Long.valueOf(i + 1), userId, "订单" + userId + "-" + i, 100.0 * (i + 1)));
}
return orders;
}, CUSTOM_EXECUTOR);
}
/**
* 模拟服务调用:查询用户地址
* @param userId 用户ID
* @return 地址信息
*/
public CompletableFuture<Address> queryAddress(Long userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("查询用户地址:userId={}", userId);
// 模拟服务调用耗时(600ms)
try {
TimeUnit.MILLISECONDS.sleep(600);
} catch (InterruptedException e) {
log.error("查询用户地址被中断", e);
throw new RuntimeException("查询用户地址失败", e);
}
// 模拟返回地址信息
return new Address(Long.valueOf(userId), "省份" + userId, "城市" + userId, "详细地址" + userId);
}, CUSTOM_EXECUTOR);
}
/**
* 示例1:异步任务链(thenApply/thenAccept/thenRun)
* 场景:查询用户信息 -> 处理用户信息 -> 记录日志
*/
@Test
public void testTaskChain() throws InterruptedException {
Comp<Void> future = queryUser(1L)
// 1. thenApply:有输入有输出,处理用户信息(异步执行)
.thenApply(user -> {
log.info("处理用户信息:{}", user);
user.setAge(user.getAge() + 1); // 模拟处理
return user;
})
// 2. thenAccept:有输入无输出,消费处理结果(异步执行)
.thenAccept(user -> {
log.info("消费处理后的用户信息:{}", user);
})
// 3. thenRun:无输入无输出,执行后续操作(异步执行)
.thenRun(() -> {
log.info("任务链执行完成,记录日志");
})
// 4. 异常处理:所有环节的异常都会走到这里
.exceptionally(ex -> {
log.error("任务链执行失败", ex);
return null;
});
// 等待任务完成
future.join();
CUSTOM_EXECUTOR.shutdown();
}
/**
* 示例2:任务组合(thenCompose/thenCombine/allOf/anyOf)
* 场景1:thenCompose(依赖任务,先查用户再查订单)
* 场景2:thenCombine(并行任务,查用户和地址后聚合)
* 场景3:allOf(多个并行任务,等待所有完成)
* 场景4:anyOf(多个并行任务,等待任意一个完成)
*/
@Test
public void testTaskCombine() throws InterruptedException {
// 场景1:thenCompose(依赖任务,先查用户再查订单)
log.info("=== 场景1:thenCompose(依赖任务) ===");
CompletableFuture<List<Order>> dependentFuture = queryUser(1L)
.thenCompose(user -> {
log.info("用户查询完成,开始查询订单:{}", user);
return queryOrders(user.getId());
})
.exceptionally(ex -> {
log.error("依赖任务执行失败", ex);
return Lists.newArrayList();
});<Order> orders = dependentFuture.join();
log.info("依赖任务结果:订单列表={}", orders);
// 场景2:thenCombine(并行任务,查用户和地址后聚合)
log.info("\n=== 场景2:thenCombine(并行任务) ===");
Completable<UserDetail> combineFuture = queryUser(2L)
.thenCombine(queryAddress(2L), (user, address) -> {
log.info("用户和地址查询完成,开始聚合");
return new UserDetail(user, address);
})
.exceptionally(ex -> {
log.error("并行任务执行失败", ex);
return null;
});
UserDetail userDetail = combineFuture.join();
log.info("并行任务结果:用户详情={}", userDetail);
// 场景3:allOf(多个并行任务,等待所有完成)
log.info("\n=== 场景3:allOf(多任务并行) ===");
Complet<User> userFuture = queryUser(3L);
Completable<Order>> orderFuture = queryOrders(3L);
Complet<Address> addressFuture = queryAddress(3L);
// 等待所有任务完成
<Void> allOfFuture = CompletableFuture.allOf(userFuture, orderFuture, addressFuture)
.thenRun(() -> {
log.info("所有任务完成,开始聚合数据");
User user = userFuture.join();
List<Order> orderList = orderFuture.join();
Address address = addressFuture.join();
UserDetail detail = new UserDetail(user, address, orderList);
log.info("多任务聚合结果:{}", detail);
})
.exceptionally(ex -> {
log.error("多任务执行失败", ex);
return null;
});
allOfFuture.join();
// 场景4:anyOf(多个并行任务,等待任意一个完成)
log.info("\n=== 场景4:anyOf(任意任务完成) ===");
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
log.error("task1被中断", e);
}
return "task1完成";
}, CUSTOM_EXECUTOR);
Comp<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
log.error("task2被中断", e);
}
return "task2完成";
}, CUSTOM_EXECUTOR);
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(task1, task2)
.thenAccept(result -> {
log.info("任意任务完成,结果:{}", result);
})
.exceptionally(ex -> {
log.error("任务执行失败", ex);
return null;
});
anyOfFuture.join();
CUSTOM_EXECUTOR.shutdown();
}
/**
* 示例3:异常处理(exceptionally/whenComplete/handle)
*/
@Test
public void testExceptionHandle() throws InterruptedException {
// 1. exceptionally:异常时返回默认值
log.info("=== 测试exceptionally ===");
Completable<User> exceptionallyFuture = queryUser(999L)
.exceptionally(ex -> {
log.error("查询用户失败,返回默认用户", ex);
return new User(0L, "默认用户", 0); // 异常时返回默认值
});
log.info("exceptionally结果:{}", exceptionallyFuture.join());
// 2. whenComplete:完成/异常时都执行,不改变结果
log.info("\n=== 测试whenComplete ===");
Comp<User> whenCompleteFuture = queryUser(888L)
.whenComplete((user, ex) -> {
if (ex == null) {
log.info("查询成功,用户:{}", user);
} else {
log.error("查询失败", ex);
}
});
try {
whenCompleteFuture.join();
} catch (Exception e) {
log.error("join时捕获异常", e);
}
// 3. handle:完成/异常时都执行,可改变结果
log.info("\n=== 测试handle ===");
Completable<User> handleFuture = queryUser(777L)
.handle((user, ex) -> {
if (ex == null) {
user.setAge(user.getAge() + 5);
return user;
} else {
log.error("查询失败,返回默认用户", ex);
return new User(0L, "handle默认用户", 0);
}
});
log.info("handle结果:{}", handleFuture.join());
CUSTOM_EXECUTOR.shutdown();
}
/**
* 示例4:实战场景:异步批量查询用户详情并聚合
* 场景:批量查询10个用户的详情(用户信息+订单+地址),然后聚合返回
*/
@Test
public void testBatchQuery() throws InterruptedException {
// 待查询的用户ID列表
<Long> userIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
// 批量异步查询每个用户的详情
List<UserDetail>> futureList = userIds.stream()
.map(userId -> {
// 每个用户的详情查询:并行查询用户、订单、地址
Complet<User> userFuture = queryUser(userId);
<List<Order>> orderFuture = queryOrders(userId);
<Address> addressFuture = queryAddress(userId);
// 聚合单个用户的详情
return CompletableFuture.allOf(userFuture, orderFuture, addressFuture)
.thenApply(v -> {
User user = userFuture.join();
<Order> orders = orderFuture.join();
Address address = addressFuture.join();
return new UserDetail(user, address, orders);
})
.exceptionally(ex -> {
log.error("查询用户{}详情失败", userId, ex);
return new UserDetail(new User(userId, "查询失败", 0), null, null);
});
})
.collect(Collectors.toList());
// 等待所有用户详情查询完成
Completable<Void> allOfFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenRun(() -> {
log.info("所有用户详情查询完成,开始聚合<UserDetail> userDetails = futureList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
log.info("批量查询结果:共{}个用户详情", userDetails.size());
// 过滤查询成功的用户
<UserDetail> successDetails = userDetails.stream()
.filter(detail -> !"查询失败".equals(detail.getUser().getName()))
.collect(Collectors.toList());
log.info("查询成功的用户数:{}", successDetails.size());
})
.exceptionally(ex -> {
log.error("批量查询失败", ex);
return null;
});
allOfFuture.join();
CUSTOM_EXECUTOR.shutdown();
}
/**
* 模拟用户类
*/
@Data
static class User {
private Long id;
private String name;
private Integer age;
public User(Long id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
}
/**
* 模拟订单类
*/
@Data
static class Order {
private Long id;
private Long userId;
private String orderNo;
private Double amount;
public Order(Long id, Long userId, String orderNo, Double amount) {
this.id = id;
this.userId = userId;
this.orderNo = orderNo;
this.amount = amount;
}
}
/**
* 模拟地址类
*/
@Data
static class Address {
private Long userId;
private String province;
private String city;
private String detail;
public Address(Long userId, String province, String city, String detail) {
this.userId = userId;
this.province = province;
this.city = city;
this.detail = detail;
}
}
/**
* 模拟用户详情类(聚合用户、地址、订单)
*/
@Data
static class UserDetail {
private User user;
private Address address;
<Order> orders;
public UserDetail(User user, Address address) {
this.user = user;
this.address = address;
}
public UserDetail(User user, Address address,<Order> orders) {
this.user = user;
this.address = address;
this.orders = orders;
}
}
}
五、JUC高级应用:并发编程设计模式与实战
5.1 生产者-消费者模式(基于阻塞队列)
生产者-消费者模式是并发编程中最经典的模式之一,用于解耦生产者和消费者,平衡两者的处理速度。JUC中的LinkedBlockingQueue、ArrayBlockingQueue等阻塞队列是实现该模式的理想工具。
核心实现思路:
- 生产者:向阻塞队列添加任务,队列满时阻塞;
- 消费者:从阻塞队列获取任务,队列空时阻塞;
- 队列:作为缓冲区,平衡生产者和消费者的速度;
- 终止条件:生产者完成任务后,向队列添加“终止信号”,消费者获取到信号后退出。
实战示例:基于LinkedBlockingQueue的生产者-消费者模式
package com.jam.demo.juc.pattern;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.CollectionUtils;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 生产者-消费者模式实战(基于阻塞队列)
* 场景:3个生产者生产任务,2个消费者消费任务,任务完成后优雅终止
* @author ken
*/
@Slf4j
public class ProducerConsumerPattern {
// 阻塞队列(缓冲区),容量10
private final LinkedBlocking<Task> queue =<>(10);
// 终止信号(用于通知消费者退出)
private static final Task TERMINATE_TASK = new Task(-1, "终止任务");
// 生产者数量
private static final int PRODUCER_NUM = 3;
// 消费者数量
private static final int CONSUMER_NUM = 2;
// 每个生产者生产的任务数
private static final int TASK_PER_PRODUCER = 5;
/**
* 任务类
*/
@lombok.Data
static class Task {
private int id;
private String content;
public Task(int id, String content) {
this.id = id;
this.content = content;
}
}
/**
* 生产者线程
*/
class Producer implements Runnable {
private int producerId;
public Producer(int producerId) {
this.producerId = producerId;
}
@Override
public void run() {
try {
for (int i =< TASK_PER_PRODUCER; i++) {
// 生成任务
Task task = new Task(i, "任务-" + producerId + "-" + i);
// 向队列添加任务(队列满时阻塞)
queue.put(task);
log.info("生产者{}:生产任务{},队列大小={}", producerId, task, queue.size());
// 模拟生产耗时
TimeUnit.MILLISECONDS.sleep(100);
}
log.info("生产者{}:生产完成", producerId);
} catch (InterruptedException e) {
log.error("生产者{}被中断", producerId, e);
Thread.currentThread().interrupt();
}
}
}
/**
* 消费者线程
*/
class Consumer implements Runnable {
private int consumerId;
public Consumer(int consumerId) {
this.consumerId = consumerId;
}
@Override
public void run() {
try {
while (true) {
// 从队列获取任务(队列空时阻塞)
Task task = queue.take();
// 检查是否为终止信号
if (task == TERMINATE_TASK) {
log.info("消费者{}:收到终止信号,退出", consumerId);
// 将终止信号放回队列,让其他消费者也能收到
queue.put(TERMINATE_TASK);
break;
}
// 处理任务
processTask(task);
}
} catch (InterruptedException e) {
log.error("消费者{}被中断", consumerId, e);
Thread.currentThread().interrupt();
}
}
/**
* 处理任务
*/
private void processTask(Task task) {
log.info("消费者{}:开始处理任务{}", consumerId, task);
// 模拟处理耗时
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
log.error("消费者{}处理任务{}被中断", consumerId, task.getId(), e);
Thread.currentThread().interrupt();
}
log.info("消费者{}:完成处理任务{}", consumerId, task);
}
}
/**
* 启动生产者-消费者模式
*/
public void start() throws InterruptedException {
// 启动生产者线程
Thread[] producers = new Thread[PRODUCER_NUM];
for (int i = 0; i< PRODUCER_NUM; i++) {
producers[i] = new Thread(new Producer(i), "Producer-" + i);
producers[i].start();
}
// 启动消费者线程
Thread[] consumers = new Thread[CONSUMER_NUM];
for (int i =< CONSUMER_NUM; i++) {
consumers[i] = new Thread(new Consumer(i), "Consumer-" + i);
consumers[i].start();
}
// 等待所有生产者完成生产
for (Thread producer : producers) {
producer.join();
}
log.info("所有生产者生产完成,添加终止信号");
// 向队列添加终止信号(消费者收到后退出)
queue.put(TERMINATE_TASK);
// 等待所有消费者完成消费
for (Thread consumer : consumers) {
consumer.join();
}
log.info("所有消费者消费完成,生产者-消费者模式结束");
}
@Test
public void testProducerConsumer() throws InterruptedException {
start();
}
}
5.2 线程池 + CompletableFuture:异步任务编排实战
在实际开发中,经常需要编排多个异步任务(如并行查询多个服务、串行处理依赖任务),结合线程池和CompletableFuture可以高效实现复杂的任务编排。
实战示例:电商订单提交的异步任务编排
package com.jam.demo.juc.pattern;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 异步任务编排实战(电商订单提交场景)
* 场景:订单提交需要执行以下任务:
* 1. 校验用户信息(串行,必须先执行)
* 2. 并行执行:校验库存、查询用户地址、查询优惠券
* 3. 串行执行:扣减库存、扣减优惠券、创建订单
* 4. 异步执行:发送订单通知、记录订单日志
* @author ken
*/
@Slf4j
public class AsyncTaskOrchestration {
// 自定义线程池(按任务类型拆分线程池,提高隔离性)
private static final ExecutorService VALIDATE_EXECUTOR = Executors.newFixedThreadPool(2);
private static final ExecutorService BUSINESS_EXECUTOR = Executors.newFixedThreadPool(5);
private static final ExecutorService NOTIFY_EXECUTOR = Executors.newFixedThreadPool(2);
/**
* 订单请求类
*/
@Data
static class OrderRequest {
private Long userId;
private Long productId;
private Integer quantity;
private Long couponId;
}
/**
* 订单响应类
*/
@Data
static class OrderResponse {
private Boolean success;
private String orderNo;
private String message;
}
/**
* 步骤1:校验用户信息(串行,必须先执行)
*/
public CompletableFuture<Boolean> validateUser(Long userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("校验用户信息:userId={}", userId);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
log.error("校验用户信息被中断", e);
return false;
}
// 模拟校验逻辑:userId>0则校验通过
boolean valid = !ObjectUtils.isEmpty(userId) && userId > 0;
log.info("用户校验结果:{}", valid);
return valid;
}, VALIDATE_EXECUTOR);
}
/**
* 步骤2-1:校验库存
*/
public CompletableFuture<Boolean> validateStock(Long productId, Integer quantity) {
return CompletableFuture.supplyAsync(() -> {
log.info("校验库存:productId={},quantity={}", productId, quantity);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
log.error("校验库存被中断", e);
return false;
}
// 模拟库存校验:productId有效且quantity<=10则通过
boolean valid = !ObjectUtils.isEmpty(productId)
&& !ObjectUtils.isEmpty(quantity)
&& quantity <= 10;
log.info("库存校验结果:{}", valid);
return valid;
}, BUSINESS_EXECUTOR);
}
/**
* 步骤2-2:查询用户地址
*/
public CompletableFuture<String> queryAddress(Long userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("查询用户地址:userId={}", userId);
try {
TimeUnit.MILLISECONDS.sleep(400);
} catch (InterruptedException e) {
log.error("查询用户地址被中断", e);
return null;
}
// 模拟查询结果:userId有效则返回地址
if (ObjectUtils.isEmpty(userId) || userId <= 0) {
return null;
}
String address = "省份A-城市B-详细地址" + userId;
log.info("用户地址查询结果:{}", address);
return address;
}, BUSINESS_EXECUTOR);
}
/**
* 步骤2-3:查询优惠券(可选,couponId为空则跳过)
*/
public CompletableFuture<Boolean> queryCoupon(Long userId, Long couponId) {
return CompletableFuture.supplyAsync(() -> {
if (ObjectUtils.isEmpty(couponId)) {
log.info("无优惠券信息,跳过查询");
return true;
}
log.info("查询优惠券:userId={},couponId={}", userId, couponId);
try {
TimeUnit.MILLISECONDS.sleep(350);
} catch (InterruptedException e) {
log.error("查询优惠券被中断", e);
return false;
}
// 模拟优惠券校验:userId和couponId匹配则有效
boolean valid = !ObjectUtils.isEmpty(userId) && userId > 0;
log.info("优惠券校验结果:{}", valid);
return valid;
}, BUSINESS_EXECUTOR);
}
/**
* 步骤3-1:扣减库存
*/
public CompletableFuture<Boolean> deductStock(Long productId, Integer quantity) {
return CompletableFuture.supplyAsync(() -> {
log.info("扣减库存:productId={},quantity={}", productId, quantity);
try {
TimeUnit.MILLISECONDS.sleep(450);
} catch (InterruptedException e) {
log.error("扣减库存被中断", e);
return false;
}
// 模拟扣减逻辑:productId和quantity有效则成功
boolean success = !ObjectUtils.isEmpty(productId) && !ObjectUtils.isEmpty(quantity);
log.info("库存扣减结果:{}", success);
return success;
}, BUSINESS_EXECUTOR);
}
/**
* 步骤3-2:扣减优惠券(可选)
*/
public CompletableFuture<Boolean> deductCoupon(Long userId, Long couponId) {
return CompletableFuture.supplyAsync(() -> {
if (ObjectUtils.isEmpty(couponId)) {
log.info("无优惠券,跳过扣减");
return true;
}
log.info("扣减优惠券:userId={},couponId={}", userId, couponId);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
log.error("扣减优惠券被中断", e);
return false;
}
boolean success = !ObjectUtils.isEmpty(userId) && userId > 0;
log.info("优惠券扣减结果:{}", success);
return success;
}, BUSINESS_EXECUTOR);
}
/**
* 步骤3-3:创建订单
*/
public CompletableFuture<String> createOrder(OrderRequest request, String address) {
return CompletableFuture.supplyAsync(() -> {
log.info("创建订单:request={},address={}", request, address);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
log.error("创建订单被中断", e);
return null;
}
// 模拟创建订单:生成UUID作为订单号
String orderNo = UUID.randomUUID().toString().replace("-", "");
log.info("订单创建成功:orderNo={}", orderNo);
return orderNo;
}, BUSINESS_EXECUTOR);
}
/**
* 步骤4-1:发送订单通知(异步,不阻塞主流程)
*/
public CompletableFuture<Void> sendNotify(String orderNo, Long userId) {
return CompletableFuture.runAsync(() -> {
log.info("发送订单通知:orderNo={},userId={}", orderNo, userId);
try {
TimeUnit.MILLISECONDS.sleep(600);
} catch (InterruptedException e) {
log.error("发送订单通知被中断", e);
}
log.info("订单通知发送完成:orderNo={}", orderNo);
}, NOTIFY_EXECUTOR);
}
/**
* 步骤4-2:记录订单日志(异步,不阻塞主流程)
*/
public CompletableFuture<Void> recordLog(String orderNo, OrderRequest request) {
return CompletableFuture.runAsync(() -> {
log.info("记录订单日志:orderNo={},request={}", orderNo, request);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
log.error("记录订单日志被中断", e);
}
log.info("订单日志记录完成:orderNo={}", orderNo);
}, NOTIFY_EXECUTOR);
}
/**
* 核心:订单提交任务编排
*/
public CompletableFuture<OrderResponse> submitOrder(OrderRequest request) {
// 参数校验
if (ObjectUtils.isEmpty(request)
|| ObjectUtils.isEmpty(request.getUserId())
|| ObjectUtils.isEmpty(request.getProductId())
|| ObjectUtils.isEmpty(request.getQuantity())) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("订单参数不完整");
return CompletableFuture.completedFuture(response);
}
// 任务编排流程
return validateUser(request.getUserId())
// 步骤1:用户校验失败,直接返回
.thenCompose(userValid -> {
if (!userValid) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("用户信息校验失败");
return CompletableFuture.completedFuture(response);
}
// 步骤2:并行执行3个任务
CompletableFuture<Boolean> stockValidFuture = validateStock(request.getProductId(), request.getQuantity());
CompletableFuture<String> addressFuture = queryAddress(request.getUserId());
CompletableFuture<Boolean> couponValidFuture = queryCoupon(request.getUserId(), request.getCouponId());
// 等待并行任务全部完成
return CompletableFuture.allOf(stockValidFuture, addressFuture, couponValidFuture)
.thenCompose(v -> {
// 获取并行任务结果
Boolean stockValid = stockValidFuture.join();
String address = addressFuture.join();
Boolean couponValid = couponValidFuture.join();
// 校验并行任务结果
if (!stockValid) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("库存不足或商品无效");
return CompletableFuture.completedFuture(response);
}
if (StringUtils.isEmpty(address)) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("用户地址查询失败");
return CompletableFuture.completedFuture(response);
}
if (!couponValid) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("优惠券无效");
return CompletableFuture.completedFuture(response);
}
// 步骤3:串行执行3个核心任务(扣减库存->扣减优惠券->创建订单)
return deductStock(request.getProductId(), request.getQuantity())
.thenCompose(stockDeductSuccess -> {
if (!stockDeductSuccess) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("库存扣减失败");
return CompletableFuture.completedFuture(response);
}
return deductCoupon(request.getUserId(), request.getCouponId())
.thenCompose(couponDeductSuccess -> {
if (!couponDeductSuccess) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("优惠券扣减失败");
return CompletableFuture.completedFuture(response);
}
return createOrder(request, address)
.thenCompose(orderNo -> {
if (StringUtils.isEmpty(orderNo)) {
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("订单创建失败");
return CompletableFuture.completedFuture(response);
}
// 步骤4:异步执行通知和日志(不阻塞主流程)
sendNotify(orderNo, request.getUserId());
recordLog(orderNo, request);
// 返回成功结果
OrderResponse response = new OrderResponse();
response.setSuccess(true);
response.setOrderNo(orderNo);
response.setMessage("订单提交成功");
return CompletableFuture.completedFuture(response);
});
});
});
});
});
})
// 全局异常处理
.exceptionally(ex -> {
log.error("订单提交异常", ex);
OrderResponse response = new OrderResponse();
response.setSuccess(false);
response.setMessage("订单提交失败:" + ex.getMessage());
return response;
});
}
/**
* 测试订单提交
*/
@Test
public void testSubmitOrder() throws InterruptedException {
// 构造有效订单请求
OrderRequest validRequest = new OrderRequest();
validRequest.setUserId(1L);
validRequest.setProductId(1001L);
validRequest.setQuantity(2);
validRequest.setCouponId(2001L);
// 构造无效订单请求(库存不足)
OrderRequest invalidStockRequest = new OrderRequest();
invalidStockRequest.setUserId(1L);
invalidStockRequest.setProductId(1001L);
invalidStockRequest.setQuantity(20); // 超过最大库存10
invalidStockRequest.setCouponId(2001L);
// 测试有效请求
log.info("=== 测试有效订单请求 ===");
CompletableFuture<OrderResponse> validFuture = submitOrder(validRequest);
OrderResponse validResponse = validFuture.join();
log.info("有效订单请求结果:{}", validResponse);
// 测试无效请求
log.info("\n=== 测试无效订单请求(库存不足) ===");
CompletableFuture<OrderResponse> invalidFuture = submitOrder(invalidStockRequest);
OrderResponse invalidResponse = invalidFuture.join();
log.info("无效订单请求结果:{}", invalidResponse);
// 关闭线程池
VALIDATE_EXECUTOR.shutdown();
BUSINESS_EXECUTOR.shutdown();
NOTIFY_EXECUTOR.shutdown();
// 等待线程池关闭
VALIDATE_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS);
BUSINESS_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS);
NOTIFY_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS);
}
}
5.3 并发编程常见问题与解决方案
5.3.1 死锁问题
死锁是并发编程中最严重的问题之一,指两个或多个线程互相持有对方需要的资源,导致所有线程都无法继续执行。
死锁产生的4个必要条件:
- 互斥条件:资源只能被一个线程持有;
- 持有并等待条件:线程持有一个资源的同时,等待另一个资源;
- 不可剥夺条件:资源不能被强制从线程手中剥夺;
- 循环等待条件:线程之间形成资源请求的循环链。
实战示例:死锁模拟与排查
package com.jam.demo.juc.problem;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
/**
* 死锁问题模拟与排查
* @author ken
*/
@Slf4j
public class DeadLockDemo {
// 资源A
private final Object resourceA = new Object();
// 资源B
private final Object resourceB = new Object();
/**
* 模拟死锁:线程1持有A等待B,线程2持有B等待A
*/
public void simulateDeadLock() {
// 线程1
new Thread(() -> {
synchronized (resourceA) {
log.info("线程1:持有资源A,等待资源B");
try {
// 持有A资源,休眠1秒,让线程2有时间持有B资源
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("线程1被中断", e);
}
// 尝试获取B资源
synchronized (resourceB) {
log.info("线程1:成功获取资源B");
}
}
}, "Thread-1").start();
// 线程2
new Thread(() -> {
synchronized (resourceB) {
log.info("线程2:持有资源B,等待资源A");
try {
// 持有B资源,休眠1秒,让线程1有时间持有A资源
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("线程2被中断", e);
}
// 尝试获取A资源
synchronized (resourceA) {
log.info("线程2:成功获取资源A");
}
}
}, "Thread-2").start();
}
/**
* 解决死锁:破坏循环等待条件(统一资源获取顺序)
*/
public void solveDeadLock() {
// 线程1:按A->B顺序获取资源
new Thread(() -> {
synchronized (resourceA) {
log.info("线程1:持有资源A,等待资源B");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("线程1被中断", e);
}
synchronized (resourceB) {
log.info("线程1:成功获取资源B");
}
}
}, "Thread-1").start();
// 线程2:同样按A->B顺序获取资源(破坏循环等待)
new Thread(() -> {
synchronized (resourceA) {
log.info("线程2:持有资源A,等待资源B");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("线程2被中断", e);
}
synchronized (resourceB) {
log.info("线程2:成功获取资源B");
}
}
}, "Thread-2").start();
}
@Test
public void testDeadLock() throws InterruptedException {
log.info("=== 模拟死锁 ===");
simulateDeadLock();
// 休眠10秒,观察死锁状态
TimeUnit.SECONDS.sleep(10);
// 死锁排查方法:
// 1. jps:获取进程ID(如1234)
// 2. jstack 1234:查看线程堆栈,会显示死锁信息
// 3. 结果示例:
// Found one Java-level deadlock:
// =============================
// "Thread-2":
// waiting for ownable synchronizer 0x000000076b6e20c8, (a java.lang.Object)
// which is held by "Thread-1"
// "Thread-1":
// waiting for ownable synchronizer 0x000000076b6e20d8, (a java.lang.Object)
// which is held by "Thread-2"
log.info("\n=== 解决死锁 ===");
solveDeadLock();
TimeUnit.SECONDS.sleep(10);
}
}
死锁解决方案:
- 破坏循环等待条件:统一资源获取顺序(如所有线程都按A->B->C的顺序获取资源);
- 破坏持有并等待条件:一次性获取所有需要的资源,获取失败则释放已持有资源;
- 破坏不可剥夺条件:使用可中断的锁(如ReentrantLock的lockInterruptibly()),超时则释放资源;
- 使用工具排查:jstack、jconsole、VisualVM等工具定位死锁。
5.3.2 线程安全问题(常见场景与解决方案)
1. 常见线程安全问题场景:
- 共享可变变量未加锁;
- 集合类(ArrayList、HashMap)在并发环境下使用;
- 单例模式的懒加载未加锁;
- 事务管理中的并发问题(如并发修改同一数据)。
2. 解决方案:
- 使用线程安全的类(ConcurrentHashMap、CopyOnWriteArrayList);
- 对共享资源加锁(ReentrantLock、synchronized);
- 使用原子类(AtomicXxx、LongAdder)保证原子操作;
- 单例模式:使用饿汉式、枚举单例,或双重检查锁(volatile+锁)。
实战示例:线程安全的单例模式
package com.jam.demo.juc.problem;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程安全的单例模式实战
* @author ken
*/
@Slf4j
public class SingletonDemo {
/**
* 双重检查锁单例(线程安全)
* 核心:volatile防止指令重排序,双重检查减少锁开销
*/
public static class DoubleCheckSingleton {
// volatile修饰:防止instance = new DoubleCheckSingleton()的指令重排序
private static volatile DoubleCheckSingleton instance;
// 私有构造方法,防止外部实例化
private DoubleCheckSingleton() {}
/**
* 获取单例实例
* @return 单例实例
*/
public static DoubleCheckSingleton getInstance() {
// 第一次检查:未初始化则进入同步块(减少锁开销)
if (instance == null) {
synchronized (DoubleCheckSingleton.class) {
// 第二次检查:防止多线程同时进入同步块后重复初始化
if (instance == null) {
// 指令重排序问题:new操作分为3步(分配内存、初始化、赋值)
// volatile可禁止重排序,确保初始化完成后再赋值
instance = new DoubleCheckSingleton();
}
}
}
return instance;
}
}
/**
* 枚举单例(线程安全,推荐)
* 核心:JVM保证枚举类的实例唯一且线程安全
*/
public enum EnumSingleton {
INSTANCE;
// 枚举类的方法
public void doSomething() {
log.info("枚举单例执行方法");
}
}
/**
* 测试单例模式的线程安全性
*/
@Test
public void testSingletonThreadSafe() throws InterruptedException {
int threadNum = 100;
CountDownLatch latch = new CountDownLatch(threadNum);
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
AtomicReference<DoubleCheckSingleton> singletonRef = new AtomicReference<>();
boolean[] threadSafe = {true};
// 测试双重检查锁单例
for (int i = 0; i < threadNum; i++) {
executor.submit(() -> {
DoubleCheckSingleton instance = DoubleCheckSingleton.getInstance();
if (singletonRef.get() == null) {
singletonRef.set(instance);
} else if (singletonRef.get() != instance) {
threadSafe[0] = false;
}
latch.countDown();
});
}
latch.await();
executor.shutdown();
log.info("双重检查锁单例是否线程安全:{}", threadSafe[0]);
// 测试枚举单例
log.info("\n=== 测试枚举单例 ===");
EnumSingleton instance1 = EnumSingleton.INSTANCE;
EnumSingleton instance2 = EnumSingleton.INSTANCE;
log.info("枚举单例实例是否相同:{}", instance1 == instance2);
instance1.doSomething();
}
}
六、JUC框架核心总结与最佳实践
6.1 核心组件总结
| 组件类型 | 核心组件 | 核心功能 | 底层实现/设计模式 |
| 同步锁 | ReentrantLock、ReadWriteLock | 独占锁、读写分离锁 | AQS、模板方法模式 |
| 共享同步工具 | CountDownLatch、CyclicBarrier、Semaphore | 多线程协调、并发数控制 | AQS(CountDownLatch、Semaphore)、ReentrantLock(CyclicBarrier) |
| 线程池 | ThreadPoolExecutor | 线程复用、任务管理、并发控制 | 核心参数+任务队列+拒绝策略 |
| 原子类 | AtomicXxx、LongAdder | 线程安全的变量操作 | CAS操作、分段累加(LongAdder) |
| 并发集合 | ConcurrentHashMap、CopyOnWriteArrayList | 线程安全的集合存储 | 分段锁/CAS(ConcurrentHashMap)、写时复制(CopyOnWriteArrayList) |
| 异步编程 | CompletableFuture | 异步任务执行、链式调用、任务组合 | Future+回调机制、线程池 |
6.2 最佳实践
1. 锁的选择
- 简单场景:使用synchronized(JDK 1.6后优化,性能接近ReentrantLock);
- 复杂场景(可中断、超时、多条件):使用ReentrantLock;
- 读写分离场景(读多写少):使用ReadWriteLock(ReentrantReadWriteLock);
- 原子变量操作:使用AtomicXxx或LongAdder(高并发累加)。
2. 线程池的使用
- 禁止使用Executors创建线程池,自定义ThreadPoolExecutor,明确核心参数;
- 按任务类型拆分线程池(如IO密集型、CPU密集型、定时任务),提高隔离性;
- 合理设置队列容量(有界队列),避免内存溢出;
- 结合CompletableFuture实现异步任务编排,提高吞吐量。
3. 并发集合的选择
- 高并发键值对存储:ConcurrentHashMap;
- 读多写少的列表:CopyOnWriteArrayList;
- 高并发队列(FIFO):ConcurrentLinkedQueue;
- 生产-消费模型:LinkedBlockingQueue、ArrayBlockingQueue;
- 无缓冲队列(直接交互):SynchronousQueue。
4. 异步编程
- 使用CompletableFuture替代传统Future,支持链式调用和异常处理;
- 自定义线程池执行异步任务,避免使用默认的ForkJoinPool;
- 复杂任务编排:使用thenCompose(依赖任务)、thenCombine(并行任务)、allOf(多任务并行);
- 必须处理异常:使用exceptionally、whenComplete、handle方法捕获异步任务异常。
5. 性能优化
- 减少锁粒度:使用分段锁(ConcurrentHashMap)、锁剥离;
- 避免锁竞争:使用无锁编程(CAS、原子类);
- 读写分离:使用ReadWriteLock,提高读操作吞吐量;
- 批量处理:结合线程池和分片处理大量任务(如批量数据库操作)。
6. 问题排查
- 死锁:使用jstack、VisualVM排查;
- 线程泄漏:检查线程池是否正确关闭,避免创建过多线程;
- 性能问题:使用JProfiler、Arthas排查线程阻塞、锁竞争问题;
- 线程安全问题:使用线程安全的类,加锁时注意锁的范围和顺序。
七、总结
JUC框架是Java并发编程的核心,涵盖了同步锁、线程池、原子类、并发集合、异步编程等核心组件,其底层都基于AQS的设计思想,通过“状态变量+队列”的模式实现高效的并发控制。