并发等待执行的几种方法

简介: ### 一、说明 工作中常有这样的场景,并发执行一些任务,并等待所有的任务执行完成,进行后续处理,这里总结了几种方法用于满足这种业务场景。 ### 二、闭锁方式 闭锁是Java早期提供的一种并发锁,其特点是每个任务颁发一个令牌,任务执行完成释放令牌,主进程可以一直阻塞等待所有的令牌被释放,当所有令牌都被释放后,主进程可以继续执行。依据闭锁的这种特效可以满足上面的任务场景。 ```jav

一、说明

工作中常有这样的场景,并发执行一些任务,并等待所有的任务执行完成,进行后续处理,这里总结了几种方法用于满足这种业务场景。

二、闭锁方式

闭锁是Java早期提供的一种并发锁,其特点是每个任务颁发一个令牌,任务执行完成释放令牌,主进程可以一直阻塞等待所有的令牌被释放,当所有令牌都被释放后,主进程可以继续执行。依据闭锁的这种特效可以满足上面的任务场景。

public class CountDownLatchService {

    private final CountDownLatch lock;
    private final ExecutorService executorService;
    private List<Long> aList;

    public CountDownLatchService(ExecutorService executorService, List<Long> aList) {
        this.executorService = executorService;
        this.lock = new CountDownLatch(aList.size());
        this.aList = aList;
    }

    public void process() throws InterruptedException {
        aList.forEach(s -> {
            executorService.submit(new Task(s));
        });

        lock.await(10, TimeUnit.SECONDS);
    }

    class Task implements Runnable {

        private Long job;

        public Task(Long job) {
            this.job = job;
        }

        @Override
        public void run() {
            try {
                System.out.println(job);
            } finally {
                lock.countDown();
            }
        }
    }
}

三、Future方式

通过for循环提交异步任务执行,返回的Future列表,再通过for循环获取每个Future中的结果。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class FutureService {

    private final ExecutorService executorService;

    public FutureService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public void process() {
        List<Long> aList = LongStream.rangeClosed(0, 1000).boxed().collect(Collectors.toList());

        List<Future> futures = new ArrayList<>(aList.size());
        aList.forEach(s -> {
            Future future = executorService.submit(() -> System.out.println(s));
            futures.add(future);
        });

        futures.forEach(s -> {
            try {
                s.get(10, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
        });
    }
}

四、CompletableFuture方式

通过Java8新提供的CompletableFuture类,可以通过allOf方法构建一批异步任务对象,然后通过get方法阻塞等待所有任务的完成。

import com.google.common.collect.Lists;
import com.taobao.eagleeye.EagleEye;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

public class ParallelService {

    public <T> void parallelExecute(ExecutorService executorService, List<T> list, Consumer<T> func) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        final Object rpcContext = EagleEye.currentRpcContext();
        CompletableFuture[] futureList = list.stream().map(s -> CompletableFuture.runAsync(() -> {
            try {
                EagleEye.setRpcContext(rpcContext);
                func.accept(s);
            } finally {
                EagleEye.clearRpcContext();
            }
        }, executorService)).toArray(CompletableFuture[]::new);

        wait(futureList);
    }

    public  <T, R> List<R> parallelGet(ExecutorService executorService, List<T> list, Function<T, R> func) {
        if (CollectionUtils.isEmpty(list)) {
            return Lists.newArrayList();
        }

        final Object rpcContext = EagleEye.currentRpcContext();
        CompletableFuture[] futureList = list.stream().map(s -> CompletableFuture.supplyAsync(() -> {
            R r;
            try {
                EagleEye.setRpcContext(rpcContext);
                r = func.apply(s);
            } finally {
                EagleEye.clearRpcContext();
            }
            return r;
            }, executorService)).toArray(CompletableFuture[]::new);
        wait(futureList);

        List<R> result = new ArrayList<>(list.size());
        for (CompletableFuture future : futureList) {
            result.add((R) future.getNow(null));
        }

        return result;
    }

    private void wait(CompletableFuture[] futureList) {
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureList);
        try {
            combinedFuture.get(10, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

五、Stream方式

通过向线程池提交一个parallelStream的foreach任务,然后通过get阻塞等待所有任务的完成。需要注意的是线程池必须是ForkJoinPool,因为parallelStream内部实现就是使用的ForkJoinPool。

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class ParallelStreamService {

    public void process() throws ExecutionException, InterruptedException {
        List<Long> aList = LongStream.rangeClosed(0, 1000).boxed().collect(Collectors.toList());

        ForkJoinPool customThreadPool = ForkJoinPool.commonPool();
        try {
            ForkJoinTask task = customThreadPool.submit(() -> aList.parallelStream().forEach(s -> {
                System.out.println(Thread.currentThread().getName() + ":" + s);
            }));
            task.get();
        } finally {
            customThreadPool.shutdown();
        }
    }
}
目录
相关文章
|
Java 数据库连接 Maven
使用mybatis插件generator生成实体类,dao层和mapper映射
使用mybatis插件generator生成实体类,dao层和mapper映射
1306 0
|
搜索推荐 IDE 开发工具
IDEA自定义右键菜单
IDEA自定义右键菜单
2951 1
|
安全 Java 应用服务中间件
使用OkHttp工具时Authorization请求头丢失问题
记一次联调三方接口时&quot;Authorization&quot;请求头丢失问题, 使用工具OkHttp
使用OkHttp工具时Authorization请求头丢失问题
|
存储 Go API
使用GoFrame连接和操作TDengine时序数据库
通过使用GoFrame框架和TDengine Go驱动,我们可以方便地连接和操作TDengine时序数据库。无论是插入、查询还是分析时序数据,都可以通过简单的API调用来实现。GoFrame提供了强大的Web开发功能,结合TDengine的高性能时序数据存储和查询能力,可以构建高效、可扩展的时序数据应用。
315 5
|
9月前
|
机器学习/深度学习 人工智能 监控
阿里通义开源全模态大语言模型 R1-Omni:情感分析成绩新标杆!推理过程全程透明,准确率飙升200%
R1-Omni 是阿里通义开源的全模态大语言模型,专注于情感识别任务,结合视觉和音频信息,提供可解释的推理过程,显著提升情感识别的准确性和泛化能力。
1226 10
阿里通义开源全模态大语言模型 R1-Omni:情感分析成绩新标杆!推理过程全程透明,准确率飙升200%
|
Java 关系型数据库 MySQL
基于SpringBoot+Vue医院管理系统(源码+部署说明+演示视频+源码介绍+lw)(1)
基于SpringBoot+Vue医院管理系统(源码+部署说明+演示视频+源码介绍+lw)
293 2
|
存储 Java 关系型数据库
(透彻)java String.getBytes()编码问题
String.getBytes()的问题String 的getBytes()方法是得到一个字串的字节数组,这是众所周知的。但特别要注意的是,本方法将返回该操作系统默认的编码格式的字节数组。
2052 0
|
存储 开发框架 .NET
C# 面试题及答案整理,最新面试题
C# 面试题及答案整理,最新面试题
520 0
|
存储 缓存 监控
Flink性能优化小结
Flink性能优化小结
|
监控 关系型数据库 PostgreSQL
PostgreSQL bgwriter,walwriter,backend process 写磁盘的实时监控
标签 PostgreSQL , 背景 数据库有两大块buffer,wal buffer和shared buffer。 wal buffer是预写日志缓冲区。 shared buffer是数据页缓冲区。
2920 0

热门文章

最新文章