Java高并发实战:利用线程池和Redis实现高效数据入库

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Java高并发实战:利用线程池和Redis实现高效数据入库

Java高并发实战:利用线程池和Redis实现高效数据入库

在高并发环境下进行数据入库是一项具有挑战性的任务。为了保证系统的性能和稳定性,可以利用线程池和Redis来实现数据的实时缓存和批量入库处理。本文将介绍一个具体实现,该实现能够根据设定的超时时间和最大批次处理数据入库。

主要思路

  • 实时数据缓存:接收到的数据首先存入Redis,保证数据的实时性。
  • 批量数据入库:当达到设定的超时时间或最大批次数量时,批量将数据从Redis中取出并入库。


主要组件

  • BatchDataStorageService:核心服务类,负责数据的缓存和批量入库。
  • CacheService:缓存服务类,使用Java的ConcurrentHashMap实现简易缓存。
  • RedisUtils:Redis工具类,用于数据的缓存。
package io.jack.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <pre>
 *   数据批量入库服务
 * </pre>
 * Created by RuiXing Hou on 2021-08-05.
 *
 * @since 1.0
 */
@Component
@Slf4j
public class BatchDataStorageService implements InitializingBean
{
  /**
   * 最大批次数量
   */
  @Value("${app.db.maxBatchCount:800}")
    private int maxBatchCount;

  /**
   * 最大线程数
   */
    @Value("${app.db.maxBatchThreads:100}")
    private int maxBatchThreads;

  /**
   * 超时时间
   */
  @Value("${app.db.batchTimeout:3000}")
    private int batchTimeout;

  /**
   * 批次数量
   */
    private int batchCount = 0;

  /**
   * 批次号
   */
  private static long batchNo = 0;

  /**
  * 获取当前机器的核数
  */
  public static final int cpuNum = Runtime.getRuntime().availableProcessors();

  /**
   * 线程池定义接口
   */
    private ExecutorService executorService = null;

  /**
   * 服务器缓存工具类,下面提供源码
   */
  @Resource
  private CacheService cacheService;

  /**
   * 业务接口
   */
  @Resource
  private DeviceRealTimeService deviceRealTimeService;

  /**
   * redis工具类
   */
  @Resource
  private RedisUtils redisUtils;

  @Override
  public void afterPropertiesSet() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    // 核心线程大小
        taskExecutor.setCorePoolSize(cpuNum);
        // 最大线程大小
        taskExecutor.setMaxPoolSize(cpuNum * 2);
        // 队列最大容量
        taskExecutor.setQueueCapacity(500);
        // 当提交的任务个数大于QueueCapacity,就需要设置该参数,但spring提供的都不太满足业务场景,可以自定义一个,也可以注意不要超过QueueCapacity即可
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setThreadFactory(r -> {
            Thread thread = new Thread(r);
            if (r instanceof BatchWorker) {
                thread.setName("batch-worker-" + ((BatchWorker) r).batchKey);
            });
        taskExecutor.initialize();
        executorService = taskExecutor.getThreadPoolExecutor();
  }

  /**
   * 需要做高并发处理的类只需要调用该方法 (我用的是rabbitMq)
   *
   * @param deviceRealTimeDTO
   */
  public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
    final String failedCacheKey = "device:real_time:failed_records";

    try {

      String durationKey = "device:real_time:batchDuration" + batchNo;
      String batchKey = "device:real_time:batch" + batchNo;

      if (!cacheService.exists(durationKey)) {
        cacheService.put(durationKey, System.currentTimeMillis());
        new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start();
      }

      cacheService.lPush(batchKey, deviceRealTimeDTO);
      if (++batchCount >= maxBatchCount) {
        // 达到最大批次,执行入库逻辑
        dataStorage(durationKey, batchKey, failedCacheKey);
      }

    } catch (Exception ex) {
      log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex);
      cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
    } finally {
      updateRealTimeData(deviceRealTimeDTO);
    }
  }

  /**
   * 更新实时数据
   * @param deviceRealTimeDTO 业务POJO
   */
  private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
    redisUtils.set("real_time:"+deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO));
  }

  /**
   *
   * @param durationKey     持续时间标识
   * @param batchKey      批次标识
   * @param failedCacheKey  错误标识
   */
  private void dataStorage(String durationKey, String batchKey, String failedCacheKey) {
    batchNo++;
    batchCount = 0;
    cacheService.del(durationKey);
    if (batchNo >= Long.MAX_VALUE) {
      batchNo = 0;
    }
    executorService.execute(new BatchWorker(batchKey, failedCacheKey));
  }

  private class BatchWorker implements Runnable
  {

    private final String failedCacheKey;
    private final String batchKey;

    public BatchWorker(String batchKey, String failedCacheKey) {
      this.batchKey = batchKey;
      this.failedCacheKey = failedCacheKey;
    }
    
    @Override
    public void run() {
      final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>();
      try {
        DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey);
        while(deviceRealTimeDTO != null) {
          deviceRealTimeDTOList.add(deviceRealTimeDTO);
          deviceRealTimeDTO = cacheService.lPop(batchKey);
        }

        long timeMillis = System.currentTimeMillis();

        try {
          List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class);
          deviceRealTimeService.insertBatch(deviceRealTimeEntityList);
        } finally {
          cacheService.del(batchKey);
          log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存设备上报记录数:" + deviceRealTimeDTOList.size() + ", 耗时:" + (System.currentTimeMillis() - timeMillis) + "ms");
        }
      } catch (Exception e) {
        log.warn("[DB:FAILED] 设备上报记录批量入库失败:" + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e);
        for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) {
          cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
        }
      }
    }
    }

  class BatchTimeoutCommitThread extends Thread {

    private final String batchKey;
    private final String durationKey;
    private final String failedCacheKey;

    public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) {
      this.batchKey = batchKey;
      this.durationKey = durationKey;
      this.failedCacheKey = failedCacheKey;
      this.setName("batch-thread-" + batchKey);
    }

    public void run() {
      try {
        Thread.sleep(batchTimeout);
      } catch (InterruptedException e) {
        log.error("[DB] 内部错误,直接提交:" + e.getMessage());
      }

      if (cacheService.exists(durationKey)) {
        // 达到最大批次的超时间,执行入库逻辑
        dataStorage(durationKey, batchKey, failedCacheKey);
      }
    }

  }

}

package io.jack.service;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
@Scope("singleton")
public class CacheService implements InitializingBean {

    private Map<String, Object> objectCache = new ConcurrentHashMap<>();

    private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>();

    @Override
    public void afterPropertiesSet() {
        statCache.put("terminals", new AtomicLong(0));
        statCache.put("connections", new AtomicLong(0));
    }

    public long incr(String statName) {
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).incrementAndGet();
    }

    public long decr(String statName) {
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).decrementAndGet();
    }

    public long stat(String statName) {
        if (!statCache.containsKey(statName))
            statCache.put(statName, new AtomicLong(0));
        return statCache.get(statName).get();
    }

    public <T> void put(String key, T object) {
        objectCache.put(key, object);
    }

    public <T> T get(String key) {
        return (T) objectCache.get(key);
    }

    public void remove(String key) {
        objectCache.remove(key);
    }

    public void hSet(String key, String subkey, Object value) {
        synchronized (objectCache) {
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap == null) {
                submap = new HashMap<>();
                objectCache.put(key, submap);
            }
            submap.put(subkey, value);
        }
    }

    public <T> T hGet(String key, String subkey) {
        synchronized (objectCache) {
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap != null) {
                return (T) submap.get(subkey);
            }
            return null;
        }
    }

    public boolean hExists(String key, String subkey) {
        synchronized (objectCache) {
            HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
            if (submap != null) {
                return submap.containsKey(subkey);
            }
            return false;
        }
    }

    public void lPush(String key, Object value) {
        synchronized (objectCache) {
            LinkedList queue = (LinkedList) objectCache.get (key);
            if (queue == null) {
                queue = new LinkedList();
                objectCache.put(key, queue);
            }
            queue.addLast(value);
        }
    }

    public <T> T lPop(String key) {
        synchronized (objectCache) {
            LinkedList queue = (LinkedList) objectCache.get (key);
            if (queue != null) {
                if (!queue.isEmpty()) {
                    return (T)queue.removeLast();
                }
                objectCache.remove(key);
            }
            return null;
        }
    }

    public void del(String key) {
        objectCache.remove(key);
    }

    public boolean exists(String key) {
        return objectCache.containsKey(key);
    }

    public void dump() {

    }
}

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8天前
|
NoSQL Redis
Redis的数据淘汰策略有哪些 ?
Redis 提供了 8 种数据淘汰策略,分为淘汰易失数据和淘汰全库数据两大类。易失数据淘汰策略包括:volatile-lru、volatile-lfu、volatile-ttl 和 volatile-random;全库数据淘汰策略包括:allkeys-lru、allkeys-lfu 和 allkeys-random。此外,还有 no-eviction 策略,禁止驱逐数据,当内存不足时新写入操作会报错。
42 16
|
28天前
|
监控 NoSQL Java
场景题:百万数据插入Redis有哪些实现方案?
场景题:百万数据插入Redis有哪些实现方案?
38 1
场景题:百万数据插入Redis有哪些实现方案?
|
9天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
27天前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
59 2
|
21天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
100 38
|
8天前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
42 14
|
8天前
|
存储 NoSQL 算法
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用哈希槽分区算法,共有16384个哈希槽,每个槽分配到不同的Redis节点上。数据操作时,通过CRC16算法对key计算并取模,确定其所属的槽和对应的节点,从而实现高效的数据存取。
35 13
|
8天前
|
存储 NoSQL Redis
Redis的数据过期策略有哪些 ?
Redis 采用两种过期键删除策略:惰性删除和定期删除。惰性删除在读取键时检查是否过期并删除,对 CPU 友好但可能积压大量过期键。定期删除则定时抽样检查并删除过期键,对内存更友好。默认每秒扫描 10 次,每次检查 20 个键,若超过 25% 过期则继续检查,单次最大执行时间 25ms。两者结合使用以平衡性能和资源占用。
30 11
|
8天前
|
监控 NoSQL 测试技术
【赵渝强老师】Redis的AOF数据持久化
Redis 是内存数据库,提供数据持久化功能,支持 RDB 和 AOF 两种方式。AOF 以日志形式记录每个写操作,支持定期重写以压缩文件。默认情况下,AOF 功能关闭,需在 `redis.conf` 中启用。通过 `info` 命令可监控 AOF 状态。AOF 重写功能可有效控制文件大小,避免性能下降。
|
8天前
|
存储 监控 NoSQL
【赵渝强老师】Redis的RDB数据持久化
Redis 是内存数据库,提供数据持久化功能以防止服务器进程退出导致数据丢失。Redis 支持 RDB 和 AOF 两种持久化方式,其中 RDB 是默认的持久化方式。RDB 通过在指定时间间隔内将内存中的数据快照写入磁盘,确保数据的安全性和恢复能力。RDB 持久化机制包括创建子进程、将数据写入临时文件并替换旧文件等步骤。优点包括适合大规模数据恢复和低数据完整性要求的场景,但也有数据完整性和一致性较低及备份时占用内存的缺点。