Java 数据分批调用接口的正确姿势

简介: Java 数据分批调用接口的正确姿势


一、背景
 

现实业务开发中,通常为了避免超时、对方接口限制等原因需要对支持批量的接口的数据分批调用。

比如List参数的size可能为 几十个甚至上百个,但是假如对方dubbo接口比较慢,传入50个以上会超时,那么可以每次传入20个,分批执行。

通常很多人会写 for 循环或者 while 循环,非常不优雅,无法复用,而且容易出错。

下面结合 Java8 的 Stream ,Function ,Consumer 等特性实现分批调用的工具类封装和自测。

并给出 CompletableFuture 的异步改进方案。

二、实现
工具类:

package com.chujianyun.common.java8.function;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;

/**

  • 执行工具类

*

  • @author 明明如月

*/
public class ExecuteUtil {

public static <T> void partitionRun(List<T> dataList, int size, Consumer<List<T>> consumer) {
    if (CollectionUtils.isEmpty(dataList)) {
        return;
    }
    Preconditions.checkArgument(size > 0, "size must not be a minus");
    Lists.partition(dataList, size).forEach(consumer);
}

public static <T, V> List<V> partitionCall2List(List<T> dataList, int size, Function<List<T>, List<V>> function) {

    if (CollectionUtils.isEmpty(dataList)) {
        return new ArrayList<>(0);
    }
    Preconditions.checkArgument(size > 0, "size must not be a minus");

    return Lists.partition(dataList, size)
            .stream()
            .map(function)
            .filter(Objects::nonNull)
            .reduce(new ArrayList<>(),
                    (resultList1, resultList2) -> {
                        resultList1.addAll(resultList2);
                        return resultList1;
                    });


}

public static <T, V> Map<T, V> partitionCall2Map(List<T> dataList, int size, Function<List<T>, Map<T, V>> function) {
    if (CollectionUtils.isEmpty(dataList)) {
        return new HashMap<>(0);
    }
    Preconditions.checkArgument(size > 0, "size must not be a minus");
    return Lists.partition(dataList, size)
            .stream()
            .map(function)
            .filter(Objects::nonNull)
            .reduce(new HashMap<>(),
                    (resultMap1, resultMap2) -> {
                        resultMap1.putAll(resultMap2);
                        return resultMap1;
                    });


}
AI 代码解读

}

待调用的服务(模拟)

package com.chujianyun.common.java8.function;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SomeManager {

public void aRun(Long id, List<String> data) {

}

public List<Integer> aListMethod(Long id, List<String> data) {
    return new ArrayList<>(0);
}

public Map<String, Integer> aMapMethod(Long id, List<String> data) {
    return new HashMap<>(0);
}
AI 代码解读

}
单元测试:

package com.chujianyun.common.java8.function;

import org.apache.commons.lang3.RandomUtils;
import org.jeasy.random.EasyRandom;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.verification.Times;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;

@RunWith(PowerMockRunner.class)
public class ExecuteUtilTest {

private EasyRandom easyRandom = new EasyRandom();

@Mock
private SomeManager someManager;

// 测试数据
private List<String> mockDataList;

private int total = 30;

@Before
public void init() {
    // 构造30条数据
    mockDataList = easyRandom.objects(String.class, 30).collect(Collectors.toList());

}

@Test
public void test_a_run_partition() {
    // mock aRun
    PowerMockito.doNothing().when(someManager).aRun(anyLong(), any());

    // 每批 10 个
    ExecuteUtil.partitionRun(mockDataList, 10, (eachList) -> someManager.aRun(1L, eachList));

    //验证执行了 3 次
    Mockito.verify(someManager, new Times(3)).aRun(anyLong(), any());
}


@Test
public void test_call_return_list_partition() {
    // mock  每次调用返回条数(注意每次调用都是这2个)
    int eachReturnSize = 2;
    PowerMockito
            .doReturn(easyRandom.objects(String.class, eachReturnSize).collect(Collectors.toList()))
            .when(someManager)
            .aListMethod(anyLong(), any());

    // 分批执行
    int size = 4;
    List<Integer> resultList = ExecuteUtil.partitionCall2List(mockDataList, size, (eachList) -> someManager.aListMethod(2L, eachList));

    //验证执行次数
    int invocations = 8;
    Mockito.verify(someManager, new Times(invocations)).aListMethod(anyLong(), any());

    // 正好几轮
    int turns;
    if (total % size == 0) {
        turns = total / size;
    } else {
        turns = total / size + 1;
    }
    Assert.assertEquals(turns * eachReturnSize, resultList.size());
}


@Test
public void test_call_return_map_partition() {
    // mock  每次调用返回条数
    // 注意:
    // 如果仅调用doReturn一次,那么每次返回都是key相同的Map,
    // 如果需要不覆盖,则doReturn次数和 invocations 相同)
    int eachReturnSize = 3;
    PowerMockito
            .doReturn(mockMap(eachReturnSize))
            .doReturn(mockMap(eachReturnSize))
            .when(someManager).aMapMethod(anyLong(), any());

    // 每批
    int size = 16;
    Map<String, Integer> resultMap = ExecuteUtil.partitionCall2Map(mockDataList, size, (eachList) -> someManager.aMapMethod(2L, eachList));

    //验证执行次数
    int invocations = 2;
    Mockito.verify(someManager, new Times(invocations)).aMapMethod(anyLong(), any());

    // 正好几轮
    int turns;
    if (total % size == 0) {
        turns = total / size;
    } else {
        turns = total / size + 1;
    }
    Assert.assertEquals(turns * eachReturnSize, resultMap.size());
}

private Map<String, Integer> mockMap(int size) {
    Map<String, Integer> result = new HashMap<>(size);
    for (int i = 0; i < size; i++) {
AI 代码解读

// 极力保证key不重复

        result.put(easyRandom.nextObject(String.class) + RandomUtils.nextInt(), easyRandom.nextInt());
    }
    return result;
}

AI 代码解读

}
注意:

1 判空

.filter(Objects::nonNull)
这里非常重要,避免又一次调用返回 null,而导致空指针异常。

2 实际使用时可以结合apollo配置, 灵活设置每批执行的数量,如果超时随时调整

3 用到的类库

集合工具类: commons-collections4、guava (可以不用)

这里的list划分子list也可以使用stream的 skip ,limit特性自己去做,集合判空也可以不借助collectionutils.

构造数据:easy-random

单元测试框架: Junit4 、 powermockito、mockito

4 大家可以加一些更强大的功能,如允许设置每次调用的时间间隔、并行或并发调用等。

三、改进
以上面的List接口为例,将其改为异步版本:

public static <T, V> List<V> partitionCall2ListAsync(List<T> dataList,
                                                     int size,
                                                     ExecutorService executorService,
                                                     Function<List<T>, List<V>> function) {

    if (CollectionUtils.isEmpty(dataList)) {
        return new ArrayList<>(0);
    }
    Preconditions.checkArgument(size > 0, "size must not be a minus");

    List<CompletableFuture<List<V>>> completableFutures = Lists.partition(dataList, size)
            .stream()
            .map(eachList -> {
                if (executorService == null) {
                    return CompletableFuture.supplyAsync(() -> function.apply(eachList));
                } else {
                    return CompletableFuture.supplyAsync(() -> function.apply(eachList), executorService);
                }

            })
            .collect(Collectors.toList());


    CompletableFuture<Void> allFinished = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
    try {
        allFinished.get();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    return completableFutures.stream()
            .map(CompletableFuture::join)
            .filter(CollectionUtils::isNotEmpty)
            .reduce(new ArrayList<V>(), ((list1, list2) -> {
                List<V> resultList = new ArrayList<>();
                if(CollectionUtils.isNotEmpty(list1)){
                   resultList.addAll(list1);
                   }

                if(CollectionUtils.isNotEmpty(list2)){
                     resultList.addAll(list2);
                   }
                return resultList;
            }));
}
AI 代码解读

测试代码:

// 测试数据

private List<String> mockDataList;

private int total = 300;

private AtomicInteger atomicInteger;

@Before
public void init() {
    // 构造total条数据
    mockDataList = easyRandom.objects(String.class, total).collect(Collectors.toList());

}


AI 代码解读

@Test
public void test_call_return_list_partition_async() {

    ExecutorService executorService = Executors.newFixedThreadPool(10);

    atomicInteger = new AtomicInteger(0);
    Stopwatch stopwatch = Stopwatch.createStarted();
    // 分批执行
    int size = 2;
    List<Integer> resultList = ExecuteUtil.partitionCall2ListAsync(mockDataList, size, executorService, (eachList) -> someCall(2L, eachList));

    Stopwatch stop = stopwatch.stop();
    log.info("执行时间: {} 秒", stop.elapsed(TimeUnit.SECONDS));

    Assert.assertEquals(total, resultList.size());
    // 正好几轮
    int turns;
    if (total % size == 0) {
        turns = total / size;
    } else {
        turns = total / size + 1;
    }
    log.info("共调用了{}次", turns);
    Assert.assertEquals(turns, atomicInteger.get());

  // 顺序也一致
    for(int i =0; i< mockDataList.size();i++){
        Assert.assertEquals((Integer) mockDataList.get(i).length(), resultList.get(i));
    }
}

AI 代码解读

/**

 * 模拟一次调用
 */
private List<Integer> someCall(Long id, List<String> strList) {

    log.info("当前-->{},strList.size:{}", atomicInteger.incrementAndGet(), strList.size());
    try {
        TimeUnit.SECONDS.sleep(2L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return strList.stream()
            .map(String::length)
            .collect(Collectors.toList());
}
AI 代码解读

通过异步可以尽可能快得拿到执行结果。

四、总结
1 要灵活运用Java 8 的 特性简化代码

2 要注意代码的封装来使代码更加优雅,复用性更强

3 要利用来构造单元测试的数据框架如 java-faker和easy-random来提高构造数据的效率

4 要了解性能改进的常见思路:合并请求、并发、并行、缓存等。

目录
打赏
0
0
0
0
212
分享
相关文章
java常用数据判空、比较和类型转换
本文介绍了Java开发中常见的数据处理技巧,包括数据判空、数据比较和类型转换。详细讲解了字符串、Integer、对象、List、Map、Set及数组的判空方法,推荐使用工具类如StringUtils、Objects等。同时,讨论了基本数据类型与引用数据类型的比较方法,以及自动类型转换和强制类型转换的规则。最后,提供了数值类型与字符串互相转换的具体示例。
185 3
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
随着大模型的越来越盛行,现在很多企业开始接入大模型的接口,今天我从java开发角度来写一个demo的示例,用于接入DeepSeek大模型,国内的大模型有很多的接入渠道,今天主要介绍下阿里云的百炼模型,因为这个模型是免费的,只要注册一个账户,就会免费送百万的token进行学习,今天就从一个简单的可以执行的示例开始进行介绍,希望可以分享给各位正在学习的同学们。
139 3
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
Java爬虫获取微店快递费用item_fee API接口数据实现
本文介绍如何使用Java开发爬虫程序,通过微店API接口获取商品快递费用(item_fee)数据。主要内容包括:微店API接口的使用方法、Java爬虫技术背景、需求分析和技术选型。具体实现步骤为:发送HTTP请求获取数据、解析JSON格式的响应并提取快递费用信息,最后将结果存储到本地文件中。文中还提供了完整的代码示例,并提醒开发者注意授权令牌、接口频率限制及数据合法性等问题。
深潜数据海洋:Java文件读写全面解析与实战指南
通过本文的详细解析与实战示例,您可以系统地掌握Java中各种文件读写操作,从基本的读写到高效的NIO操作,再到文件复制、移动和删除。希望这些内容能够帮助您在实际项目中处理文件数据,提高开发效率和代码质量。
37 4
|
3月前
|
使用Java和Spring Data构建数据访问层
本文介绍了如何使用 Java 和 Spring Data 构建数据访问层的完整过程。通过创建实体类、存储库接口、服务类和控制器类,实现了对数据库的基本操作。这种方法不仅简化了数据访问层的开发,还提高了代码的可维护性和可读性。通过合理使用 Spring Data 提供的功能,可以大幅提升开发效率。
90 21
java语言后台管理若依框架-登录提示404-接口异常-系统接口404异常如何处理-登录验证码不显示prod-api/captchaImage 404 (Not Found) 如何处理-解决方案优雅草卓伊凡
java语言后台管理若依框架-登录提示404-接口异常-系统接口404异常如何处理-登录验证码不显示prod-api/captchaImage 404 (Not Found) 如何处理-解决方案优雅草卓伊凡
263 5
|
5月前
|
在Java中如何实现接口?
实现接口是 Java 编程中的一个重要环节,它有助于提高代码的规范性、可扩展性和复用性。通过正确地实现接口,可以使代码更加灵活、易于维护和扩展。
291 64
|
5月前
|
在Java中,接口之间可以继承吗?
接口继承是一种重要的机制,它允许一个接口从另一个或多个接口继承方法和常量。
424 60
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
93 7