一、Exchanger概述
Exchanger是一个用于线程间数据交换的同步点。它允许两个线程在一个点上交换彼此的数据。每个线程在调用exchange方法时提供一个对象,然后等待另一个线程到达该点进行交换。一旦两个线程都到达交换点,它们就可以安全地交换数据并继续执行。
二、工作原理
Exchanger的工作原理基于等待-通知机制。当第一个线程调用exchange方法时,它会将自己提供的数据存储起来,并进入等待状态。此时,该线程将阻塞,直到另一个线程也调用exchange方法。当第二个线程到达并调用exchange方法时,它会将自己的数据与第一个线程的数据进行交换,并唤醒第一个线程。这样,两个线程就可以各自获得对方的数据并继续执行。
值得注意的是,Exchanger
只能用于两个线程之间的数据交换。如果有多个线程需要交换数据,那么需要使用其他同步工具或设计更复杂的交换逻辑。
三、Exchanger源码分析
首先,Exchanger 的内部实现通常依赖于内部的等待队列,这个队列用于存储等待交换数据的线程。当第一个线程到达交换点时,它会被放入等待队列中并阻塞;当第二个线程到达时,它会从队列中取出第一个线程,并进行数据交换,然后两个线程都可以继续执行。
下面是 Exchanger
类的一个简版的源码,用于说明其实现原理:
public class Exchanger<V> { // 用于等待交换的线程队列,这里简化为一个节点 private volatile Node<V> slot; // ... 其他字段和方法 ... public Exchanger() { slot = null; } public V exchange(V x) throws InterruptedException { // 创建一个新的节点,包含当前线程和要交换的数据 Node<V> newNode = new Node<V>(Thread.currentThread(), x); // 自旋,直到交换成功 for (;;) { Node<V> oldNode = slot; if (oldNode == null) { // 如果slot为空,则尝试将新节点设置为slot if (compareAndSetSlot(null, newNode)) { // 等待另一个线程来交换数据 waitForMatch(newNode); return newNode.item; // 返回交换后的数据 } } else if (oldNode.isWaiting()) { // 如果slot中的节点正在等待交换,则尝试进行交换 if (compareAndSetSlot(oldNode, null)) { // 设置新节点的匹配线程和数据 newNode.match = oldNode; // 唤醒等待的线程 oldNode.thread.interrupt(); return oldNode.item; // 返回交换后的数据 } } else { // 如果slot中的节点已经匹配完成,则需要等待直到它变为null或正在等待状态 // 这里可以加入自旋等待的逻辑,但为了简化,我们省略了这部分代码 } // 如果发生竞争失败或其他情况,这里会有重试的逻辑,同样为了简化而省略 } } // 内部节点类,用于存储线程和要交换的数据 private static final class Node<V> { final Thread thread; // 存储线程的引用 V item; // 存储要交换的数据 Node<V> match; // 存储匹配的节点,即交换数据的对方节点 // ... 构造方法和其他方法 ... boolean isWaiting() { return match == null; } } // 使用CAS操作设置slot的值,这里简化为一个方法调用,实际实现会使用Unsafe类进行操作 private boolean compareAndSetSlot(Node<V> expect, Node<V> update) { // ... CAS操作的实现 ... return true; // 这里为了简化而直接返回true,实际实现会有比较和设置的逻辑 } // 等待匹配线程的方法,这里简化为一个方法调用,实际实现会有等待和唤醒的逻辑处理 private void waitForMatch(Node<V> newNode) throws InterruptedException { // ... 等待和唤醒的逻辑处理 ... } }
在实际的 Exchanger 实现中,还会使用到内部类 Node 的 wait 和 notify 方法来实现线程的等待和唤醒。当第一个线程到达时,它会将自己的节点放入 slot 并调用 wait 方法等待;当第二个线程到达时,它会从 slot 中取出第一个线程的节点,然后调用其 notify 方法唤醒它,并完成数据交换。这个过程是通过原子操作(如 CAS)来保证线程安全的。此外,实际的 Exchanger 还支持中断响应和超时等待等高级功能。
四、Exchanger的应用
4.1 使用场景
Exchanger在多种并发编程场景中都非常有用。例如,在遗传算法中,可以使用Exchanger来实现个体之间的信息交换;在管道设计中,可以使用Exchanger来传递数据块或任务;在游戏中,可以使用Exchanger来实现玩家之间的物品交易等。
此外,Exchanger还可以用于实现一些特定的同步模式。例如,可以使用Exchanger来实现两个线程之间的交替执行,或者实现一种类似于"生产者-消费者"模式的数据交换逻辑。
4.2 Exchanger的使用
用Exchanger实现两个线程将交换彼此持有的字符串数据:
import java.util.concurrent.Exchanger; public class ExchangerExample { public static void main(String[] args) { // 创建一个Exchanger对象 Exchanger<String> exchanger = new Exchanger<>(); // 创建一个线程,它将使用"Hello"与另一个线程交换数据 Thread producer = new Thread(() -> { try { String producedData = "Hello"; String consumerData = exchanger.exchange(producedData); System.out.println("生产者线程交换后得到的数据: " + consumerData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, "生产者线程"); // 创建一个线程,它将使用"World"与另一个线程交换数据 Thread consumer = new Thread(() -> { try { String consumerData = "World"; String producedData = exchanger.exchange(consumerData); System.out.println("消费者线程交换后得到的数据: " + producedData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, "消费者线程"); // 启动线程 producer.start(); consumer.start(); } }
代码中我们创建了一个 Exchanger 对象,并且定义了两个线程:一个生产者线程和一个消费者线程。生产者线程持有一个字符串 "Hello",而消费者线程持有一个字符串 "World"。两个线程都通过调用 exchanger.exchange() 方法来等待交换数据。
当两个线程都到达交换点时(即都调用了 exchange() 方法),Exchanger 会确保它们安全地交换数据。交换完成后,每个线程都会得到对方原本持有的数据,并打印出来。
请注意,由于线程调度的不确定性,我们不能保证哪个线程会先执行完成。但是,不论顺序如何,Exchanger 都会确保在两个线程都准备好交换数据之前,它们都会阻塞在 exchange() 方法上。一旦两个线程都到达交换点,交换就会立即发生,并且两个线程都可以继续执行。
运行上述代码,你应该会看到类似下面的输出:
生产者线程交换后得到的数据: World 消费者线程交换后得到的数据: Hello
五、与其他同步工具的比较
与其他同步工具相比,Exchanger具有其独特的特点和优势。例如,与CyclicBarrier相比,Exchanger更注重于线程间的数据交换而非简单的线程同步;与Semaphore相比,Exchanger提供了一种更直接的数据交换方式而非资源访问控制;与SynchronousQueue相比,Exchanger可以看作是其双向形式的简化版。
总结
Exchanger是Java并发库中的一个强大且有用的工具类。它提供了一种简单而高效的方式来实现线程间的数据交换和同步。通过深入理解其工作原理和应用场景,我们可以更好地利用它来编写高效且可靠的并发程序。