【Java】你还在使用单线程处理大量数据么?

简介: 现有一个数据拼装入库的接口,总数据量大约几万条,之前使用单线程同步处理,需要处理几分钟,这接口速度在生产上是不允许的,针对这一问题,需要对此接口进行多线程并发处理。

业务场景

现有一个数据拼装入库的接口,总数据量大约几万条,之前使用单线程同步处理,需要处理几分钟,这接口速度在生产上是不允许的,针对这一问题,需要对此接口进行速度优化。

优化方案

使用多线程异步处理的方式,技能保证接口很快的响应,也能提高数据的拼装入库操作。

多线程的实现--线程池

为什么要使用线程池

1.在Java中,如果每个请求到达就创建一个新线程,开销是相当大的。在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。

2.除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个jvm里创建太多的线程,可能会是系统由于过度消耗内存或“切换过度”而导致系统资源不足。

3.为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁的线程的次数,特别是一个些资源耗费比较大但是线程的创建和销毁尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。

线程池的创建

ThreadPoolExecutor是线程池的核心实现类,在JDK1.5引入,位于java.util.concurrent包

  • Executor线程池相关顶级接口,它将任务的提交与任务的执行分离开来
  • ExecutorService继承并扩展了Executor接口,提供了Runnable、FutureTask等主要线程实现接口扩展
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务

1、Spring配置类

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
    @Bean("myExceutor")
    public Executor myfoExceutor(ThreadPoolConfigProperties pool) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(pool.getCoreSize());
        // 设置最大线程数
        executor.setMaxPoolSize(pool.getMaxSize());
        //配置队列大小
        executor.setQueueCapacity(pool.getQueueSize());
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(pool.getKeepAliveTime());
        // 设置默认线程名称
        executor.setThreadNamePrefix("pageInfoExceutor-");
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //执行初始化
        executor.initialize();
        return executor;
    }
}

2、手动创建

/**
corePoolSize:核心线程池的线程数量
maximumPoolSize:最大的线程池线程数量
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。
unit:线程活动保持时间的单位。
workQueue:指定任务队列所使用的阻塞队列
threadFactory:线程工厂提供线程的创建方式,默认使用Executors.defaultThreadFactory()
handler:当线程池所处理的任务数超过其承载容量或关闭后继续有任务提交时,所调用的拒绝策略
**/
ExecutorService exec = new ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                                RejectedExecutionHandler handler);

Executors是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool方法可以生成一个拥有固定线程数的线程池。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

其本质是初始化一个ThreadPoolExecutor对象。

提交任务

1、execute()

通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。

/**
* 源码 
**/
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         分 3 个步骤进行:
         1. 如果正在运行的线程少于 corePoolSize,请尝试使用给定命令作为其第一个任务来启动新线程。对 addWorker 的调用以原子方式检查 runState 和 workerCount,因此通过返回 false 来防止错误警报,这些警报会在不应该添加线程时添加线程。
         2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有线程自上次检查以来就死了)或者池在进入此方法后关闭了。因此,我们重新检查状态,如有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。
         3.如果我们不能排队任务,那么我们尝试添加一个新线程。如果失败了,我们知道我们已经关闭或饱和,因此拒绝这项任务。

         */
        int c = ctl.get();
    //判断工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //执行addworker,创建一个核心线程,创建失败重新获取ctl
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    //如果工作线程数大于核心线程数,判断线程池的状态是否为running,并且可以添加进队列
//如果线程池不是running状态,则执行拒绝策略,(还是会调用一次addworker)
        if (isRunning(c) && workQueue.offer(command)) {
              //再次获取ctl,进行双重检索
            int recheck = ctl.get();
            //如果线程池是不是处于RUNNING的状态,那么就会将任务从队列中移除, 
            //如果移除失败,则会判断工作线程是否为0 ,如果过为0 就创建一个非核心线程 
            //如果移除成功,就执行拒绝策略,因为线程池已经不可用了;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
      //线程池挂了或者大于最大线程数
        else if (!addWorker(command, false))
            reject(command);
    }
exec.execute(() -> gooodsService.copyData(a,b));

2、submit()

通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。

/**
提交可运行任务以执行,并返回表示该任务的未来。未来的方法get将在成功完成后返回null。
参数:任务 – 要提交的任务
返回:a 未来代表任务完成前
抛出:RejectedExecutionException – 如果无法安排任务执行
     NullPointerException – 如果任务为空
**/
Future<?> submit(Runnable task);
FutureTask<T> ft = new FutureTask<T>(Callable<V> callable);
exec.submit(ft);

案例伪代码

/**
*拼装数据并插表(无返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
    // 创建线程池
        ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    //先对数据进行分割
    List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
    for (List<po> list : divideList){
        //提交任务
        exec.execute(() ->  dataMapper.saveBatchData(list));
  }
    
}

/**
*拼装数据并插表(有返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
       //创建一个集合,来接收返回值
     List<CompletableFuture<ResultDTO>> returnList = new ArrayList<>();
    // 创建线程池
        ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    //先对数据进行分割
    List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
    for (List<po> list : divideList){
        //提交任务
         CompletableFuture<ResultDTO> cf = CompletableFuture.supplyAsync(() -> {
                        try {
                            return dataMapper.saveBatchData(list);
                        } catch (Exception e) {
                            return null;
                        }
                    },exec).exceptionally( ex -> null);
        returnList.add(cf)
                
  }
    /*之后可以对拿到的返回值进行自己的业务操作....................*/
    
    //阻塞主线程-->等待子线程都执行完后执行,(这里我想等所有线程都执行完操作数据库后,删除redis锁)
    CompletableFuture.allOf(returnList.toArray(new CompletableFuture[returnList.size()])).join();
    redisTemplate.delete(key);
}

后续优化

经过多线程处理后,接口响应速度提升了10倍,并且可以做异步处理,接口先给出响应,数据在后台异步处理。

但是这也是处理几万的数据量的。虽然使用了多线程,但是依旧只是一个节点处理,依靠单个Jvm,如果数据量再大,且并发量提高,会造成OOM内存溢出。所以后续在不改变需求的情况下,使用分布式计算,依靠MQ任务分发,进行多节点处理的方式,性能和稳定性会大大提高。

相关文章
|
21天前
|
前端开发 JavaScript Java
java常用数据判空、比较和类型转换
本文介绍了Java开发中常见的数据处理技巧,包括数据判空、数据比较和类型转换。详细讲解了字符串、Integer、对象、List、Map、Set及数组的判空方法,推荐使用工具类如StringUtils、Objects等。同时,讨论了基本数据类型与引用数据类型的比较方法,以及自动类型转换和强制类型转换的规则。最后,提供了数值类型与字符串互相转换的具体示例。
|
2天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
4天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
4天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
4天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
20 3
|
4天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
57 2
|
12天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
43 6
|
21天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
21天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
44 3
|
22天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####