程序员必备的十大技能(进阶版)之网络与高并发原理(三)

简介: 教程来源 https://zlpow.cn/ Java并发编程深度剖析涵盖JMM内存模型、volatile内存屏障、synchronized锁升级优化、AQS源码机制、线程池调优及伪共享问题,系统揭示多线程安全与高性能底层原理。

四、Java并发编程深度剖析

4.1 Java内存模型(JMM)

│                          Java内存模型(JMM)                             │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   ┌──────────────────────┐         ┌──────────────────────┐            │
│   │      线程A            │         │      线程B            │            │
│   │   ┌──────────────┐   │         │   ┌──────────────┐   │            │
│   │   │  工作内存    │   │         │   │  工作内存    │   │            │
│   │   │ (本地缓存)   │   │         │   │ (本地缓存)   │   │            │
│   │   └──────┬───────┘   │         │   └──────┬───────┘   │            │
│   │          │           │         │          │           │            │
│   │          │ read/write│         │          │ read/write│            │
│   │          ▼           │         │          ▼           │            │
│   └──────────┼───────────┘         └──────────┼───────────┘            │
│              │                                 │                        │
│              └───────────────┬─────────────────┘                        │
│                              │                                          │
│                              ▼                                          │
│                   ┌─────────────────────┐                              │
│                   │      主内存          │                              │
│                   │   (堆内存/方法区)    │                              │
│                   └─────────────────────┘                              │
│                                                                         │
│   JMM规则:                                                             │
│   1. 所有变量存储在主内存                                                │
│   2. 每个线程有自己的工作内存(缓存+寄存器)                              │
│   3. 线程不能直接操作主内存,必须通过工作内存中转                         │
│   4.  volatile:写操作立即刷新到主内存,读操作从主内存读取                 │
│   5.  synchronized:解锁前刷新,加锁时读取                               │
└─────────────────────────────────────────────────────────────────────────┘

4.2 volatile与内存屏障

public class VolatileExample {
    // 保证可见性 + 禁止指令重排序
    private volatile boolean flag = false;
    private int value = 0;

    public void writer() {
        value = 42;           // 普通写
        flag = true;          // volatile写(内存屏障:StoreStore | StoreLoad)
    }

    public void reader() {
        if (flag) {           // volatile读(内存屏障:LoadLoad | LoadStore)
            // 保证能读到value=42,不会出现指令重排
            System.out.println(value);
        }
    }

    /*
     * volatile底层实现:lock前缀指令
     * 汇编代码:lock addl $0x0,(%rsp)
     * 作用:
     * 1. 将当前处理器缓存行写回到主内存
     * 2. 使其他处理器缓存行失效(MESI缓存一致性协议)
     * 3. 禁止指令重排序优化
     */
}

// DCL单例中的volatile
public class Singleton {
    private static volatile Singleton instance;  // volatile必不可少

    public static Singleton getInstance() {
        if (instance == null) {           // 第一次检查
            synchronized (Singleton.class) {
                if (instance == null) {   // 第二次检查
                    instance = new Singleton();  // 分解为3步
                    // 1. 分配内存空间
                    // 2. 初始化对象
                    // 3. 将instance指向内存地址
                    // volatile禁止2和3重排序
                }
            }
        }
        return instance;
    }
}

4.3 锁的升级与优化(JDK 1.6+)

/*
 * synchronized锁的四个状态:
 * 无锁 → 偏向锁 → 轻量级锁 → 重量级锁(单向升级)
 */

public class SynchronizedUpgradeDemo {

    private Object lock = new Object();

    // 1. 偏向锁:单线程重复获取,消除同步(Mark Word存储线程ID)
    public void biasedLock() {
        // JVM参数:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
        synchronized (lock) {
            // 第一次获取,偏向锁记录当前线程ID
            // 同一线程再次获取,无需CAS操作
        }
    }

    // 2. 轻量级锁:多线程交替执行,CAS自旋
    public void lightweightLock() {
        // 线程A获取锁,Mark Word指向栈帧中的Lock Record
        // 线程B自旋等待,不阻塞
        synchronized (lock) {
            // 执行
        }
    }

    // 3. 重量级锁:竞争激烈,阻塞线程
    public void heavyweightLock() {
        // 自旋超过阈值(JDK1.6自适应自旋)
        // 升级为重量级锁,操作系统的互斥量(mutex)
        // 线程阻塞,上下文切换开销大
        synchronized (lock) {
            // 执行
        }
    }
}

// 锁优化技术
public class LockOptimization {

    // 1. 锁消除(Escape Analysis)
    // JVM发现锁对象是线程局部变量,自动消除synchronized
    public void lockElimination() {
        // StringBuffer是线程安全的,但这里只在本方法使用
        StringBuffer sb = new StringBuffer();
        sb.append("a").append("b");  // JVM自动消除append方法的锁
    }

    // 2. 锁粗化
    public void lockCoarsening() {
        // 原始代码(频繁加锁解锁)
        for (int i = 0; i < 100; i++) {
            synchronized (this) {
                // 操作
            }
        }

        // JVM优化后(一次加锁)
        synchronized (this) {
            for (int i = 0; i < 100; i++) {
                // 操作
            }
        }
    }
}

4.4 AQS(AbstractQueuedSynchronizer)源码剖析

// AQS核心结构
public abstract class AbstractQueuedSynchronizer {

    // 同步状态(volatile保证可见性)
    private volatile int state;

    // CLH队列(FIFO双向链表)
    private transient volatile Node head;
    private transient volatile Node tail;

    static final class Node {
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        volatile int waitStatus;  // CANCELLED(1), SIGNAL(-1), CONDITION(-2), PROPAGATE(-3)
    }

    // 独占模式获取(ReentrantLock)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&  // 尝试获取(子类实现)
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {  // 加入队列
            selfInterrupt();
        }
    }

    // 加入等待队列
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);  // 自旋CAS入队
        return node;
    }

    // 自旋获取锁
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 检查是否可以park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

// ReentrantLock实现
public class ReentrantLock {
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;  // 重入
                if (nextc < 0) throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    // 公平锁
    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 公平:检查队列中是否有等待更久的线程
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

4.5 线程池深度解析

public class ThreadPoolAnalysis {

    /*
     * ThreadPoolExecutor核心参数
     * corePoolSize: 核心线程数
     * maximumPoolSize: 最大线程数
     * keepAliveTime: 空闲线程存活时间
     * unit: 时间单位
     * workQueue: 工作队列
     * threadFactory: 线程工厂
     * handler: 拒绝策略
     */

    // 工作队列类型对比
    // 1. ArrayBlockingQueue:有界队列,需要指定容量
    // 2. LinkedBlockingQueue:有界/无界,默认Integer.MAX_VALUE
    // 3. SynchronousQueue:无容量,每个put必须等待take
    // 4. PriorityBlockingQueue:优先级队列

    // 拒绝策略
    // 1. AbortPolicy:抛出RejectedExecutionException(默认)
    // 2. CallerRunsPolicy:交给调用者线程执行
    // 3. DiscardPolicy:直接丢弃
    // 4. DiscardOldestPolicy:丢弃队列头部任务

    // 线程数配置公式(IO密集型 vs CPU密集型)
    // CPU密集型:线程数 = CPU核心数 + 1
    // IO密集型:线程数 = CPU核心数 * (1 + 平均等待时间/平均计算时间)
    // 实际:线程数 = CPU核心数 * 2 ~ 4

    // 自定义线程池
    public static ExecutorService createOptimizedThreadPool() {
        int coreCount = Runtime.getRuntime().availableProcessors();

        return new ThreadPoolExecutor(
            coreCount,                        // corePoolSize
            coreCount * 4,                    // maximumPoolSize
            60L, TimeUnit.SECONDS,            // keepAliveTime
            new ArrayBlockingQueue<>(1000),   // 有界队列,防止OOM
            new NamedThreadFactory("worker"), // 自定义线程名
            new ThreadPoolExecutor.CallerRunsPolicy()  // 背压策略
        );
    }

    // 监控线程池
    public static class MonitoredThreadPool extends ThreadPoolExecutor {

        private final AtomicLong submittedTasks = new AtomicLong();
        private final AtomicLong completedTasks = new AtomicLong();
        private final AtomicLong rejectedTasks = new AtomicLong();

        @Override
        public void execute(Runnable command) {
            submittedTasks.incrementAndGet();
            super.execute(command);
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            completedTasks.incrementAndGet();
            super.afterExecute(r, t);
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            rejectedTasks.incrementAndGet();
            super.rejectedExecution(r, executor);
        }

        // 定期输出监控信息
        @Scheduled(fixedDelay = 60000)
        public void report() {
            logger.info("ThreadPool Status: " +
                "Active={}, PoolSize={}, QueueSize={}, " +
                "Submitted={}, Completed={}, Rejected={}",
                getActiveCount(), getPoolSize(), getQueue().size(),
                submittedTasks.get(), completedTasks.get(), rejectedTasks.get());
        }
    }
}

// CompletableFuture异步编程
public class CompletableFutureDemo {

    public void asyncProcessing() {
        // 异步任务
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> fetchUserInfo())      // 异步获取用户信息
            .thenApply(user -> enrichUser(user))      // 串行处理
            .thenCompose(user -> fetchOrders(user))   // 依赖另一个异步任务
            .thenCombine(                            // 组合两个异步任务
                CompletableFuture.supplyAsync(() -> fetchProductInfo()),
                (orders, products) -> buildResponse(orders, products)
            )
            .exceptionally(throwable -> {              // 异常处理
                logger.error("Async processing failed", throwable);
                return "default response";
            });

        // 阻塞等待
        String result = future.join();

        // 多个异步任务并行执行
        List<CompletableFuture<String>> futures = Arrays.asList(
            CompletableFuture.supplyAsync(() -> callServiceA()),
            CompletableFuture.supplyAsync(() -> callServiceB()),
            CompletableFuture.supplyAsync(() -> callServiceC())
        );

        // 等待所有完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenRun(() -> System.out.println("All completed"));

        // 等待任意一个完成
        CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
            .thenAccept(result -> System.out.println("First completed: " + result));
    }
}

4.6 伪共享(False Sharing)问题

public class FalseSharingDemo {

    /*
     * CPU缓存行(Cache Line):通常64字节
     * 伪共享:多个线程修改不同变量,但位于同一缓存行,导致缓存行失效
     */

    // 问题代码
    public static class Counter {
        volatile long x;  // 线程A频繁修改
        volatile long y;  // 线程B频繁修改
        // x和y可能在同一个缓存行(64字节,两个long = 16字节)
    }

    // 解决方案1:填充(Padding)
    public static class PaddedCounter {
        volatile long x;
        long p1, p2, p3, p4, p5, p6, p7;  // 填充56字节,确保x独占一个缓存行
        volatile long y;
    }

    // 解决方案2:@Contended(Java 8+)
    // JVM参数:-XX:-RestrictContended
    public static class ContendedCounter {
        @sun.misc.Contended
        volatile long x;

        @sun.misc.Contended
        volatile long y;
    }

    // 性能测试
    public static void benchmark() throws InterruptedException {
        int iterations = 100_000_000;
        Counter counter = new Counter();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < iterations; i++) {
                counter.x++;  // 频繁修改x
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < iterations; i++) {
                counter.y++;  // 频繁修改y
            }
        });

        long start = System.nanoTime();
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        long duration = (System.nanoTime() - start) / 1_000_000;

        System.out.println("Duration: " + duration + "ms");
        // 使用Counter: ~5000ms
        // 使用PaddedCounter: ~1000ms(性能提升5倍)
    }
}

来源:
https://rvtst.cn/

相关文章
|
22小时前
|
运维 Java 程序员
程序员必备的十大技能(进阶版)之性能调优与故障排查(一)
教程来源 https://qeext.cn/ 本文系统讲解性能调优与故障排查核心技能,涵盖故障方法论、CPU/内存/I/O/网络/数据库问题定位、Java诊断工具(Arthas/JVM)、全链路压测及混沌工程,辅以实战案例与黄金排查原则,助开发者从“重启党”进阶为问题终结者。
|
22小时前
|
设计模式 算法 程序员
程序员必备的十大技能(进阶版)之设计模式与架构思维(三)
教程来源 https://wkmsa.cn/ 行为型模式聚焦对象间职责分配与通信协作。含策略(算法可插拔)、观察者(事件自动通知)、责任链(请求逐级处理)、模板方法(算法骨架复用)、状态(行为随状态切换)等核心模式,提升系统灵活性与可维护性。
|
22小时前
|
Kubernetes Cloud Native 程序员
程序员必备的十大技能(进阶版)之云原生与容器化(三)
教程来源 https://unbgv.cn/ Kubernetes核心资源模型详解:Pod(最小调度单元,含Init容器、多探针、卷挂载等);Deployment(无状态应用滚动更新与回滚);Service(服务发现与负载均衡);Ingress(七层HTTP路由);HPA(基于CPU/内存/自定义指标的自动扩缩容)
|
2天前
|
消息中间件 NoSQL 程序员
程序员必备的十大技能(进阶版)之分布式核心技术(四)
教程来源 http://rvtst.cn/ 本节详解分布式锁与消息中间件核心实践:Redis红锁(RedLock)通过多节点多数派机制保障高可用与互斥性;ZooKeeper锁基于临时顺序节点实现强一致性;消息队列则覆盖可靠性(同步发送+手动确认)、幂等性(Redis/DB去重)及顺序性(有序发送与消费)。
|
22小时前
|
Cloud Native 安全 程序员
程序员必备的十大技能(进阶版)之云原生与容器化(五)
教程来源 https://vbzcj.cn/ 本文系统梳理云原生安全加固(镜像、运行时、网络、RBAC)、Helm包管理、GitOps持续交付(ArgoCD+CI/CD)、Service Mesh、eBPF及Serverless容器等核心实践,涵盖配置示例与最佳实践清单,助力构建安全、可靠、可观测的现代化K8s应用体系。
|
3天前
|
机器学习/深度学习 自然语言处理 C++
大模型应用:大模型实测对比:1.8B vs 6B,本地部署的极限拉扯与真实体感.119
本文对比Qwen1.5-1.8B与ChatGLM2-6B两大中文大模型:前者轻量易部署,CPU即可运行,代码简洁,但易幻觉、指令遵循弱;后者参数量大,中文理解与逻辑更强,但需GPU、加载复杂。二者代表“小而美”与“大而全”的典型路径。
大模型应用:大模型实测对比:1.8B vs 6B,本地部署的极限拉扯与真实体感.119
|
1天前
|
人工智能 缓存 安全
【AI 尝鲜实验室】5.22 号上新 | DeepSeek-TUI:终端里 DeepSeek 版的 Claude Code
DeepSeek-TUI是专为DeepSeek V4大模型打造的终端AI编程助手,支持100万Token超长上下文、实时推理可视化、文件/Shell/Git/网页等全栈操作,提供Plan/Agent/YOLO三种安全模式,无需图形界面,开箱即用。(239字)
|
2天前
|
人工智能 API 开发者
NVIDIA 免费 API 从申请到 Claude Code 接入全攻略:CLIProxyAPI 与 CCR 代理实战
JeecgBoot AI专题研究 NVIDIA 免费 API 申请与 Claude Code 第三方模型接入实战指南 前言Claude Code 已经成了很多开发者日常编程的首选工具,但它的付费门槛让不少人望而却步——尤其是以人民币结算的用户,每月开支不算小。好消息是,NVIDIA 提供了免费
131 0
NVIDIA 免费 API 从申请到 Claude Code 接入全攻略:CLIProxyAPI 与 CCR 代理实战
|
2天前
|
并行计算 API 开发者
万字详解:普通开发者如何用Ollama、llama.cpp把大模型无缝跑在本地消费级显卡上?
本文详解普通开发者如何用Ollama与llama.cpp,将7B–14B大模型高效部署于本地消费级显卡(如RTX 4060 8GB)。涵盖显存评估、量化原理(Q4_K_M等)、一键运行与精细调优、避坑指南及跨平台(CUDA/ROCm/Metal)实测数据,助你零成本、高隐私、离线可用。
|
2天前
|
Java Windows
【主流版本】JDK安装版下载地址和环境配置方法
本页提供主流JDK版本(6u45至21)的百度网盘与夸克网盘下载链接,含提取码、文件大小等信息;并详细指导Windows系统下JAVA_HOME与PATH环境变量配置及验证方法,助力Java开发环境快速搭建。