Java 多线程 & 锁原理 | Java Debug 笔记

简介: Java 多线程 & 锁原理 | Java Debug 笔记

后端讲求的是高并发、高性能、高可用(3H),但是要实现 3H,通常是通过优化架构(横向分层,纵向分割)、使用缓存、分布式化和集群化等手段来实现。很少会自己写线程代码,日常开发在需要用到多线程的地方也大多都交给框架处理,对多线程和锁原理的理解一直不够深入。


趁着周末有时间,从简单的线程创建方式开始讲起,逐步深入了解关于线程的几种状态和锁原理。针对一些以前没写过独立文章的锁实现,还会展开来讲。


Java 创建线程的几种方式


先简单介绍一下创建线程的几种方式:


  • 继承 Thread class SubThread extends Thread { @Override public void run() { System.out.println(getName()); }


public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            SubThread subThread = new SubThread();
            subThread.start();
        }
    }
}
复制代码


  • 实现 Runnable 接口 class RunnableImpl implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()); }


public static void main(String[] args) {
        RunnableImpl runnable = new RunnableImpl();
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
    }
}
复制代码


  • 由于 Runnable 接口只有一个 run 方法需要我们实现,所以也可以在创建 Thread 实例的时候传入 lambda 表达式,这里不再赘述。
  • 线程池 class RunnableImpl implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()); }


public static void main(String[] args) {
        final int count = 10;
        ExecutorService threadPool = Executors.newFixedThreadPool(count);
        RunnableImpl runnable = new RunnableImpl();
        for (int i = 0; i < count; i++) {
            threadPool.submit(runnable);
        }
    }
}
复制代码


  • Executors 作为线程池工具类提供了诸多快速创建线程池的 API 。一部分底层使用的是 ThreadPoolExecutor ,一部分底层使用的是 ForkJoinPool 。 使用线程池将不再需要显式创建线程,而只需要将任务提交到线程池(线程池内部使用一个 BlockingQueue 存放),线程池内部会自动创建/销毁线程来完成任务。 更多与线程池相关的内容可以看:深入理解 Java ThreadPool


线程的几种状态


首先 Java 中的线程和操作系统的线程是一一对应的关系。线程状态可以通过调用线程的 getState() 方法获取,返回值是一个状态枚举:


public enum State {
    NEW,
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED;
}
复制代码


前面也说了创建线程的几种方式,无论是通过何种方式创建的线程,总离不开以下 6 种线程状态:


  1. New :一个线程被 New 出来之后,调用 start() 方法之前,就是 New 状态
  2. Runnable :一个被创建出来的线程调用 start() 方法,线程开始进入 Runnable 状态
  1. Ready :就绪状态,指线程可以被 CPU 执行,所有的处于 Ready 状态的线程会被存放在一个等待队列里。当线程被调度器选中之后,会从 Ready 状态转换为 Running 状态
  2. Running :运行状态,指线程正在被 CPU 执行。当线程被挂起或调用 Thread.yleid() ,会从 Runniing 切换为 Ready 状态
  1. TimedWaiting :线程在 Running 状态调用以下方法会进入 TimedWaiting 状态,等待时间到了之后,线程会重新变为 Ready 状态(回到就绪队列当中)
  • Thread.sleep(millis);
  • o.wait(timeout);
  • thread.join(millis);
  • LockSupport.parkNanos(nanos);
  • LockSupport.parkUntil(deadline);
  1. Waiting :线程在 Runnable 里的 Running 状态调用以下 ① 方法会进入 Waiting 状态,直到调用以下 ② 方法回到 Ready 状态
  1. 从 Running 变为 Waiting 状态的方法:
  • o.wait();
  • thread.join();
  • LockSupport.park();
  1. 从 Waiting 状态回到 Ready 状态的方法:
  • o.notify();
  • o.notifyAll();
  • LockSupport.unpark(currentThread);
  1. Blocked :一个处于 Running 状态的线程试图获取进入同步代码块的锁失败的时候,会进入 Blocked 状态。直到获取到进入同步代码块的锁,回到 Ready 状态
  2. Teminated :当线程任务正常完成后的线程状态


注意:当一个线程的处于 Teminated 状态时,不能通过调用 start() 重新回到 Runnable 状态。


AQS


在讲具体的锁之前,先来了解一下 AQS ( java.util.concurrent.locks.AbstractQueuedSynchronizer )。


AQS 为 Java 中的各种 CAS 锁提供了上层抽象,AQS 中最为核心的四部分内容:


  1. Node 内部类。由于 AQS 中使用双向链表存储想要获取锁的线程,Node 作为双向链表中的节点类,与线程进行绑定,同时记录前一位和后一位节点,同时设立了 Node 一些状态属性。
    static final class Node { /** Marker to indicate a node is waiting in shared mode / static final Node SHARED = new Node(); /* Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;


static final int CANCELLED =  1;
 static final int SIGNAL    = -1;
 static final int CONDITION = -2;
 static final int PROPAGATE = -3;
 volatile int waitStatus;
 volatile Node prev;
 volatile Node next;
 volatile Thread thread;
 Node nextWaiter;
 final boolean isShared() {
     return nextWaiter == SHARED;
 }
 final Node predecessor() throws NullPointerException {
     Node p = prev;
     if (p == null)
         throw new NullPointerException();
     else
         return p;
 }
 Node() { }
 Node(Thread thread, Node mode) {     // Used by addWaiter
     this.nextWaiter = mode;
     this.thread = thread;
 }
 Node(Thread thread, int waitStatus) { // Used by Condition
     this.waitStatus = waitStatus;
     this.thread = thread;
 }
复制代码


  1. }
  2. state 属性值。一个被 volatile 修饰的 int 类型属性(保证了线程可见性)。提供了基本的 Setter & Getter。至于 state 代表的含义是什么要看具体的子类实现。
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private volatile int state;


protected final int getState() {
     return state;
 }
 protected final void setState(int newState) {
     state = newState;
 }
复制代码


  1. ... }
  2. 各种 CAS 操作。
    ... protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
    private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }
    private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
    private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
    private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } ...
  3. 骨架逻辑。不难发现,AQS 封装了不少骨架逻辑,使得子类只需要实现部分方法即可以完成自定义锁。
    ... final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
    public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
    public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ...




有了以上的基础认知,现在可以来看看 Java 的几种锁类型。


synchronized


synchronized 是最常用的实现线程安全的通用方案,synchronized 本质是锁升级过程。

使用 synchronized 的方式有两种。一种是直接使用同步代码块;另一种是将 synchronized  加在方法上,如果修饰的方法是实例方法,则使用 this 作为锁对象,如果修饰的方法是静态方法,则使用所在类的类对象作为锁对象。整个锁升级过程为:偏向锁 -> 轻量级锁 -> OS 锁 。


关于 synchronized 更详细的内容可以查看:深入理解 synchronized


ReentrantLock


ReentrantLock : 基于 CAS ,相比于 synchronized 能够更好的控制锁的状态。


synchronized 只要进入代码块就代表上锁,离开代码块就是释放锁;而 ReentranLock 需要手动的 lock 和 unlock,同时还提供 tryLock 方法 ,能够让我们在尝试获取锁失败后进行自定义操作。


ReentrantLock 还支持在初始化的实时指定 fair 参数,代表是否使用公平策略。公平锁使用的是 ReentrantLock 内部的 FairSync ;非公平锁使用的是 NonfairSync 。两者都间接继承自 AQS ,对 AQS 中的 state 的运用是用作记录是否上锁以及当前重入次数。


而 tryLock 方法,能够指定一个 timeout 参数,会在指定时间内进行尝试加锁,并返回加锁结果,不会像 lock 方法那样一直阻塞直到获取成功为止。


关于 ReentrantLock 更详细的内容可以查看:深入理解 ReentrantLock


ReadWriteLock


ReadWriteLock : 读写锁,其实是包含读锁(共享锁)和写锁(排它锁)。


读锁(共享锁): 当添加的是读锁,允许其他的线程同样使用读锁进入(其他的读线程),不允许使用写锁进入(写线程)。即读读并发,读写不并发。


写锁(排它锁): 当添加的是写锁的时候,其他使用读锁或者写锁的线程都不能进入。

关于 synchronized 更详细的内容可以查看:深入理解 ReadWriteLock


LockSupport


LockSupport :


  • synchronized 的实现原理 & 锁升级问题


关于 LockSupport 更详细的内容可以查看:深入理解 LockSupport


CountDownLatch


CountDownLatch : 倒计时,初始化的时候指定一个倒计时数值,当倒计时结束后,调用 wait 的线程会往下执行:


public static void main(String[] args) throws InterruptedException {
    int count = 100;
    // 设置倒计时数为 100
    final CountDownLatch countDownLatch = new CountDownLatch(count); 
    for (int i = 0; i < count; i++) {
        new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
                // 每个线程调用一次 countDown 代表倒计时减 1
                countDownLatch.countDown(); 
            }
        }).start();
    }
    System.out.println("countdown start");
    // 在倒计时结束前(count 数为 0)一直阻塞,直到倒计时结束
    countDownLatch.await(); 
    System.out.println("countdown end");
}
复制代码


没写过独立分析 CountDownLatch 的文章。这里简单分析下 CountDownLatch 的实现。CountDownLatch 的源码也十分简单,本质就是将 AQS 中的 state 作为计数:


public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        /**
        对当前 state 值固定减一,没有使用 release 参数
        也就是说只能每次调用 countDown 来进行倒计时减一操作
        而且只有 state 减为 0 ,才算是真正的锁释放,AQS 中的 doReleaseShared 方法才被执行,对线程执行 unpark 操作
        */
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    private final Sync sync;
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    /**
    尝试获取锁,如果获取失败,对线程进行 park 操作
    */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public long getCount() {
        return sync.getCount();
    }
}
复制代码


所以 CountDownLatch 顾名思义,就是将 AQS 中的 state 作为计数器使用,每次调用 countDown 则对 state 进行减一操作,只有 state 减到 0 才算释放锁。


CyclicBarrier


CyclicBarrier : 循环障碍。对调用了 await 方法的线程进行等待,直到有达到数量的等待线程之后再集体释放。


设定一个循环阈值和 Runnable 对象,每达到一次循环阈值,执行一次 Runnable 对象的 run 方法。


public static void main(String[] args) {
    int batchNumber = 10;
    final CyclicBarrier cyclicBarrier = new CyclicBarrier(batchNumber, new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": " + "数量达到规定,集体释放等待队列中的线程。");
        }
    });
    for (int i = 0; i < batchNumber; i++) {
        new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                for (int j = 0; j < 100; j++) {
                    System.out.println(Thread.currentThread().getName() + ": " + String.valueOf(j));
                    // 调用该方法会将当前线程放入 trip 等待队列中,直到队列中的数量达到阈值,再集体唤醒队列中的所有线程
                    cyclicBarrier.await();
                }
            }
        }).start();
    }
}
复制代码


上述实验的打印结果是:


Thread-0: 0
Thread-4: 0
Thread-3: 0
Thread-2: 0
Thread-1: 0
Thread-6: 0
Thread-5: 0
Thread-7: 0
Thread-8: 0
Thread-9: 0
Thread-9: 数量达到规定,集体释放等待队列中的线程。
Thread-9: 1
Thread-0: 1
Thread-3: 1
Thread-6: 1
Thread-8: 1
Thread-1: 1
Thread-2: 1
Thread-4: 1
Thread-7: 1
Thread-5: 1
Thread-5: 数量达到规定,集体释放等待队列中的线程。
...
复制代码


每次线程打印一次当前计数之后被加入 trip 等待队列,等待其他线程同样的值打印完了相同的值之后(这时候 trip 达到规定数量),才会继续执行(达到规定数量的 trip 队列会对所有线程集体释放)。


阅读 CyclicBarrier 相关源码:


public class CyclicBarrier {
    // 一个帮助记录是否打断控制的内部类,主要用作跳出死循环
    private static class Generation {
        boolean broken = false;
    }
    // 是否打断控制的变量
    private Generation generation = new Generation();
    // 使用 ReentrantLock 作为锁对象
    private final ReentrantLock lock = new ReentrantLock();
    // 使用 lock.newCondition() 来创建一个 trip 队列
    private final Condition trip = lock.newCondition();
    // 保存调用构建函数时指定的循环数值
    private final int parties;
    // 保存调用构建函数时指定的 Runnable
    private final Runnable barrierCommand;
    // 当前的实际计数数值,当 count = parties,代表需要执行 Runnable 的 run 方法了
    private int count;
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
...
    // 每次调用 await 方法(指定超时或者不指定超时),都会执行该方法
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 每次都会对 count 进行减一操作
            int index = --count;
            // 达到阈值(trip 队列中线程数量达到 parties 个)
            if (index == 0) {  
                boolean ranAction = false;
                try {
                    // 执行 Runnable 的 run 方法,通常在最后一位添加到 trip 的线程中执行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 该方法的作用是集体释放 trip 等待队列队列中的线程,重置 count 值和 generation 控制变量
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 如果 trip 队列中线程还没达到阈值,使用死循环一直对等待队列执行 await
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                // 当数量满足阈值,generation 会被重置,跳出死循环
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
}
复制代码


总结一下,CyclicBarrier 的底层使用 ReentrantLock 作为锁,并调用 newCondition 方法来创建 trip 等待队列,在等待队列的线程数没达到阈值时,调用 await 方法让队列中的线程集体等待,直到等待队列中线程数达到阈值,调用 signalAll 来对队列中的线程集体唤醒,同时重置计数器进行下一次的循环计数控制。


Phaser


Phaser : 相位器,也称为阶段器。可以看作是一个分段的 CyclicBarrier 。


当一个线程调用 arriveAndAwaitAdvance 的时候,代表该线程到达当前阶段,进入等待队列,直到所有线程都到达当前阶段(等待队列满了),再集体释放,进入下一阶段或结束。


public static void main(String[] args) {
    int workerNum = 3;
    int phases = 4;
    Phaser phaser = new Phaser(workerNum) {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println(String.format("phase: %s - parties: %s", phase, registeredParties));
            return registeredParties == 0;
        }
    };
    for (int i = 0; i < workerNum; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < phases; j++) {
                    System.out.println(Thread.currentThread().getName());
                    // 到达并等待前进,是指该线程到达这个阶段,等待其他线程到达这个阶段再一同前进。
                    phaser.arriveAndAwaitAdvance();
                }
            }
        }).start();
    }
}
复制代码


当任务涉及多个阶段,并需要某一个阶段的任务全部完成后才能开始下一阶段的任务的时候,可以考虑使用 Phaser 。


Semaphore


Semaphore :  信号量,指定允许多少个线程同时执行(获得锁)。使用上与 ThreadPoolExecutor 的 maximumPoolSize 参数类似。最多允许有多少任务同时执行。

与 ReentrantLock 类似,支持公平锁和非公平锁。通过构造函数的 fair 参数进行指定。


public static void main(String[] args) {
    int count = 5;
    Semaphore semaphore = new Semaphore(count);
    for (int i = 0; i < count * 2; i++) {
        new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                try {
                    // 阻塞直到获取锁
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(5000);
                } finally {
                    // 释放锁
                    semaphore.release();
                }
            }
        }).start();
    }
}
复制代码


Semaphore 在实现上也 ReentrantLock 大致相同,内部有一个继承自 AQS 的内部类 Sync 。将 AQS 中的 state 作为计数器使用,记录当前剩下多少个可运行名额。


...
abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits);
    }
    final int getPermits() {
        return getState();
    }
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            // remaining >= 0 代表还有可用名额,使用 CAS 尝试获取
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 释放锁就是归还可用名额
            if (compareAndSetState(current, next))
                return true;
        }
    }
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 相比只要有可用名额,调用 acquire 就有机会调用 CAS 尝试获取锁的非公平版本不同
            // 公平版本会在多一步检查,检查是否已经有别的线程正在排队
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
...
复制代码


Exchanger


Exchanger : 用作线程数据交换。


调用 exchange 方法后当前线程会阻塞,直到别的线程也调用 exchange 完成交换,才会往下执行。


public static void main(String[] args) {
    Exchanger<Object> exchanger = new Exchanger<>();
    new Thread(new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            String s = (String) exchanger.exchange("String from T1");
            System.out.println(Thread.currentThread().getName() + ": " + s);
        }
    }, "T1").start();
    new Thread(new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            String s = (String) exchanger.exchange("String from T2");
            System.out.println(Thread.currentThread().getName() + ": " + s);
        }
    }, "T2").start();
}
复制代码


查阅 Exchanger 的源码:


...
// 用作交换数据的 Node 类,还用了 @sun.misc.Contended 注解保证一个对象能够在一个独立的缓存行里
@sun.misc.Contended static final class Node {
    int index;              // Arena index
    int bound;              // Last recorded value of Exchanger.bound
    int collides;           // Number of CAS failures at current bound
    int hash;               // Pseudo-random for spins
    Object item;            // This thread's current item
    volatile Object match;  // Item provided by releasing thread
    volatile Thread parked; // Set to this thread when parked, else null
}
/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}
...
复制代码


CAS


  • ABA 问题


上面说到的几种锁,除了 synchronized ,其余都是基于 CAS 实现的。也就是其实现线程同步的方式都是:


  1. 取出当前的值,赋值给临时变量;或者直接指定希望的初始值
  2. 计算要设置的值;或者直接指定希望的目标值
  3. 将当前最新值和之前赋值的临时变量进行比较
  1. 如果相同,说明期间没有被其他线程修改过,将目标资源更新为要设置的值
  2. 如果不同,说明期间有其他线程修改过,重新从步骤一开始执行


以 ReentrantLock 的获取锁过程为例进行说明:无论是初始化是否指定 fair 参数的公平锁或者非公平锁(公平锁时 sync 为内部类 FairSync 实例;非公平锁时 sync 为内部类的 NonfairSync 实例),获取锁的时候都是调用 compareAndSetState(0, 1) (意思为期望当前状态为无锁状态 0 ,期望设置成有锁状态 1)。


但这流程当中有个问题:在高并发环境下,如果在计算要设置的值期间,有其他线程将目标资源从 A 修改为 B,再从 B 重新修改为 A ,是能够通过步骤三的判断的。但资源又确实是被其他线程修改过的。


这时候如果目标资源是基本数据类型,其实并不影响。例如我要 compareAndSetInt(0,1) ,那么代表我只关心初始值为 0 ,设置为 1 的条件,至于在我获取初始值(步骤一)和进行比较(步骤三)过程中发生了什么。并不需要关心。


如果是引用类型的话呢?引用对象没改变,但是对象中的某个属性发生了改变又该如何处理?当然是重写 equals 和 hashCode 方法,在步骤三中调用 equals 进行比较我们关心的属性值。


还有一个问题,如果我们确实需要保证在步骤一和步骤三之间没有被修改过,彻底避免 ABA 问题,能怎么处理?两种方案,一是为目标资源绑定一个 version,在步骤三中对 version 也进行比较;二是使用修改时间戳,同样在步骤三中进行比较。


  • 高并发场景


记得在我第一次梳理 CAS 流程的时候,就在想这难道就没有并发问题了吗?其实单纯从字面意思去理解步骤三的话是会出现并发问题的,也就是在对比了初始值和最新值之后,设置目标资源的目标值之前,是有可能被其他线程修改的?那为什么 CAS 这么普遍被使用呢?


其实在这两个步骤之间操作系统是有一个上锁的动作的,为的就是解决这个并发问题。在比较值和设置值这个操作,也就是 compareAndSet 这个动作发生时,系统会给总线或者缓存上锁,确保在多核环境下,不会被其他线程并发修改。但 CAS 无法解决一个问题是,只能保证保证一个共享变量是线程安全的。


  • 循环时间过长


我们知道 CAS 本质其实就是在一个 for 循环里将当前值和原来值做对比并尝试修改值。但当长时间 CAS 不成功的时候,将会为 CPU 带来巨大压力,特别是多个线程都尝试对一个共享变量进行 CAS 操作。如果 CAS 搭配处理器的 pause 指定一定程度能够缓解这个问题。pause 指令有两个作用:


  1. 可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。
  2. 可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。
相关文章
|
7天前
|
Java
Java中ReentrantLock释放锁代码解析
Java中ReentrantLock释放锁代码解析
24 8
|
7天前
|
Java
Java基础—笔记—static篇
`static`关键字用于声明静态变量和方法,在类加载时初始化,只有一份共享内存。静态变量可通过类名或对象访问,但推荐使用类名。静态方法无`this`,不能访问实例成员,常用于工具类。静态代码块在类加载时执行一次,用于初始化静态成员。
9 0
|
7天前
|
Java API 索引
Java基础—笔记—String篇
本文介绍了Java中的`String`类、包的管理和API文档的使用。包用于分类管理Java程序,同包下类无需导包,不同包需导入。使用API时,可按类名搜索、查看包、介绍、构造器和方法。方法命名能暗示其功能,注意参数和返回值。`String`创建有两种方式:双引号创建(常量池,共享)和构造器`new`(每次新建对象)。此外,列举了`String`的常用方法,如`length()`、`charAt()`、`equals()`、`substring()`等。
13 0
|
7天前
|
Java 调度
Java中常见锁的分类及概念分析
Java中常见锁的分类及概念分析
13 0
|
6天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
1天前
|
Java
浅谈Java的synchronized 锁以及synchronized 的锁升级
浅谈Java的synchronized 锁以及synchronized 的锁升级
5 0
|
2天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。
|
3天前
|
Java 程序员 编译器
Java中的线程同步与锁优化策略
【4月更文挑战第14天】在多线程编程中,线程同步是确保数据一致性和程序正确性的关键。Java提供了多种机制来实现线程同步,其中最常用的是synchronized关键字和Lock接口。本文将深入探讨Java中的线程同步问题,并分析如何通过锁优化策略提高程序性能。我们将首先介绍线程同步的基本概念,然后详细讨论synchronized和Lock的使用及优缺点,最后探讨一些锁优化技巧,如锁粗化、锁消除和读写锁等。
|
4天前
|
Java 编译器
Java并发编程中的锁优化策略
【4月更文挑战第13天】 在Java并发编程中,锁是一种常见的同步机制,用于保证多个线程之间的数据一致性。然而,不当的锁使用可能导致性能下降,甚至死锁。本文将探讨Java并发编程中的锁优化策略,包括锁粗化、锁消除、锁降级等方法,以提高程序的执行效率。
11 4
|
5天前
|
Java
探秘jstack:解决Java应用线程问题的利器
探秘jstack:解决Java应用线程问题的利器
14 1
探秘jstack:解决Java应用线程问题的利器