前言
目前正在出一个Java多线程专题
长期系列教程,从入门到进阶含源码解读
, 篇幅会较多, 喜欢的话,给个关注❤️ ~
Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解Exchanger
,一起来看下吧~
Exchanger
Exchanger
类用于两个线程交换数据,支持泛型,下面我们通过例子感受一下:
public class ExchangerTest { public static void main(String[] args) throws InterruptedException { Exchanger<String> exchanger = new Exchanger<>(); new Thread(() -> { try { System.out.println("1:" + exchanger.exchange("这是来自线程2的数据")); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); Thread.sleep(1000); new Thread(() -> { try { System.out.println("2:" + exchanger.exchange("这是来自线程1的数据")); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } 复制代码
实际输出:
1:这是来自线程1的数据 2:这是来自线程2的数据 复制代码
可以看出两个线程的数据进行了交换
原理分析
先看下构造函数:
public Exchanger() { participant = new Participant(); } static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } } 复制代码
Participant本质上是一个ThreadLocal
,用来保存本地变量Node
, 可以理解是交换的信息
static final class Node { int index; // Arena index int bound; // Last recorded value of Exchanger.bound int collides; // Number of CAS failures at current bound int hash; // Pseudo-random for spins Object item; // This thread's current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null } 复制代码
下面重点看下exchange
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; } 复制代码
从源码看,数据的交换方式有两种,slotExchange
和arenaExchange
private volatile Node[] arena; 复制代码
可以看到arena
是一个Node数组,arena
为空就会进行slotExchange
,可以称为单槽交换
, arenaExchange
可以称为多槽交换
。exchange其实主要做了判断处理,在对应情况下使用不同的交换方式
slotExchange
private final Object slotExchange(Object item, boolean timed, long ns) { // 获取当前线程的交换节点 Node p = participant.get(); Thread t = Thread.currentThread(); if (t.isInterrupted()) // preserve interrupt status so caller can recheck return null; // 常规自旋操作 for (Node q;;) { // 如果 q不为空 说明q已经被占了 if ((q = slot) != null) { if (U.compareAndSwapObject(this, SLOT, q, null)) { // 获取交换值 Object v = q.item; // 设置交换值 q.match = item; Thread w = q.parked; // 唤醒等待线程 if (w != null) U.unpark(w); // 交换成功返回结果 return v; } // CPU核数数多于1个, 且bound为0时创建arena数组,并将bound设置为SEQ大小 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } // 重定向到多槽交换 arenaExchange else if (arena != null) return null; // caller must reroute to arenaExchange else { // 占用slot p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) break; // 失败设置为null 进入下一次自旋 p.item = null; } } // 下面的操作主要是匹配等待接收的线程 int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; // 自旋的次数 int spins = (NCPU > 1) ? SPINS : 1; Object v; // 匹配线程未到 就进入自旋 while ((v = p.match) == null) { if (spins > 0) { // 优化操作 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } // 说明匹配线程已找到 但是还未完全准备好 else if (slot != p) spins = SPINS; // 自选时间过长 还未匹配到进入阻塞,常规操作 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } // 超时 让出给其它线程 else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; } 复制代码
arenaExchange
操作与slotExchange
类似,但是它更复杂一点,需要通过index
来命中,这里就不带大家看了,有兴趣的同学可以自己去看下
结束语
下节给大家讲下CountDownLatch
~