Dating Java8系列之并行数据处理

简介: Dating Java8系列之并行数据处理



分支合并框架

分支合并框架介绍

分支/合并框架的目的是以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型。

要定义RecursiveTask,只需实现它唯一的抽象方法compute:

protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:

if (任务足够小或不可分) {   顺序计算该任务} else {     将任务分成两个子任务     递归调用本方法,拆分每个子任务,等待所有子任务完成     合并每个子任务的结果}

使用分支合并框架的例子

执行递增求和任务

public class CalculatorSumTask extends RecursiveTask<Long> {
    // 开始的值    private Long start;
    // 结束的值    private Long end;
    // 阈值,即结束fork的条件    private static final Long THRESHOLD = 10000L;
    public CalculatorSumTask(Long start, Long end) {        this.start = start;        this.end = end;    }
    /**     * 求 start - end 之间的所有数的和     *     * @return     */    @Override    protected Long compute() {
        // 数的间隔        long length = end - start;
        // 任务足够小或不可分        if (length <= THRESHOLD) {            long sum = 0L;            // 顺序计算该任务            for (long i = start; i <= end; i++) {                sum += i;            }            return sum;        } else {            long middle = (start + end) / 2;            // 将任务分成两个子任务            CalculatorSumTask leftTask = new CalculatorSumTask(start, middle);            leftTask.fork();            CalculatorSumTask rightTask = new CalculatorSumTask(middle + 1, end);            rightTask.fork();
            // 合并每个子任务的结果            return leftTask.join() + rightTask.join();        }    }}

当把CalculatorSumTask任务传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务compute方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把,要求和的数组分成两半,分给两个新的CalculatorSumTask,而它们也由ForkJoinPool安排执行。

因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步细分的条件(本例中是求和的项目数小于等于10000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。

模拟发送短信

public class SendSmsTask extends RecursiveAction {
    private List<String> phoneList;
    private String smsText;
    private static final Integer THRESHOLD = 1000;
    public SendSmsTask(List<String> phoneList, String smsText) {        this.phoneList = phoneList;        this.smsText = smsText;    }
    private void send(String phoneNumber, String smsText) {        try {            Thread.sleep(10);            System.out.println("发送:" + phoneNumber + " 内容:" + smsText);        } catch (InterruptedException e) {            e.printStackTrace();        }    }
    /**     * 对一批手机号进行发短信处理     */    @Override    protected void compute() {        int batchSize = phoneList.size();        if (batchSize <= 1000) {            for (String number : phoneList) {                send(number, smsText);            }        } else {            // 拆的逻辑和如何拆分由自己来定义            int middle = phoneList.size() / 2;
            List<String> leftList = phoneList.subList(0, middle);            List<String> rightList = phoneList.subList(middle, phoneList.size());
            SendSmsTask leftTask = new SendSmsTask(leftList,smsText);            SendSmsTask rightTask = new SendSmsTask(rightList,smsText);
            leftTask.fork();            rightTask.fork();
            // 下面这两步骤可以省略            leftTask.join();            rightTask.join();        }    }}

工作窃取算法

算法介绍

分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每个子任务所花的时间可能天差地别。

分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。

  • 每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。
  • 基于前面所述的原因,某个线程可能已经完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。
  • 这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。
  • 这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。
  • 这就是为什么要划分成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。

相关代码

public class ParallelMain {
    public static void main(String[] args) {        Long start = Instant.now().toEpochMilli();        // 求0-10000000所有数据的和        Long sum = 0L;        for (long i = 0; i <= 10000000000L; i++) {            sum += i;        }        System.out.println(sum);        Long end = Instant.now().toEpochMilli();        System.out.println("for循环单线程执行 耗费时间:" + (end - start));
        // 获取Jvm最大的可用线程数        System.out.println(Runtime.getRuntime().availableProcessors());
        Long start1 = Instant.now().toEpochMilli();
        // 要处理的数据量的太小        ForkJoinPool forkJoinPool = new ForkJoinPool();        Long sum1 = forkJoinPool.invoke(new CalculatorSumTask(0L,10000000000L));        System.out.println(sum1);
        Long end1 = Instant.now().toEpochMilli();        System.out.println("fork join 并行执行 耗费时间:" + (end1 - start1));
        Long start2 = Instant.now().toEpochMilli();
        Long sum2 = LongStream.rangeClosed(0L,10000000000L).parallel().reduce(0L,Long::sum);        System.out.println(sum2);
        Long end2 = Instant.now().toEpochMilli();        System.out.println("java8 parallel 并行执行 耗费时间:" + (end2 - start2));    }}

小结

  • 分支/合并框架使用递归的方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。
  • 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
  • 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是反直觉的,因此一定要测量,确保你并没有把程序变得更慢。
  • 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。

作者:翎野君

本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。

目录
相关文章
|
25天前
|
存储 Java 数据挖掘
Java 8 新特性之 Stream API:函数式编程风格的数据处理范式
Java 8 引入的 Stream API 提供了一种新的数据处理方式,支持函数式编程风格,能够高效、简洁地处理集合数据,实现过滤、映射、聚合等操作。
41 6
|
5月前
|
监控 Java 大数据
如何在Java中实现批量数据处理
如何在Java中实现批量数据处理
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
5月前
|
消息中间件 存储 Java
使用Java构建实时数据处理流程
使用Java构建实时数据处理流程
|
4月前
|
存储 Java API
探索Java中的Stream API: 提升数据处理的效率与优雅
在Java的海洋中,Stream API如同一股清流,为数据处理注入了新的活力。本文将深入探讨Stream API的核心概念、操作以及它如何改变我们编写和理解代码的方式。通过实际案例,我们将揭示这一现代编程范式如何简化集合处理,提高代码的可读性与性能。
|
5月前
|
并行计算 Java 大数据
Java中的高效并行计算与多线程编程技术
Java中的高效并行计算与多线程编程技术
|
5月前
|
存储 Java 调度
线程操纵术并行策略问题之Java的并行编程优势问题如何解决
线程操纵术并行策略问题之Java的并行编程优势问题如何解决
|
5月前
|
设计模式 安全 Java
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
79 0
|
5月前
|
设计模式 并行计算 安全
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
56 0
|
5月前
|
Java 调度 Windows
Java面试之程序、进程、线程、管程和并发、并行的概念
Java面试之程序、进程、线程、管程和并发、并行的概念
33 0