利用java8 的 CompletableFuture 优化 Flink 程序

简介: 本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。

一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

java

代码解读

复制代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

java

代码解读

复制代码

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();

CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
                return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
            });
executeFutures.add(executeFuture);

for (int i = 0; i < executeFutures.size(); i++) {
    executeFutures.get(i).get()
    xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

java

代码解读

复制代码

class AddFunction extends AbstractFunction {
    @Override
    public AviatorObject call(Map<String, Object> env,

                              AviatorObject arg1, AviatorObject arg2) {
        Number left = FunctionUtils.getNumberValue(arg1, env);
        Number right = FunctionUtils.getNumberValue(arg2, env);
        return new AviatorDouble(left.intValue() + right.intValue());
    }

    public String getName() {

        return "add" ;
    }
}


public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
   AviatorEvaluator.addFunction(new AddFunction());
    System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
     AviatorEvaluator.addFunction(new AddFunction());
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "testId1" , 1);
    stringObjectHashMap.put( "testId2" , 2);
    Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "ip" , "a1111" );
    // stringObjectHashMap.put("result", "a&B&C&d");
 stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
    stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
    stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
    stringObjectHashMap.put( "testId1" , "sku" );
    stringObjectHashMap.put( "a" , "123" );
    stringObjectHashMap.put( "a1" , "null" );
    stringObjectHashMap.put( "b1" , 123);
 
    AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
    AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)

 execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
    System.out.println(execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
    System.out.println( "###" + execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

java

代码解读

复制代码

## examples/function.av
fn add(x, y) {
  return x + y;
}
p(add(1,2))

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    String function = "## examples/function.av\n" +
            "\n" +
            "fn add(x, y) {\n" +
            "  return x + y;\n" +
            "}" ;
    AviatorEvaluator.defineFunction( "add" , function);
    System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。


转载来源:https://juejin.cn/post/7372114027840094223

相关文章
|
27天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
4天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
398 17
|
7天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
19天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
7天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
355 2
|
22天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
24天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2598 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
6天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
277 2
|
4天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
106 65
|
23天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1581 17
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码