使用SpringAOP对IN查询进行多线程拆分效率提升巨大

简介: 本文介绍了一种通过多线程优化大批量 IN 查询性能的方法,并提供了一个基于 Spring AOP 的注解方案。该方案允许用户通过添加注解来自动拆分查询参数,利用多线程并发执行查询并合并结果,从而显著提高接口响应速度。适用于大批量查询场景,尤其是结果能够简单合并的情况。文中详细描述了如何定义 AOP 注解及其实现逻辑。

前言

这个功能是我2021年写的,效果不错,然后在新公司又改良了,经过实践很有用!

我们在项目中经常遇到IN查询,同时IN的参数太多甚至大几百上千,会导致PG性能下降严重进而接口反应太慢。这个应该是前期没规划好,但是事已至此还是要对此进行优化。第一个就是想到通过多线程去查,比如原来是

SELECT * FROM device WHERE id IN (1, 2, 3, 4)

拆分为

SELECT * FROM device WHERE id IN (1, 2)SELECT * FROM device WHERE id IN (3, 4)

并行执行,然后将返回结果合并。

因为用的地方多,每次都要写很麻烦,所以结合SpringAOP写了一个基于注解优化方案,只需要打上注解就可以提升性能了。实现效果以及具体实现逻辑如下:

java

代码解读

复制代码

@SplitWorkAnnotation(setThreadPool = LIST_DEVICE_EXECUTOR, splitLimit = 20, splitGroupNum = 10)
public listDeviceDetail(Long projectId,@NeedSplitParam List<Long> deviceId){
......
}

适用场景和不适用场景

主要适用大批量IN查询,或者某个参数特别大导致性能问题的同时结果能简单合并的,就是说符合以下公式的:

fun(a,b,bigList) = fun(a,b,bigListPart1) + fun(a,b,bigListPart2) 这里的加可以是:合并运算,SUM,COUNT以及求TOPN(合并后再取TOPN)

不适用的典型场景有分页以及不符合上面公式的场景

定义AOP注解

需要定义的注解参数:

  • setThreadPool:线程池,可能阻塞比较大,不要用公共的线程池最好自己定义一个
  • handlerReturnClass:返回值回调函数,对应不同返回值处理逻辑:可能是合并可能取前十条可能求和
  • splitLimit:超过多少需要拆分
  • splitGroupNum:拆分时每组多少个

java

代码解读

复制代码

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SplitWorkAnnotation {

    /**
     * 设置线程池
     *
     * @return {@link ThreadPoolEnum}
     */
    ThreadPoolEnum setThreadPool();

    /**
     * 返回值处理
     *
     * @return {@link Class}<{@link ?} {@link extends} {@link HandleReturn}>
     */
    Class<? extends HandleReturn> handlerReturnClass() default MergeFunction.class;

    /**
     * 超过多少开始拆分 >
     *
     * @return int
     */
    int splitLimit() default 1000;

    /**
     * 拆分后每组多少
     *
     * @return int
     */
    int splitGroupNum() default 100;
}

标记需要拆分参数的注解

加在需要拆分的参数上,只支持一个。因为两两组合情况非常复杂,也一般不符合实际使用情况。

java

代码解读

复制代码


@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface NeedSplitParam {


}

使用AOP实现拆分多线程并发调用合并逻辑

java

代码解读

复制代码

@Aspect
@Component
@Slf4j
public class SplitWorkAspect {

    /**
     * 切入点表达式,拦截方法上有@NeedSplitParaAnnotation注解的所有方法
     *
     * @return void
     * @author tangsiqi
     * @date 2021/8/9 18:17
     */
    @Pointcut("@annotation( com.demo.SplitWorkAnnotation)")
    public void needSplit() {
    }


    /**
     * @param pjp
     * @return {@link Object}
     * @throws Throwable
     */
    @Around("needSplit()")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        Signature signature = pjp.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method targetMethod = methodSignature.getMethod();
        SplitWorkAnnotation splitWorkAnnotation = targetMethod.getAnnotation(SplitWorkAnnotation.class);
        Object[] args = pjp.getArgs();

        int splitLimit = splitWorkAnnotation.splitLimit();
        int splitGroupNum = splitWorkAnnotation.splitGroupNum();
        if (args == null || args.length == 0 || splitLimit <= splitGroupNum) {
            return pjp.proceed();
        }

        int needSplitParamIndex = -1;
        for (int i = 0; i < targetMethod.getParameters().length; i++) {
            Parameter parameter = targetMethod.getParameters()[i];
            NeedSplitParam needSplitParam = parameter.getAnnotation(NeedSplitParam.class);
            if (needSplitParam != null) {
                needSplitParamIndex = i;
                break;
            }
        }

        if (needSplitParamIndex == -1) {
            return pjp.proceed();
        }
        Object needSplitParam = args[needSplitParamIndex];


        //只能处理Object[] 和 Collection
        if (!(needSplitParam instanceof Object[]) && !(needSplitParam instanceof List) && !(needSplitParam instanceof Set)) {
            return pjp.proceed();
        }
        //如果目标参数长度小于拆分下限跳过
        boolean notMeetSplitLen = (needSplitParam instanceof Object[] && ((Object[]) needSplitParam).length <= splitLimit)
                || (needSplitParam instanceof List && ((List) needSplitParam).size() <= splitLimit)
                || (needSplitParam instanceof Set && ((Set) needSplitParam).size() <= splitLimit);
        if (notMeetSplitLen) {
            return pjp.proceed();
        }

        // 去重,这一步看情况也可以不要
        if (needSplitParam instanceof List) {
            List<?> list = (List<?>) needSplitParam;
            if (list.size() > 1) {
                needSplitParam = new ArrayList<>(new HashSet<>(list));
            }
        }
        //算出拆分成几批次
        int batchNum = getBatchNum(needSplitParam, splitGroupNum);
        if (batchNum == 1) {
            return pjp.proceed();
        }
        CompletableFuture<?>[] futures = new CompletableFuture[batchNum];
        ThreadPoolEnum threadPool = splitWorkAnnotation.setThreadPool();
        if (threadPool == null) {
            return pjp.proceed();
        }

        try {
            for (int currentBatch = 0; currentBatch < batchNum; currentBatch++) {
                int finalNeedSplitParamIndex = needSplitParamIndex;
                int finalCurrentBatch = currentBatch;
                Object finalNeedSplitParam = needSplitParam;
                futures[currentBatch] = CompletableFuture.supplyAsync(() -> {
                    Object[] dest = new Object[args.length];
                    //这一步很重要!!!因为多线程运行不能用原理的参数列表了,不然会导致混乱
                    System.arraycopy(args, 0, dest, 0, args.length);
                    try {
                        //将其他参数保持不变,将需要拆分的参数替换为part参数
                        dest[finalNeedSplitParamIndex] = getPartParam(finalNeedSplitParam, splitGroupNum, finalCurrentBatch);
                        return pjp.proceed(dest);
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                }, threadPool.getThreadPoolExecutor());
            }
            CompletableFuture<Void> all = CompletableFuture.allOf(futures);
            all.get();
            Class<? extends HandleReturn> handleReturn = splitWorkAnnotation.handlerReturnClass();

            List<Object> resultList = new ArrayList<>(futures.length);
            for (CompletableFuture<?> future : futures) {
                resultList.add(future.get());
            }
            //获取到每个part的结果然后调用处理函数
            return handleReturn.getDeclaredMethods()[0].invoke(handleReturn.getDeclaredConstructor().newInstance(), resultList);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取批次数目
     *
     * @param needSplitParam1
     * @param splitGroupNum
     * @return {@link Integer}
     */
    public Integer getBatchNum(Object needSplitParam1, Integer splitGroupNum) {
        if (needSplitParam1 instanceof Object[]) {
            Object[] splitParam = (Object[]) needSplitParam1;
            return splitParam.length % splitGroupNum == 0 ? splitParam.length / splitGroupNum : splitParam.length / splitGroupNum + 1;
        } else if (needSplitParam1 instanceof Collection) {
            Collection<?> splitParam = (Collection<?>) needSplitParam1;
            return splitParam.size() % splitGroupNum == 0 ? splitParam.size() / splitGroupNum : splitParam.size() / splitGroupNum + 1;
        } else {
            return 1;
        }
    }

    /**
     * 获取当前批次参数
     *
     * @param needSplitParam
     * @param splitGroupNum
     * @param batch
     * @return {@link Object}
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws InstantiationException
     * @throws IllegalAccessException
     */
    public Object getPartParam(Object needSplitParam, Integer splitGroupNum, Integer batch) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        if (needSplitParam instanceof Object[]) {
            Object[] splitParam = (Object[]) needSplitParam;
            int end = Math.min((batch + 1) * splitGroupNum, splitParam.length);
            return Arrays.copyOfRange(splitParam, batch * splitGroupNum, end);
        } else if (needSplitParam instanceof List) {
            List<?> splitParam = (List<?>) needSplitParam;
            int end = Math.min((batch + 1) * splitGroupNum, splitParam.size());
            return splitParam.subList(batch * splitGroupNum, end);
        } else if (needSplitParam instanceof Set) {
            List splitParam = new ArrayList<>((Set) needSplitParam);
            int end = Math.min((batch + 1) * splitGroupNum, splitParam.size());
            //参数具体化了
            Set<?> set = (Set<?>) needSplitParam.getClass().getDeclaredConstructor().newInstance();
            set.addAll(splitParam.subList(batch * splitGroupNum, end));
            return set;
        } else {
            return null;
        }
    }
}

定义处理返回值的接口

java

代码解读

复制代码

/**
 * 处理返回结果接口
 *
 * @author: TangSiQi
 * @date: 2021年08月13日15:42
 **/
public interface HandleReturn {

    /**
     * 处理返回结果方法
     *
     * @param t 拆分后多次请求结果
     * @return R 处理后的返回结果
     * @author tangsiqi
     * @date 2021/8/13 15:55
     */
    Object handleReturn(List t);
}

实现了一个简单合并的

java

代码解读

复制代码

/**
 * 集合List等合并策略
 *
 * @author: TangSiQi
 * @date: 2021年08月13日15:32
 **/
public class MergeFunction implements HandleReturn {

    @Override
    public Object handleReturn(List results) {
        if (results == null) {
            return null;
        }
        if (results.size() <= 1) {
            //todo
            return results.get(0);
        }
        //这里自己要知道具体返回类型
        List first = (List) results.get(0);
        for (int i = 1; i < results.size(); i++) {
            first.addAll((List) results.get(i));
        }
        return first;
    }
}


转载来源:https://juejin.cn/post/7408859165433577482

相关文章
|
8月前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
SQL 监控 关系型数据库
『叶问』#40,MySQL进程号、连接ID、查询ID、InnoDB线程与系统线程如何对应
『叶问』#40,MySQL进程号、连接ID、查询ID、InnoDB线程与系统线程如何对应
615 0
|
小程序 数据库
通过队列解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)
小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错 ProgrammingError: Recursive use of cursors not allowed. 就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。 刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。
1051 0
|
SQL Java 测试技术
再送一波干货,测试2000线程并发下同时查询1000万条数据库表及索引优化
原文:再送一波干货,测试2000线程并发下同时查询1000万条数据库表及索引优化 继上篇文章《绝对干货,教你4分钟插入1000万条数据到mysql数据库表,快快进来》发布后在博客园首页展示得到了挺多的阅读量,我这篇文章就是对上篇文章的千万级数据库表在高并发访问下如何进行测试访问 这篇文章的知识点如下: 1.
2016 0
|
SQL 网络协议 关系型数据库
如何在MySQL中查询OS线程id(LWP)?
如何在MySQL中查询OS线程id(LWP)? [root@LHRDB ~]# ps -Lf 16833UID        PID  PPID   LWP  C NLWP STIME TTY      STAT   TIME CMDmysql   ...
1751 0
|
SQL
SQL Server 2005 查询处理器未能为执行并行查询启动必要的线程资源。
今天早上程序突然报:Microsoft OLE DB Provider for SQL Server 错误 '80040e14' 查询处理器未能为执行并行查询启动必要的线程资源。 赶快重启了sql服务,恢复正常,不知道什么原因,打算本周安装sql server 2005 sp3.
1346 0
|
28天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
62 1