四、Java并发编程深度剖析
4.1 Java内存模型(JMM)
│ Java内存模型(JMM) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ 线程A │ │ 线程B │ │
│ │ ┌──────────────┐ │ │ ┌──────────────┐ │ │
│ │ │ 工作内存 │ │ │ │ 工作内存 │ │ │
│ │ │ (本地缓存) │ │ │ │ (本地缓存) │ │ │
│ │ └──────┬───────┘ │ │ └──────┬───────┘ │ │
│ │ │ │ │ │ │ │
│ │ │ read/write│ │ │ read/write│ │
│ │ ▼ │ │ ▼ │ │
│ └──────────┼───────────┘ └──────────┼───────────┘ │
│ │ │ │
│ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 主内存 │ │
│ │ (堆内存/方法区) │ │
│ └─────────────────────┘ │
│ │
│ JMM规则: │
│ 1. 所有变量存储在主内存 │
│ 2. 每个线程有自己的工作内存(缓存+寄存器) │
│ 3. 线程不能直接操作主内存,必须通过工作内存中转 │
│ 4. volatile:写操作立即刷新到主内存,读操作从主内存读取 │
│ 5. synchronized:解锁前刷新,加锁时读取 │
└─────────────────────────────────────────────────────────────────────────┘
4.2 volatile与内存屏障
public class VolatileExample {
// 保证可见性 + 禁止指令重排序
private volatile boolean flag = false;
private int value = 0;
public void writer() {
value = 42; // 普通写
flag = true; // volatile写(内存屏障:StoreStore | StoreLoad)
}
public void reader() {
if (flag) { // volatile读(内存屏障:LoadLoad | LoadStore)
// 保证能读到value=42,不会出现指令重排
System.out.println(value);
}
}
/*
* volatile底层实现:lock前缀指令
* 汇编代码:lock addl $0x0,(%rsp)
* 作用:
* 1. 将当前处理器缓存行写回到主内存
* 2. 使其他处理器缓存行失效(MESI缓存一致性协议)
* 3. 禁止指令重排序优化
*/
}
// DCL单例中的volatile
public class Singleton {
private static volatile Singleton instance; // volatile必不可少
public static Singleton getInstance() {
if (instance == null) { // 第一次检查
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查
instance = new Singleton(); // 分解为3步
// 1. 分配内存空间
// 2. 初始化对象
// 3. 将instance指向内存地址
// volatile禁止2和3重排序
}
}
}
return instance;
}
}
4.3 锁的升级与优化(JDK 1.6+)
/*
* synchronized锁的四个状态:
* 无锁 → 偏向锁 → 轻量级锁 → 重量级锁(单向升级)
*/
public class SynchronizedUpgradeDemo {
private Object lock = new Object();
// 1. 偏向锁:单线程重复获取,消除同步(Mark Word存储线程ID)
public void biasedLock() {
// JVM参数:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
synchronized (lock) {
// 第一次获取,偏向锁记录当前线程ID
// 同一线程再次获取,无需CAS操作
}
}
// 2. 轻量级锁:多线程交替执行,CAS自旋
public void lightweightLock() {
// 线程A获取锁,Mark Word指向栈帧中的Lock Record
// 线程B自旋等待,不阻塞
synchronized (lock) {
// 执行
}
}
// 3. 重量级锁:竞争激烈,阻塞线程
public void heavyweightLock() {
// 自旋超过阈值(JDK1.6自适应自旋)
// 升级为重量级锁,操作系统的互斥量(mutex)
// 线程阻塞,上下文切换开销大
synchronized (lock) {
// 执行
}
}
}
// 锁优化技术
public class LockOptimization {
// 1. 锁消除(Escape Analysis)
// JVM发现锁对象是线程局部变量,自动消除synchronized
public void lockElimination() {
// StringBuffer是线程安全的,但这里只在本方法使用
StringBuffer sb = new StringBuffer();
sb.append("a").append("b"); // JVM自动消除append方法的锁
}
// 2. 锁粗化
public void lockCoarsening() {
// 原始代码(频繁加锁解锁)
for (int i = 0; i < 100; i++) {
synchronized (this) {
// 操作
}
}
// JVM优化后(一次加锁)
synchronized (this) {
for (int i = 0; i < 100; i++) {
// 操作
}
}
}
}
4.4 AQS(AbstractQueuedSynchronizer)源码剖析
// AQS核心结构
public abstract class AbstractQueuedSynchronizer {
// 同步状态(volatile保证可见性)
private volatile int state;
// CLH队列(FIFO双向链表)
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
volatile int waitStatus; // CANCELLED(1), SIGNAL(-1), CONDITION(-2), PROPAGATE(-3)
}
// 独占模式获取(ReentrantLock)
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取(子类实现)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 加入队列
selfInterrupt();
}
}
// 加入等待队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 自旋CAS入队
return node;
}
// 自旋获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查是否可以park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
// ReentrantLock实现
public class ReentrantLock {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 重入
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// 公平锁
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平:检查队列中是否有等待更久的线程
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
4.5 线程池深度解析
public class ThreadPoolAnalysis {
/*
* ThreadPoolExecutor核心参数
* corePoolSize: 核心线程数
* maximumPoolSize: 最大线程数
* keepAliveTime: 空闲线程存活时间
* unit: 时间单位
* workQueue: 工作队列
* threadFactory: 线程工厂
* handler: 拒绝策略
*/
// 工作队列类型对比
// 1. ArrayBlockingQueue:有界队列,需要指定容量
// 2. LinkedBlockingQueue:有界/无界,默认Integer.MAX_VALUE
// 3. SynchronousQueue:无容量,每个put必须等待take
// 4. PriorityBlockingQueue:优先级队列
// 拒绝策略
// 1. AbortPolicy:抛出RejectedExecutionException(默认)
// 2. CallerRunsPolicy:交给调用者线程执行
// 3. DiscardPolicy:直接丢弃
// 4. DiscardOldestPolicy:丢弃队列头部任务
// 线程数配置公式(IO密集型 vs CPU密集型)
// CPU密集型:线程数 = CPU核心数 + 1
// IO密集型:线程数 = CPU核心数 * (1 + 平均等待时间/平均计算时间)
// 实际:线程数 = CPU核心数 * 2 ~ 4
// 自定义线程池
public static ExecutorService createOptimizedThreadPool() {
int coreCount = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
coreCount, // corePoolSize
coreCount * 4, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAliveTime
new ArrayBlockingQueue<>(1000), // 有界队列,防止OOM
new NamedThreadFactory("worker"), // 自定义线程名
new ThreadPoolExecutor.CallerRunsPolicy() // 背压策略
);
}
// 监控线程池
public static class MonitoredThreadPool extends ThreadPoolExecutor {
private final AtomicLong submittedTasks = new AtomicLong();
private final AtomicLong completedTasks = new AtomicLong();
private final AtomicLong rejectedTasks = new AtomicLong();
@Override
public void execute(Runnable command) {
submittedTasks.incrementAndGet();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
completedTasks.incrementAndGet();
super.afterExecute(r, t);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectedTasks.incrementAndGet();
super.rejectedExecution(r, executor);
}
// 定期输出监控信息
@Scheduled(fixedDelay = 60000)
public void report() {
logger.info("ThreadPool Status: " +
"Active={}, PoolSize={}, QueueSize={}, " +
"Submitted={}, Completed={}, Rejected={}",
getActiveCount(), getPoolSize(), getQueue().size(),
submittedTasks.get(), completedTasks.get(), rejectedTasks.get());
}
}
}
// CompletableFuture异步编程
public class CompletableFutureDemo {
public void asyncProcessing() {
// 异步任务
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> fetchUserInfo()) // 异步获取用户信息
.thenApply(user -> enrichUser(user)) // 串行处理
.thenCompose(user -> fetchOrders(user)) // 依赖另一个异步任务
.thenCombine( // 组合两个异步任务
CompletableFuture.supplyAsync(() -> fetchProductInfo()),
(orders, products) -> buildResponse(orders, products)
)
.exceptionally(throwable -> { // 异常处理
logger.error("Async processing failed", throwable);
return "default response";
});
// 阻塞等待
String result = future.join();
// 多个异步任务并行执行
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(() -> callServiceA()),
CompletableFuture.supplyAsync(() -> callServiceB()),
CompletableFuture.supplyAsync(() -> callServiceC())
);
// 等待所有完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> System.out.println("All completed"));
// 等待任意一个完成
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
.thenAccept(result -> System.out.println("First completed: " + result));
}
}
4.6 伪共享(False Sharing)问题
public class FalseSharingDemo {
/*
* CPU缓存行(Cache Line):通常64字节
* 伪共享:多个线程修改不同变量,但位于同一缓存行,导致缓存行失效
*/
// 问题代码
public static class Counter {
volatile long x; // 线程A频繁修改
volatile long y; // 线程B频繁修改
// x和y可能在同一个缓存行(64字节,两个long = 16字节)
}
// 解决方案1:填充(Padding)
public static class PaddedCounter {
volatile long x;
long p1, p2, p3, p4, p5, p6, p7; // 填充56字节,确保x独占一个缓存行
volatile long y;
}
// 解决方案2:@Contended(Java 8+)
// JVM参数:-XX:-RestrictContended
public static class ContendedCounter {
@sun.misc.Contended
volatile long x;
@sun.misc.Contended
volatile long y;
}
// 性能测试
public static void benchmark() throws InterruptedException {
int iterations = 100_000_000;
Counter counter = new Counter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < iterations; i++) {
counter.x++; // 频繁修改x
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < iterations; i++) {
counter.y++; // 频繁修改y
}
});
long start = System.nanoTime();
t1.start();
t2.start();
t1.join();
t2.join();
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Duration: " + duration + "ms");
// 使用Counter: ~5000ms
// 使用PaddedCounter: ~1000ms(性能提升5倍)
}
}