Juc并发编程16——Semaphore,Exchanger,Fork/Join框架

简介: Juc并发编程16——Semaphore,Exchanger,Fork/Join框架

1.Semaphore

还记得我们在操作系统中学习的信号量吗?它在解决进程间的同步问题起着非常大的作用。

java中的信号量也有很大的作用,它可以限制一个代码块可以同时被访问的线程数量(加排他锁锁可以限制只被一个线程访问),相当于流量控制。简单来说,它就是一个可以被N个线程同时占用的排它锁(因此也支持公平锁和非公平锁)。在初始时,可以指定Semaphore的许可证个数,一个线程可以获取一个或者多个许可证,当许可证不足以供其它线程获取时,想要竞争同步资源的其它线程将会被阻塞。

public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); //可以设定参数指定许可证数量
                    System.out.println("get a license...");
                    Thread.sleep(100);
                    semaphore.release();  //可以设定参数指定许可证数量
                    System.out.println("release a license...");
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
            }).start();
        }
    }

输出结果如下。

24157666e4474096aa39f49c063a2c20.png

其它的一些api真的也特别简单,这里写个demo演示下。

public class Demo36 {
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(10);
        // 获取剩余许可证数量
        System.out.println(semaphore.availablePermits()); 
        // 是否有等待线程
        System.out.println(semaphore.hasQueuedThreads());
        // 等待线程数量
        System.out.println(semaphore.getQueueLength());
    }
}

结果如下。

ad975c4bf7dd40dc974bc0261fe41699.png

许可证还可以被回收。

    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        new Thread(() -> {
            try {
                semaphore.acquire();
                System.out.println("acquire permit");
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
        }).start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("drain permit number" + semaphore.drainPermits());
        new Thread(() -> {
            try {
                semaphore.acquire();
                System.out.println("acquire permit");
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
        }).start();
    }

结果如下。

0fc7baf908354f059c1a87be0f483122.png

2.数据交换Exchanger

Exchanger可以让两个线程在同一个时间点发生数据交换。

public static void main(String[] args) throws InterruptedException {
        Exchanger<Object> exchanger = new Exchanger<>();
        new Thread(() -> {
            try {
                System.out.println(exchanger.exchange("AAA..."));
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
        }).start();
        exchanger.exchange("BBB");
    }

结果如下。

a4476121176c479498e0c710da5f4986.png

过于简单,不再赘述。

3.Fork/Join框架

在jdk7中,出现了一个新的框架用于并行执行任务,它可以把大任务拆分为多个小任务并行执行,最大程度的利用多核cpu的优势,最后汇总执行的结果。很强大很高效吧。

比如计算:18x7 + 36 x 8 + 9 x 77 + 8 x 53,可以进行如下图的拆分汇总操作。


abf350c6a6c74c52aabecd68aa779e98.png

而且它不仅仅是拆分任务,使用多个线程并行执行任务,还可以工作窃取算法,提高线程的利用率。其原理是,把每个线程的任务进一步拆分为若干子任务,并且每个线程创建一个队列来存放自己的子任务,当某个线程的子任务全部完成,可以从其它线程的队列中获取任务执行。可以参考下图进行理解。

21f7b5beea8b4435bd4434459d9030fe.png

下面结合实例使用下。比如我们需要计算1-1000的和。

   public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            System.out.println(pool.submit(new SubTask(1, 1000)).get());
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    static class SubTask extends RecursiveTask<Integer>{
        private int start;
        private int end;
        public SubTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        @Override
        protected Integer compute() {
            if((end - start) > 125) {
                SubTask subTask1 = new SubTask(start, (start + end) / 2);
                subTask1.fork();
                SubTask subTask2 = new SubTask((start + end) / 2 + 1, end);
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            } else {
              System.out.println(Thread.currentThread().getName() + "start add from " + start +"end" +end);
                int result = 0;
                for (int i = start; i <= end; i++) {
                    result += i;
                }
                return  result;
            }
        }
    }

其结果如下。

cfdb24dd37ab409e947ca9b798470768.png

其实,arrays中的parallelSort就是使用的Fork/Join框架

    public static void parallelSort(byte[] a) {
        int n = a.length, p, g;
        if (n <= MIN_ARRAY_SORT_GRAN ||
            (p = ForkJoinPool.getCommonPoolParallelism()) == 1)
            DualPivotQuicksort.sort(a, 0, n - 1);
        else
            new ArraysParallelSortHelpers.FJByte.Sorter
                (null, a, new byte[n], 0, n, 0,
                 ((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
                 MIN_ARRAY_SORT_GRAN : g).invoke();
    }

在多核环境下,Fork/Join框架效率会随着运算规模增大而对效能的提升效果更显著,大规模运算推荐使用哦。

相关文章
|
10月前
|
资源调度
JUC并发编程之同步器(Semaphore、CountDownLatch、CyclicBarrier、Exchanger、CompletableFuture)附带相关面试题
1.Semaphore(资源调度) 2.CountDownLatch(子线程优先) 3.CyclicBarrier(栅栏) 4.Exchanger(公共交换区) 5.CompletableFuture(异步编程)
112 0
|
1月前
|
Java
Java一分钟之-并发编程:线程间通信(Phaser, CyclicBarrier, Semaphore)
【5月更文挑战第19天】Java并发编程中,Phaser、CyclicBarrier和Semaphore是三种强大的同步工具。Phaser用于阶段性任务协调,支持动态注册;CyclicBarrier允许线程同步执行,适合循环任务;Semaphore控制资源访问线程数,常用于限流和资源池管理。了解其使用场景、常见问题及避免策略,结合代码示例,能有效提升并发程序效率。注意异常处理和资源管理,以防止并发问题。
42 2
|
1月前
|
Java 数据库
Semaphore(信号量)源码解读与使用
Semaphore(信号量)源码解读与使用
|
分布式计算 算法 Java
【JUC基础】16. Fork Join
“分而治之”一直是一个非常有效的处理大量数据的方法。著名的MapReduce也是采取了分而治之的思想。。简单地说,就是如果你要处理 1000 个数据,但是你并不具备处理 1000个数据的能力,那么你可以只处理其中的 10 个,然后分阶段处理 100 次,将 100 次的结进行合成,就是最终想要的对原始 1000 个数据的处理结果。而这就是Fork Join的基本思想。
|
Java
Java Review - 并发编程_ 信号量Semaphore原理&源码剖析
Java Review - 并发编程_ 信号量Semaphore原理&源码剖析
80 0
|
安全 Java
《JUC并发编程 - 基础篇》JUC概述 | Lock接口 | 线程间通信 | 多线程锁 | 集合线程安全(二)
《JUC并发编程 - 基础篇》JUC概述 | Lock接口 | 线程间通信 | 多线程锁 | 集合线程安全
《JUC并发编程 - 基础篇》JUC概述 | Lock接口 | 线程间通信 | 多线程锁 | 集合线程安全(二)
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(四)
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(四)
|
存储 Java 编译器
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(二)
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(二)
|
缓存 索引
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(三)
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(三)
|
存储 安全 Java
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(一)
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile
《JUC并发编程 - 原理篇》Monitor | synchronized | wait&notify | join | park&unpark | 指令级并行 | volatile(一)