剑指JUC原理-9.Java无锁模型(上):https://developer.aliyun.com/article/1413634
原子引用
为什么需要原子引用类型?
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
有如下方法
public interface DecimalAccount { // 获取余额 BigDecimal getBalance(); // 取款 void withdraw(BigDecimal amount); /** * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作 * 如果初始余额为 10000 那么正确的结果应当是 0 */ static void demo(DecimalAccount account) { List<Thread> ts = new ArrayList<>(); for (int i = 0; i < 1000; i++) { ts.add(new Thread(() -> { account.withdraw(BigDecimal.TEN); })); } ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(account.getBalance()); } }
试着提供不同的 DecimalAccount 实现,实现安全的取款操作
不安全实现
class DecimalAccountUnsafe implements DecimalAccount { BigDecimal balance; public DecimalAccountUnsafe(BigDecimal balance) { this.balance = balance; } @Override public BigDecimal getBalance() { return balance; } @Override public void withdraw(BigDecimal amount) { BigDecimal balance = this.getBalance(); this.balance = balance.subtract(amount); } }
安全实现-使用锁
class DecimalAccountSafeLock implements DecimalAccount { private final Object lock = new Object(); BigDecimal balance; public DecimalAccountSafeLock(BigDecimal balance) { this.balance = balance; } @Override public BigDecimal getBalance() { return balance; } @Override public void withdraw(BigDecimal amount) { synchronized (lock) { BigDecimal balance = this.getBalance(); this.balance = balance.subtract(amount); } } }
安全实现-使用 CAS
class DecimalAccountSafeCas implements DecimalAccount { AtomicReference<BigDecimal> ref; public DecimalAccountSafeCas(BigDecimal balance) { ref = new AtomicReference<>(balance); } @Override public BigDecimal getBalance() { return ref.get(); } @Override public void withdraw(BigDecimal amount) { while (true) { BigDecimal prev = ref.get(); BigDecimal next = prev.subtract(amount); if (ref.compareAndSet(prev, next)) { break; } } } }
测试代码
DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal("10000"))); DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal("10000"))); DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal("10000")));
运行结果
4310 cost: 425 ms 0 cost: 285 ms 0 cost: 274 ms
ABA 问题及解决
ABA 问题
static AtomicReference<String> ref = new AtomicReference<>("A"); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // 获取值 A // 这个共享变量被它线程修改过? String prev = ref.get(); other(); sleep(1); // 尝试改为 C log.debug("change A->C {}", ref.compareAndSet(prev, "C")); } private static void other() { new Thread(() -> { log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B")); }, "t1").start(); sleep(0.5); new Thread(() -> { log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A")); }, "t2").start(); }
输出
11:29:52.325 c.Test36 [main] - main start... 11:29:52.379 c.Test36 [t1] - change A->B true 11:29:52.879 c.Test36 [t2] - change B->A true 11:29:53.880 c.Test36 [main] - change A->C true
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程
希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
AtomicStampedReference
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // 获取值 A String prev = ref.getReference(); // 获取版本号 int stamp = ref.getStamp(); log.debug("版本 {}", stamp); // 如果中间有其它线程干扰,发生了 ABA 现象 other(); sleep(1); // 尝试改为 C log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1)); } private static void other() { new Thread(() -> { log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1)); log.debug("更新版本为 {}", ref.getStamp()); }, "t1").start(); sleep(0.5); new Thread(() -> { log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1)); log.debug("更新版本为 {}", ref.getStamp()); }, "t2").start(); }
输出为
15:41:34.891 c.Test36 [main] - main start... 15:41:34.894 c.Test36 [main] - 版本 0 15:41:34.956 c.Test36 [t1] - change A->B true 15:41:34.956 c.Test36 [t1] - 更新版本为 1 15:41:35.457 c.Test36 [t2] - change B->A true 15:41:35.457 c.Test36 [t2] - 更新版本为 2 15:41:36.457 c.Test36 [main] - change A->C false
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A ->C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
AtomicMarkableReference
class GarbageBag { String desc; public GarbageBag(String desc) { this.desc = desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return super.toString() + " " + desc; } }
@Slf4j public class TestABAAtomicMarkableReference { public static void main(String[] args) throws InterruptedException { GarbageBag bag = new GarbageBag("装满了垃圾"); // 参数2 mark 可以看作一个标记,表示垃圾袋满了 AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true); log.debug("主线程 start..."); GarbageBag prev = ref.getReference(); log.debug(prev.toString()); new Thread(() -> { log.debug("打扫卫生的线程 start..."); bag.setDesc("空垃圾袋"); while (!ref.compareAndSet(bag, bag, true, false)) {} // 如果状态被改成了false,那么下次compareAndSet就不会成功了 log.debug(bag.toString()); }).start(); Thread.sleep(1000); log.debug("主线程想换一只新垃圾袋?"); boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false); log.debug("换了么?" + success); log.debug(ref.getReference().toString()); } }
输出
2023-10-13 15:30:09.264 [main] 主线程 start... 2023-10-13 15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾 2023-10-13 15:30:09.293 [Thread-1] 打扫卫生的线程 start... 2023-10-13 15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋 2023-10-13 15:30:10.294 [main] 主线程想换一只新垃圾袋? 2023-10-13 15:30:10.294 [main] 换了么?false 2023-10-13 15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
/** 参数1,提供数组、可以是线程不安全数组或线程安全数组 参数2,获取数组长度的方法 参数3,自增方法,回传 array, index 参数4,打印数组的方法 */ // supplier 提供者 无中生有 ()->结果 // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果 // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)-> private static <T> void demo( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer ) { List<Thread> ts = new ArrayList<>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0; i < length; i++) { // 每个线程对数组作 10000 次操作 ts.add(new Thread(() -> { for (int j = 0; j < 10000; j++) { putConsumer.accept(array, j%length); } })); } ts.forEach(t -> t.start()); // 启动所有线程 ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); // 等所有线程结束 printConsumer.accept(array); }
不安全的数组
demo( ()->new int[10], (array)->array.length, (array, index) -> array[index]++, array-> System.out.println(Arrays.toString(array)) );
结果
[9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698]
其实本质上是这样的:
其执行的是,多个线程,在对应数组里面进行 ++操作,那么就会有这么一种情况,两个线程 刚好读到了同样的位置,然后 都对同一个数进行了 ++ 操作,此时,理论上 两个线程的++操作 最后加了2,实际上确加了1
安全的数组
demo( ()-> new AtomicIntegerArray(10), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), array -> System.out.println(array) );
结果
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
字段更新器
- AtomicReferenceFieldUpdater // 域 字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
public class Test5 { private volatile int field; public static void main(String[] args) { AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field"); Test5 test5 = new Test5(); fieldUpdater.compareAndSet(test5, 0, 10); // 修改成功 field = 10 System.out.println(test5.field); // 修改成功 field = 20 fieldUpdater.compareAndSet(test5, 10, 20); System.out.println(test5.field); // 修改失败 field = 20 fieldUpdater.compareAndSet(test5, 10, 30); System.out.println(test5.field); } }
原子累加器(并发编程大师 - 与人类优秀的灵魂对话版)
剑指JUC原理-10.并发编程大师的原子累加器底层优化原理(与人类的优秀灵魂对话)-CSDN博客
Unsafe
cas 底层是调用的 unsafe
概述
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
public class UnsafeAccessor { static Unsafe unsafe; static { try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); unsafe = (Unsafe) theUnsafe.get(null); } catch (NoSuchFieldException | IllegalAccessException e) { throw new Error(e); } } static Unsafe getUnsafe() { return unsafe; } }
不要被名字所迷惑,名字虽然叫 Unsafe,但是这里并不是指什么线程安全的方面的不安全,而是指这个类比较底层,操作的都是内存,线程,不建议我们编程人员直接对它使用
Unsafe CAS 操作
其底层是通过内存偏移量来定位到这个属性,定位到属性以后,再对里面的属性值做一个比较并交换的动作。
@Data class Student { volatile int id; volatile String name; }
Unsafe unsafe = UnsafeAccessor.getUnsafe(); Field id = Student.class.getDeclaredField("id"); Field name = Student.class.getDeclaredField("name"); // 获得成员变量的偏移量 long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id); long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name); Student student = new Student(); // 使用 cas 方法替换成员变量的值 UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true System.out.println(student);
输出
Student(id=20, name=张三)
使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现
class AtomicData { private volatile int data; static final Unsafe unsafe; static final long DATA_OFFSET; static { unsafe = UnsafeAccessor.getUnsafe(); try { // data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性 DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data")); } catch (NoSuchFieldException e) { throw new Error(e); } } public AtomicData(int data) { this.data = data; } public void decrease(int amount) { int oldValue; while(true) { // 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解 oldValue = data; // cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) { return; } } } public int getData() { return data; } }
Account 实现
Account.demo(new Account() { AtomicData atomicData = new AtomicData(10000); @Override public Integer getBalance() { return atomicData.getData(); } @Override public void withdraw(Integer amount) { atomicData.decrease(amount); } });