吃透JUC框架:从底层原理到实战落地,解决并发编程90%的问题

简介: 本文从底层原理(AQS、CAS)出发,逐步拆解了JUC的核心组件,结合实战示例讲解了各组件的使用场景和最佳实践,最后总结了并发编程的常见问题和解决方案。掌握JUC框架,不仅能夯实并发编程基础,更能解决实际开发中的高并发、线程安全问题,提升系统的吞吐量和稳定性。

一、前言:为什么JUC是Java并发编程的核心?

在Java开发中,“并发”是绕不开的核心议题——从后端接口的高并发请求处理,到分布式系统的多线程协同,再到大数据场景的并行计算,都离不开高效的并发编程工具。而JUC(java.util.concurrent)框架,正是Java官方为解决并发问题提供的“利器集合”,它封装了并发编程的核心难点(线程同步、线程池管理、原子操作、并发集合等),让开发者无需从零实现底层逻辑,就能安全、高效地编写并发代码。

二、JUC核心基础:线程与同步的底层逻辑

在深入JUC组件前,我们必须先搞懂:Java线程的底层实现是什么?为什么需要同步机制?这是理解JUC设计思想的前提。

2.1 线程的底层本质:内核线程与用户线程

Java线程在JDK 1.2之后,采用“一对一”的线程模型——即一个Java线程对应一个操作系统内核线程(Kernel Thread,KT)。其底层逻辑如下:

  • 应用程序通过JVM调用操作系统的线程API创建内核线程;
  • 内核线程由操作系统调度,负责执行具体的任务;
  • 当Java线程阻塞时(如sleep、wait),对应的内核线程也会阻塞,JVM会调度其他就绪线程执行。

流程图(线程模型)

image.png

2.2 并发问题的根源:共享变量与竞态条件

并发编程的核心问题是“共享变量的线程安全”。当多个线程同时操作同一个共享变量时,会出现“竞态条件”(Race Condition),导致结果不符合预期。

核心原因:CPU指令重排序与内存可见性

  1. 指令重排序:CPU为了提高效率,会在不影响单线程执行结果的前提下,重新排序指令的执行顺序;
  2. 内存可见性:多个线程操作共享变量时,各自的工作内存(CPU缓存)与主内存的数据可能不同步;
  3. 原子性:多个操作被视为一个整体,要么全部执行,要么全部不执行(如i++并非原子操作,拆解为读取、加1、写入三步)。

示例:非线程安全的共享变量操作

package com.jam.demo.juc.basic;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

/**
* 共享变量的线程安全问题演示
* @author ken
*/

@Slf4j
public class SharedVariableTest {
   // 共享变量
   private int count = 0;

   /**
    * 自增操作(非原子)
    */

   public void increment() {
       count++;
   }

   @Test
   public void testUnsafe() throws InterruptedException {
       // 启动10个线程,每个线程执行1000次自增
       int threadNum = 10;
       Thread[] threads = new Thread[threadNum];
       for (int i = < threadNum; i++) {
           threads[i] = new Thread(() -> {
               for (int j = 0< 1000; j++) {
                   increment();
               }
           }, "Thread-" + i);
           threads[i].start();
       }

       // 等待所有线程执行完成
       for (Thread thread : threads) {
           thread.join();
       }

       // 预期结果:10*1000=10000,实际结果往往小于10000
       log.info("最终count值:{}", count);
   }
}

运行结果:最终count值可能为9876、9953等,始终小于10000。原因是count++的三步指令被多个线程交替执行,导致数据覆盖。

2.3 JUC的核心解决思路:封装同步与原子性机制

JUC框架通过以下三类核心机制解决并发问题:

  1. 同步机制:通过锁(Lock)、信号量(Semaphore)等组件,保证同一时间只有一个线程执行临界区代码;
  2. 原子操作:通过原子类(AtomicInteger、LongAdder)封装CAS操作,保证单个变量的原子性;
  3. 内存可见性:通过volatile关键字、锁的释放-获取机制,保证共享变量的内存可见性。

三、JUC核心基石:AQS原理与实现

AQS(AbstractQueuedSynchronizer)是JUC框架的“灵魂”——ReentrantLock、CountDownLatch、Semaphore等核心组件,均基于AQS实现。理解AQS,就等于掌握了JUC的核心设计思想。

3.1 AQS的核心设计:状态变量+双向链表+条件队列

AQS的核心目标是“实现一个可重入的同步器”,其底层通过三个核心部分实现:

1. 核心状态变量(state)

  • 用volatile修饰的int变量,代表同步状态(如锁的持有状态、信号量的许可数);
  • 核心操作:getState()、setState()、compareAndSetState()(CAS操作,保证原子性)。

2. 双向同步队列(CLH队列)

  • 用于存放等待获取同步资源的线程;
  • 每个节点(Node)包含:线程引用、前驱节点、后继节点、等待状态(如CANCELLED、SIGNAL);
  • 采用“自旋+park”的方式实现线程阻塞与唤醒,减少CPU空转。

3. 条件队列(Condition Queue)

  • 用于实现“等待-通知”机制(类似Object的wait()/notify());
  • 每个Condition对应一个独立的条件队列,可实现多条件等待。

AQS核心结构流程图

image.png

3.2 AQS的核心流程:获取资源与释放资源

AQS的核心逻辑围绕“获取资源”和“释放资源”展开,子类(如ReentrantLock)只需重写以下方法,即可实现不同的同步语义:

  • tryAcquire(int arg):尝试获取资源(独占式);
  • tryRelease(int arg):尝试释放资源(独占式);
  • tryAcquireShared(int arg):尝试获取资源(共享式);
  • tryReleaseShared(int arg):尝试释放资源(共享式);
  • isHeldExclusively():判断当前线程是否独占资源。

1. 独占式获取资源流程(以ReentrantLock为例)

image.png

2. 独占式释放资源流程(以ReentrantLock为例)

image.png

3.3 AQS的核心特点

  1. 模板方法模式:AQS定义了获取/释放资源的核心流程(模板方法),子类只需重写tryAcquire等方法,无需关注队列管理、线程阻塞/唤醒等底层细节;
  2. 独占式与共享式:支持两种资源获取模式(独占式如锁,共享式如CountDownLatch);
  3. 可重入性:通过state变量记录持有次数,支持线程重入获取资源;
  4. 高效性:采用CAS+park/unpark机制,减少锁的开销,比synchronized更灵活。

四、JUC核心组件实战:从基础到进阶

4.1 独占锁:ReentrantLock(可重入锁)

ReentrantLock是synchronized的增强版,支持可重入、公平锁/非公平锁、条件等待等特性,底层基于AQS实现。

1. 核心特性对比(ReentrantLock vs synchronized)

特性 ReentrantLock synchronized
锁类型 可指定公平/非公平(默认非公平) 非公平锁
可重入性 支持 支持
等待可中断 支持(lockInterruptibly()) 不支持
超时获取锁 支持(tryLock(long, TimeUnit)) 不支持
条件等待 多个Condition,支持多条件等待 仅一个条件队列
性能 高并发场景下更优(减少上下文切换) JDK 1.6后优化,性能接近

2. 实战示例:ReentrantLock的基本使用

package com.jam.demo.juc.lock;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* ReentrantLock实战示例
* 包含:公平锁/非公平锁、可重入、条件等待、超时获取、中断获取
* @author ken
*/

@Slf4j
public class ReentrantLockDemo {
   // 1. 非公平锁(默认)
   private final ReentrantLock nonFairLock = new ReentrantLock();
   // 2. 公平锁
   private final ReentrantLock fairLock = new ReentrantLock(true);
   // 3. 条件队列(用于多条件等待)
   private final Condition condition = nonFairLock.newCondition();
   private int count = 0;

   /**
    * 示例1:基本锁操作(可重入)
    */

   public void reentrantDemo() {
       nonFairLock.lock();
       try {
           count++;
           log.info("第一次获取锁,count:{}", count);
           // 重入获取锁
           reentrantInner();
       } finally {
           // 注意:获取几次锁,就要释放几次
           nonFairLock.unlock();
           log.info("第一次释放锁");
       }
   }

   private void reentrantInner() {
       nonFairLock.lock();
       try {
           count++;
           log.info("重入获取锁,count:{}", count);
       } finally {
           nonFairLock.unlock();
           log.info("重入释放锁");
       }
   }

   /**
    * 示例2:条件等待(await/signal)
    */

   public void conditionDemo() throws InterruptedException {
       new Thread(() -> {
           nonFairLock.lock();
           try {
               log.info("线程A:获取锁,等待条件满足");
               // 等待条件,释放锁并阻塞
               condition.await();
               log.info("线程A:条件满足,继续执行");
               count++;
               log.info("线程A:count:{}", count);
           } catch (InterruptedException e) {
               log.error("线程A被中断", e);
           } finally {
               nonFairLock.unlock();
           }
       }, "Thread-A").start();

       // 主线程休眠1秒,确保线程A先获取锁
       TimeUnit.SECONDS.sleep(1);

       new Thread(() -> {
           nonFairLock.lock();
           try {
               log.info("线程B:获取锁,发送条件通知");
               // 唤醒等待条件的线程
               condition.signal();
               log.info("线程B:通知完成,释放锁");
           } finally {
               nonFairLock.unlock();
           }
       }, "Thread-B").start();
   }

   /**
    * 示例3:超时获取锁(tryLock)
    */

   public void tryLockDemo() throws InterruptedException {
       new Thread(() -> {
           nonFairLock.lock();
           try {
               // 持有锁3秒
               TimeUnit.SECONDS.sleep(3);
               log.info("线程C:持有锁3秒后释放");
           } catch (InterruptedException e) {
               log.error("线程C被中断", e);
           } finally {
               nonFairLock.unlock();
           }
       }, "Thread-C").start();

       TimeUnit.SECONDS.sleep(1);

       new Thread(() -> {
           boolean acquired = false;
           try {
               // 尝试获取锁,超时时间2秒
               acquired = nonFairLock.tryLock(2, TimeUnit.SECONDS);
               if (acquired) {
                   log.info("线程D:成功获取锁");
               } else {
                   log.info("线程D:超时未获取到锁");
               }
           } catch (InterruptedException e) {
               log.error("线程D被中断", e);
           } finally {
               if (acquired) {
                   nonFairLock.unlock();
               }
           }
       }, "Thread-D").start();
   }

   /**
    * 示例4:中断获取锁(lockInterruptibly)
    */

   public void interruptibleLockDemo() {
       Thread threadE = new Thread(() -> {
           try {
               log.info("线程E:尝试获取锁(可中断)");
               nonFairLock.lockInterruptibly();
               try {
                   log.info("线程E:成功获取锁");
               } finally {
                   nonFairLock.unlock();
               }
           } catch (InterruptedException e) {
               log.info("线程E:获取锁过程中被中断");
           }
       }, "Thread-E");

       threadE.start();

       // 主线程休眠1秒后中断线程E
       try {
           TimeUnit.SECONDS.sleep(1);
           threadE.interrupt();
           log.info("主线程:中断线程E");
       } catch (InterruptedException e) {
           log.error("主线程被中断", e);
       }
   }

   @Test
   public void testAll() throws InterruptedException {
       log.info("=== 测试可重入 ===");
       reentrantDemo();

       log.info("\n=== 测试条件等待 ===");
       conditionDemo();
       TimeUnit.SECONDS.sleep(2);

       log.info("\n=== 测试超时获取锁 ===");
       tryLockDemo();
       TimeUnit.SECONDS.sleep(4);

       log.info("\n=== 测试中断获取锁 ===");
       interruptibleLockDemo();
       TimeUnit.SECONDS.sleep(2);
   }
}

3. 底层原理:ReentrantLock如何基于AQS实现?

ReentrantLock通过内部类Sync(继承AQS)实现核心逻辑,分为公平锁(FairSync)和非公平锁(NonfairSync):

  • 非公平锁:线程获取锁时,先尝试CAS修改state为1(不排队直接抢锁),失败后再加入CLH队列;
  • 公平锁:线程获取锁时,直接加入CLH队列,按队列顺序获取锁;
  • 可重入性:获取锁时,若当前线程已持有锁,则state+1;释放锁时,state-1,直到state=0时释放锁。

4.2 共享锁:CountDownLatch与CyclicBarrier

共享锁的核心特点是“多个线程可同时获取资源”,JUC中典型的共享锁组件是CountDownLatch(倒计时门闩)和CyclicBarrier(循环屏障)。

4.2.1 CountDownLatch:等待多线程完成

CountDownLatch基于AQS的共享模式实现,核心功能是“让一个或多个线程等待其他线程完成所有任务后,再继续执行”。

核心原理:
  • 初始化时指定“计数次数”(state=计数次数);
  • 每个线程完成任务后,调用countDown(),state-1(tryReleaseShared);
  • 等待线程调用await(),若state>0则阻塞,直到state=0时,所有等待线程被唤醒(共享模式下,唤醒后会继续唤醒后续线程)。
实战示例:CountDownLatch实现多线程任务协调

package com.jam.demo.juc.share;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* CountDownLatch实战示例
* 场景:主线程等待3个任务线程完成后,统计结果
* @author ken
*/

@Slf4j
public class CountDownLatchDemo {
   // 任务数量
   private static final int TASK_NUM = 3;
   // 用于统计每个任务的结果
   private int[] taskResults = new int[TASK_NUM];

   /**
    * 执行任务
    * @param taskId 任务ID
    * @param latch CountDownLatch
    */

   public void executeTask(int taskId, CountDownLatch latch) {
       try {
           log.info("任务{}:开始执行", taskId);
           // 模拟任务执行(1-3秒随机)
           long sleepTime = 1000 + (long) (Math.random() * 2000);
           TimeUnit.MILLISECONDS.sleep(sleepTime);

           // 模拟任务结果(随机0-100的整数)
           taskResults[taskId] = (int) (Math.random() * 101);
           log.info("任务{}:执行完成,结果:{},耗时:{}ms", taskId, taskResults[taskId], sleepTime);
       } catch (InterruptedException e) {
           log.error("任务{}被中断", taskId, e);
       } finally {
           // 任务完成,计数减1
           if (!ObjectUtils.isEmpty(latch)) {
               latch.countDown();
           }
       }
   }

   /**
    * 示例1:基本使用(主线程等待所有任务完成)
    */

   @Test
   public void testBasic() throws InterruptedException {
       CountDownLatch latch = new CountDownLatch(TASK_NUM);
       ExecutorService executor = Executors.newFixedThreadPool(TASK_NUM);

       for (int i = < TASK_NUM; i++) {
           int taskId = i;
           executor.submit(() -> executeTask(taskId, latch));
       }

       log.info("主线程:等待所有任务完成...");
       // 等待所有任务完成(阻塞)
       latch.await();
       log.info("主线程:所有任务完成,开始统计结果");

       // 统计结果
       int sum = 0;
       for (int result : taskResults) {
           sum += result;
       }
       log.info("主线程:所有任务结果总和:{},平均值:{}", sum, sum / (double) TASK_NUM);

       executor.shutdown();
   }

   /**
    * 示例2:超时等待(主线程等待指定时间后,不再等待)
    */

   @Test
   public void testTimeout() throws InterruptedException {
       CountDownLatch latch = new CountDownLatch(TASK_NUM);
       ExecutorService executor = Executors.newFixedThreadPool(TASK_NUM);

       for (int i = < TASK_NUM; i++) {
           int taskId = i;
           executor.submit(() -> {
               try {
                   // 任务1执行5秒(模拟超时场景)
                   if (taskId == 0) {
                       TimeUnit.SECONDS.sleep(5);
                   } else {
                       TimeUnit.SECONDS.sleep(1);
                   }
                   taskResults[taskId] = (int) (Math.random() * 101);
                   log.info("任务{}:执行完成,结果:{}", taskId, taskResults[taskId]);
               } catch (InterruptedException e) {
                   log.error("任务{}被中断", taskId, e);
               } finally {
                   latch.countDown();
               }
           });
       }

       log.info("主线程:等待所有任务完成(超时时间3秒)...");
       // 等待3秒,若未完成则继续执行
       boolean allCompleted = latch.await(3, TimeUnit.SECONDS);
       if (allCompleted) {
           log.info("主线程:所有任务完成");
       } else {
           log.info("主线程:等待超时,部分任务未完成");
       }

       executor.shutdownNow();
   }
}

4.2.2 CyclicBarrier:多线程同步等待

CyclicBarrier与CountDownLatch类似,但核心区别在于:

  • CountDownLatch:计数只能使用一次,线程完成任务后计数减1,等待线程等待计数为0;
  • CyclicBarrier:计数可循环使用,多个线程到达“屏障”后,才继续执行,可设置“屏障动作”(所有线程到达后执行)。
核心原理:
  • 初始化时指定“参与线程数”和“屏障动作”;
  • 每个线程到达屏障时,调用await(),计数器减1;
  • 当计数器减至0时,执行屏障动作,然后重置计数器,所有线程继续执行;
  • 支持重置(reset()),可循环使用。
实战示例:CyclicBarrier实现多线程同步执行

package com.jam.demo.juc.share;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.StringUtils;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* CyclicBarrier实战示例
* 场景:3个线程同步到达屏障后,执行统计动作,然后循环执行下一轮任务
* @author ken
*/

@Slf4j
public class CyclicBarrierDemo {
   // 参与线程数
   private static final int THREAD_NUM = 3;
   // 循环次数
   private static final int LOOP_NUM = 2;
   // 用于统计每轮任务的结果
   private int roundSum = 0;

   /**
    * 执行任务
    * @param threadId 线程ID
    * @param barrier CyclicBarrier
    */

   public void executeTask(int threadId, CyclicBarrier barrier) {
       for (int round = 0;< LOOP_NUM; round++) {
           try {
               log.info("线程{}:第{}轮任务开始执行", threadId, round + 1);
               // 模拟任务执行(1秒)
               Thread.sleep(1000);

               // 模拟任务结果(随机0-10的整数)
               int result = (int) (Math.random() * 11);
               roundSum += result;
               log.info("线程{}:第{}轮任务完成,结果:{},当前总和:{}", threadId, round + 1, result, roundSum);

               // 到达屏障,等待其他线程
               log.info("线程{}:到达屏障,等待其他线程...", threadId);
               barrier.await();

               // 屏障动作执行完成后,继续下一轮
               log.info("线程{}:第{}轮屏障动作完成,进入下一轮", threadId, round + 1);
           } catch (InterruptedException e) {
               log.error("线程{}被中断", threadId, e);
               return;
           } catch (BrokenBarrierException e) {
               log.error("线程{}:屏障被破坏", threadId, e);
               return;
           }
       }
   }

   @Test
   public void testCyclicBarrier() throws InterruptedException {
       // 初始化CyclicBarrier:3个线程,屏障动作(统计每轮结果)
       CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, () -> {
           log.info("=== 屏障动作:第{}轮所有任务完成,总和:{},平均值:{} ===",
                   (roundSum / THREAD_NUM) > 0 ? (roundSum / THREAD_NUM) : 1,
                   roundSum,
                   roundSum / (double) THREAD_NUM);
           // 重置总和,用于下一轮
           roundSum = 0;
       });

       ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
       for (int i = 0; i< THREAD_NUM; i++) {
           int threadId = i;
           executor.submit(() -> executeTask(threadId, barrier));
       }

       executor.shutdown();
       // 等待所有任务执行完成
       while (!executor.isTerminated()) {
           Thread.sleep(500);
       }
       log.info("所有轮次任务执行完成");
   }

   /**
    * 示例:BrokenBarrierException场景(线程被中断导致屏障破坏)
    */

   @Test
   public void testBrokenBarrier() throws InterruptedException {
       CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, () -> {
           log.info("屏障动作执行");
       });

       ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
       Thread thread0 = new Thread(() -> {
           try {
               log.info("线程0:执行任务");
               Thread.sleep(1000);
               log.info("线程0:到达屏障");
               barrier.await();
           } catch (InterruptedException e) {
               log.error("线程0被中断", e);
           } catch (BrokenBarrierException e) {
               log.error("线程0:屏障被破坏", e);
           }
       });

       executor.submit(thread0);
       executor.submit(() -> {
           try {
               log.info("线程1:执行任务");
               Thread.sleep(2000);
               log.info("线程1:到达屏障");
               barrier.await();
           } catch (InterruptedException e) {
               log.error("线程1被中断", e);
           } catch (BrokenBarrierException e) {
               log.error("线程1:屏障被破坏", e);
           }
       });

       // 主线程休眠1.5秒后中断线程0,导致屏障破坏
       Thread.sleep(1500);
       thread0.interrupt();

       executor.shutdown();
       while (!executor.isTerminated()) {
           Thread.sleep(500);
       }
   }
}

4.2.3 CountDownLatch vs CyclicBarrier 核心区别

特性 CountDownLatch CyclicBarrier
核心功能 等待多线程完成任务 多线程同步到达屏障后继续执行
计数是否可循环 不可循环(一次性) 可循环(reset()重置)
线程角色 分为“等待线程”和“计数线程” 所有线程角色相同
屏障动作 无(等待计数为0后直接唤醒) 支持(所有线程到达后执行)
底层实现 AQS共享模式 基于ReentrantLock和Condition

4.3 信号量:Semaphore(控制并发访问数)

Semaphore(信号量)用于控制“同时访问某个资源的线程数”,核心是“许可数”的分配与释放,底层基于AQS的共享模式实现。

核心原理:

  • 初始化时指定“许可数”(state=许可数);
  • 线程获取资源时,调用acquire(),尝试获取1个许可(state-1<0则阻塞;
  • 线程释放资源时,调用release(),释放1个许可(state+1),唤醒等待线程;
  • 支持公平/非公平模式,可一次性获取/释放多个许可。

实战示例:Semaphore控制接口并发访问

package com.jam.demo.juc.semaphore;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.StringUtils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* Semaphore实战示例
* 场景:控制接口的并发访问数(最多3个线程同时访问)
* @author ken
*/

@Slf4j
public class SemaphoreDemo {
   // 最大并发数(许可数)
   private static final int MAX_CONCURRENT = 3;
   // 非公平信号量(默认)
   private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT);
   // 公平信号量
   private final Semaphore fairSemaphore = new Semaphore(MAX_CONCURRENT, true);

   /**
    * 模拟接口访问
    * @param userId 用户ID
    */

   public void accessApi(String userId) {
       try {
           // 1. 获取许可(阻塞等待)
           semaphore.acquire();
           log.info("用户{}:成功获取许可,开始访问接口", userId);

           // 2. 模拟接口处理(1-2秒)
           long handleTime = 1000 + (long) (Math.random() * 1000);
           TimeUnit.MILLISECONDS.sleep(handleTime);
           log.info("用户{}:接口访问完成,耗时:{}ms", userId, handleTime);
       } catch (InterruptedException e) {
           log.error("用户{}:访问接口被中断", userId, e);
       } finally {
           // 3. 释放许可
           semaphore.release();
           log.info("用户{}:释放许可", userId);
       }
   }

   /**
    * 示例1:基本使用(控制最大并发数)
    */

   @Test
   public void testBasic() throws InterruptedException {
       // 模拟10个用户并发访问
       int userNum = 10;
       ExecutorService executor = Executors.newFixedThreadPool(userNum);

       for (int i = < userNum; i++) {
           String userId = "user-" + (i + 1);
           executor.submit(() -> accessApi(userId));
       }

       executor.shutdown();
       while (!executor.isTerminated()) {
           TimeUnit.MILLISECONDS.sleep(500);
       }
       log.info("所有用户访问完成");
   }

   /**
    * 示例2:超时获取许可(tryAcquire)
    */

   @Test
   public void testTryAcquire() throws InterruptedException {
       ExecutorService executor = Executors.newFixedThreadPool(5);

       for (int i = < 5; i++) {
           String userId = "user-" + (i + 1);
           executor.submit(() -> {
               boolean acquired = false;
               try {
                   // 尝试获取许可,超时时间1秒
                   acquired = semaphore.tryAcquire(1, TimeUnit.SECONDS);
                   if (acquired) {
                       log.info("用户{}:成功获取许可,访问接口", userId);
                       TimeUnit.MILLISECONDS.sleep(2000);
                       log.info("用户{}:接口访问完成", userId);
                   } else {
                       log.info("用户{}:超时未获取到许可,拒绝访问", userId);
                   }
               } catch (InterruptedException e) {
                   log.error("用户{}:获取许可被中断", userId, e);
               } finally {
                   if (acquired) {
                       semaphore.release();
                   }
               }
           });
       }

       executor.shutdown();
       while (!executor.isTerminated()) {
           TimeUnit.MILLISECONDS.sleep(500);
       }
   }

   /**
    * 示例3:一次性获取/释放多个许可
    */

   @Test
   public void testAcquireReleaseMultiple() throws InterruptedException {
       // 模拟需要2个许可才能访问的资源
       Semaphore multiSemaphore = new Semaphore(MAX_CONCURRENT);
       ExecutorService executor = Executors.newFixedThreadPool(3);

       for (int i = < 3; i++) {
           String userId = "user-" + (i + 1);
           executor.submit(() -> {
               boolean acquired = false;
               try {
                   // 一次性获取2个许可
                   acquired = multiSemaphore.tryAcquire(2, 1, TimeUnit.SECONDS);
                   if (acquired) {
                       log.info("用户{}:成功获取2个许可,访问资源", userId);
                       TimeUnit.MILLISECONDS.sleep(1500);
                       log.info("用户{}:资源访问完成", userId);
                   } else {
                       log.info("用户{}:未获取到2个许可,拒绝访问", userId);
                   }
               } catch (InterruptedException e) {
                   log.error("用户{}:获取许可被中断", userId, e);
               } finally {
                   if (acquired) {
                       // 一次性释放2个许可
                       multiSemaphore.release(2);
                       log.info("用户{}:释放2个许可", userId);
                   }
               }
           });
       }

       executor.shutdown();
       while (!executor.isTerminated()) {
           TimeUnit.MILLISECONDS.sleep(500);
       }
   }
}

4.4 线程池:ThreadPoolExecutor(核心并发工具)

线程池是JUC中最常用的组件之一,用于“管理线程生命周期、复用线程、控制并发数”,避免频繁创建/销毁线程带来的性能开销。JUC中的线程池核心实现是ThreadPoolExecutor,Executors工具类创建的线程池(如newFixedThreadPool)本质上都是ThreadPoolExecutor的封装。

4.4.1 线程池核心参数与工作原理

核心参数(7个):
  1. corePoolSize:核心线程数(常驻线程,即使空闲也不销毁);
  2. maximumPoolSize:最大线程数(核心线程+临时线程的最大数量);
  3. keepAliveTime:临时线程的空闲时间(超过此时间则销毁);
  4. unit:keepAliveTime的时间单位;
  5. workQueue:任务队列(存放等待执行的任务);
  6. threadFactory:线程工厂(用于创建线程,可自定义线程名称、优先级等);
  7. handler:拒绝策略(当线程池满且队列满时,处理新任务的策略)。
核心工作原理:

image.png

拒绝策略(4种默认策略):
  1. AbortPolicy:直接抛出RejectedExecutionException(默认策略);
  2. CallerRunsPolicy:由提交任务的线程自己执行任务;
  3. DiscardPolicy:直接丢弃新任务,不抛出异常;
  4. DiscardOldestPolicy:丢弃队列中最旧的任务,然后将新任务加入队列。

4.4.2 线程池实战:自定义线程池+实战场景

package com.jam.demo.juc.threadpool;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* ThreadPoolExecutor实战示例
* 场景:自定义线程池处理批量任务,结合MyBatis-Plus实现数据库批量插入
* @author ken
*/

@Slf4j
public class ThreadPoolExecutorDemo {
   // 自定义线程工厂(指定线程名称,便于排查问题)
   private static final ThreadFactory CUSTOM_THREAD_FACTORY = new ThreadFactory() {
       private final AtomicInteger threadNum = new AtomicInteger(1);

       @Override
       public Thread newThread(Runnable r) {
           Thread thread = new Thread(r, "demo-thread-" + threadNum.getAndIncrement());
           // 设置线程优先级(默认5)
           thread.setPriority(Thread.NORM_PRIORITY);
           // 设置为非守护线程
           thread.setDaemon(false);
           return thread;
       }
   };

   // 自定义拒绝策略(记录日志+抛出异常)
   private static final RejectedExecutionHandler CUSTOM_REJECT_HANDLER = (r, executor) -> {
       log.error("线程池满,无法处理任务:{}", r.toString());
       throw new RejectedExecutionException("线程池已达最大负载,拒绝执行任务");
   };

   /**
    * 初始化自定义线程池
    * 核心参数说明:
    * - 核心线程数:CPU核心数(CPU密集型任务)
    * - 最大线程数:CPU核心数*2(防止线程过多导致上下文切换)
    * - 空闲时间:30秒(临时线程空闲30秒销毁)
    * - 任务队列:LinkedBlockingQueue,容量100(避免无界队列导致内存溢出)
    * @return 自定义线程池
    */

   public ThreadPoolExecutor createCustomThreadPool() {
       int cpuNum = Runtime.getRuntime().availableProcessors();
       return new ThreadPoolExecutor(
               cpuNum,
               cpuNum * 2,
               30,
               TimeUnit.SECONDS,
               new<>(100),
               CUSTOM_THREAD_FACTORY,
               CUSTOM_REJECT_HANDLER
       );
   }

   /**
    * 模拟任务:处理数据(实际场景可替换为MyBatis-Plus数据库操作)
    * @param data 待处理数据
    * @return 处理结果
    */

   public String processData(String data) {
       try {
           // 模拟任务处理(50-200ms)
           long sleepTime = 50 + (long) (Math.random() * 150);
           TimeUnit.MILLISECONDS.sleep(sleepTime);
           return "processed-" + data;
       } catch (InterruptedException e) {
           log.error("处理数据{}被中断", data, e);
           Thread.currentThread().interrupt();
           return null;
       }
   }

   /**
    * 示例1:批量处理任务(submit提交任务,获取Future结果)
    */

   @Test
   public void testBatchProcess() throws InterruptedException, ExecutionException {
       ThreadPoolExecutor executor = createCustomThreadPool<String> dataList = Lists.newArrayList();
       // 模拟100条数据
       for (int i =< 100; i++) {
           dataList.add("data-" + (i + 1));
      <Future<String>> futureList = Lists.newArrayList();
       for (String data : dataList) {
           // 提交任务,获取Future(用于获取结果)
          <String> future = executor.submit(() -> processData(data));
           futureList.add(future);
       }

       // 关闭线程池(不再接受新任务,等待现有任务完成)
       executor.shutdown();
       // 等待所有任务完成(超时10秒)
       boolean finished = executor.awaitTermination(10, TimeUnit.SECONDS);
       if (!finished) {
           // 超时未完成,中断任务
           executor.shutdownNow();
           log.error("线程池任务执行超时,强制中断");
           return;
       }

       // 处理任务<String> resultList = Lists.newArrayList();
       for (Future<String> future : futureList) {
           if (future.isDone() && !future.isCancelled()) {
               String result = future.get();
               if (!CollectionUtils.isEmpty(result)) {
                   resultList.add(result);
               }
           }
       }

       log.info("批量处理完成,成功处理数据条数:{},总数据条数:{}", resultList.size(), dataList.size());
       log.info("处理结果示例:{}", resultList.subList(0, Math.min(10, resultList.size())));
   }

   /**
    * 示例2:使用CompletableFuture优化任务处理(支持异步回调)
    */

   @Test
   public void testCompletableFuture() throws InterruptedException {
       ThreadPoolExecutor executor = createCustomThreadPool();<String> dataList = Lists.newArrayList();
       for (int i = 0; i < 50; i++) {
           dataList.add("data-" + (i + 1));
       }

       // 用于统计成功/失败数量
       AtomicInteger successCount = new AtomicInteger(0);
       AtomicInteger failCount = new AtomicInteger(0<Comp<Void>> futureList = Lists.newArrayList();
       for (String data : dataList) {
           CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> processData(data), executor)
                   // 任务成功回调
                   .thenAccept(result -> {
                       if (!CollectionUtils.isEmpty(result)) {
                           successCount.incrementAndGet();
                           log.info("任务成功:{} -> {}", data, result);
                       }
                   })
                   // 任务异常回调
                   .exceptionally(ex -> {
                       failCount.incrementAndGet();
                       log.error("任务失败:{}", data, ex);
                       return null;
                   });
           futureList.add(future);
       }

       // 等待所有任务完成
       CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();

       executor.shutdown();
       log.info("CompletableFuture批量处理完成,成功:{},失败:{}", successCount.get(), failCount.get());
   }

   /**
    * 示例3:拒绝策略触发场景
    */

   @Test
   public void testRejectPolicy() {
       // 初始化一个小容量线程池(核心1,最大2,队列2)
       ThreadPoolExecutor executor = new ThreadPoolExecutor(
               1,
               2,
               30,
               TimeUnit.SECONDS,
               new LinkedBlockingQueue<>(2),
               CUSTOM_THREAD_FACTORY,
               CUSTOM_REJECT_HANDLER
       );

       // 提交5个任务(核心1+队列2+最大2,共5个任务会触发拒绝)
       for (int i = 0< 5; i++) {
           int taskId = i;
           try {
               executor.submit(() -> {
                   try {
                       // 模拟任务执行(3秒)
                       TimeUnit.SECONDS.sleep(3);
                       log.info("任务{}执行完成", taskId);
                   } catch (InterruptedException e) {
                       log.error("任务{}被中断", taskId, e);
                   }
               });
               log.info("提交任务{}成功", taskId);
           } catch (RejectedExecutionException e) {
               log.error("提交任务{}失败", taskId, e);
           }
       }

       executor.shutdown();
   }

   /**
    * 示例4:MyBatis-Plus批量插入实战(结合线程池)
    * 注:实际项目中需注入Mapper,此处模拟
    */

   @Test
   public void testMyBatisPlusBatchInsert() throws InterruptedException {
       ThreadPoolExecutor executor = createCustomThreadPool();
       // 模拟1000条待<User> userList = Lists.newArrayList();
       for (int i = 0;< 1000; i++) {
           User user = new User();
           user.setId(Long.valueOf(i + 1));
           user.setName("user-" + (i + 1));
           user.setAge(18 + (i % 20));
           user.setEmail("user-" + (i + 1) + "@demo.com");
           userList.add(user);
       }

       // 分片处理(每100条为一片)
       int batchSize = 100;
       List<User>> batchList = Lists.partition(userList, batchSize);

       List<Void>> futureList = Lists.newArrayList();
       for<User> batch : batchList) {
          <Void> future = CompletableFuture.runAsync(() -> {
               // 实际场景:调用MyBatis-Plus的saveBatch方法
               boolean success = saveBatch(batch);
               if (success) {
                   log.info("批量插入成功,条数:{}", batch.size());
               } else {
                   log.error("批量插入失败,条数:{}", batch.size());
               }
           }, executor);
           futureList.add(future);
       }

       CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
       executor.shutdown();
       log.info("所有批量插入任务完成");
   }

   /**
    * 模拟MyBatis-Plus的saveBatch方法
    */

  <User> userList) {
       try {
           // 模拟数据库插入操作(100ms)
           TimeUnit.MILLISECONDS.sleep(100);
           return true;
       } catch (InterruptedException e) {
           log.error("批量插入失败", e);
           return false;
       }
   }

   /**
    * 模拟User实体类(MyBatis-Plus)
    */

   @lombok.Data
   static class User {
       private Long id;
       private String name;
       private Integer age;
       private String email;
   }
}

4.4.3 线程池使用注意事项

  1. 避免使用Executors创建线程池:
  • newFixedThreadPool:无界队列,可能导致内存溢出;
  • newCachedThreadPool:最大线程数无限制,高并发下可能创建大量线程,导致CPU/内存耗尽;
  • 推荐使用ThreadPoolExecutor自定义线程池,明确核心参数。
  1. 合理设置核心参数:
  • CPU密集型任务(如计算):核心线程数=CPU核心数+1;
  • IO密集型任务(如数据库/网络操作):核心线程数=CPU核心数*2;
  • 任务队列:使用有界队列,避免无界队列导致内存溢出。
  1. 线程池关闭:
  • shutdown():温和关闭,不再接受新任务,等待现有任务完成;
  • shutdownNow():强制关闭,中断正在执行的任务,返回未执行的任务;
  • 建议结合awaitTermination()等待任务完成,避免任务丢失。
  1. 异常处理:
  • submit()提交的任务,异常会被封装在Future中,需通过get()获取异常;
  • execute()提交的任务,异常会直接抛出,需在任务内部捕获;
  • 推荐使用CompletableFuture的exceptionally()处理异步任务异常。
  1. 线程池监控:
  • 自定义线程池时,可重写beforeExecute()、afterExecute()、terminated()方法,实现任务执行前后的监控;
  • 通过线程池的getActiveCount()(活跃线程数)、getQueue().size()(队列任务数)等方法监控线程池状态。

4.5 原子类:AtomicXxx与LongAdder(线程安全的变量操作)

原子类是JUC提供的“线程安全的变量操作工具”,底层基于CAS(Compare And Swap)操作实现,无需加锁即可保证变量操作的原子性,性能优于synchronized和ReentrantLock。

4.5.1 核心原子类分类

  1. 基本类型原子类:AtomicInteger、AtomicLong、AtomicBoolean;
  2. 数组类型原子类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray;
  3. 引用类型原子类:AtomicReference、AtomicStampedReference、AtomicMarkableReference;
  4. 字段更新器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater(用于更新对象的volatile字段);
  5. 累加器:LongAdder、DoubleAdder(高并发场景下累加性能优于AtomicLong)。

4.5.2 CAS原理:无锁编程的核心

CAS是一种“无锁原子操作”,核心逻辑是“比较-替换”:

  • 步骤:先比较变量的当前值是否等于预期值,若等于则将变量更新为目标值,否则不更新;
  • 底层实现:基于CPU的cmpxchg指令(硬件级原子操作),无需操作系统的锁机制,性能高效;
  • 缺点:ABA问题、循环时间长导致CPU开销大、只能保证单个变量的原子性。

CAS流程图

image.png

4.5.3 实战示例:原子类的使用与对比

package com.jam.demo.juc.atomic;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

/**
* 原子类实战示例
* 包含:AtomicInteger基本使用、LongAdder高并发累加、ABA问题解决
* @author ken
*/

@Slf4j
public class AtomicDemo {
   // 线程数
   private static final int THREAD_NUM = 10;
   // 每个线程执行次数
   private static final int EXECUTE_TIMES = 100000;

   /**
    * 示例1:AtomicInteger基本使用(替代非线程安全的int)
    */

   @Test
   public void testAtomicInteger() throws InterruptedException {
       AtomicInteger atomicInteger = new AtomicInteger(0);
       CountDownLatch latch = new CountDownLatch(THREAD_NUM);
       ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);

       for (int i = 0;< THREAD_NUM; i++) {
           executor.submit(() -> {
               for (int j = < EXECUTE_TIMES; j++) {
                   // 原子自增(i++)
                   atomicInteger.incrementAndGet();
                   // 其他原子操作:decrementAndGet()(i--)、getAndIncrement()(++i)、addAndGet()(i+=n)等
               }
               latch.countDown();
           });
       }

       latch.await();
       executor.shutdown();
       // 预期结果:10*100000=1000000
       log.info("AtomicInteger最终值:{}", atomicInteger.get());
   }

   /**
    * 示例2:LongAdder vs AtomicLong(高并发累加性能对比)
    */

   @Test
   public void testLongAdderVsAtomicLong() throws InterruptedException {
       // 测试AtomicLong
       AtomicInteger atomicLongResult = new AtomicInteger(0);
       CountDownLatch atomicLatch = new CountDownLatch(THREAD_NUM);
       ExecutorService atomicExecutor = Executors.newFixedThreadPool(THREAD_NUM);
       long atomicStart = System.currentTimeMillis();

       for (int i =< THREAD_NUM; i++) {
           atomicExecutor.submit(() -> {
               for (int j = 0;< EXECUTE_TIMES; j++) {
                   atomicLongResult.incrementAndGet();
               }
               atomicLatch.countDown();
           });
       }
       atomicLatch.await();
       atomicExecutor.shutdown();
       long atomicCost = System.currentTimeMillis() - atomicStart;

       // 测试LongAdder
       LongAdder longAdder = new LongAdder();
       CountDownLatch adderLatch = new CountDownLatch(THREAD_NUM);
       ExecutorService adderExecutor = Executors.newFixedThreadPool(THREAD_NUM);
       long adderStart = System.currentTimeMillis();

       for (int i = 0;< THREAD_NUM; i++) {
           adderExecutor.submit(() -> {
               for (int j =< EXECUTE_TIMES; j++) {
                   longAdder.increment();
               }
               adderLatch.countDown();
           });
       }
       adderLatch.await();
       adderExecutor.shutdown();
       long adderCost = System.currentTimeMillis() - adderStart;

       log.info("AtomicInteger:结果={},耗时={}ms", atomicLongResult.get(), atomicCost);
       log.info("LongAdder:结果={},耗时={}ms", longAdder.sum(), adderCost);
       // 结论:高并发下LongAdder性能优于AtomicInteger,因为LongAdder采用分段累加(减少CAS竞争)
   }

   /**
    * 示例3:ABA问题及解决(AtomicStampedReference)
    * ABA问题:线程1将A改为B,线程2又将B改为A,线程1再次CAS时会认为值未变,导致错误更新
    */

   @Test
   public void testABAProblem() {
       // 1. 普通AtomicReference存在ABA问题
       AtomicReference<String> atomicRef = new<>("A");
       String oldValue = atomicRef.get();

       // 线程1:尝试将A改为C
       new Thread(() -> {
           log.info("线程1:当前值={}", atomicRef.get());
           try {
               // 休眠1秒,让线程2执行ABA操作
               Thread.sleep(1000);
           } catch (InterruptedException e) {
               log.error("线程1被中断", e);
           }
           // CAS:预期值A,目标值C
           boolean success = atomicRef.compareAndSet(oldValue, "C");
           log.info("线程1:CAS结果={},当前值={}", success, atomicRef.get());
       }, "Thread-1").start();

       // 线程2:执行ABA操作(A->B->A)
       new Thread(() -> {
           log.info("线程2:当前值={}", atomicRef.get());
           // A->B
           atomicRef.compareAndSet("A", "B");
           log.info("线程2:A->B,当前值={}", atomicRef.get());
           // B->A
           atomicRef.compareAndSet("B", "A");
           log.info("线程2:B->A,当前值={}", atomicRef.get());
       }, "Thread-2").start();

       // 等待线程执行完成
       try {
           Thread.sleep(2000);
       } catch (InterruptedException e) {
           log.error("主线程被中断", e);
       }

       // 2. AtomicStampedReference解决ABA问题(增加版本号)
       log.info("\n=== 使用AtomicStampedReference解决ABA问题 ===");
       // 初始化:值A,版本号1
<String> stampedRef = new Atomic<>("A", 1);
       int oldStamp = stampedRef.getStamp();

       // 线程3:尝试将A改为C(带版本号)
       new Thread(() -> {
           log.info("线程3:当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
           try {
               Thread.sleep(1000);
           } catch (InterruptedException e) {
               log.error("线程3被中断", e);
           }
           // CAS:预期值A,目标值C,预期版本号oldStamp
           boolean success = stampedRef.compareAndSet("A", "C", oldStamp, oldStamp + 1);
           log.info("线程3:CAS结果={},当前值={},版本号={}", success, stampedRef.getReference(), stampedRef.getStamp());
       }, "Thread-3").start();

       // 线程4:执行ABA操作
       new Thread(() -> {
           log.info("线程4:当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
           // A->B(版本号1->2)
           stampedRef.compareAndSet("A", "B", 1, 2);
           log.info("线程4:A->B,当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
           // B->A(版本号2->3)
           stampedRef.compareAndSet("B", "A", 2, 3);
           log.info("线程4:B->A,当前值={},版本号={}", stampedRef.getReference(), stampedRef.getStamp());
       }, "Thread-4").start();

       try {
           Thread.sleep(2000);
       } catch (InterruptedException e) {
           log.error("主线程被中断", e);
       }
   }

   /**
    * 示例4:字段更新器(AtomicIntegerFieldUpdater)
    * 用于更新对象的volatile字段(无需修改对象类,动态更新)
    */

   @Test
   public void testAtomicFieldUpdater() {
       // 初始化对象
       User user = new User("ken", 20);
       log.info("更新前:name={},age={}", user.getName(), user.getAge());

       // 1. 创建字段更新器(更新User类的age字段,必须是volatile修饰)
      <User> ageUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

       // 2. 原子更新age字段
       ageUpdater.incrementAndGet(user);
       log.info("更新后(自增1):age={}", user.getAge());

       ageUpdater.addAndGet(user, 5);
       log.info("更新后(加5):age={}", user.getAge());

       // 3. 尝试更新(预期值26,目标值30)
       boolean success = ageUpdater.compareAndSet(user, 26, 30);
       log.info("CAS更新结果:{},最终age={}", success, user.getAge());
   }

   /**
    * 模拟User类(volatile字段)
    */

   @lombok.Data
   static class User {
       private String name;
       // 必须是volatile修饰,否则字段更新器无法使用
       private volatile int age;

       public User(String name, int age) {
           this.name = name;
           this.age = age;
       }
   }
}

4.5.4 原子类核心总结

  1. 适用场景:单个变量的原子性操作(如计数、累加),无需加锁,性能高效;
  2. LongAdder优势:高并发场景下,通过“分段累加”减少CAS竞争,性能优于AtomicLong;
  3. ABA问题解决:使用AtomicStampedReference(版本号)或AtomicMarkableReference(标记位);
  4. 字段更新器:无需修改对象类,即可动态更新其volatile字段,灵活性高。

4.6 并发集合:线程安全的集合框架

Java默认的集合框架(如ArrayList、HashMap)是非线程安全的,在并发场景下使用会出现数据不一致问题。JUC提供了专门的并发集合,解决了线程安全问题,同时性能优于Vector、Hashtable(后者使用synchronized加锁,性能较差)。

4.6.1 并发集合核心分类与对比

并发集合类 对应非并发集合 核心特点/底层实现 适用场景
ConcurrentHashMap HashMap 分段锁(JDK 1.8后改为CAS+ synchronized) 高并发键值对存储
CopyOnWriteArrayList ArrayList 写时复制(修改时复制整个数组,读无锁) 读多写少场景
CopyOnWriteArraySet HashSet 基于CopyOnWriteArrayList实现 读多写少的无序集合
ConcurrentLinkedQueue LinkedList 无锁队列(CAS实现) 高并发队列(FIFO)
ConcurrentLinkedDeque LinkedList 无锁双端队列 高并发双端队列(FIFO/LIFO)
LinkedBlockingQueue LinkedList 基于ReentrantLock的有界/无界队列 生产-消费模型(需控制并发数)
ArrayBlockingQueue ArrayList 基于数组的有界队列(ReentrantLock) 固定容量的生产-消费模型
SynchronousQueue 无缓冲队列(生产-消费直接交互) 线程间直接传递任务
PriorityBlockingQueue PriorityQueue 基于堆的无界优先级队列 高并发优先级任务调度

4.6.2 核心并发集合实战

package com.jam.demo.juc.collection;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 并发集合实战示例
* 包含:ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue、LinkedBlockingQueue
* @author ken
*/

@Slf4j
public class ConcurrentCollectionDemo {
   // 线程数
   private static final int THREAD_NUM = 5;
   // 每个线程操作次数
   private static final int OPERATE_TIMES = 10000;

   /**
    * 示例1:ConcurrentHashMap(高并发键值对存储)
    * 对比HashMap(非线程安全)和ConcurrentHashMap(线程安全)
    */

   @Test
   public void testConcurrentHashMap() throws InterruptedException {
       // 1. 测试HashMap(非线程安全)
      <String, Integer> hash<>();
       CountDownLatch hashMapLatch = new CountDownLatch(THREAD_NUM);
       ExecutorService hashMapExecutor = Executors.newFixedThreadPool(THREAD_NUM);
       AtomicInteger hashMapErrorCount = new AtomicInteger(0);

       for (int i = 0;< THREAD_NUM; i++) {
           int threadId = i;
           hashMapExecutor.submit(() -> {
               for (int j = 0;< OPERATE_TIMES; j++) {
                   String key = "key-" + (j % 10); // 共10个key,模拟竞争
                   try {
                       // 非原子操作:get+put,会出现线程安全问题
                       hashMap.put(key, hashMap.getOrDefault(key, 0) + 1);
                   } catch (Exception e) {
                       hashMapErrorCount.incrementAndGet();
                   }
               }
               hashMapLatch.countDown();
           });
       }

       hashMapLatch.await();
       hashMapExecutor.shutdown();
       int hashMapTotal = hashMap.values().stream().mapToInt(Integer::intValue).sum();
       log.info("HashMap:总计数={}(预期={}),错误数={}", hashMapTotal, THREAD_NUM * OPERATE_TIMES, hashMapErrorCount.get());

       // 2. 测试ConcurrentHashMap(线程安全)<String, Integer> concurrentHashMap = new Concurrent<>();
       CountDownLatch chmLatch = new CountDownLatch(THREAD_NUM);
       ExecutorService chmExecutor = Executors.newFixedThreadPool(THREAD_NUM);

       for (int i = 0;< THREAD_NUM; i++) {
           int threadId = i;
           chmExecutor.submit(() -> {
               for (int j = 0;< OPERATE_TIMES; j++) {
                   String key = "key-" + (j % 10);
                   // 原子操作:computeIfAbsent+getAndIncrement
                   concurrentHashMap.computeIfAbsent(key, k -> new AtomicInteger(0)).getAndIncrement();
               }
               chmLatch.countDown();
           });
       }

       chmLatch.await();
       chmExecutor.shutdown();
       int chmTotal = concurrentHashMap.values().stream().mapToInt(AtomicInteger::intValue).sum();
       log.info("ConcurrentHashMap:总计数={}(预期={})", chmTotal, THREAD_NUM * OPERATE_TIMES);
   }

   /**
    * 示例2:CopyOnWriteArrayList(读多写少场景)
    * 核心特点:写时复制,读无锁,写加锁(性能较差)
    */

   @Test
   public void testCopyOnWriteArrayList() throws InterruptedException {
       List<String> cowList = new<>();
       CountDownLatch latch = new CountDownLatch(THREAD_NUM);
       ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);

       for (int i = 0; i< THREAD_NUM; i++) {
           int threadId = i;
           executor.submit(() -> {
               // 写操作:add(写时复制)
               for (int j = 0; j < OPERATE_TIMES / 10; j++) {
                   cowList.add("data-" + threadId + "-" + j);
               }
               log.info("线程{}:写操作完成,当前列表大小={}", threadId, cowList.size());

               // 读操作:遍历(无锁)
               for (int j = 0;< 10; j++) {
                   String data = cowList.get(j % cowList.size());
                   if (j == 0) {
                       log.info("线程{}:读操作示例,data={}", threadId, data);
                   }
               }

               latch.countDown();
           });
       }

       latch.await();
       executor.shutdown();
       log.info("CopyOnWriteArrayList最终大小:{}", cowList.size());

       // 注意:CopyOnWriteArrayList的迭代器是快照迭代器,不支持remove操作
       try {
           for (String data : cowList) {
               cowList.remove(data);
           }
       } catch (UnsupportedOperationException e) {
           log.error("CopyOnWriteArrayList迭代器不支持remove操作", e);
       }
   }

   /**
    * 示例3:ConcurrentLinkedQueue(高并发无锁队列)
    * 适用场景:高并发下的FIFO队列,无锁设计,性能优于LinkedBlockingQueue
    */

   @Test
   public void testConcurrentLinkedQueue() throws InterruptedException {<String> queue<>();
       CountDownLatch producerLatch = new CountDownLatch(3); // 3个生产者
       CountDownLatch consumerLatch = new CountDownLatch(2); // 2个消费者
       ExecutorService executor = Executors.newFixedThreadPool(5);

       // 生产者线程:向队列添加任务
       for (int i = 0; i< 3; i++) {
           int producerId = i;
           executor.submit(() -> {
               for (int j = < 1000; j++) {
                   String task = "task-" + producerId + "-" + j;
                   queue.offer(task);
                   if (j % 100 == 0) {
                       log.info("生产者{}:添加任务{},队列大小={}", producerId, task, queue.size());
                   }
                   // 模拟生产耗时
                   try {
                       Thread.sleep(1);
                   } catch (InterruptedException e) {
                       log.error("生产者{}被中断", producerId, e);
                   }
               }
               log.info("生产者{}:生产完成", producerId);
               producerLatch.countDown();
           });
       }

       // 消费者线程:从队列获取任务
       for (int i = < 2; i++) {
           int consumerId = i;
           executor.submit(() -> {
               while (true) {
                   // 非阻塞获取任务(无任务返回null)
                   String task = queue.poll();
                   if (!CollectionUtils.isEmpty(task)) {
                       log.info("消费者{}:处理任务{},队列剩余大小={}", consumerId, task, queue.size());
                       // 模拟消费耗时
                       try {
                           Thread.sleep(2);
                       } catch (InterruptedException e) {
                           log.error("消费者{}被中断", consumerId, e);
                       }
                   } else {
                       // 检查生产者是否完成,若完成则退出
                       if (producerLatch.getCount() == 0) {
                           break;
                       }
                   }
               }
               log.info("消费者{}:消费完成", consumerId);
               consumerLatch.countDown();
           });
       }

       producerLatch.await();
       consumerLatch.await();
       executor.shutdown();
       log.info("所有任务处理完成,队列剩余大小={}", queue.size());
   }

   /**
    * 示例4:LinkedBlockingQueue(有界队列,生产-消费模型)
    * 核心特点:基于ReentrantLock,支持有界容量,可用于流量控制
    */

   @Test
   public void testLinkedBlockingQueue() throws InterruptedException {
       // 初始化有界队列,容量100
       Queue<Integer> queue = new Linked<>(100);
       CountDownLatch producerLatch = new CountDownLatch(2);
       CountDownLatch consumerLatch = new CountDownLatch(3);
       ExecutorService executor = Executors.newFixedThreadPool(5);

       // 生产者:向队列添加数据,队列满时阻塞
       for (int i = 0; i< 2; i++) {
           int producerId = i;
           executor.submit(() -> {
               for (int j = < 500; j++) {
                   try {
                       // 阻塞添加(队列满时等待)
                       queue.put(j);
                       if (j % 100 == 0) {
                           log.info("生产者{}:添加数据{},队列大小={}", producerId, j, queue.size());
                       }
                   } catch (InterruptedException e) {
                       log.error("生产者{}被中断", producerId, e);
                       return;
                   }
               }
               log.info("生产者{}:生产完成", producerId);
               producerLatch.countDown();
           });
       }

       // 消费者:从队列获取数据,队列空时阻塞
       for (int i = 0< 3; i++) {
           int consumerId = i;
           executor.submit(() -> {
               while (true) {
                   try {
                       // 阻塞获取(队列空时等待)
                       Integer data = queue.take();
                       log.info("消费者{}:处理数据{},队列剩余大小={}", consumerId, data, queue.size());
                       // 模拟消费耗时
                       Thread.sleep(1);
                   } catch (InterruptedException e) {
                       log.error("消费者{}被中断", consumerId, e);
                       return;
                   }

                   // 检查生产者是否完成且队列空,若满足则退出
                   if (producerLatch.getCount() == 0 && queue.isEmpty()) {
                       break;
                   }
               }
               log.info("消费者{}:消费完成", consumerId);
               consumerLatch.countDown();
           });
       }

       producerLatch.await();
       consumerLatch.await();
       executor.shutdown();
       log.info("所有任务处理完成,队列剩余大小={}", queue.size());
   }
}

4.7 CompletableFuture:异步编程神器

CompletableFuture是JDK 8引入的异步编程工具,基于Future和回调机制,支持链式调用、异步组合、异常处理,解决了传统Future需要阻塞获取结果的问题,是JUC中异步编程的核心组件。

4.7.1 CompletableFuture核心特点

  1. 异步执行:支持异步提交任务,无需阻塞等待结果;
  2. 链式调用:通过thenApply、thenAccept、thenRun等方法实现链式处理;
  3. 任务组合:支持多个CompletableFuture的组合(allOf、anyOf);
  4. 异常处理:通过exceptionally、whenComplete等方法处理异步任务异常;
  5. 自定义线程池:可指定线程池执行任务,避免使用默认的ForkJoinPool。

4.7.2 CompletableFuture核心方法分类

方法类型 核心方法 功能描述
异步执行 runAsync、supplyAsync 无返回值/有返回值的异步任务
链式处理 thenApply、thenAccept、thenRun 任务完成后执行下一个任务(有输入/无输入)
异步组合 thenCompose、thenCombine、allOf、anyOf 多个任务的依赖组合/并行组合
异常处理 exceptionally、whenComplete、handle 任务异常时的处理/任务完成后的统一处理
结果获取 get、join、getNow 阻塞获取/非阻塞获取/立即获取(有默认值)

4.7.3 CompletableFuture实战:异步任务链与组合

package com.jam.demo.juc.future;

import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* CompletableFuture实战示例
* 包含:异步任务链、任务组合、异常处理、实战场景(异步查询+数据聚合)
* @author ken
*/

@Slf4j
public class CompletableFutureDemo {
   // 自定义线程池(避免使用默认ForkJoinPool)
   private static final ExecutorService CUSTOM_EXECUTOR = Executors.newFixedThreadPool(5);

   /**
    * 模拟服务调用:查询用户信息
    * @param userId 用户ID
    * @return 用户信息
    */

   public<User> queryUser(Long userId) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("查询用户信息:userId={}", userId);
           // 模拟服务调用耗时(500ms)
           try {
               TimeUnit.MILLISECONDS.sleep(500);
           } catch (InterruptedException e) {
               log.error("查询用户信息被中断", e);
               throw new RuntimeException("查询用户信息失败", e);
           }
           // 模拟返回用户信息
           return new User(userId, "用户" + userId, 20 + (int) (userId % 10));
       }, CUSTOM_EXECUTOR);
   }

   /**
    * 模拟服务调用:查询用户订单
    * @param userId 用户ID
    * @return 订单列表
    */

  <List<Order>> queryOrders(Long userId) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("查询用户订单:userId={}", userId);
           // 模拟服务调用耗时(800ms)
           try {
               TimeUnit.MILLISECONDS.sleep(800);
           } catch (InterruptedException e) {
               log.error("查询用户订单被中断", e);
               throw new RuntimeException("查询用户订单失败", e);
           }
           // 模拟返回订单列表<Order> orders = Lists.newArrayList();
           for (int i = 0; i < 2; i++) {
               orders.add(new Order(Long.valueOf(i + 1), userId, "订单" + userId + "-" + i, 100.0 * (i + 1)));
           }
           return orders;
       }, CUSTOM_EXECUTOR);
   }

   /**
    * 模拟服务调用:查询用户地址
    * @param userId 用户ID
    * @return 地址信息
    */

   public CompletableFuture<Address> queryAddress(Long userId) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("查询用户地址:userId={}", userId);
           // 模拟服务调用耗时(600ms)
           try {
               TimeUnit.MILLISECONDS.sleep(600);
           } catch (InterruptedException e) {
               log.error("查询用户地址被中断", e);
               throw new RuntimeException("查询用户地址失败", e);
           }
           // 模拟返回地址信息
           return new Address(Long.valueOf(userId), "省份" + userId, "城市" + userId, "详细地址" + userId);
       }, CUSTOM_EXECUTOR);
   }

   /**
    * 示例1:异步任务链(thenApply/thenAccept/thenRun)
    * 场景:查询用户信息 -> 处理用户信息 -> 记录日志
    */

   @Test
   public void testTaskChain() throws InterruptedException {
       Comp<Void> future = queryUser(1L)
               // 1. thenApply:有输入有输出,处理用户信息(异步执行)
               .thenApply(user -> {
                   log.info("处理用户信息:{}", user);
                   user.setAge(user.getAge() + 1); // 模拟处理
                   return user;
               })
               // 2. thenAccept:有输入无输出,消费处理结果(异步执行)
               .thenAccept(user -> {
                   log.info("消费处理后的用户信息:{}", user);
               })
               // 3. thenRun:无输入无输出,执行后续操作(异步执行)
               .thenRun(() -> {
                   log.info("任务链执行完成,记录日志");
               })
               // 4. 异常处理:所有环节的异常都会走到这里
               .exceptionally(ex -> {
                   log.error("任务链执行失败", ex);
                   return null;
               });

       // 等待任务完成
       future.join();
       CUSTOM_EXECUTOR.shutdown();
   }

   /**
    * 示例2:任务组合(thenCompose/thenCombine/allOf/anyOf)
    * 场景1:thenCompose(依赖任务,先查用户再查订单)
    * 场景2:thenCombine(并行任务,查用户和地址后聚合)
    * 场景3:allOf(多个并行任务,等待所有完成)
    * 场景4:anyOf(多个并行任务,等待任意一个完成)
    */

   @Test
   public void testTaskCombine() throws InterruptedException {
       // 场景1:thenCompose(依赖任务,先查用户再查订单)
       log.info("=== 场景1:thenCompose(依赖任务) ===");
       CompletableFuture<List<Order>> dependentFuture = queryUser(1L)
               .thenCompose(user -> {
                   log.info("用户查询完成,开始查询订单:{}", user);
                   return queryOrders(user.getId());
               })
               .exceptionally(ex -> {
                   log.error("依赖任务执行失败", ex);
                   return Lists.newArrayList();
               });<Order> orders = dependentFuture.join();
       log.info("依赖任务结果:订单列表={}", orders);

       // 场景2:thenCombine(并行任务,查用户和地址后聚合)
       log.info("\n=== 场景2:thenCombine(并行任务) ===");
       Completable<UserDetail> combineFuture = queryUser(2L)
               .thenCombine(queryAddress(2L), (user, address) -> {
                   log.info("用户和地址查询完成,开始聚合");
                   return new UserDetail(user, address);
               })
               .exceptionally(ex -> {
                   log.error("并行任务执行失败", ex);
                   return null;
               });
       UserDetail userDetail = combineFuture.join();
       log.info("并行任务结果:用户详情={}", userDetail);

       // 场景3:allOf(多个并行任务,等待所有完成)
       log.info("\n=== 场景3:allOf(多任务并行) ===");
       Complet<User> userFuture = queryUser(3L);
       Completable<Order>> orderFuture = queryOrders(3L);
       Complet<Address> addressFuture = queryAddress(3L);

       // 等待所有任务完成
      <Void> allOfFuture = CompletableFuture.allOf(userFuture, orderFuture, addressFuture)
               .thenRun(() -> {
                   log.info("所有任务完成,开始聚合数据");
                   User user = userFuture.join();
                   List<Order> orderList = orderFuture.join();
                   Address address = addressFuture.join();
                   UserDetail detail = new UserDetail(user, address, orderList);
                   log.info("多任务聚合结果:{}", detail);
               })
               .exceptionally(ex -> {
                   log.error("多任务执行失败", ex);
                   return null;
               });
       allOfFuture.join();

       // 场景4:anyOf(多个并行任务,等待任意一个完成)
       log.info("\n=== 场景4:anyOf(任意任务完成) ===");
       CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
           try {
               TimeUnit.MILLISECONDS.sleep(300);
           } catch (InterruptedException e) {
               log.error("task1被中断", e);
           }
           return "task1完成";
       }, CUSTOM_EXECUTOR);

       Comp<String> task2 = CompletableFuture.supplyAsync(() -> {
           try {
               TimeUnit.MILLISECONDS.sleep(500);
           } catch (InterruptedException e) {
               log.error("task2被中断", e);
           }
           return "task2完成";
       }, CUSTOM_EXECUTOR);

       CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(task1, task2)
               .thenAccept(result -> {
                   log.info("任意任务完成,结果:{}", result);
               })
               .exceptionally(ex -> {
                   log.error("任务执行失败", ex);
                   return null;
               });
       anyOfFuture.join();

       CUSTOM_EXECUTOR.shutdown();
   }

   /**
    * 示例3:异常处理(exceptionally/whenComplete/handle)
    */

   @Test
   public void testExceptionHandle() throws InterruptedException {
       // 1. exceptionally:异常时返回默认值
       log.info("=== 测试exceptionally ===");
       Completable<User> exceptionallyFuture = queryUser(999L)
               .exceptionally(ex -> {
                   log.error("查询用户失败,返回默认用户", ex);
                   return new User(0L, "默认用户", 0); // 异常时返回默认值
               });
       log.info("exceptionally结果:{}", exceptionallyFuture.join());

       // 2. whenComplete:完成/异常时都执行,不改变结果
       log.info("\n=== 测试whenComplete ===");
       Comp<User> whenCompleteFuture = queryUser(888L)
               .whenComplete((user, ex) -> {
                   if (ex == null) {
                       log.info("查询成功,用户:{}", user);
                   } else {
                       log.error("查询失败", ex);
                   }
               });
       try {
           whenCompleteFuture.join();
       } catch (Exception e) {
           log.error("join时捕获异常", e);
       }

       // 3. handle:完成/异常时都执行,可改变结果
       log.info("\n=== 测试handle ===");
       Completable<User> handleFuture = queryUser(777L)
               .handle((user, ex) -> {
                   if (ex == null) {
                       user.setAge(user.getAge() + 5);
                       return user;
                   } else {
                       log.error("查询失败,返回默认用户", ex);
                       return new User(0L, "handle默认用户", 0);
                   }
               });
       log.info("handle结果:{}", handleFuture.join());

       CUSTOM_EXECUTOR.shutdown();
   }

   /**
    * 示例4:实战场景:异步批量查询用户详情并聚合
    * 场景:批量查询10个用户的详情(用户信息+订单+地址),然后聚合返回
    */

   @Test
   public void testBatchQuery() throws InterruptedException {
       // 待查询的用户ID列表
      <Long> userIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);

       // 批量异步查询每个用户的详情
       List<UserDetail>> futureList = userIds.stream()
               .map(userId -> {
                   // 每个用户的详情查询:并行查询用户、订单、地址
                   Complet<User> userFuture = queryUser(userId);
<List<Order>> orderFuture = queryOrders(userId);
                  <Address> addressFuture = queryAddress(userId);

                   // 聚合单个用户的详情
                   return CompletableFuture.allOf(userFuture, orderFuture, addressFuture)
                           .thenApply(v -> {
                               User user = userFuture.join();
                              <Order> orders = orderFuture.join();
                               Address address = addressFuture.join();
                               return new UserDetail(user, address, orders);
                           })
                           .exceptionally(ex -> {
                               log.error("查询用户{}详情失败", userId, ex);
                               return new UserDetail(new User(userId, "查询失败", 0), null, null);
                           });
               })
               .collect(Collectors.toList());

       // 等待所有用户详情查询完成
       Completable<Void> allOfFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
               .thenRun(() -> {
                   log.info("所有用户详情查询完成,开始聚合<UserDetail> userDetails = futureList.stream()
                           .map(CompletableFuture::join)
                           .collect(Collectors.toList());
                   log.info("
批量查询结果:共{}个用户详情", userDetails.size());
                   // 过滤查询成功的用户
                  <UserDetail> successDetails = userDetails.stream()
                           .filter(detail -> !"
查询失败".equals(detail.getUser().getName()))
                           .collect(Collectors.toList());
                   log.info("
查询成功的用户数:{}", successDetails.size());
               })
               .exceptionally(ex -> {
                   log.error("
批量查询失败", ex);
                   return null;
               });

       allOfFuture.join();
       CUSTOM_EXECUTOR.shutdown();
   }

   /**
    * 模拟用户类
    */
   @Data
   static class User {
       private Long id;
       private String name;
       private Integer age;

       public User(Long id, String name, Integer age) {
           this.id = id;
           this.name = name;
           this.age = age;
       }
   }

   /**
    * 模拟订单类
    */
   @Data
   static class Order {
       private Long id;
       private Long userId;
       private String orderNo;
       private Double amount;

       public Order(Long id, Long userId, String orderNo, Double amount) {
           this.id = id;
           this.userId = userId;
           this.orderNo = orderNo;
           this.amount = amount;
       }
   }

   /**
    * 模拟地址类
    */
   @Data
   static class Address {
       private Long userId;
       private String province;
       private String city;
       private String detail;

       public Address(Long userId, String province, String city, String detail) {
           this.userId = userId;
           this.province = province;
           this.city = city;
           this.detail = detail;
       }
   }

   /**
    * 模拟用户详情类(聚合用户、地址、订单)
    */
   @Data
   static class UserDetail {
       private User user;
       private Address address;
<Order> orders;

       public UserDetail(User user, Address address) {
           this.user = user;
           this.address = address;
       }

       public UserDetail(User user, Address address,<Order> orders) {
           this.user = user;
           this.address = address;
           this.orders = orders;
       }
   }
}

五、JUC高级应用:并发编程设计模式与实战

5.1 生产者-消费者模式(基于阻塞队列)

生产者-消费者模式是并发编程中最经典的模式之一,用于解耦生产者和消费者,平衡两者的处理速度。JUC中的LinkedBlockingQueue、ArrayBlockingQueue等阻塞队列是实现该模式的理想工具。

核心实现思路:

  1. 生产者:向阻塞队列添加任务,队列满时阻塞;
  2. 消费者:从阻塞队列获取任务,队列空时阻塞;
  3. 队列:作为缓冲区,平衡生产者和消费者的速度;
  4. 终止条件:生产者完成任务后,向队列添加“终止信号”,消费者获取到信号后退出。

实战示例:基于LinkedBlockingQueue的生产者-消费者模式

package com.jam.demo.juc.pattern;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.CollectionUtils;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* 生产者-消费者模式实战(基于阻塞队列)
* 场景:3个生产者生产任务,2个消费者消费任务,任务完成后优雅终止
* @author ken
*/

@Slf4j
public class ProducerConsumerPattern {
   // 阻塞队列(缓冲区),容量10
   private final LinkedBlocking<Task> queue =<>(10);
   // 终止信号(用于通知消费者退出)
   private static final Task TERMINATE_TASK = new Task(-1, "终止任务");
   // 生产者数量
   private static final int PRODUCER_NUM = 3;
   // 消费者数量
   private static final int CONSUMER_NUM = 2;
   // 每个生产者生产的任务数
   private static final int TASK_PER_PRODUCER = 5;

   /**
    * 任务类
    */

   @lombok.Data
   static class Task {
       private int id;
       private String content;

       public Task(int id, String content) {
           this.id = id;
           this.content = content;
       }
   }

   /**
    * 生产者线程
    */

   class Producer implements Runnable {
       private int producerId;

       public Producer(int producerId) {
           this.producerId = producerId;
       }

       @Override
       public void run() {
           try {
               for (int i =< TASK_PER_PRODUCER; i++) {
                   // 生成任务
                   Task task = new Task(i, "任务-" + producerId + "-" + i);
                   // 向队列添加任务(队列满时阻塞)
                   queue.put(task);
                   log.info("生产者{}:生产任务{},队列大小={}", producerId, task, queue.size());
                   // 模拟生产耗时
                   TimeUnit.MILLISECONDS.sleep(100);
               }
               log.info("生产者{}:生产完成", producerId);
           } catch (InterruptedException e) {
               log.error("生产者{}被中断", producerId, e);
               Thread.currentThread().interrupt();
           }
       }
   }

   /**
    * 消费者线程
    */

   class Consumer implements Runnable {
       private int consumerId;

       public Consumer(int consumerId) {
           this.consumerId = consumerId;
       }

       @Override
       public void run() {
           try {
               while (true) {
                   // 从队列获取任务(队列空时阻塞)
                   Task task = queue.take();
                   // 检查是否为终止信号
                   if (task == TERMINATE_TASK) {
                       log.info("消费者{}:收到终止信号,退出", consumerId);
                       // 将终止信号放回队列,让其他消费者也能收到
                       queue.put(TERMINATE_TASK);
                       break;
                   }
                   // 处理任务
                   processTask(task);
               }
           } catch (InterruptedException e) {
               log.error("消费者{}被中断", consumerId, e);
               Thread.currentThread().interrupt();
           }
       }

       /**
        * 处理任务
        */

       private void processTask(Task task) {
           log.info("消费者{}:开始处理任务{}", consumerId, task);
           // 模拟处理耗时
           try {
               TimeUnit.MILLISECONDS.sleep(200);
           } catch (InterruptedException e) {
               log.error("消费者{}处理任务{}被中断", consumerId, task.getId(), e);
               Thread.currentThread().interrupt();
           }
           log.info("消费者{}:完成处理任务{}", consumerId, task);
       }
   }

   /**
    * 启动生产者-消费者模式
    */

   public void start() throws InterruptedException {
       // 启动生产者线程
       Thread[] producers = new Thread[PRODUCER_NUM];
       for (int i = 0; i< PRODUCER_NUM; i++) {
           producers[i] = new Thread(new Producer(i), "Producer-" + i);
           producers[i].start();
       }

       // 启动消费者线程
       Thread[] consumers = new Thread[CONSUMER_NUM];
       for (int i =< CONSUMER_NUM; i++) {
           consumers[i] = new Thread(new Consumer(i), "Consumer-" + i);
           consumers[i].start();
       }

       // 等待所有生产者完成生产
       for (Thread producer : producers) {
           producer.join();
       }
       log.info("所有生产者生产完成,添加终止信号");

       // 向队列添加终止信号(消费者收到后退出)
       queue.put(TERMINATE_TASK);

       // 等待所有消费者完成消费
       for (Thread consumer : consumers) {
           consumer.join();
       }
       log.info("所有消费者消费完成,生产者-消费者模式结束");
   }

   @Test
   public void testProducerConsumer() throws InterruptedException {
       start();
   }
}

5.2 线程池 + CompletableFuture:异步任务编排实战

在实际开发中,经常需要编排多个异步任务(如并行查询多个服务、串行处理依赖任务),结合线程池和CompletableFuture可以高效实现复杂的任务编排。

实战示例:电商订单提交的异步任务编排

package com.jam.demo.juc.pattern;

import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* 异步任务编排实战(电商订单提交场景)
* 场景:订单提交需要执行以下任务:
* 1. 校验用户信息(串行,必须先执行)
* 2. 并行执行:校验库存、查询用户地址、查询优惠券
* 3. 串行执行:扣减库存、扣减优惠券、创建订单
* 4. 异步执行:发送订单通知、记录订单日志
* @author ken
*/

@Slf4j
public class AsyncTaskOrchestration {
   // 自定义线程池(按任务类型拆分线程池,提高隔离性)
   private static final ExecutorService VALIDATE_EXECUTOR = Executors.newFixedThreadPool(2);
   private static final ExecutorService BUSINESS_EXECUTOR = Executors.newFixedThreadPool(5);
   private static final ExecutorService NOTIFY_EXECUTOR = Executors.newFixedThreadPool(2);

   /**
    * 订单请求类
    */

   @Data
   static class OrderRequest {
       private Long userId;
       private Long productId;
       private Integer quantity;
       private Long couponId;
   }

   /**
    * 订单响应类
    */

   @Data
   static class OrderResponse {
       private Boolean success;
       private String orderNo;
       private String message;
   }

   /**
    * 步骤1:校验用户信息(串行,必须先执行)
    */

   public CompletableFuture<Boolean> validateUser(Long userId) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("校验用户信息:userId={}", userId);
           try {
               TimeUnit.MILLISECONDS.sleep(300);
           } catch (InterruptedException e) {
               log.error("校验用户信息被中断", e);
               return false;
           }
           // 模拟校验逻辑:userId>0则校验通过
           boolean valid = !ObjectUtils.isEmpty(userId) && userId > 0;
           log.info("用户校验结果:{}", valid);
           return valid;
       }, VALIDATE_EXECUTOR);
   }

   /**
    * 步骤2-1:校验库存
    */

   public CompletableFuture<Boolean> validateStock(Long productId, Integer quantity) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("校验库存:productId={},quantity={}", productId, quantity);
           try {
               TimeUnit.MILLISECONDS.sleep(500);
           } catch (InterruptedException e) {
               log.error("校验库存被中断", e);
               return false;
           }
           // 模拟库存校验:productId有效且quantity<=10则通过
           boolean valid = !ObjectUtils.isEmpty(productId)
                   && !ObjectUtils.isEmpty(quantity)
                   && quantity <= 10;
           log.info("库存校验结果:{}", valid);
           return valid;
       }, BUSINESS_EXECUTOR);
   }

   /**
    * 步骤2-2:查询用户地址
    */

   public CompletableFuture<String> queryAddress(Long userId) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("查询用户地址:userId={}", userId);
           try {
               TimeUnit.MILLISECONDS.sleep(400);
           } catch (InterruptedException e) {
               log.error("查询用户地址被中断", e);
               return null;
           }
           // 模拟查询结果:userId有效则返回地址
           if (ObjectUtils.isEmpty(userId) || userId <= 0) {
               return null;
           }
           String address = "省份A-城市B-详细地址" + userId;
           log.info("用户地址查询结果:{}", address);
           return address;
       }, BUSINESS_EXECUTOR);
   }

   /**
    * 步骤2-3:查询优惠券(可选,couponId为空则跳过)
    */

   public CompletableFuture<Boolean> queryCoupon(Long userId, Long couponId) {
       return CompletableFuture.supplyAsync(() -> {
           if (ObjectUtils.isEmpty(couponId)) {
               log.info("无优惠券信息,跳过查询");
               return true;
           }
           log.info("查询优惠券:userId={},couponId={}", userId, couponId);
           try {
               TimeUnit.MILLISECONDS.sleep(350);
           } catch (InterruptedException e) {
               log.error("查询优惠券被中断", e);
               return false;
           }
           // 模拟优惠券校验:userId和couponId匹配则有效
           boolean valid = !ObjectUtils.isEmpty(userId) && userId > 0;
           log.info("优惠券校验结果:{}", valid);
           return valid;
       }, BUSINESS_EXECUTOR);
   }

   /**
    * 步骤3-1:扣减库存
    */

   public CompletableFuture<Boolean> deductStock(Long productId, Integer quantity) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("扣减库存:productId={},quantity={}", productId, quantity);
           try {
               TimeUnit.MILLISECONDS.sleep(450);
           } catch (InterruptedException e) {
               log.error("扣减库存被中断", e);
               return false;
           }
           // 模拟扣减逻辑:productId和quantity有效则成功
           boolean success = !ObjectUtils.isEmpty(productId) && !ObjectUtils.isEmpty(quantity);
           log.info("库存扣减结果:{}", success);
           return success;
       }, BUSINESS_EXECUTOR);
   }

   /**
    * 步骤3-2:扣减优惠券(可选)
    */

   public CompletableFuture<Boolean> deductCoupon(Long userId, Long couponId) {
       return CompletableFuture.supplyAsync(() -> {
           if (ObjectUtils.isEmpty(couponId)) {
               log.info("无优惠券,跳过扣减");
               return true;
           }
           log.info("扣减优惠券:userId={},couponId={}", userId, couponId);
           try {
               TimeUnit.MILLISECONDS.sleep(300);
           } catch (InterruptedException e) {
               log.error("扣减优惠券被中断", e);
               return false;
           }
           boolean success = !ObjectUtils.isEmpty(userId) && userId > 0;
           log.info("优惠券扣减结果:{}", success);
           return success;
       }, BUSINESS_EXECUTOR);
   }

   /**
    * 步骤3-3:创建订单
    */

   public CompletableFuture<String> createOrder(OrderRequest request, String address) {
       return CompletableFuture.supplyAsync(() -> {
           log.info("创建订单:request={},address={}", request, address);
           try {
               TimeUnit.MILLISECONDS.sleep(500);
           } catch (InterruptedException e) {
               log.error("创建订单被中断", e);
               return null;
           }
           // 模拟创建订单:生成UUID作为订单号
           String orderNo = UUID.randomUUID().toString().replace("-", "");
           log.info("订单创建成功:orderNo={}", orderNo);
           return orderNo;
       }, BUSINESS_EXECUTOR);
   }

   /**
    * 步骤4-1:发送订单通知(异步,不阻塞主流程)
    */

   public CompletableFuture<Void> sendNotify(String orderNo, Long userId) {
       return CompletableFuture.runAsync(() -> {
           log.info("发送订单通知:orderNo={},userId={}", orderNo, userId);
           try {
               TimeUnit.MILLISECONDS.sleep(600);
           } catch (InterruptedException e) {
               log.error("发送订单通知被中断", e);
           }
           log.info("订单通知发送完成:orderNo={}", orderNo);
       }, NOTIFY_EXECUTOR);
   }

   /**
    * 步骤4-2:记录订单日志(异步,不阻塞主流程)
    */

   public CompletableFuture<Void> recordLog(String orderNo, OrderRequest request) {
       return CompletableFuture.runAsync(() -> {
           log.info("记录订单日志:orderNo={},request={}", orderNo, request);
           try {
               TimeUnit.MILLISECONDS.sleep(200);
           } catch (InterruptedException e) {
               log.error("记录订单日志被中断", e);
           }
           log.info("订单日志记录完成:orderNo={}", orderNo);
       }, NOTIFY_EXECUTOR);
   }

   /**
    * 核心:订单提交任务编排
    */

   public CompletableFuture<OrderResponse> submitOrder(OrderRequest request) {
       // 参数校验
       if (ObjectUtils.isEmpty(request)
               || ObjectUtils.isEmpty(request.getUserId())
               || ObjectUtils.isEmpty(request.getProductId())
               || ObjectUtils.isEmpty(request.getQuantity())) {
           OrderResponse response = new OrderResponse();
           response.setSuccess(false);
           response.setMessage("订单参数不完整");
           return CompletableFuture.completedFuture(response);
       }

       // 任务编排流程
       return validateUser(request.getUserId())
               // 步骤1:用户校验失败,直接返回
               .thenCompose(userValid -> {
                   if (!userValid) {
                       OrderResponse response = new OrderResponse();
                       response.setSuccess(false);
                       response.setMessage("用户信息校验失败");
                       return CompletableFuture.completedFuture(response);
                   }

                   // 步骤2:并行执行3个任务
                   CompletableFuture<Boolean> stockValidFuture = validateStock(request.getProductId(), request.getQuantity());
                   CompletableFuture<String> addressFuture = queryAddress(request.getUserId());
                   CompletableFuture<Boolean> couponValidFuture = queryCoupon(request.getUserId(), request.getCouponId());

                   // 等待并行任务全部完成
                   return CompletableFuture.allOf(stockValidFuture, addressFuture, couponValidFuture)
                           .thenCompose(v -> {
                               // 获取并行任务结果
                               Boolean stockValid = stockValidFuture.join();
                               String address = addressFuture.join();
                               Boolean couponValid = couponValidFuture.join();

                               // 校验并行任务结果
                               if (!stockValid) {
                                   OrderResponse response = new OrderResponse();
                                   response.setSuccess(false);
                                   response.setMessage("库存不足或商品无效");
                                   return CompletableFuture.completedFuture(response);
                               }
                               if (StringUtils.isEmpty(address)) {
                                   OrderResponse response = new OrderResponse();
                                   response.setSuccess(false);
                                   response.setMessage("用户地址查询失败");
                                   return CompletableFuture.completedFuture(response);
                               }
                               if (!couponValid) {
                                   OrderResponse response = new OrderResponse();
                                   response.setSuccess(false);
                                   response.setMessage("优惠券无效");
                                   return CompletableFuture.completedFuture(response);
                               }

                               // 步骤3:串行执行3个核心任务(扣减库存->扣减优惠券->创建订单)
                               return deductStock(request.getProductId(), request.getQuantity())
                                       .thenCompose(stockDeductSuccess -> {
                                           if (!stockDeductSuccess) {
                                               OrderResponse response = new OrderResponse();
                                               response.setSuccess(false);
                                               response.setMessage("库存扣减失败");
                                               return CompletableFuture.completedFuture(response);
                                           }
                                           return deductCoupon(request.getUserId(), request.getCouponId())
                                                   .thenCompose(couponDeductSuccess -> {
                                                       if (!couponDeductSuccess) {
                                                           OrderResponse response = new OrderResponse();
                                                           response.setSuccess(false);
                                                           response.setMessage("优惠券扣减失败");
                                                           return CompletableFuture.completedFuture(response);
                                                       }
                                                       return createOrder(request, address)
                                                               .thenCompose(orderNo -> {
                                                                   if (StringUtils.isEmpty(orderNo)) {
                                                                       OrderResponse response = new OrderResponse();
                                                                       response.setSuccess(false);
                                                                       response.setMessage("订单创建失败");
                                                                       return CompletableFuture.completedFuture(response);
                                                                   }

                                                                   // 步骤4:异步执行通知和日志(不阻塞主流程)
                                                                   sendNotify(orderNo, request.getUserId());
                                                                   recordLog(orderNo, request);

                                                                   // 返回成功结果
                                                                   OrderResponse response = new OrderResponse();
                                                                   response.setSuccess(true);
                                                                   response.setOrderNo(orderNo);
                                                                   response.setMessage("订单提交成功");
                                                                   return CompletableFuture.completedFuture(response);
                                                               });
                                                   });
                                       });
                           });
               });
       })
               // 全局异常处理
               .exceptionally(ex -> {
                   log.error("订单提交异常", ex);
                   OrderResponse response = new OrderResponse();
                   response.setSuccess(false);
                   response.setMessage("订单提交失败:" + ex.getMessage());
                   return response;
               });
   }

   /**
    * 测试订单提交
    */

   @Test
   public void testSubmitOrder() throws InterruptedException {
       // 构造有效订单请求
       OrderRequest validRequest = new OrderRequest();
       validRequest.setUserId(1L);
       validRequest.setProductId(1001L);
       validRequest.setQuantity(2);
       validRequest.setCouponId(2001L);

       // 构造无效订单请求(库存不足)
       OrderRequest invalidStockRequest = new OrderRequest();
       invalidStockRequest.setUserId(1L);
       invalidStockRequest.setProductId(1001L);
       invalidStockRequest.setQuantity(20); // 超过最大库存10
       invalidStockRequest.setCouponId(2001L);

       // 测试有效请求
       log.info("=== 测试有效订单请求 ===");
       CompletableFuture<OrderResponse> validFuture = submitOrder(validRequest);
       OrderResponse validResponse = validFuture.join();
       log.info("有效订单请求结果:{}", validResponse);

       // 测试无效请求
       log.info("\n=== 测试无效订单请求(库存不足) ===");
       CompletableFuture<OrderResponse> invalidFuture = submitOrder(invalidStockRequest);
       OrderResponse invalidResponse = invalidFuture.join();
       log.info("无效订单请求结果:{}", invalidResponse);

       // 关闭线程池
       VALIDATE_EXECUTOR.shutdown();
       BUSINESS_EXECUTOR.shutdown();
       NOTIFY_EXECUTOR.shutdown();
       // 等待线程池关闭
       VALIDATE_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS);
       BUSINESS_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS);
       NOTIFY_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS);
   }
}

5.3 并发编程常见问题与解决方案

5.3.1 死锁问题

死锁是并发编程中最严重的问题之一,指两个或多个线程互相持有对方需要的资源,导致所有线程都无法继续执行。

死锁产生的4个必要条件:
  1. 互斥条件:资源只能被一个线程持有;
  2. 持有并等待条件:线程持有一个资源的同时,等待另一个资源;
  3. 不可剥夺条件:资源不能被强制从线程手中剥夺;
  4. 循环等待条件:线程之间形成资源请求的循环链。
实战示例:死锁模拟与排查

package com.jam.demo.juc.problem;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
* 死锁问题模拟与排查
* @author ken
*/

@Slf4j
public class DeadLockDemo {
   // 资源A
   private final Object resourceA = new Object();
   // 资源B
   private final Object resourceB = new Object();

   /**
    * 模拟死锁:线程1持有A等待B,线程2持有B等待A
    */

   public void simulateDeadLock() {
       // 线程1
       new Thread(() -> {
           synchronized (resourceA) {
               log.info("线程1:持有资源A,等待资源B");
               try {
                   // 持有A资源,休眠1秒,让线程2有时间持有B资源
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   log.error("线程1被中断", e);
               }
               // 尝试获取B资源
               synchronized (resourceB) {
                   log.info("线程1:成功获取资源B");
               }
           }
       }, "Thread-1").start();

       // 线程2
       new Thread(() -> {
           synchronized (resourceB) {
               log.info("线程2:持有资源B,等待资源A");
               try {
                   // 持有B资源,休眠1秒,让线程1有时间持有A资源
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   log.error("线程2被中断", e);
               }
               // 尝试获取A资源
               synchronized (resourceA) {
                   log.info("线程2:成功获取资源A");
               }
           }
       }, "Thread-2").start();
   }

   /**
    * 解决死锁:破坏循环等待条件(统一资源获取顺序)
    */

   public void solveDeadLock() {
       // 线程1:按A->B顺序获取资源
       new Thread(() -> {
           synchronized (resourceA) {
               log.info("线程1:持有资源A,等待资源B");
               try {
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   log.error("线程1被中断", e);
               }
               synchronized (resourceB) {
                   log.info("线程1:成功获取资源B");
               }
           }
       }, "Thread-1").start();

       // 线程2:同样按A->B顺序获取资源(破坏循环等待)
       new Thread(() -> {
           synchronized (resourceA) {
               log.info("线程2:持有资源A,等待资源B");
               try {
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   log.error("线程2被中断", e);
               }
               synchronized (resourceB) {
                   log.info("线程2:成功获取资源B");
               }
           }
       }, "Thread-2").start();
   }

   @Test
   public void testDeadLock() throws InterruptedException {
       log.info("=== 模拟死锁 ===");
       simulateDeadLock();
       // 休眠10秒,观察死锁状态
       TimeUnit.SECONDS.sleep(10);

       // 死锁排查方法:
       // 1. jps:获取进程ID(如1234)
       // 2. jstack 1234:查看线程堆栈,会显示死锁信息
       // 3. 结果示例:
       // Found one Java-level deadlock:
       // =============================
       // "Thread-2":
       //   waiting for ownable synchronizer 0x000000076b6e20c8, (a java.lang.Object)
       //   which is held by "Thread-1"
       // "Thread-1":
       //   waiting for ownable synchronizer 0x000000076b6e20d8, (a java.lang.Object)
       //   which is held by "Thread-2"

       log.info("\n=== 解决死锁 ===");
       solveDeadLock();
       TimeUnit.SECONDS.sleep(10);
   }
}

死锁解决方案:
  1. 破坏循环等待条件:统一资源获取顺序(如所有线程都按A->B->C的顺序获取资源);
  2. 破坏持有并等待条件:一次性获取所有需要的资源,获取失败则释放已持有资源;
  3. 破坏不可剥夺条件:使用可中断的锁(如ReentrantLock的lockInterruptibly()),超时则释放资源;
  4. 使用工具排查:jstack、jconsole、VisualVM等工具定位死锁。

5.3.2 线程安全问题(常见场景与解决方案)

1. 常见线程安全问题场景:
  • 共享可变变量未加锁;
  • 集合类(ArrayList、HashMap)在并发环境下使用;
  • 单例模式的懒加载未加锁;
  • 事务管理中的并发问题(如并发修改同一数据)。
2. 解决方案:
  • 使用线程安全的类(ConcurrentHashMap、CopyOnWriteArrayList);
  • 对共享资源加锁(ReentrantLock、synchronized);
  • 使用原子类(AtomicXxx、LongAdder)保证原子操作;
  • 单例模式:使用饿汉式、枚举单例,或双重检查锁(volatile+锁)。
实战示例:线程安全的单例模式

package com.jam.demo.juc.problem;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 线程安全的单例模式实战
* @author ken
*/

@Slf4j
public class SingletonDemo {
   /**
    * 双重检查锁单例(线程安全)
    * 核心:volatile防止指令重排序,双重检查减少锁开销
    */

   public static class DoubleCheckSingleton {
       // volatile修饰:防止instance = new DoubleCheckSingleton()的指令重排序
       private static volatile DoubleCheckSingleton instance;

       // 私有构造方法,防止外部实例化
       private DoubleCheckSingleton() {}

       /**
        * 获取单例实例
        * @return 单例实例
        */

       public static DoubleCheckSingleton getInstance() {
           // 第一次检查:未初始化则进入同步块(减少锁开销)
           if (instance == null) {
               synchronized (DoubleCheckSingleton.class) {
                   // 第二次检查:防止多线程同时进入同步块后重复初始化
                   if (instance == null) {
                       // 指令重排序问题:new操作分为3步(分配内存、初始化、赋值)
                       // volatile可禁止重排序,确保初始化完成后再赋值
                       instance = new DoubleCheckSingleton();
                   }
               }
           }
           return instance;
       }
   }

   /**
    * 枚举单例(线程安全,推荐)
    * 核心:JVM保证枚举类的实例唯一且线程安全
    */

   public enum EnumSingleton {
       INSTANCE;

       // 枚举类的方法
       public void doSomething() {
           log.info("枚举单例执行方法");
       }
   }

   /**
    * 测试单例模式的线程安全性
    */

   @Test
   public void testSingletonThreadSafe() throws InterruptedException {
       int threadNum = 100;
       CountDownLatch latch = new CountDownLatch(threadNum);
       ExecutorService executor = Executors.newFixedThreadPool(threadNum);
       AtomicReference<DoubleCheckSingleton> singletonRef = new AtomicReference<>();
       boolean[] threadSafe = {true};

       // 测试双重检查锁单例
       for (int i = 0; i < threadNum; i++) {
           executor.submit(() -> {
               DoubleCheckSingleton instance = DoubleCheckSingleton.getInstance();
               if (singletonRef.get() == null) {
                   singletonRef.set(instance);
               } else if (singletonRef.get() != instance) {
                   threadSafe[0] = false;
               }
               latch.countDown();
           });
       }

       latch.await();
       executor.shutdown();
       log.info("双重检查锁单例是否线程安全:{}", threadSafe[0]);

       // 测试枚举单例
       log.info("\n=== 测试枚举单例 ===");
       EnumSingleton instance1 = EnumSingleton.INSTANCE;
       EnumSingleton instance2 = EnumSingleton.INSTANCE;
       log.info("枚举单例实例是否相同:{}", instance1 == instance2);
       instance1.doSomething();
   }
}

六、JUC框架核心总结与最佳实践

6.1 核心组件总结

组件类型 核心组件 核心功能 底层实现/设计模式
同步锁 ReentrantLock、ReadWriteLock 独占锁、读写分离锁 AQS、模板方法模式
共享同步工具 CountDownLatch、CyclicBarrier、Semaphore 多线程协调、并发数控制 AQS(CountDownLatch、Semaphore)、ReentrantLock(CyclicBarrier)
线程池 ThreadPoolExecutor 线程复用、任务管理、并发控制 核心参数+任务队列+拒绝策略
原子类 AtomicXxx、LongAdder 线程安全的变量操作 CAS操作、分段累加(LongAdder)
并发集合 ConcurrentHashMap、CopyOnWriteArrayList 线程安全的集合存储 分段锁/CAS(ConcurrentHashMap)、写时复制(CopyOnWriteArrayList)
异步编程 CompletableFuture 异步任务执行、链式调用、任务组合 Future+回调机制、线程池

6.2 最佳实践

1. 锁的选择

  • 简单场景:使用synchronized(JDK 1.6后优化,性能接近ReentrantLock);
  • 复杂场景(可中断、超时、多条件):使用ReentrantLock;
  • 读写分离场景(读多写少):使用ReadWriteLock(ReentrantReadWriteLock);
  • 原子变量操作:使用AtomicXxx或LongAdder(高并发累加)。

2. 线程池的使用

  • 禁止使用Executors创建线程池,自定义ThreadPoolExecutor,明确核心参数;
  • 按任务类型拆分线程池(如IO密集型、CPU密集型、定时任务),提高隔离性;
  • 合理设置队列容量(有界队列),避免内存溢出;
  • 结合CompletableFuture实现异步任务编排,提高吞吐量。

3. 并发集合的选择

  • 高并发键值对存储:ConcurrentHashMap;
  • 读多写少的列表:CopyOnWriteArrayList;
  • 高并发队列(FIFO):ConcurrentLinkedQueue;
  • 生产-消费模型:LinkedBlockingQueue、ArrayBlockingQueue;
  • 无缓冲队列(直接交互):SynchronousQueue。

4. 异步编程

  • 使用CompletableFuture替代传统Future,支持链式调用和异常处理;
  • 自定义线程池执行异步任务,避免使用默认的ForkJoinPool;
  • 复杂任务编排:使用thenCompose(依赖任务)、thenCombine(并行任务)、allOf(多任务并行);
  • 必须处理异常:使用exceptionally、whenComplete、handle方法捕获异步任务异常。

5. 性能优化

  • 减少锁粒度:使用分段锁(ConcurrentHashMap)、锁剥离;
  • 避免锁竞争:使用无锁编程(CAS、原子类);
  • 读写分离:使用ReadWriteLock,提高读操作吞吐量;
  • 批量处理:结合线程池和分片处理大量任务(如批量数据库操作)。

6. 问题排查

  • 死锁:使用jstack、VisualVM排查;
  • 线程泄漏:检查线程池是否正确关闭,避免创建过多线程;
  • 性能问题:使用JProfiler、Arthas排查线程阻塞、锁竞争问题;
  • 线程安全问题:使用线程安全的类,加锁时注意锁的范围和顺序。

七、总结

JUC框架是Java并发编程的核心,涵盖了同步锁、线程池、原子类、并发集合、异步编程等核心组件,其底层都基于AQS的设计思想,通过“状态变量+队列”的模式实现高效的并发控制。

目录
相关文章
|
2天前
|
云安全 监控 安全
|
7天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
932 5
|
13天前
|
人工智能 Java API
Java 正式进入 Agentic AI 时代:Spring AI Alibaba 1.1 发布背后的技术演进
Spring AI Alibaba 1.1 正式发布,提供极简方式构建企业级AI智能体。基于ReactAgent核心,支持多智能体协作、上下文工程与生产级管控,助力开发者快速打造可靠、可扩展的智能应用。
1097 41
|
9天前
|
机器学习/深度学习 人工智能 数据可视化
1秒生图!6B参数如何“以小博大”生成超真实图像?
Z-Image是6B参数开源图像生成模型,仅需16GB显存即可生成媲美百亿级模型的超真实图像,支持中英双语文本渲染与智能编辑,登顶Hugging Face趋势榜,首日下载破50万。
664 38
|
13天前
|
人工智能 前端开发 算法
大厂CIO独家分享:AI如何重塑开发者未来十年
在 AI 时代,若你还在紧盯代码量、执着于全栈工程师的招聘,或者仅凭技术贡献率来评判价值,执着于业务提效的比例而忽略产研价值,你很可能已经被所谓的“常识”困住了脚步。
758 67
大厂CIO独家分享:AI如何重塑开发者未来十年
|
9天前
|
存储 自然语言处理 测试技术
一行代码,让 Elasticsearch 集群瞬间雪崩——5000W 数据压测下的性能避坑全攻略
本文深入剖析 Elasticsearch 中模糊查询的三大陷阱及性能优化方案。通过5000 万级数据量下做了高压测试,用真实数据复刻事故现场,助力开发者规避“查询雪崩”,为您的业务保驾护航。
473 30
|
16天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
937 59
Meta SAM3开源:让图像分割,听懂你的话
|
5天前
|
弹性计算 网络协议 Linux
阿里云ECS云服务器详细新手购买流程步骤(图文详解)
新手怎么购买阿里云服务器ECS?今天出一期阿里云服务器ECS自定义购买流程:图文全解析,阿里云服务器ECS购买流程图解,自定义购买ECS的设置选项是最复杂的,以自定义购买云服务器ECS为例,包括付费类型、地域、网络及可用区、实例、镜像、系统盘、数据盘、公网IP、安全组及登录凭证详细设置教程:
204 114