同步工具(未完待更新)

简介: 在JDK1.7中,同步工具主要包括CountDownLatch(一次性栅栏)、Semaphore(信号量)、CyclicBarrier(循环同步栅栏)、Exchanger(线程间交换器)和Phaser。下面的篇幅中,将依次讲述每种同步工具的概念、用法和原理。

CountDownLatch一次性栅栏

1 概念与用法

CountDownLatch是一个用来同步多个线程的并发工具,n个线程启动后,分别调用CountDownLatch的await方法来等待其m个条件满足(m在初始化时指定);

每当有条件满足时,当前线程调用CountDownLatch的countDown方法,使得其m值减1;

直至m值为0时,所有等待的线程被唤醒,继续执行。

注意,CountDownLatch是一次性的,当条件满足后,它不能再回到初始状态,也不能阻止后续线程了。

若要循环的阻塞多个线程,则考虑使用CyclicBarrier。

例如5匹马参加赛马比赛,需等待3个裁判到位后才能启动,代码如下:

public class CountDownLatchExam {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            service.submit(new Horse("horse" + i, latch));
        }
        for (int i = 0; i < 3; i++) {
            service.submit(new Judge("judge" + i, latch));
        }
        service.shutdown();
    }
    private static class Horse implements Runnable {
        private final String name;
        private final CountDownLatch latch;
        Horse(String name, CountDownLatch latch) {
            this.name = name;
            this.latch = latch;
        }
        @Override
        public void run() {
            try {
                System.out.println(name + " is ready,wait for all judges.");
                latch.await();
                System.out.println(name + " is running.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    private static class Judge implements Runnable {
        private final String name;
        private final CountDownLatch latch;
        private static Random random = new Random(System.currentTimeMillis());
        Judge(String name, CountDownLatch latch) {
            this.name = name;
            this.latch = latch;
        }
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(name + " is ready.");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
horse0 is ready,wait for all judges.
horse1 is ready,wait for all judges.
horse2 is ready,wait for all judges.
horse3 is ready,wait for all judges.
horse4 is ready,wait for all judges.
judge2 is ready.
judge1 is ready.
judge0 is ready.
horse0 is running.
horse1 is running.
horse2 is running.
horse3 is running.
horse4 is running.

CountDownLatch的原理在上一篇的4.7节“一次唤醒所有阻塞线程的共享锁”中已经详细阐述了。简要复述一下,CountDownLatch使用AQS的子类Sync作为内部的同步器,并由Sync复写了AQS的tryAcquireShared和tryReleaseShared方法。它将AQS中的state当做需要满足的条件个数,生成了一个共享锁。

每当调用await方法时,内部调用了tryAcquireShared方法,由于state>0,因此调用的线程会阻塞在共享锁的循环框架中。

每当调用countDown方法时,内部调用了releaseShared方法,而此方法将会把state的值减1,当state的值为0时,tryAcquireShared中的循环将会唤醒所有等待线程,使之继续运行。由于tryAcquireShared方法中没有修改state值,因此CountDownLatch只能一次性使用,不能循环使用。

若需知道更多细节,请直接阅读CountDownLatch和AQS的源代码。提醒一句,CountDownLatch的源代码是所有AQS的应用中最简单的,应当从它读起。

Semaphore信号量

1 概念与用法

Semaphore信号量,在多个任务争夺几个有限的共享资源时使用。

调用acquire方法获取一个许可,成功获取的线程继续执行,否则就阻塞;

调用release方法释放一个许可。每当有空余的许可时,阻塞的线程和其他线程可竞争许可。

下面的例子中,10辆车竞争3个许可证,有了许可证的车就可以入内访问资源,访问完成后释放许可证:

作者:Alex Wang
链接:https://zhuanlan.zhihu.com/p/27829595
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public class SemaphoreExam {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        ExecutorService service = Executors.newCachedThreadPool();
        // 10 cars wait for 3 semaphore
        for (int i = 0; i < 10; i++) {
            service.submit(new Car("Car" + i, semaphore));
        }
        service.shutdown();
    }
    private static class Car implements Runnable {
        private final String name;
        private final Semaphore semaphore;
        private static Random random = new Random(System.currentTimeMillis());
        Car(String name, Semaphore semaphore) {
            this.name = name;
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                System.out.println(name + " is waiting for a permit");
                semaphore.acquire();
                System.out.println(name+" get a permit to access, available permits:"+semaphore.availablePermits());
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(name + " release a permit, available permits:"+semaphore.availablePermits());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

注意,运行时semaphore.availablePermits()方法会返回当前空余的许可证数量。但由于线程获取许可证的速度往往快于IO的速度,因此很多时刻看到这个数字都是0。

2 原理

Semaphore的原理在上一篇的4.8节“拥有多个许可证的共享锁”中已经详细阐述了。

简要复述一下,Semaphore使用AQS的子类Sync作为内部的同步器,并由Sync复写了AQS的tryAcquireShared和tryReleaseShared方法。

它将AQS中的state当做许可证的个数,生成了一个共享锁。state的值在Semaphore的构造函数中指定,必须大于0。

1.每当调用acquire方法时,内部调用了tryAcquireShared方法,此方法检测state的值是否>0,若是则将state减1,并继续运行,否则线程就阻塞在共享锁的循环框架中。

2.每当调用release方法时,内部调用了releaseShared方法,而此方法将会把state的值加1,当state的值大于0时,tryAcquireShared中的循环将会唤醒所有等待线程,使之继续运行,重新竞争许可证。

若需知道更多细节,请直接阅读Semaphore和AQS的源代码。

CyclicBarrier循环同步栅栏

1 概念与用法

CyclicBarrier可用来在某些栅栏点处同步多个线程,且可以多次使用,每次在栅栏点同步后,还可以激发一个事件。

例如三个旅游者(线程)各自出发,依次到达三个城市,必须每个人都到达某个城市后(栅栏点),才能再次出发去向下一个城市,当他们每同步一次时,激发一个事件,输出一段文字。代码如下:

public class CyclicBarrierExam {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("======== all threads have arrived at the checkpoint ===========");
            }
        });
        ExecutorService service = Executors.newFixedThreadPool(3);
        service.submit(new Traveler("Traveler1", barrier));
        service.submit(new Traveler("Traveler2", barrier));
        service.submit(new Traveler("Traveler3", barrier));
        service.shutdown();
    }
    private static class Traveler implements Runnable {
        private final String name;
        private final CyclicBarrier barrier;
        private static Random rand = new Random(47);
        Traveler(String name, CyclicBarrier barrier) {
            this.name = name;
            this.barrier = barrier;
        }
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                System.out.println(name + " arrived at Beijing.");
                barrier.await();
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                System.out.println(name + " arrived at Shanghai.");
                barrier.await();
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                System.out.println(name + " arrived at Guangzhou.");
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

2 原理

CyclicBarrier是依赖一个可重入锁ReentrantLock和它的一个Condition实现的,

在构造时,CyclicBarrier得到了一个parties数值,它代表参与的线程数量,以及一个Runnable的实例,它代表被激发的事件。

每当有线程调用await时,parties减1。若此时parties大于0,线程就在Condition处阻塞,若parties等于0,则此Condition会调用signalAll释放所有等待线程,并触发事件,同时将parties复原。因此所有的线程又进入下一轮循环。

CyclicBarrier代码非常简单,复杂之处在于它还要处理线程中断、超时等情况。

Exchange线程间变量交换

1 概念与用法

Exchange专门用于成对的线程间同步的交换一个同类型的变量,这种交换是线程安全且高效的。直接来看一个例子:

public class ExchangerExam {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new StringHolder("LeftHand", "LeftValue", exchanger));
        service.submit(new StringHolder("RightHand", "RightValue", exchanger));
        service.shutdown();
    }
    private static class StringHolder implements Runnable {
        private final String name;
        private final String val;
        private final Exchanger<String> exchanger;
        private static Random rand = new Random(System.currentTimeMillis());
        StringHolder(String name, String val, Exchanger<String> exchanger) {
            this.name = name;
            this.val = val;
            this.exchanger = exchanger;
        }
        @Override
        public void run() {
            try {
                System.out.println(name + " hold the val:" + val);
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                String str = exchanger.exchange(val);
                System.out.println(name + " get the val:" + str);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

可以看到,代码中两个线程同步的交换了一个String。先执行exchange方法的线程会阻塞直到后一个线程也执行了exchange方法,然后同步的完成数据的交换。再看一个例子:

public class ExchangerExam2 {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService service = Executors.newCachedThreadPool();
        long start = System.currentTimeMillis();
        service.submit(new StringHolder("LeftHand", "LeftValue", exchanger));
        service.submit(new StringHolder("RightHand", "RightValue", exchanger));
        service.shutdown();
        service.awaitTermination(1, TimeUnit.DAYS);
        long end = System.currentTimeMillis();
        System.out.println("time span is " + (end - start) + " milliseconds");
    }
    private static class StringHolder implements Runnable {
        private final String name;
        private final String val;
        private final Exchanger<String> exchanger;
        private static Random rand = new Random(System.currentTimeMillis());
        StringHolder(String name, String val, Exchanger<String> exchanger) {
            this.name = name;
            this.val = val;
            this.exchanger = exchanger;
        }
        @Override
        public void run() {
            try {
                for (int i = 0; i < 10000; i++) {
//                    System.out.println(name + "-" + i + ": hold the val:" + val + i);
//                    TimeUnit.NANOSECONDS.sleep(rand.nextInt(5));
                    String str = exchanger.exchange(val + i);
//                    System.out.println(name + "-" + i + ": get the val:" + str);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

代码中,两个线程交换了10000组数据,用时仅41ms,这说明Exchanger的同步效率是非常高的。

再看一段代码:

作者:Alex Wang
链接:https://zhuanlan.zhihu.com/p/27829595
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public class ExchangerExam3 {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new StringHolder("North", "NorthValue", exchanger));
        service.submit(new StringHolder("East", "EastValue", exchanger));
        service.submit(new StringHolder("West", "WestValue", exchanger));
        service.submit(new StringHolder("South", "SouthValue", exchanger));
        service.shutdown();
    }
    private static class StringHolder implements Runnable {
        private final String name;
        private final String val;
        private final Exchanger<String> exchanger;
        private static Random rand = new Random(System.currentTimeMillis());
        StringHolder(String name, String val, Exchanger<String> exchanger) {
            this.name = name;
            this.val = val;
            this.exchanger = exchanger;
        }
        @Override
        public void run() {
            try {
                for (int i = 0; i < 10000; i++) {
                    System.out.println(name + "-" + i + ": hold the val:" + val + i);
                    TimeUnit.NANOSECONDS.sleep(rand.nextInt(5));
                    String str = exchanger.exchange(val + i);
                    System.out.println(name + "-" + i + ": get the val:" + str);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

这段代码在运行时有很大的概率会死锁,原因就是Exchanger是用来在“成对”的线程之间交换数据的,像上面这样在四个线程之间交换数据,Exchanger很有可能将多个线程互相阻塞在其Slot中,造成死锁。

2 原理

Exchanger这个类初看非常简单,其公开的接口仅有一个无参构造函数,两个重载的泛型exchange方法:

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

第一个方法用来持续阻塞的交换数据;第二个方法用来在一个时间范围内交换数据,若超时则抛出TimeoutException后返回,同时唤醒另一个阻塞线程。

Exchanger的基本原理是维持一个槽(Slot),这个Slot中存储一个Node的引用,这个Node中保存了一个用来交换的Item和一个用来获取对象的洞Hole。如果一个来“占有”的线程看见Slot为null,则调用CAS方法使一个Node对象占据这个Slot,并等待另一个线程前来交换。如果第二个来“填充”的线程看见Slot不为null,则调用CAS方法将其设置为null,同时使用CAS与Hole交换Item,然后唤醒等待的线程。注意所有的CAS操作都有可能失败,因此CAS必须是循环调用的。

看看JDK1.7中Exchanger的数据结构相关源代码:


// AtomicReference中存储的是Hole对象
private static final class Node extends AtomicReference<Object> {
    /** 用来交换的对象. */
    public final Object item;
    /** The Thread waiting to be signalled; null until waiting. */
    public volatile Thread waiter;
    /**
     * Creates node with given item and empty hole.
     * @param item the item
     */
    public Node(Object item) {
        this.item = item;
    }
}
//Slot中存储的是Node
private static final class Slot extends AtomicReference<Object> {
    //这一行是为了防止伪共享而加入的缓冲行,与具体算法无关
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
//一个Slot数组,数组中有32个Slot,只在必要时才创建
private volatile Slot[] arena = new Slot[CAPACITY];

下面是进行交换操作的核心算法:/

private Object doExchange(Object item, boolean timed, long nanos) {
    Node me = new Node(item);               // 创建一个Node,预备在“占用”时使用
    int index = hashIndex();                   // 当前Slot的哈希值
    int fails = 0;                            // CAS失败次数
    for (;;) {
        Object y;                          // 当前Slot中的内容
        Slot slot = arena[index];              //得到当前的Slot
        if (slot == null)                     // 延迟加载slots
            createSlot(index);                // 创建Slot并重入循环
        else if ((y = slot.get()) != null &&  // 如果Hole不为null,准备“填充”
                 slot.compareAndSet(y, null)) {
            Node you = (Node)y;               // 从这里开始交换数据
            if (you.compareAndSet(null, item)) {
                LockSupport.unpark(you.waiter);  //唤醒等待线程
                return you.item;                //“填充”线程从这里返回值
            }                                // 上面条件不满足,重入循环
        }
        else if (y == null &&                 // 如果Hole为null,准备“占有”
                 slot.compareAndSet(null, me)) {
            if (index == 0)                   // 在slot 0上等待交换
                return timed ?
                    awaitNanos(me, slot, nanos) :
                    await(me, slot);
            Object v = spinWait(me, slot);    // Slot位置不为0时,自旋等待交换
            if (v != CANCEL)
                return v;                 //“占有”线程从这里返回值
            me = new Node(item);              // 抛弃被取消的Node,创建新Node
            int m = max.get();
            if (m > (index >>>= 1))           // index右移1位,相当于arena中slot向右1位
                max.compareAndSet(m, m - 1);  // 缩表
        }
        else if (++fails > 1) {               // 在第一个Slot上运行两次失败
            int m = max.get();
            if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                index = m + 1;                // 第三次失败时index增加
            else if (--index < 0)
                index = m;                    // 当index小于0时,赋值为m
        }
    }
}


目录
相关文章
|
4月前
|
存储
Qt更新组件出现(“要继续此操作,至少需要一个有效且已启用的储存库”)
Qt更新组件出现(“要继续此操作,至少需要一个有效且已启用的储存库”)
213 0
Qt更新组件出现(“要继续此操作,至少需要一个有效且已启用的储存库”)
uniapp bug记录(后续更新)
uniapp bug记录(后续更新)
117 0
|
程序员 测试技术 数据库
实战! 项目单据确认状态未更新排查
实战! 项目单据确认状态未更新排查
|
Web App开发 缓存 iOS开发
CleanMyMacX4.12.2有哪些新的功能更新
任何一部电子设备在使用多年之后都会出现性能下降的问题,苹果的Mac计算机自然也不例外。当你发现Mac运行缓慢,因为有太多文件或缓存垃圾将Mac的运行速度拖了下来。 要想提高生活和工作效率,必须对Mac进行优化,提升一下Mac 的使用性能。那么以下三种提升Mac使用性能的方法对你的帮助将会是巨大的。
118 0
|
缓存 索引
ES的删除和更新,旧数据到低是如何处理的?
根据ES的读写入原理,大家都知道ES写入时每秒从内存缓冲区(memory buffer)生成小的segment,将其递交给系统缓存(OS filesystem cache)中,后台会定期的对这些小的segment 合并成一个大的segment段
330 0
ES的删除和更新,旧数据到低是如何处理的?
|
存储 测试技术 开发工具
BSTestRunner增加历史执行记录展示和重试功能
之前对于用例的失败重试,和用例的历史测试记录存储展示做了很多的描述呢,但是都是基于各个项目呢,不方便使用,为了更好的使用,我们对这里进行抽离,抽离出来一个单独的模块,集成到BSTestRunner中,以后我们使用BSTestRunner直接就可以使用里面的失败重试和展示历史记录了。
BSTestRunner增加历史执行记录展示和重试功能
|
开发者 数据格式 智能硬件
【重要通知】文档更新(10.18—10.24)
2021.10.18-2021.10.24文档更新内容
【重要通知】文档更新(10.18—10.24)
|
关系型数据库 MySQL
有数据进行更新 没有进行新增 怎么操作
有数据进行更新 没有进行新增
298 0
|
搜索推荐 SEO
网站内容更新频率如何控制?网站内容更新注意事项
网站更新是每个站长必做的功课,当一个网站创建完成后,开始更新网站内容。 更新内容并不是说一个月时间内,把挖掘出来的内容、关键词全部一次性用上,这样的做法是错误的,搜索引擎是非常反感的。正确的做法是循序渐进的更新网站,不是像一台机器在采集,举个例子:个人博客今天发送3篇文稿,明天也3篇,后天还是一样3篇,天天坚持持续一段时间,是被搜索引擎认可的。
227 0