Java开发异步批处理教程

简介: Java开发异步批处理教程

1687007783668.png

开启掘金成长之旅!这是我参与「掘金日新计划 · 12 月更文挑战」的第4天,点击查看活动详情

书接上回 大数据量、高并发业务怎么优化?(一) 文章中介绍了异步批处理的三种方式,本文继续深入针对前两种进行讲解,并给出代码示例:

image.png

一 普通版本,采用阻塞队列 ArrayBlockingQueue

使用普通方式能够直接基于JDK中现成的并发包 ArrayBlockingQueue 提供的 offer(E e, long timeout, TimeUnit unit)(添加元素到队列尾部,如果队列已满则等待参数指定时间后返回false)方法 和  poll(long timeout, TimeUnit unit)(从队列头部获取元素,如果队列为空则等待参数指定时间后返回null)方法,来达到异步批处理效果

生产者代码:由于采用内存队列,最好在创建 ArrayBlockingQueue 时指定队列大小,防止队列无界,导致内存溢出

/**
 * 生产者
 */
@Component
@Slf4j
public class MonitorQueue {
    private BlockingQueue<List<NodeCollectDTO>> queue = new ArrayBlockingQueue<>(10000000);
    public void put(List<NodeCollectDTO> list) {
        try {
            queue.put(list);
        } catch (InterruptedException e) {
            log.error(String.format("队列put异常:%s", e.getMessage()), e);
        }
    }
    public void offer(List<NodeCollectDTO> list, long timeout, TimeUnit unit) throws InterruptedException {
        queue.offer(list, timeout, unit);
    }
    public List<NodeCollectDTO> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }
}

消费者代码:在创建消费者时开启一个子线程在死循环中一直读取队列元素,直到队列元素超过我们的 maxNum 时,将临时列表元素插入数据库中

/**
 * 消费者
 */
@Slf4j
@Component
public class MonitorConsumer implements Runnable {
    @Autowired
    private MonitorQueue queue;
    @Autowired
    private MonitorService monitorService;
    @PostConstruct
    public void init() {
        new Thread(this, "monitor-collect").start();
    }
    // 临时列表大小限制
    private int maxNum = 2000;
    @SuppressWarnings("InfiniteLoopStatement")
    @Override
    public void run() {
        while (true) {
            handler();
        }
    }
    private void handler() {
        try {
            List<NodeCollectDTO> temp = new ArrayList<>(maxNum);
            while (temp.size() <= maxNum) {
                List<NodeCollectDTO> list = queue.poll(20, TimeUnit.SECONDS);
                if (CollectionUtil.isNotEmpty(list)) {
                    temp.addAll(list);
                } else {
                    break;
                }
            }
            if (CollectionUtil.isEmpty(temp)) {
                return;
            }
            int i = monitorService.batchSave(temp);
            log.debug("----------------------------batchSave num:{}, collect.size:{}", i, collect.size());
        } catch (Exception e) {
            log.error(String.format("消费者异常: %s", e.getMessage()), e);
        }
    }
}    

可以看到采用该种方式实现的异步批量入库代码比较简单,便于理解,在性能上,基本都能够满足日常普通业务存在的批量入库场景

二 进阶版,采用 Disruptor 队列,本文基于 Disruptor 最新4.0版本

先给出 Disruptor 官网简介

Disruptor 是一个提供并发环形缓冲区数据结构的库。它旨在在异步事件处理架构中提供低延迟、高吞吐量的工作队列。 为了理解 Disruptor 的好处,我们可以将它与一些很好理解且目的非常相似的东西进行比较。在 Disruptor 的情况下,这将是 Java 的 BlockingQueue。与队列一样,Disruptor 的目的是在同一进程内的线程之间移动数据(例如消息或事件)。然而,Disruptor 提供的一些关键特性使其有别于队列。他们是:

  1. 向消费者多播事件,带有消费者依赖图。
  2. 为事件预分配内存。
  3. 可选无锁

Disruptor 给我们在项目中实现异步批处理提供了另一种方式,一种无锁、延迟更低、吞吐量更高、提供消费者多播等等的内存队列

下面介绍如何使用

2.1 依赖安装

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>4.0.0.RC1</version>
</dependency>

2.2 Disruptor 使用代码如下:

public class LongEvent{
    private long value;
    public void set(long value){
        this.value = value;
    }
    @Override
    public String toString(){
        return "LongEvent{" + "value=" + value + '}';
    }
}
@Slf4j
public class LongEventMain {
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {
        log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);
    }
    public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {
        event.set(buffer.getLong(0));
    }
    public static void main(String[] args) throws Exception {
        int bufferSize = 128;
        // 1. 创建Disruptor对象
        Disruptor<LongEvent> disruptor =
                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
        // 2. 添加事件处理类(消费者)
        disruptor.handleEventsWith(LongEventMain::handleEvent);
        // 3. 开启事件处理线程
        disruptor.start();
        // 4. 获取ringBuffer
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            // 5. 发布事件(生产者)
            ringBuffer.publishEvent(LongEventMain::translate, bb);
            Thread.sleep(1);
        }
    }
}

2.3 上面代码完成了一个事件发布后,事件处理类就能够收到对应事件信息的功能,但是我们想要的是能在消费者线程中批量处理生产者数据的逻辑,还得再修改一下事件处理类代码,如下:

@Slf4j
public class LongEventBatch implements EventHandler<LongEvent> {
    private static final int MAX_BATCH_SIZE = 20;
    private final List<LongEvent> batch = new ArrayList<>();
    public LongEventBatch() {
        // 虚拟机关闭处理
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("------------------ShutdownHook-DataEventHandler,上报tempList");
            if (batch.size() > 0) {
                // 批量入库伪代码
                int i = xxxService.batchSave(temp);
            }
        }));
    }
    @Override
    public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) {
        log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);
        batch.add(event);
        if (batch.size() >= MAX_BATCH_SIZE) {
            processBatch(batch);
        }
    }
    private void processBatch(final List<LongEvent> batch) {
        // 批量入库伪代码
        int i = xxxService.batchSave(temp);
        // 记得清空batch列表
        batch.clear();
    }
}

由此,我们就实现了基于 Disruptor 的异步批处理逻辑,该方式会比普通版本性能高出一个数量级,大家在工作中可以尝试使用一番

最后

附博主 github 地址 github.com/wayn111

目录
相关文章
|
9天前
|
SQL 安全 Java
安全问题已经成为软件开发中不可忽视的重要议题。对于使用Java语言开发的应用程序来说,安全性更是至关重要
在当今网络环境下,Java应用的安全性至关重要。本文深入探讨了Java安全编程的最佳实践,包括代码审查、输入验证、输出编码、访问控制和加密技术等,帮助开发者构建安全可靠的应用。通过掌握相关技术和工具,开发者可以有效防范安全威胁,确保应用的安全性。
23 4
|
11天前
|
缓存 监控 Java
如何运用JAVA开发API接口?
本文详细介绍了如何使用Java开发API接口,涵盖创建、实现、测试和部署接口的关键步骤。同时,讨论了接口的安全性设计和设计原则,帮助开发者构建高效、安全、易于维护的API接口。
35 4
|
15天前
|
安全 Java 编译器
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
|
15天前
|
Java 开发工具 Android开发
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
|
15天前
|
Java 编译器 Android开发
Kotlin教程笔记(28) -Kotlin 与 Java 混编
Kotlin教程笔记(28) -Kotlin 与 Java 混编
|
16天前
|
SQL Java 程序员
倍增 Java 程序员的开发效率
应用计算困境:Java 作为主流开发语言,在数据处理方面存在复杂度高的问题,而 SQL 虽然简洁但受限于数据库架构。SPL(Structured Process Language)是一种纯 Java 开发的数据处理语言,结合了 Java 的架构灵活性和 SQL 的简洁性。SPL 提供简洁的语法、完善的计算能力、高效的 IDE、大数据支持、与 Java 应用无缝集成以及开放性和热切换特性,能够大幅提升开发效率和性能。
|
17天前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
34 2
|
17天前
|
监控 Java 数据库连接
在Java开发中,数据库连接管理是关键问题之一
在Java开发中,数据库连接管理是关键问题之一。本文介绍了连接池技术如何通过预创建和管理数据库连接,提高数据库操作的性能和稳定性,减少资源消耗,并简化连接管理。通过示例代码展示了HikariCP连接池的实际应用。
17 1
|
10天前
|
安全 Java 测试技术
Java开发必读,谈谈对Spring IOC与AOP的理解
Spring的IOC和AOP机制通过依赖注入和横切关注点的分离,大大提高了代码的模块化和可维护性。IOC使得对象的创建和管理变得灵活可控,降低了对象之间的耦合度;AOP则通过动态代理机制实现了横切关注点的集中管理,减少了重复代码。理解和掌握这两个核心概念,是高效使用Spring框架的关键。希望本文对你深入理解Spring的IOC和AOP有所帮助。
23 0
|
11天前
|
Java API Android开发
kotlin和java开发优缺点
kotlin和java开发优缺点
25 0