线程通信(CountDownLatch、CyclicBarrier、Semaphore、Exchanger)

简介: 线程通信(CountDownLatch、CyclicBarrier、Semaphore、Exchanger)


一、等待多线程完成的CountDownLatch

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。

CountDownLatch是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件的发生。

闭锁状态包含一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示已经有一个事件已经发生了。而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生。如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待中的线程中断或者超时。 下面,我们以经典的运动员赛跑举例:

我们在这里设置了两个门,一个是开始门,一个是结束门。

  • 开始门: 所有运动员处于准备状态,等待教练的枪声。这时候运动员为n个,枪响只需要一声,等待的这一声枪响到了,开始门也就打开了,所有运动员开始跑。
  • 结束门: 教练等待所有运动员,当最后一个运动员也冲破底线,教练才能宣布所有人到达终点,这时候是教练等待n个运动员,直到n为0。

下面我们根据具体的代码来演示CountDownLatch的用法:

package concurrency;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Runner implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private static Random rand= new Random(47);
    private final CountDownLatch start_latch;
    private final CountDownLatch end_latch;
    public Runner(CountDownLatch start_latch, CountDownLatch end_latch) {
        this.start_latch = start_latch;
        this.end_latch = end_latch;
    }
    @Override
    public void run() {
        try {
            start_latch.await();  //所有运动员都在准备状态中,等待教练释放开始门
            try {
                doWork();  //每个人跑步的时间不同
                end_latch.countDow n();  //跑完后,告诉教练跑完了
            } catch (InterruptedException e) {
                System.out.println("Interrupted Runner" + id);
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted Runner" + id);
        }
    }
    public void doWork() throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
        System.out.println(this + "completed");
    }
    @Override
    public String toString() {
        return String.format("%1$-3d", id);
    }
}
class Coach implements Runnable {
    private final CountDownLatch start_latch;
    private final CountDownLatch end_latch;
    public Coach(CountDownLatch start_latch, CountDownLatch end_latch) {
        this.start_latch = start_latch;
        this.end_latch = end_latch;
    }
    @Override
    public void run() {
        start_latch.countDown();  //教练释放了开始门,运动员们都开始跑
        System.out.println("Coach say: Ready!!!!  Go!!!!");
        try {
            end_latch.await();  //当结束门的count down减为0时,教练宣布所有人都跑完了。
            System.out.println("All runner passed the end point");
        } catch (InterruptedException ex) {
            System.out.println(this + " interrupted");
        }
    }
}
public class TestRunner {
    private static final int SIZE = 10;
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        CountDownLatch startGate = new CountDownLatch(1);
        CountDownLatch endGate = new CountDownLatch(SIZE);
        for (int i = 0; i < SIZE; i++) {
            exec.execute(new Runner(startGate, endGate));
        }
        exec.execute(new Coach(startGate, endGate));
        exec.shutdown();
    }
}
复制代码

CountDownLatch强调的是一个线程(或多个)需要等待另外的n个线程干完某件事情之后才能继续执行。 上述例子,Coach线程是裁判,10个Runner是跑步的。运动员先准备,裁判喊跑,运动员才开始跑(这是第一次同步,对应startGate)。10个人谁跑到终点了,countdown一下,直到10个人全部到达,裁判喊停(这是第二次同步,对应endGate)。 最后运行结果如下:

Coach say: Ready!!!! Go!!!! 7 completed 9 completed 5 completed 8 completed 2 completed 0 completed 6 completed 4 completed 1 completed 3 completed All runner passed the end point

二、同步屏障CyclicBarrier

CyclicBarrier适用于这样的情况:你希望创建一组任务,它们并行地执行工作,然后在下一个步骤之前等待,直到所有任务都完成。栅栏和闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。

闭锁用于等待事件,而栅栏是线程之间彼此等待,等到都到的时候再决定做下一件事。可以参考Java并发工具类(闭锁CountDownLatch)

拿运动员的事情举例,运动员们跑到终点,互相等待所有人都到达终点后,再一起去做喝酒这件事。(运动员也许不能喝酒的,也许大家再跑一轮。)

下面用一个赛马程序来举例:

package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
class Horse implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;
    public Horse(CyclicBarrier b) {barrier = b;}
    public synchronized int getStrides() {return strides;}
    public void run() {
        try {
            while (!Thread.interrupted()) {  //线程内不断循环
                synchronized (this) {
                    strides += rand.nextInt(3);   //每次马可以走0,1或者2步
                }
                barrier.await();  //走完后,就等所有其它马也走完,才能开始下一回合
            }
        } catch (InterruptedException e) {
        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public String toString() {
        return "Horse " + id + " ";
    }
    public String tracks() {
        StringBuilder s =new StringBuilder();
        for(int i = 0; i < getStrides();i++)
            s.append("*");   //这里打印每个马走的轨迹
        s.append(id);
        return s.toString();
    }
}
public class HorseRace {
    static final int FINISH_LINE = 75;
    private List<Horse> horses = new ArrayList<Horse>();
    private ExecutorService exec = Executors.newCachedThreadPool();
    private CyclicBarrier barrier;
    public HorseRace(int nHorses, final int pause) {
        barrier = new CyclicBarrier(nHorses, new Runnable() {
            @Override
            public void run() {
                StringBuilder s = new StringBuilder();
                for (int i = 0; i < FINISH_LINE; i++) {
                    s.append("="); //打印赛道
                }
                System.out.println(s);
                for (Horse horse : horses) {
                    System.out.println(horse.tracks());  //打印每匹马的轨迹
                }
                for (Horse horse : horses) {
                  if (horse.getStrides() >= FINISH_LINE) {
                      System.out.println(horse + "won!");   //每次检查,如果哪匹马到终点了,终止所有线程
                      exec.shutdownNow();
                      return;
                  }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(pause); //每走完一轮,暂停一小会输出
                } catch (InterruptedException e) {
                    System.out.println("barrier-action sleep interrupted");
                }
            }
        });
        for (int i = 0; i < nHorses; i++) {
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);  //所有马的线程开始执行
        }
    }
    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        new HorseRace(nHorses, pause);
    }
}
复制代码

我们假设赛道长为75,马每次能走0,1或者2步,每次走完一轮后,互相等待。一旦所有马越过栅栏,它就会自动为下一回合的比赛做好准备。读者可以运行我的程序,在控制台上可以展示出一定的动画效果。

上面的例子中,我们向CyclicBarrier提供一个“栅栏动作”,它是一个Runnable,当计数值到达0时自动执行,这是CyclicBarrier和CountDownLatch之间的另一个区别。

public CyclicBarrier(int parties, Runnable barrierAction)
复制代码

除此之外,CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。

三、控制并发线程数Semaphore

定义

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore管理着一组许可(permit),许可的初始数量可以通过构造函数设定,操作时首先要获取到许可,才能进行操作,操作完成后需要释放许可。如果没有获取许可,则阻塞到有许可被释放。如果初始化了一个许可为1Semaphore,那么就相当于一个不可重入的互斥锁(Mutex)。

实例场景

理论的听起来有些绕口,其实假设生活中一个常见的场景:每天早上,大家都热衷于带薪上厕所,但是公司厕所一共只有10个坑位。。那么只能同时10个人用着,后面来的人都得等着(阻塞),如果走了2个人,那么又可以进去2个人。这里面就是Semaphore的应用场景,争夺有限的资源。

代码实战

package concurrency;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
class Employee implements Runnable {
    private String id;
    private Semaphore semaphore;
    private static Random rand= new Random(47);
    public Employee(String id, Semaphore semaphore) {
        this.id = id;
        this.semaphore = semaphore;
    }
    public void run() {
            try {
                semaphore.acquire();
                System.out.println(this.id + "is using the toilet");
                TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
                semaphore.release();
                System.out.println(this.id + "is leaving");
            } catch (InterruptedException e) {
            }
    }
}
public class ToiletRace {
    private static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);
    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Employee(String.valueOf(i), s));
        }
        threadPool.shutdown();
    }
}
复制代码

这里我定义了30个人要上厕所,但是只有10个坑位,每个人消耗随机的时间,直接运行上面这段代码,可以看到一开始进去了10个人,后来就是陆陆续续的有人进,有人出了。但是正在使用的一定不会超过10个的。

Semaphore是很好用的Java并发工具,除了上面这个例子,我们在工作中经常用它管理数据库连接或者保护其它受限资源的并发使用。当然Semaphore还有其它的一些方法,可以查看剩余的许可数,可以查看正在使用许可的线程数,具体使用时可以查看官方文档。

应用场景

Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假 如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这 时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连 接。这个时候,就可以使用Semaphore来做流量控制

public class SemaphoreTest {
    private static final int       THREAD_COUNT = 30;
    private static ExecutorService threadPool   = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore       s            = new Semaphore(10);
    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

在代码中,虽然有30个线程在执行,但是只允许10个并发执行。Semaphore的构造方法 Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允 许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用 Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以 用tryAcquire()方法尝试获取许可证。

四、CountDownLatch与CyclicBarrier比较

CountDownLatch 与 CyclicBarrier 都是用于控制并发的工具类,都可以理解成维护的就是一个计数器,但是这两者还是各有不同侧重点的:

  1. CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;而 CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等,等大家都完成,再携手共进。
  2. 调用 CountDownLatch 的 countDown 方法后,当前线程并不会阻塞,会继续往下执行;而调用 CyclicBarrier 的 await 方法,会阻塞当前线程,直到 CyclicBarrier 指定的线程全部都到达了指定点的时候,才能继续往下执行;
  3. CountDownLatch 方法比较少,操作比较简单,而 CyclicBarrier 提供的方法更多,比如能够通过 getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,并且 CyclicBarrier 的构造方法可以传入 barrierAction,指定当所有线程都到达时执行的业务功能;
  4. CountDownLatch 是不能复用的,而 CyclicBarrier 是可以复用的

五、线程间交换数据的Exchanger

定义

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交 换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方。

实际场景

Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需 要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行 录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否 录入一致

代码

public class ExchangerTest {
    private static final Exchanger<String> exgr       = new Exchanger<String>();
    private static ExecutorService         threadPool = Executors.newFixedThreadPool(2);
    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "银行流水A";// A录入银行流水数据
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "银行流水数据B";// B录入银行流水数据
                    String A = exgr.exchange("B");
                    System.out.println("A和B数据是否一致" + A.equals(B) + "A录入的是" + A + "B录入的是" + B);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.shutdown();
    }
}

如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发 生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。














目录
相关文章
|
17天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
1月前
|
Python
如何在Python中使用Semaphore来实现线程同步?
如何在Python中使用Semaphore来实现线程同步?
27 7
|
1月前
|
Python
如何在Python中实现线程之间的同步和通信?
【2月更文挑战第17天】【2月更文挑战第51篇】如何在Python中实现线程之间的同步和通信?
|
3月前
|
Java 测试技术
CountDownLatch、CyclicBarrier让线程听我号令
CountDownLatch、CyclicBarrier让线程听我号令
42 0
|
1月前
|
消息中间件 并行计算 网络协议
探秘高效Linux C/C++项目架构:让进程、线程和通信方式助力你的代码飞跃
探秘高效Linux C/C++项目架构:让进程、线程和通信方式助力你的代码飞跃
34 0
|
1月前
|
安全
多线程通信
多线程通信
|
2月前
|
Go 调度 开发者
Go语言并发基础:轻量级线程与通道通信
【2月更文挑战第6天】本文介绍了Go语言在并发编程方面的基础知识和核心概念。我们将深入探讨goroutine(轻量级线程)的创建与调度,以及如何利用channel进行goroutine间的通信与同步。此外,还将简要提及select语句的使用,并解释其在处理多个channel操作时的优势。
|
2月前
|
并行计算 Java API
深入理解Java多线程编程:创建、状态管理、同步与通信
深入理解Java多线程编程:创建、状态管理、同步与通信
|
3月前
|
前端开发 Java BI
自定义线程池+countdownlatch
自定义线程池+countdownlatch
21 0
|
28天前
|
存储 缓存 NoSQL
Redis单线程已经很快了6.0引入多线程
Redis单线程已经很快了6.0引入多线程
31 3