项目里多线程常用姿势,很给力的使用方法-阿里云开发者社区

开发者社区> 开发与运维> 正文

项目里多线程常用姿势,很给力的使用方法

简介: 多线程

最近项目里经常会有一部分大批量数据的校验处理等对接口响应有要求的情景,这时候往往只能通过多线程的方式去处理请求,能达到快速响应

下面因为多线程用的比较多,所以需要复用代码,这里我基于java8的函数式编程抽出了共同方法,不侵占业务实现,后面还有个多线程带返回值的方式。大家可以参考参考思路。

public class TaskThreadPoolService {

private Logger logger = LoggerFactory.getLogger(TaskThreadPoolService.class);

/**
 * 默认线程大小
 */
private final static Integer DEFAULT_POOL_SIZE = 20;

/**
 * 默认最大活跃线程大小
 */
private final static Integer DEFAULT_MAX_ALIVE_POOL_SIZE = 50;

/**
 * 多线程执行无返回值方法
 * 线程池大小为20,最大活跃线程数50,存活时间为0的无边界队列线程池
 *
 * @param voidTaskMethodInteface
 * @param list
 * @param <T>
 */
public <T> void splitVoidTask(VoidTaskMethodInterface<T> voidTaskMethodInteface, List<T> list) throws Exception {
    splitVoidTask(voidTaskMethodInteface, list, DEFAULT_POOL_SIZE, DEFAULT_MAX_ALIVE_POOL_SIZE);
}

/**
 * 多线程执行无返回值方法(可指定部分线程池参数)
 *
 * @param voidTaskMethodInteface
 * @param list
 * @param <T>
 */
public <T> void splitVoidTask(VoidTaskMethodInterface<T> voidTaskMethodInteface, List<T> list, int poolSize, int maxSize) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, maxSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    try {
        for (T t : list) {
            executor.submit(() -> {
                try {
                    voidTaskMethodInteface.exec(t);
                } catch (Throwable e) {
                    logger.error("splitVoidTask" + e.getMessage(), e);
                } finally {
                    logger.info("do what you want to do");
                }
            });
        }
    } catch (Exception e) {
        logger.error("多线程执行无返回值方法出现异常");
        throw new Exception("splitVoidTask error");
    } finally {
        executor.shutdown();
    }
}

/**
 * 带返回值的多线程异步等待所有结果返回的
 * 线程池大小为20,最大活跃线程数50,存活时间为0的无边界队列线程池
 * 单个返回的结果不能返回null,影响结果判定
 *
 * @param futureTaskMethodInteface
 * @param list
 * @param <T>
 * @param <F>
 * @return
 * @throws Exception
 */
public <T, F> Map<T, F> splitFutureTask(FutureTaskMethodInterface<T, F> futureTaskMethodInteface, List<T> list) throws Exception {
    return splitFutureTask(futureTaskMethodInteface, list, DEFAULT_POOL_SIZE, DEFAULT_MAX_ALIVE_POOL_SIZE);
}

/**
 * 带返回值的多线程异步等待所有结果返回的(可指定部分线程池参数)
 * 单个返回的结果不能返回null,影响结果判定
 *
 * @param futureTaskMethodInteface
 * @param list
 * @param <T>
 * @param <F>
 * @return
 * @throws Exception
 */
public <T, F> Map<T, F> splitFutureTask(FutureTaskMethodInterface<T, F> futureTaskMethodInteface, List<T> list, int poolSize, int maxSize) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, maxSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    try {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        Map<T, Future<F>> futureMap = new HashMap<>(list.size());
        for (T t : list) {
            Future<F> future = executor.submit(() -> {
                try {
                    F f = futureTaskMethodInteface.exec(t);
                    return f;
                } catch (Throwable e) {
                    logger.error("splitVoidTask" + e.getMessage(), e);
                    return null;
                } finally {
                    countDownLatch.countDown();
                }
            });
            futureMap.put(t, future);
        }
        Map<T, F> resultMap = new HashMap<>(futureMap.size());
        //检测每个线程的执行结果,如果有future的返回结果为null,则认为执行失败
        for (Map.Entry<T, Future<F>> entry : futureMap.entrySet()) {
            try {
                if (entry.getValue().get() == null) {
                    throw new Exception("splitFutureTask error");
                }
                resultMap.put(entry.getKey(), entry.getValue().get());
            } catch (Exception e) {
                logger.error("splitFutureTask error params:%s,error msg:%s", VJson.writeAsString(entry.getKey()), e.getMessage());
                throw new Exception("splitFutureTask error");
            }
        }
        return resultMap;
    } catch (Exception e) {
        logger.error("多线程执行无返回值方法出现异常");
        throw new Exception("splitFutureTask error");
    } finally {
        executor.shutdown();
    }
}
}

版权声明:本文首发在云栖社区,遵循云栖社区版权声明:本文内容由互联网用户自发贡献,版权归用户作者所有,云栖社区不为本文内容承担相关法律责任。云栖社区已升级为阿里云开发者社区。如果您发现本文中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,阿里云开发者社区将协助删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章