5.7 原子累加器
5.7.1 累加器性能比较
/** * @author lxy * @version 1.0 * @Description * @date 2022/7/22 17:27 */ @Slf4j(topic = "c.Test33") public class Test33 { public static void main(String[] args) { demo( ()->new AtomicLong(0), adder->adder.getAndIncrement() ); demo( ()->new LongAdder(), adder->adder.increment() ); } /* () -> 结果 提供累加器对象 (参数) -> 执行累加操作 */ private static <T> void demo(Supplier <T> adderSupplier, Consumer <T> action) { T adder = adderSupplier.get(); List <Thread> ts = new ArrayList <>(); // 4 个线程,每人累加 50 万 for (int i = 0; i < 4; i++) { ts.add(new Thread(() -> { for (int j = 0; j < 500000; j++) { action.accept(adder); } })); } long start = System.nanoTime(); ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start) / 1000_000); } }
输出为:
2000000 cost:32 2000000 cost:14
**分析:**性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
* 源码之 LongAdder
LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧
LongAdder 类有几个关键域
// 累加单元数组, 懒惰初始化 transient volatile Cell[] cells; // 基础值, 如果没有竞争, 则用 cas 累加这个域 transient volatile long base; // 在 cells 创建或扩容时, 置为 1, 表示加锁 transient volatile int cellsBusy;
注意:volatile保证共享变量的可见性,transient保证序列化下这些属性信息不会被序列化
5.7.2 CAS锁
//使用CAS实现Lock //不推荐使用:因为会造成CPU的空转... @Slf4j(topic = "c.Test42") public class LockCas { // 0 没加锁 // 1 加锁 private AtomicInteger state = new AtomicInteger(0); public void lock() { while (true) { if (state.compareAndSet(0, 1)) { break; } } } public void unlock() { log.debug("unlock..."); state.set(0); } public static void main(String[] args) { LockCas lock = new LockCas(); new Thread(() -> { log.debug("begin..."); lock.lock(); try { log.debug("lock..."); sleep(1); } finally { lock.unlock(); } }).start(); new Thread(() -> { log.debug("begin..."); lock.lock(); try { log.debug("lock..."); } finally { lock.unlock(); } }).start(); } }
输出为:
16:35:41.029 c.Test42 [Thread-1] - begin... 16:35:41.028 c.Test42 [Thread-0] - begin... 16:35:41.032 c.Test42 [Thread-1] - lock... 16:35:41.032 c.Test42 [Thread-1] - unlock... 16:35:41.032 c.Test42 [Thread-0] - lock... 16:35:42.040 c.Test42 [Thread-0] - unlock...
* 原理之伪共享
其中 Cell 即为累加单元
// 防止缓存行伪共享 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值 final boolean cas(long prev, long next) { return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next); } // 省略不重要代码 }
关于缓存行伪共享,得从缓存说起~ ~~
1. 缓存与内存的速度比较
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
比如核心Core1和Core2都从内存中读入数据a = 2 到缓存中。之后Core1把数据改成了2000,此时为了保持数据的一致性,Core2中的数据也需要修改!即把Core2中的缓存行失效,重新去内存中取最新的数据
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象(1行64个字节)。这样问题来了:
Core-0 要修改 Cell[0]
Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要修改为Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效 。
我们把一个缓存行加入了多个cell对象称为伪共享,注解的作用就是防止伪共享
累加主要调用下面的方法
2. add()方法
public void add(long x) { // as 为累加单元数组 // b 为基础值 // x 为累加值 Cell[] as; long b, v; int m; Cell a; // 进入 if 的两个条件 // 1. as 有值, 表示已经发生过竞争, 进入 if // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if if ((as = cells) != null || !casBase(b = base, b + x)) { // uncontended 表示 cell 没有竞争 boolean uncontended = true; if ( // as 还没有创建 as == null || (m = as.length - 1) < 0 || // 当前线程对应的 cell 还没有 (a = as[getProbe() & m]) == null || // cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell ) !(uncontended = a.cas(v = a.value, v + x)) ) { // 进入 cell 数组创建、cell 创建的流程 longAccumulate(x, null, uncontended); } } }
add流程图
3. longAccumulate()
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell if ((h = getProbe()) == 0) { // 初始化 probe ThreadLocalRandom.current(); // h 对应新的 probe 值, 用来对应 cell h = getProbe(); wasUncontended = true; } // collide 为 true 表示需要扩容 boolean collide = false; for (; ; ) { Cell[] as; Cell a; int n; long v; // 已经有了 cells if ((as = cells) != null && (n = as.length) > 0) { // cells存在,但是里面还没有 cell if ((a = as[(n - 1) & h]) == null) { //如果cellsBusy没有加锁,为 cellsBusy 加锁, 创建 cell if (cellsBusy == 0) { // Try to attach new Cell // 创建cell,且cell 的初始累加值为 x Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) {//进行加锁,并修改cellsBusy=1 boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null &&//判断槽位是否为空 (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {//不为空则进行下一次循环尝试 rs[j] = r;//为空,则将元素放到槽位上去 created = true; } } finally { cellsBusy = 0; } if (created)//如果上面cell创建成功,则退出 break; continue; // Slot is now non-empty } // 成功则 break, 否则继续 continue 循环 } // 有竞争, 改变线程对应的 cell 来重试 cas else if (!wasUncontended) wasUncontended = true; // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果 cells 长度已经超过了最大长度(CPU的核数), 或者已经扩容, 改变线程对应的 cell 来重试 cas else if (n >= NCPU || cells != as) collide = false; // 确保 collide 为 false 进入此分支, 就不会进入最后的 else if 进行扩容了 else if (!collide) collide = true; //修改标志位进行加锁 else if (cellsBusy == 0 && casCellsBusy()) { // 加锁成功, 扩容 try { if (cells == as) { // 扩容cells Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; } // 改变线程对应的 cell,再重新循环 h = advanceProbe(h); } // 还没有 cells, 尝试给 cellsBusy 加锁 //cellsBusy == 0表示还没有其他线程为其加锁 //cells == as表示还有没其他线程创建该cells数组(未新建) //casCellsBusy() 采用CAS方法将cellsBusy 从0 改为 1。这样做是为了防止其他线程也对其进行加锁.. else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; // 加锁成功, 创建 cell数组, 最开始长度为 2。 并创建一个累加单元 cell try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; //注意:虽然我们创建的Cell数组大小是2,但是我们只向其加入了一个元素, //不到万不得已不创建另外一个cell,懒惰化的思想~~~ rs[h & 1] = new Cell(x); cells = rs;//赋值给cells init = true; } } finally {//标志位设置为0,也就是解锁 cellsBusy = 0; } // 成功则 break,退出循环; if (init) break; } // 上两种情况失败(如加锁失败), 尝试给 base 累加,如果失败则重新循环 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
longAccumulate 流程图
每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
4. 获取最终结果通过 sum 方法
//对cells数组的元素进行累加 public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
5.8 Unsafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
unsafe 与线程安全性无关。因为比较接近底层,用来操作内存、线程,不建议我们直接使用
5.8.1 Unsafe CAS 操作
/** * @author lxy * @version 1.0 * @Description * @date 2022/7/22 17:47 */ @Slf4j(topic = "c.TestUnsafe") public class TestUnsafe { public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException { //使用反射获取到Unsafe对象 Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); Unsafe unsafe = (Unsafe) theUnsafe.get(null); System.out.println(unsafe); //1. 获取域的偏移地址 long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id")); long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name")); Teacher teacher = new Teacher(); //2. 执行cas操作 unsafe.compareAndSwapInt(teacher,idOffset,0,1); unsafe.compareAndSwapObject(teacher,nameOffset,null,"张三"); //3. 验证 System.out.println(teacher); } } @Data class Teacher{ volatile int id; volatile String name; }
输出为:
sun.misc.Unsafe@490d6c15 Teacher(id=1, name=张三)
6.8.2 unsafe对象模拟实现原子整数
使用自定义的 MyAtomicInteger 实现之前线程安全的原子整数类 AtomicInteger
/** * @author lxy * @version 1.0 * @Description * @date 2022/7/22 18:13 */ public class Test35 { public static void main(String[] args) { Account.demo(new MyAtomicInteger(10000)); } } class MyAtomicInteger implements Account { private volatile int value; private static final long valueOffset; private static final Unsafe UNSAFE; public MyAtomicInteger(int value) { this.value = value; } static { UNSAFE = UnsafeAccessor.getUnsafe(); try { valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value")); } catch (NoSuchFieldException e) { e.printStackTrace(); throw new RuntimeException(); } } private int getValue(){ return value; } public void decrement(int amount){ while (true){ int prev = this.value; int next = prev-amount; if(UNSAFE.compareAndSwapInt(this,valueOffset,prev,next)){ break; } } } @Override public Integer getBalance() { return getValue(); } @Override public void withdraw(Integer amount) { decrement(amount); } } interface Account { // 获取余额 Integer getBalance(); // 取款 void withdraw(Integer amount); /** * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作 * 如果初始余额为 10000 那么正确的结果应当是 0 */ static void demo(Account account) { List <Thread> ts = new ArrayList <>(); for (int i = 0; i < 1000; i++) { ts.add(new Thread(() -> { account.withdraw(10); })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end-start)/1000_000 + " ms"); } }
本章小结
- CAS 与 volatile
- API
- 原子整数
- 原子引用
- 原子数组
- 字段更新器
- 原子累加器
- Unsafe
- 原理方面*
- LongAdder 源码
- 伪共享