并发工具类Phaser、Exchanger使用

简介: Phaser 是一个更加复杂和强大的同步辅助类,对 CountDownLatch 与 CyclicBarrier 的全面升级,是一个 java 并发 api 的一个重量级类。Exchanger,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。

0x01:Phaser


Phaser 是一个更加复杂和强大的同步辅助类,对 CountDownLatch 与 CyclicBarrier 的全面升级,是一个 java 并发 api 的一个重量级类。


常用api:


  • arriveAndAwaitAdvance() 每凑齐指定人数就报团执行一次,同一个线程可以执行多次arriveAndAwaitAdvance(),表示不同阶段的报团


  • arriveAndDeregister() 退出当前团,且当前团规则人数减1(报完当前团后,不再报下阶段的团)


  • getArrivedParties() 当前团凑足了多少人


  • getRegisteredParties() 获取注册的团规定人数


arrive() 使getArrivedParties()数量加1,即用一个虚拟线程占据一个线程的位置, 此虚拟线程不阻塞


  • register() 动态增加一个团的规定人数


  • bulkRegister(int parties) 动态的增加规定报团人数,是register()的多次调用版


  • forceTermination() 取消报团,线程执行各自代码,不再有Phaser阻塞等待情况


  • getUnarrivedParties() 当前还差多少线程开团,是getArrivedParties()方法的补集


  • isTerminated() 判断Phaser对象是否已为销毁状态


使用案例


作CountDownLatch使用


import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.ThreadLocalRandom.current;
import static java.lang.Thread.currentThread;
/**
 * 将 Phaser当作 CountDownLatch来用
 * 
 **/
public class PhaserForCountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 定义一个 Phaser , 并未指定“分片数量 parties”,此时在 Phaser 内部分片的数量 parties 默认为 0 ,
// 后面可以通过 register() 方法来动态增加
final Phaser phaser = new Phaser();
// 定义 5 个线程
for (int i = 0; i < 5; i++) {
new Thread(() -> {
// 调用 Phaser 的 register() 方法使得 phaser 内部的 parties 加一
                phaser.register();
try {
// 采用随机休眠的方式模拟线程的运行时间开销
                    TimeUnit.SECONDS.sleep(current().nextInt(20));
// 线程任务结束,执行 arrive()
/**
                     * 补充:arrive() 方法类似于 CountDownLatch 的 countdown()
                     * 方法,代表着“当前线程已经到达屏障”,
                     * 但是它不需要等待其他的线程也到达屏障。因此该方法“不是阻塞的方法”,执行之后会立即返回,
                     * 同时该方法会返回一个整数类型的数字,代表着已经到达的 Phase(阶段)编号
                     */
                    phaser.arrive();
                    System.out.println(new Date() + ":" + currentThread() + " completed the work.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "T-" + i).start();
        }
/**
         * 这里让线程休眠的目的: 为了保证在主线程 register() 之前,所有的子线程都能顺利 register , 否则就会出现
         * phaser 只注册一个 parties , 并且很快 arrive 的情况。
         */
        TimeUnit.SECONDS.sleep(current().nextInt(10));
// 主线程也调用注册方法
        phaser.register();
// 主线程也 arrive() , 但是它要等待下一个阶段,等待下一个阶段的前提“所有的线程都 arrive ,
// 也就是 phaser 内部当前 phase 的 unarrived 数量为 0 ”
        phaser.arriveAndAwaitAdvance();
        System.out.println(new Date() + ": all of sub task completed work.");
    }
}


作CyclicBarrier使用


/**
 * 将 Phaser 当作 CyclicBarrier 来使用
 *
 *  phaser.arriveAndAwaitAdvance(): 该方法会等待当前 Phaser 中所有的 part(子线程)都完成了
 *                                  任务才能使线程退出阻塞状态
 **/
public class PhaserForCyclicBarrier
{
public static void main(String[] args) throws InterruptedException
   {
// 定义一个分片 parties 为0 的 Phaser
      final Phaser phaser = new Phaser();
for (int i = 0; i < 5; i++)
      {
new Thread(()->
         {
// 子线程调用注册方法
            phaser.register();
try
            {
               TimeUnit.SECONDS.sleep(current().nextInt(20));
// 调用 arriveAndAwaitAdvance() 等待所有线程 arrive 然后继续前行
               phaser.arriveAndAwaitAdvance();
               System.out.println(new Date() + ":" + currentThread() + " completed the work.");
            }
catch (InterruptedException e)
            {
               e.printStackTrace();
            }
         } , "T-"+i).start();
      }
// 休眠以确保其他子线程顺利调用 register()
      TimeUnit.SECONDS.sleep(10);
// 主线程调用 register()
      phaser.register();
      phaser.arriveAndAwaitAdvance();
      System.out.println(new Date() + ": all of sub task completed work.");
   }
}


0x02:Exchanger


Exchanger是什么?


它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这个两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的中断时成对的线程使用exchange()方法,当有一对线程到达了同步点,就会进行交换数据,因此该工具类的线程对象是成对的。


线程可以在成对内配对和交换元素的同步点。每个线程在输入exchange方法时提供一些对象,与合作者线程匹配,并在返回时接收其合作伙伴的对象。交换器可以被视为一个的双向形式的SynchroniuzedQueue。交换器在诸如遗传算法和管道设计的应用中可能是有用的。


一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定事物后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。图片


常用方法:


  • Exchanger 泛型类型,其中V表示可交换的数据类型


  • V exchanger(V v):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送该线程,并接收该线程的对象。


  • V exchanger(V v, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或超出类指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。


import java.util.concurrent.Exchanger;
public class ExechangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread() {
@Override
public void run() {
String data1 = "data1";
try {
System.out.println(Thread.currentThread().getName() + "交换前的数据:" + data1);
String data2 = exchanger.exchange(data1);
System.out.println(Thread.currentThread().getName() + "交换后的数据:" + data2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
new Thread() {
@Override
public void run() {
String data2 = "data2";
try {
System.out.println(Thread.currentThread().getName() + "交换前的数据:" + data2);
String data1 = exchanger.exchange(data2);
System.out.println(Thread.currentThread().getName() + "交换后的数据:" + data1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}
相关文章
|
Kubernetes Linux 网络安全
kubernetes(k8s)篇(一)(2022年最新)使用KubeKey安装k8s集群及k8sUI界面KubeSphere
kubernetes(k8s)篇(一)(2022年最新)使用KubeKey安装k8s集群及k8sUI界面KubeSphere
3714 0
kubernetes(k8s)篇(一)(2022年最新)使用KubeKey安装k8s集群及k8sUI界面KubeSphere
|
Kubernetes Java 容器
部署 Spring Boot 应用到 K8S 教程
部署 Spring Boot 应用到 K8S 教程
655 0
|
8月前
|
SQL 关系型数据库 PostgreSQL
CTE vs 子查询:深入拆解PostgreSQL复杂SQL的隐藏性能差异
本文深入探讨了PostgreSQL中CTE(公共表表达式)与子查询的选择对SQL性能的影响。通过分析两者底层机制,揭示CTE的物化特性及子查询的优化融合优势,并结合多场景案例对比执行效率。最终给出决策指南,帮助开发者根据数据量、引用次数和复杂度选择最优方案,同时提供高级优化技巧和版本演进建议,助力SQL性能调优。
890 1
|
8月前
|
NoSQL 网络安全 Redis
RedisDesktopManager免费软件下载,Redis桌面管理器(又名RDM),redis管理器
Redis桌面管理器(RDM)是一款支持Windows、Linux和MacOS的开源Redis数据库管理工具,提供易于使用的GUI界面,支持SSL/TLS加密、SSH隧道及云服务集成。本文档介绍了RDM的安装方法,并详细列出了Redis常用命令分类教程,包括键操作、字符串、哈希、列表、集合、有序集合、发布/订阅、事务及服务器管理等核心功能,帮助用户快速上手Redis开发与管理。
3197 1
|
监控 供应链 定位技术
ERP系统中的销售订单处理与订单跟踪
【7月更文挑战第25天】 ERP系统中的销售订单处理与订单跟踪
1110 0
|
安全 Linux 网络安全
【工具使用】几款优秀的SSH连接客户端软件工具推荐FinalShell、Xshell、MobaXterm、OpenSSH、PUTTY、Terminus、mRemoteNG、Terminals等
【工具使用】几款优秀的SSH连接客户端软件工具推荐FinalShell、Xshell、MobaXterm、OpenSSH、PUTTY、Terminus、mRemoteNG、Terminals等
134957 0
|
关系型数据库 MySQL 应用服务中间件
Nginx__高级进阶篇之LNMP动态网站环境部署
Nginx__高级进阶篇之LNMP动态网站环境部署
510 0
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
1666 0
|
SQL 关系型数据库 MySQL
MySQL大数据量分页查询方法及其优化
MySQL大数据量分页查询方法及其优化
754 4
|
运维 Linux 网络安全
跨平台SSH文件传输:Linux与Windows环境下的实践指南
本文介绍了在Linux和Windows之间使用SCP、SecureCRT及PuTTY工具集进行文件传输的方法。在Linux中,利用SCP命令进行文件下载、上传及目录传输。在Windows环境下,PSFTP和PSCP提供类似功能,而SecureCRT作为SSH客户端,支持设置上传下载目录并进行文件传输。掌握这些工具的使用可提升跨平台运维效率。