利用java8 的 CompletableFuture 优化 Flink 程序

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。

一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

java

代码解读

复制代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

java

代码解读

复制代码

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();

CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
                return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
            });
executeFutures.add(executeFuture);

for (int i = 0; i < executeFutures.size(); i++) {
    executeFutures.get(i).get()
    xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

java

代码解读

复制代码

class AddFunction extends AbstractFunction {
    @Override
    public AviatorObject call(Map<String, Object> env,

                              AviatorObject arg1, AviatorObject arg2) {
        Number left = FunctionUtils.getNumberValue(arg1, env);
        Number right = FunctionUtils.getNumberValue(arg2, env);
        return new AviatorDouble(left.intValue() + right.intValue());
    }

    public String getName() {

        return "add" ;
    }
}


public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
   AviatorEvaluator.addFunction(new AddFunction());
    System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
     AviatorEvaluator.addFunction(new AddFunction());
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "testId1" , 1);
    stringObjectHashMap.put( "testId2" , 2);
    Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "ip" , "a1111" );
    // stringObjectHashMap.put("result", "a&B&C&d");
 stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
    stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
    stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
    stringObjectHashMap.put( "testId1" , "sku" );
    stringObjectHashMap.put( "a" , "123" );
    stringObjectHashMap.put( "a1" , "null" );
    stringObjectHashMap.put( "b1" , 123);
 
    AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
    AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)

 execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
    System.out.println(execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
    System.out.println( "###" + execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

java

代码解读

复制代码

## examples/function.av
fn add(x, y) {
  return x + y;
}
p(add(1,2))

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    String function = "## examples/function.av\n" +
            "\n" +
            "fn add(x, y) {\n" +
            "  return x + y;\n" +
            "}" ;
    AviatorEvaluator.defineFunction( "add" , function);
    System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。


转载来源:https://juejin.cn/post/7372114027840094223

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4天前
|
消息中间件 资源调度 Java
用Java实现samza转换成flink
【10月更文挑战第20天】
|
9天前
|
Java 数据库连接 数据库
优化之路:Java连接池技术助力数据库性能飞跃
在Java应用开发中,数据库操作常成为性能瓶颈。频繁的数据库连接建立和断开增加了系统开销,导致性能下降。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接,显著减少连接开销,提升系统性能。文章详细介绍了连接池的优势、选择标准、使用方法及优化策略,帮助开发者实现数据库性能的飞跃。
18 4
|
7天前
|
存储 Java 开发者
成功优化!Java 基础 Docker 镜像从 674MB 缩减到 58MB 的经验分享
本文分享了如何通过 jlink 和 jdeps 工具将 Java 基础 Docker 镜像从 674MB 优化至 58MB 的经验。首先介绍了选择合适的基础镜像的重要性,然后详细讲解了使用 jlink 构建自定义 JRE 镜像的方法,并通过 jdeps 自动化模块依赖分析,最终实现了镜像的大幅缩减。此外,文章还提供了实用的 .dockerignore 文件技巧和选择安全、兼容的基础镜像的建议,帮助开发者提升镜像优化的效果。
|
12天前
|
缓存 前端开发 JavaScript
9大高性能优化经验总结,Java高级岗必备技能,强烈建议收藏
关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。本文介绍了9种性能优化方法,涵盖代码优化、数据库优化、连接池调优、架构层面优化、分布式缓存、异步化、Web前端优化、服务化、硬件升级、搜索引擎和产品逻辑优化。欢迎留言交流。
|
11天前
|
存储 缓存 Java
Java应用瘦身记:Docker镜像从674MB优化至58MB的实践指南
【10月更文挑战第22天】 在容器化时代,Docker镜像的大小直接影响到应用的部署速度和运行效率。一个轻量级的Docker镜像可以减少存储成本、加快启动时间,并提高资源利用率。本文将分享如何将一个Java基础Docker镜像从674MB缩减到58MB的实践经验。
22 1
|
12天前
|
消息中间件 监控 算法
Java性能优化:策略与实践
【10月更文挑战第21】Java性能优化:策略与实践
|
12天前
|
SQL 监控 Java
Java性能优化:提升应用效率与响应速度的全面指南
【10月更文挑战第21】Java性能优化:提升应用效率与响应速度的全面指南
|
18天前
|
Java Maven 数据安全/隐私保护
如何实现Java打包程序的加密代码混淆,避免被反编译?
【10月更文挑战第15天】如何实现Java打包程序的加密代码混淆,避免被反编译?
31 2
|
21天前
|
安全 Java Linux
java程序设置开机自启
java程序设置开机自启
|
17天前
|
缓存 Java 数据处理
java查询大量数据优化
通过结合的高性能云服务,如其提供的弹性计算资源与全球加速网络,可以进一步增强这些优化策略的效果,确保数据处理环节更加迅速、可靠。蓝易云不仅提供稳定的基础架构,还拥有强大的安全防护和灵活的服务选项,是优化大型数据处理项目不可或缺的合作伙伴。
26 0