最近遇到一个问题,就是如何批量的进行数据迁移和计算,既要保证快,又要保证准,那么快怎么保证呢?分布式及并发是必然要使用的,怎么保证准呢,并发的同步(在共享内存并发模型里,同步是显式进行的。程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。执行操作有序性)是必然要使用的。
单机的实现方式
单机实现主要目的是用来判断批量运行时会报什么错,所以实现起来比较简单,直接开启一个线程池进行处理:
线程池设置
线程池的设置如下,核心线程只使用一个,因为准确性对于同步计算逻辑更重要,更多关于线程池的介绍:【Java并发编程 十二】JUC并发包下线程池
/** * 线程池设置 */ ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("syncStoreInfo-pool-%d") .setUncaughtExceptionHandler((t, e) -> LOGGER.warn("syncStoreInfo failed", e)) .get(); private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
核心逻辑调用
核心逻辑调用如下,可以看到是通过线程池并发的对全部coopNos进行了一个消费。
/** * 同步全部合作下的全部门店 * * @param coopNos the coop nos */ public void syncBatchCooperation(List<String> coopNos) { if (CollectionUtils.isEmpty(coopNos)) { return; } for (String coopNo : coopNos) { executor.submit(() -> { try { LOGGER.info("开始同步单个合作下的门店数据,coopNo = " + coopNo); // 核心业务逻辑 this.syncOneCooperation(coopNo); } catch (Exception e) { LOGGER.warn(MsgBuilder.build("合作下门店同步错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e); } }); } }
分布式实现方式
分布式的实现方式是为了让多台机器同时消费coopNos,增加速度,同时使用RedissonClient来保证分布式的一致性处理
核心逻辑调用
请求到来时先将代处理的消息都塞到分布式的redis队列中,具体的消费由分布式集群分组处理
/** * 同步全部合作下的全部门店 * * @param coopNos the coop nos */ public void syncBatchCooperation(List<String> coopNos) { if (CollectionUtils.isEmpty(coopNos)) { return; } for (String coopNo : coopNos) { // 线上运行时用redis队列+多线程,多台机器同时消费 addCoopToAsyncWork(coopNo); } }
RedissonClient
引入RedissonClient,并且设置一个sortedSet队列,按照加入时间打分。将所有待处理的数据放到一个分布式消息队列中,然后由下游从队列中取数据进行处理。RedissonClient可以保证对多个计算节点保证同步。之前用C#做的时候搞的过于复杂了,参见这篇Blog:【Redis原理机制 六】Redis分布式锁深入、改进策略及RedLock算法
/** * 引入RedissonClient */ @Resource private RedissonClient redissonClient; /** * 队列设置 */ private RScoredSortedSet<String> coopNoQueue; /** * 是否运行 */ private boolean isRunning = false; /** * 队列初始化设置 */ @PostConstruct public void init() { coopNoQueue = redissonClient.getScoredSortedSet("COOP:STORE:SYNC:", StringCodec.INSTANCE); } /** * 队列增加元素 */ private void addCoopToAsyncWork(String data) { coopNoQueue.add(Double.longBitsToDouble(System.currentTimeMillis()), JsonUtils.obj2Str(data)); }
SmartLifecycle
有些场景我们需要在Spring 所有的bean 完成初始化后紧接着执行一些任务或者启动需要的异步服务,首先实现接口SmartLifecycle并重写实现方法,通过SmartLifecycle进行容器级的生命周期管理,常见有几种解决方案:
- j2ee 注解 启动前
@PostConstruct
销毁前@PreDestroy
基于j2ee 规范 - springboot 的
org.springframework.boot.CommandLineRunner
spring org.springframework.context.SmartLifecycle
SmartLifecycle 不仅仅能在初始化后执行一个逻辑,还能在关闭前执行一个逻辑,这里我们就在开始时执行并发消费逻辑,结束时结束消费逻辑、关闭线程池
/** * 是否运行 */ private boolean isRunning = false; /** * 容器启动时调用:运行中 */ @Override public void start() { isRunning = true; executor.execute(() -> { while (isRunning) { try { Collection<String> coopNos = coopNoQueue.pollFirst(10); if (CollectionUtils.isEmpty(coopNos)) { TimeUnit.SECONDS.sleep(1); continue; } LOGGER.info("开始同步一组合作下门店,合作数量: {}, coopNos : {}", coopNos.size(), JsonUtils.obj2Str(coopNos)); CountDownLatch count = new CountDownLatch(coopNos.size()); coopNos.forEach(coopNo -> { try { LOGGER.info("开始同步单个合作下门店,coopNo = " + coopNo); syncOneCooperation(coopNo); } catch (Exception e) { LOGGER.warn(MsgBuilder.build("同步单个合作门店错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e); } finally { count.countDown(); } }); count.await(); LOGGER.info("同步本组合作下门店结束,数量:{}", coopNos.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { LOGGER.warn("合作下门店同步结束", e); } } }); } /** * 容器停止时调用:结束方法调用、关闭线程池 */ @Override @SneakyThrows public void stop() { isRunning = false; executor.shutdown(); executor.awaitTermination(30L, TimeUnit.SECONDS); } /** * 检测bean容器是否运行 */ @Override public boolean isRunning() { return isRunning; }
CountDownLatch
我们使用CountDownLatch
来进行线程同步,保证每组10个coopNos执行完成后再进行下一组执行,一定程度上保证分组数据的同步处理【Java并发编程 十】JUC并发包下的工具类,防止本地的coopNos数量不停变化,导致foreach里的执行逻辑有误。
CountDownLatch count = new CountDownLatch(coopNos.size()); coopNos.forEach(coopNo -> { try { LOGGER.info("开始同步单个合作下门店,coopNo = " + coopNo); syncOneCooperation(coopNo); } catch (Exception e) { LOGGER.warn(MsgBuilder.build("同步单个合作门店错误,coopNo = {}, errorMsg = {} ", coopNo, ExceptionUtils.getStackTrace(e)), e); } finally { count.countDown(); } }); count.await();