Java高并发实战:利用线程池和Redis实现高效数据入库
在高并发环境下进行数据入库是一项具有挑战性的任务。为了保证系统的性能和稳定性,可以利用线程池和Redis来实现数据的实时缓存和批量入库处理。本文将介绍一个具体实现,该实现能够根据设定的超时时间和最大批次处理数据入库。
主要思路
- 实时数据缓存:接收到的数据首先存入Redis,保证数据的实时性。
- 批量数据入库:当达到设定的超时时间或最大批次数量时,批量将数据从Redis中取出并入库。
主要组件
- BatchDataStorageService:核心服务类,负责数据的缓存和批量入库。
- CacheService:缓存服务类,使用Java的ConcurrentHashMap实现简易缓存。
- RedisUtils:Redis工具类,用于数据的缓存。
package io.jack.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * <pre> * 数据批量入库服务 * </pre> * Created by RuiXing Hou on 2021-08-05. * * @since 1.0 */ @Component @Slf4j public class BatchDataStorageService implements InitializingBean { /** * 最大批次数量 */ @Value("${app.db.maxBatchCount:800}") private int maxBatchCount; /** * 最大线程数 */ @Value("${app.db.maxBatchThreads:100}") private int maxBatchThreads; /** * 超时时间 */ @Value("${app.db.batchTimeout:3000}") private int batchTimeout; /** * 批次数量 */ private int batchCount = 0; /** * 批次号 */ private static long batchNo = 0; /** * 获取当前机器的核数 */ public static final int cpuNum = Runtime.getRuntime().availableProcessors(); /** * 线程池定义接口 */ private ExecutorService executorService = null; /** * 服务器缓存工具类,下面提供源码 */ @Resource private CacheService cacheService; /** * 业务接口 */ @Resource private DeviceRealTimeService deviceRealTimeService; /** * redis工具类 */ @Resource private RedisUtils redisUtils; @Override public void afterPropertiesSet() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); // 核心线程大小 taskExecutor.setCorePoolSize(cpuNum); // 最大线程大小 taskExecutor.setMaxPoolSize(cpuNum * 2); // 队列最大容量 taskExecutor.setQueueCapacity(500); // 当提交的任务个数大于QueueCapacity,就需要设置该参数,但spring提供的都不太满足业务场景,可以自定义一个,也可以注意不要超过QueueCapacity即可 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); taskExecutor.setThreadFactory(r -> { Thread thread = new Thread(r); if (r instanceof BatchWorker) { thread.setName("batch-worker-" + ((BatchWorker) r).batchKey); }); taskExecutor.initialize(); executorService = taskExecutor.getThreadPoolExecutor(); } /** * 需要做高并发处理的类只需要调用该方法 (我用的是rabbitMq) * * @param deviceRealTimeDTO */ public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { final String failedCacheKey = "device:real_time:failed_records"; try { String durationKey = "device:real_time:batchDuration" + batchNo; String batchKey = "device:real_time:batch" + batchNo; if (!cacheService.exists(durationKey)) { cacheService.put(durationKey, System.currentTimeMillis()); new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start(); } cacheService.lPush(batchKey, deviceRealTimeDTO); if (++batchCount >= maxBatchCount) { // 达到最大批次,执行入库逻辑 dataStorage(durationKey, batchKey, failedCacheKey); } } catch (Exception ex) { log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex); cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } finally { updateRealTimeData(deviceRealTimeDTO); } } /** * 更新实时数据 * @param deviceRealTimeDTO 业务POJO */ private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { redisUtils.set("real_time:"+deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO)); } /** * * @param durationKey 持续时间标识 * @param batchKey 批次标识 * @param failedCacheKey 错误标识 */ private void dataStorage(String durationKey, String batchKey, String failedCacheKey) { batchNo++; batchCount = 0; cacheService.del(durationKey); if (batchNo >= Long.MAX_VALUE) { batchNo = 0; } executorService.execute(new BatchWorker(batchKey, failedCacheKey)); } private class BatchWorker implements Runnable { private final String failedCacheKey; private final String batchKey; public BatchWorker(String batchKey, String failedCacheKey) { this.batchKey = batchKey; this.failedCacheKey = failedCacheKey; } @Override public void run() { final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>(); try { DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey); while(deviceRealTimeDTO != null) { deviceRealTimeDTOList.add(deviceRealTimeDTO); deviceRealTimeDTO = cacheService.lPop(batchKey); } long timeMillis = System.currentTimeMillis(); try { List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class); deviceRealTimeService.insertBatch(deviceRealTimeEntityList); } finally { cacheService.del(batchKey); log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存设备上报记录数:" + deviceRealTimeDTOList.size() + ", 耗时:" + (System.currentTimeMillis() - timeMillis) + "ms"); } } catch (Exception e) { log.warn("[DB:FAILED] 设备上报记录批量入库失败:" + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e); for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) { cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } } } } class BatchTimeoutCommitThread extends Thread { private final String batchKey; private final String durationKey; private final String failedCacheKey; public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) { this.batchKey = batchKey; this.durationKey = durationKey; this.failedCacheKey = failedCacheKey; this.setName("batch-thread-" + batchKey); } public void run() { try { Thread.sleep(batchTimeout); } catch (InterruptedException e) { log.error("[DB] 内部错误,直接提交:" + e.getMessage()); } if (cacheService.exists(durationKey)) { // 达到最大批次的超时间,执行入库逻辑 dataStorage(durationKey, batchKey, failedCacheKey); } } } }
package io.jack.service; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @Component @Scope("singleton") public class CacheService implements InitializingBean { private Map<String, Object> objectCache = new ConcurrentHashMap<>(); private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>(); @Override public void afterPropertiesSet() { statCache.put("terminals", new AtomicLong(0)); statCache.put("connections", new AtomicLong(0)); } public long incr(String statName) { if (!statCache.containsKey(statName)) statCache.put(statName, new AtomicLong(0)); return statCache.get(statName).incrementAndGet(); } public long decr(String statName) { if (!statCache.containsKey(statName)) statCache.put(statName, new AtomicLong(0)); return statCache.get(statName).decrementAndGet(); } public long stat(String statName) { if (!statCache.containsKey(statName)) statCache.put(statName, new AtomicLong(0)); return statCache.get(statName).get(); } public <T> void put(String key, T object) { objectCache.put(key, object); } public <T> T get(String key) { return (T) objectCache.get(key); } public void remove(String key) { objectCache.remove(key); } public void hSet(String key, String subkey, Object value) { synchronized (objectCache) { HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key); if (submap == null) { submap = new HashMap<>(); objectCache.put(key, submap); } submap.put(subkey, value); } } public <T> T hGet(String key, String subkey) { synchronized (objectCache) { HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key); if (submap != null) { return (T) submap.get(subkey); } return null; } } public boolean hExists(String key, String subkey) { synchronized (objectCache) { HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key); if (submap != null) { return submap.containsKey(subkey); } return false; } } public void lPush(String key, Object value) { synchronized (objectCache) { LinkedList queue = (LinkedList) objectCache.get (key); if (queue == null) { queue = new LinkedList(); objectCache.put(key, queue); } queue.addLast(value); } } public <T> T lPop(String key) { synchronized (objectCache) { LinkedList queue = (LinkedList) objectCache.get (key); if (queue != null) { if (!queue.isEmpty()) { return (T)queue.removeLast(); } objectCache.remove(key); } return null; } } public void del(String key) { objectCache.remove(key); } public boolean exists(String key) { return objectCache.containsKey(key); } public void dump() { } }