背景
要给前端出个接口,一个数据展示接口,要从好几个表把数据统计出来,所以打算用多线程来查,查完了返回给前端
用到的东西
就需求来说,join可以做到,countdownlatch也可以做到,我喜欢用countdownlatch做,代码看起来简单点,然后涉及到要开启多线程,所以还是自己来维护一个线程池来用。
代码
线程池部分
@Configuration @EnableAsync //开启多线程 public class ThreadPoolConfig { private static final long serialVersionUID = -4778941758120026886L; /** 线程池维护线程的最少数量 */ private int minPoolSize = 4; /** 线程池维护线程的最大数量 */ private int maxPoolSize = 16; /** 线程池维护线程所允许的空闲时间 */ private int idleSeconds = 1800; /** 线程池所使用的缓冲队列 */ private int queueBlockSize = 30; private ThreadPoolExecutor executor; @Bean("taskExecutor") public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(minPoolSize); // 设置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueBlockSize); // 设置线程空闲时间,当超过核心线程之外的线程在空闲到达之后会被销毁(秒) executor.setKeepAliveSeconds(idleSeconds); // 设置默认线程名称 executor.setThreadNamePrefix("ThreadExcutor"); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); //拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //执行初始化 executor.initialize(); return executor; } public ThreadPoolConfig() { this.executor = new ThreadPoolExecutor(minPoolSize, maxPoolSize, idleSeconds, TimeUnit.SECONDS, /* 时间单位,秒 */ new ArrayBlockingQueue<Runnable>(queueBlockSize), new ThreadPoolExecutor.CallerRunsPolicy()); /* 重试添加当前加入失败的任务 */ } public void execute(Runnable task) { executor.execute(task); } public <T> Future<T> submit(Callable<T> task){ return executor.submit(task); } }
countdownlatch代码
public QuotaInfoVo getQuotaInfo() { StopWatch sw = new StopWatch(); sw.start(); QuotaInfoVo info=new QuotaInfoVo(); final CountDownLatch latch = new CountDownLatch(4); String id=tmpFcrmId; //这个是从配置文件拿的 if (StringUtils.isEmpty(id)){ throw new ServiceException(ResultCode.ERROR); } taskExecutor.execute(new Task1(info,latch,id)); taskExecutor.execute(new Task2(info,latch,id)); taskExecutor.execute(new Task3(info,latch,id)); taskExecutor.execute(new Task4(info,latch,id)); try { //主线程等待所有统计指标执行完毕 latch.await(5, TimeUnit.SECONDS); //latch.await(); sw.stop(); log.info("getQuotaInfo finish ,total time is {}",sw.getTotalTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } return info; }
然后给一个task的例子
private class Task1 implements Runnable{ private QuotaInfoVo quotaInfoVo; private CountDownLatch countDownLatch; private String id; public OrderCount(QuotaInfoVo quotaInfoVo,CountDownLatch countDownLatch,String id){ this.quotaInfoVo=quotaInfoVo; this.countDownLatch=countDownLatch; this.id=id; } @Override public void run() { //这里写自己的逻辑处理 //处理完了记得countDown countDownLatch.countDown(); } }
注意事项
最主要需要注意的就是主线程的这个部分:
try { //主线程等待所有统计指标执行完毕 latch.await(5, TimeUnit.SECONDS); //latch.await(); sw.stop(); log.info("getQuotaInfo finish ,total time is {}",sw.getTotalTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); }
latch.await如果不加等待时间的话,当某个线程执行出错,卡在那里,那么主线程会一直等待,这肯定是我们不想看到的,所以await一定要加等待时间。