【Java】你还在使用单线程处理大量数据么?

简介: 现有一个数据拼装入库的接口,总数据量大约几万条,之前使用单线程同步处理,需要处理几分钟,这接口速度在生产上是不允许的,针对这一问题,需要对此接口进行多线程并发处理。

业务场景

现有一个数据拼装入库的接口,总数据量大约几万条,之前使用单线程同步处理,需要处理几分钟,这接口速度在生产上是不允许的,针对这一问题,需要对此接口进行速度优化。

优化方案

使用多线程异步处理的方式,技能保证接口很快的响应,也能提高数据的拼装入库操作。

多线程的实现--线程池

为什么要使用线程池

1.在Java中,如果每个请求到达就创建一个新线程,开销是相当大的。在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。

2.除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个jvm里创建太多的线程,可能会是系统由于过度消耗内存或“切换过度”而导致系统资源不足。

3.为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁的线程的次数,特别是一个些资源耗费比较大但是线程的创建和销毁尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。

线程池的创建

ThreadPoolExecutor是线程池的核心实现类,在JDK1.5引入,位于java.util.concurrent包

  • Executor线程池相关顶级接口,它将任务的提交与任务的执行分离开来
  • ExecutorService继承并扩展了Executor接口,提供了Runnable、FutureTask等主要线程实现接口扩展
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务

1、Spring配置类

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
    @Bean("myExceutor")
    public Executor myfoExceutor(ThreadPoolConfigProperties pool) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(pool.getCoreSize());
        // 设置最大线程数
        executor.setMaxPoolSize(pool.getMaxSize());
        //配置队列大小
        executor.setQueueCapacity(pool.getQueueSize());
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(pool.getKeepAliveTime());
        // 设置默认线程名称
        executor.setThreadNamePrefix("pageInfoExceutor-");
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //执行初始化
        executor.initialize();
        return executor;
    }
}

2、手动创建

/**
corePoolSize:核心线程池的线程数量
maximumPoolSize:最大的线程池线程数量
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。
unit:线程活动保持时间的单位。
workQueue:指定任务队列所使用的阻塞队列
threadFactory:线程工厂提供线程的创建方式,默认使用Executors.defaultThreadFactory()
handler:当线程池所处理的任务数超过其承载容量或关闭后继续有任务提交时,所调用的拒绝策略
**/
ExecutorService exec = new ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                                RejectedExecutionHandler handler);

Executors是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool方法可以生成一个拥有固定线程数的线程池。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

其本质是初始化一个ThreadPoolExecutor对象。

提交任务

1、execute()

通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。

/**
* 源码 
**/
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         分 3 个步骤进行:
         1. 如果正在运行的线程少于 corePoolSize,请尝试使用给定命令作为其第一个任务来启动新线程。对 addWorker 的调用以原子方式检查 runState 和 workerCount,因此通过返回 false 来防止错误警报,这些警报会在不应该添加线程时添加线程。
         2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有线程自上次检查以来就死了)或者池在进入此方法后关闭了。因此,我们重新检查状态,如有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。
         3.如果我们不能排队任务,那么我们尝试添加一个新线程。如果失败了,我们知道我们已经关闭或饱和,因此拒绝这项任务。

         */
        int c = ctl.get();
    //判断工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //执行addworker,创建一个核心线程,创建失败重新获取ctl
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    //如果工作线程数大于核心线程数,判断线程池的状态是否为running,并且可以添加进队列
//如果线程池不是running状态,则执行拒绝策略,(还是会调用一次addworker)
        if (isRunning(c) && workQueue.offer(command)) {
              //再次获取ctl,进行双重检索
            int recheck = ctl.get();
            //如果线程池是不是处于RUNNING的状态,那么就会将任务从队列中移除, 
            //如果移除失败,则会判断工作线程是否为0 ,如果过为0 就创建一个非核心线程 
            //如果移除成功,就执行拒绝策略,因为线程池已经不可用了;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
      //线程池挂了或者大于最大线程数
        else if (!addWorker(command, false))
            reject(command);
    }
exec.execute(() -> gooodsService.copyData(a,b));

2、submit()

通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。

/**
提交可运行任务以执行,并返回表示该任务的未来。未来的方法get将在成功完成后返回null。
参数:任务 – 要提交的任务
返回:a 未来代表任务完成前
抛出:RejectedExecutionException – 如果无法安排任务执行
     NullPointerException – 如果任务为空
**/
Future<?> submit(Runnable task);
FutureTask<T> ft = new FutureTask<T>(Callable<V> callable);
exec.submit(ft);

案例伪代码

/**
*拼装数据并插表(无返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
    // 创建线程池
        ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    //先对数据进行分割
    List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
    for (List<po> list : divideList){
        //提交任务
        exec.execute(() ->  dataMapper.saveBatchData(list));
  }
    
}

/**
*拼装数据并插表(有返回值)
@param dataList 一个大量数据的集合
**/
public void copyData(List<po> dataList) {
       //创建一个集合,来接收返回值
     List<CompletableFuture<ResultDTO>> returnList = new ArrayList<>();
    // 创建线程池
        ExecutorService exec = new ThreadPoolExecutor(8,20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    //先对数据进行分割
    List<List<po>> divideList = CollectionDataUtils.divide(dataList, 500);
    for (List<po> list : divideList){
        //提交任务
         CompletableFuture<ResultDTO> cf = CompletableFuture.supplyAsync(() -> {
                        try {
                            return dataMapper.saveBatchData(list);
                        } catch (Exception e) {
                            return null;
                        }
                    },exec).exceptionally( ex -> null);
        returnList.add(cf)
                
  }
    /*之后可以对拿到的返回值进行自己的业务操作....................*/
    
    //阻塞主线程-->等待子线程都执行完后执行,(这里我想等所有线程都执行完操作数据库后,删除redis锁)
    CompletableFuture.allOf(returnList.toArray(new CompletableFuture[returnList.size()])).join();
    redisTemplate.delete(key);
}

后续优化

经过多线程处理后,接口响应速度提升了10倍,并且可以做异步处理,接口先给出响应,数据在后台异步处理。

但是这也是处理几万的数据量的。虽然使用了多线程,但是依旧只是一个节点处理,依靠单个Jvm,如果数据量再大,且并发量提高,会造成OOM内存溢出。所以后续在不改变需求的情况下,使用分布式计算,依靠MQ任务分发,进行多节点处理的方式,性能和稳定性会大大提高。

相关文章
|
7天前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
21 2
|
11天前
|
存储 缓存 Java
java线程内存模型底层实现原理
java线程内存模型底层实现原理
java线程内存模型底层实现原理
|
8天前
|
安全 Java 开发者
Java修饰符与封装:理解访问权限、行为控制与数据隐藏的重要性
Java中的修饰符和封装概念是构建健壯、易维护和扩展的Java应用程序的基石。通过合理利用访问权限修饰符和非访问修饰符,开发者能够设计出更加安全、灵活且高效的代码结构。封装不仅是面向对象编程的核心原则之一,也是提高软件项目质量和可维护性的关键策略。
10 1
|
12天前
|
Java 开发者
Java中的多线程基础与应用
【9月更文挑战第22天】在Java的世界中,多线程是一块基石,它支撑着现代并发编程的大厦。本文将深入浅出地介绍Java中多线程的基本概念、创建方法以及常见的应用场景,帮助读者理解并掌握这一核心技术。
|
8天前
|
Java 调度
Java-Thread多线程的使用
这篇文章介绍了Java中Thread类多线程的创建、使用、生命周期、状态以及线程同步和死锁的概念和处理方法。
Java-Thread多线程的使用
|
11天前
|
Java 调度 开发者
Java中的多线程编程:从基础到实践
本文旨在深入探讨Java多线程编程的核心概念和实际应用,通过浅显易懂的语言解释多线程的基本原理,并结合实例展示如何在Java中创建、控制和管理线程。我们将从简单的线程创建开始,逐步深入到线程同步、通信以及死锁问题的解决方案,最终通过具体的代码示例来加深理解。无论您是Java初学者还是希望提升多线程编程技能的开发者,本文都将为您提供有价值的见解和实用的技巧。
15 2
|
13天前
|
Java 数据处理
Java中的多线程编程:从基础到实践
本文旨在深入探讨Java中的多线程编程,涵盖其基本概念、创建方法、同步机制及实际应用。通过对多线程基础知识的介绍和具体示例的演示,希望帮助读者更好地理解和应用Java多线程编程,提高程序的效率和性能。
19 1
|
6天前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
6 0
|
8天前
|
Java 数据处理 调度
Java中的多线程编程:从基础到实践
本文深入探讨了Java中多线程编程的基本概念、实现方式及其在实际项目中的应用。首先,我们将了解什么是线程以及为何需要多线程编程。接着,文章将详细介绍如何在Java中创建和管理线程,包括继承Thread类、实现Runnable接口以及使用Executor框架等方法。此外,我们还将讨论线程同步和通信的问题,如互斥锁、信号量、条件变量等。最后,通过具体的示例展示了如何在实际项目中有效地利用多线程提高程序的性能和响应能力。
|
9天前
|
安全 算法 Java
Java中的多线程编程:从基础到高级应用
本文深入探讨了Java中的多线程编程,从最基础的概念入手,逐步引导读者了解并掌握多线程开发的核心技术。无论是初学者还是有一定经验的开发者,都能从中获益。通过实例和代码示例,本文详细讲解了线程的创建与管理、同步与锁机制、线程间通信以及高级并发工具等主题。此外,还讨论了多线程编程中常见的问题及其解决方案,帮助读者编写出高效、安全的多线程应用程序。
下一篇
无影云桌面