最近项目里经常会有一部分大批量数据的校验处理等对接口响应有要求的情景,这时候往往只能通过多线程的方式去处理请求,能达到快速响应
下面因为多线程用的比较多,所以需要复用代码,这里我基于java8的函数式编程抽出了共同方法,不侵占业务实现,后面还有个多线程带返回值的方式。大家可以参考参考思路。
public class TaskThreadPoolService {
private Logger logger = LoggerFactory.getLogger(TaskThreadPoolService.class);
/**
* 默认线程大小
*/
private final static Integer DEFAULT_POOL_SIZE = 20;
/**
* 默认最大活跃线程大小
*/
private final static Integer DEFAULT_MAX_ALIVE_POOL_SIZE = 50;
/**
* 多线程执行无返回值方法
* 线程池大小为20,最大活跃线程数50,存活时间为0的无边界队列线程池
*
* @param voidTaskMethodInteface
* @param list
* @param <T>
*/
public <T> void splitVoidTask(VoidTaskMethodInterface<T> voidTaskMethodInteface, List<T> list) throws Exception {
splitVoidTask(voidTaskMethodInteface, list, DEFAULT_POOL_SIZE, DEFAULT_MAX_ALIVE_POOL_SIZE);
}
/**
* 多线程执行无返回值方法(可指定部分线程池参数)
*
* @param voidTaskMethodInteface
* @param list
* @param <T>
*/
public <T> void splitVoidTask(VoidTaskMethodInterface<T> voidTaskMethodInteface, List<T> list, int poolSize, int maxSize) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, maxSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
try {
for (T t : list) {
executor.submit(() -> {
try {
voidTaskMethodInteface.exec(t);
} catch (Throwable e) {
logger.error("splitVoidTask" + e.getMessage(), e);
} finally {
logger.info("do what you want to do");
}
});
}
} catch (Exception e) {
logger.error("多线程执行无返回值方法出现异常");
throw new Exception("splitVoidTask error");
} finally {
executor.shutdown();
}
}
/**
* 带返回值的多线程异步等待所有结果返回的
* 线程池大小为20,最大活跃线程数50,存活时间为0的无边界队列线程池
* 单个返回的结果不能返回null,影响结果判定
*
* @param futureTaskMethodInteface
* @param list
* @param <T>
* @param <F>
* @return
* @throws Exception
*/
public <T, F> Map<T, F> splitFutureTask(FutureTaskMethodInterface<T, F> futureTaskMethodInteface, List<T> list) throws Exception {
return splitFutureTask(futureTaskMethodInteface, list, DEFAULT_POOL_SIZE, DEFAULT_MAX_ALIVE_POOL_SIZE);
}
/**
* 带返回值的多线程异步等待所有结果返回的(可指定部分线程池参数)
* 单个返回的结果不能返回null,影响结果判定
*
* @param futureTaskMethodInteface
* @param list
* @param <T>
* @param <F>
* @return
* @throws Exception
*/
public <T, F> Map<T, F> splitFutureTask(FutureTaskMethodInterface<T, F> futureTaskMethodInteface, List<T> list, int poolSize, int maxSize) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, maxSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
try {
CountDownLatch countDownLatch = new CountDownLatch(list.size());
Map<T, Future<F>> futureMap = new HashMap<>(list.size());
for (T t : list) {
Future<F> future = executor.submit(() -> {
try {
F f = futureTaskMethodInteface.exec(t);
return f;
} catch (Throwable e) {
logger.error("splitVoidTask" + e.getMessage(), e);
return null;
} finally {
countDownLatch.countDown();
}
});
futureMap.put(t, future);
}
Map<T, F> resultMap = new HashMap<>(futureMap.size());
//检测每个线程的执行结果,如果有future的返回结果为null,则认为执行失败
for (Map.Entry<T, Future<F>> entry : futureMap.entrySet()) {
try {
if (entry.getValue().get() == null) {
throw new Exception("splitFutureTask error");
}
resultMap.put(entry.getKey(), entry.getValue().get());
} catch (Exception e) {
logger.error("splitFutureTask error params:%s,error msg:%s", VJson.writeAsString(entry.getKey()), e.getMessage());
throw new Exception("splitFutureTask error");
}
}
return resultMap;
} catch (Exception e) {
logger.error("多线程执行无返回值方法出现异常");
throw new Exception("splitFutureTask error");
} finally {
executor.shutdown();
}
}
}