利用java8 的 CompletableFuture 优化 Flink 程序

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。

一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

java

代码解读

复制代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

java

代码解读

复制代码

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();

CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
                return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
            });
executeFutures.add(executeFuture);

for (int i = 0; i < executeFutures.size(); i++) {
    executeFutures.get(i).get()
    xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

java

代码解读

复制代码

class AddFunction extends AbstractFunction {
    @Override
    public AviatorObject call(Map<String, Object> env,

                              AviatorObject arg1, AviatorObject arg2) {
        Number left = FunctionUtils.getNumberValue(arg1, env);
        Number right = FunctionUtils.getNumberValue(arg2, env);
        return new AviatorDouble(left.intValue() + right.intValue());
    }

    public String getName() {

        return "add" ;
    }
}


public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
   AviatorEvaluator.addFunction(new AddFunction());
    System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
     AviatorEvaluator.addFunction(new AddFunction());
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "testId1" , 1);
    stringObjectHashMap.put( "testId2" , 2);
    Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "ip" , "a1111" );
    // stringObjectHashMap.put("result", "a&B&C&d");
 stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
    stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
    stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
    stringObjectHashMap.put( "testId1" , "sku" );
    stringObjectHashMap.put( "a" , "123" );
    stringObjectHashMap.put( "a1" , "null" );
    stringObjectHashMap.put( "b1" , 123);
 
    AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
    AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)

 execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
    System.out.println(execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
    System.out.println( "###" + execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

java

代码解读

复制代码

## examples/function.av
fn add(x, y) {
  return x + y;
}
p(add(1,2))

java

代码解读

复制代码

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    String function = "## examples/function.av\n" +
            "\n" +
            "fn add(x, y) {\n" +
            "  return x + y;\n" +
            "}" ;
    AviatorEvaluator.defineFunction( "add" , function);
    System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。


转载来源:https://juejin.cn/post/7372114027840094223

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5天前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
169 1
Flink CDC + Hologres高性能数据同步优化实践
|
6天前
|
Java C语言
课时8:Java程序基本概念(标识符与关键字)
课时8介绍Java程序中的标识符与关键字。标识符由字母、数字、下划线和美元符号组成,不能以数字开头且不能使用Java保留字。建议使用有意义的命名,如student_name、age。关键字是特殊标记,如蓝色字体所示。未使用的关键字有goto、const;特殊单词null、true、false不算关键字。JDK1.4后新增assert,JDK1.5后新增enum。
|
6天前
|
Java 编译器
课时7:Java程序基本概念(注释)
课时7介绍了Java程序中的注释。编程语言有其语法和语义,注释有助于理解代码需求,防止断档。Java支持三类注释:单行(//)、多行(/* */)和文档注释(/** */)。注释不会被编译器编译。范例中展示了如何在代码中使用注释,并强调了注释对项目文档管理的重要性。
|
3天前
|
存储 Java 数据库连接
【YashanDB知识库】Java程序调用存储过程,在提取clob时报YAS-00004
【YashanDB知识库】Java程序调用存储过程,在提取clob时报YAS-00004
|
3天前
|
搜索推荐 Java Android开发
课时146:使用JDT开发Java程序
在 Eclipse 之中提供有 JDT环境可以实现java 程序的开发,下面就通过一些功能进行演示。 项目开发流程
|
6天前
|
Java 开发工具
课时5:第一个Java程序
课时5介绍了编写第一个Java程序的步骤,包括创建Hello.java文件、编写“Hello World”代码、编译和运行程序。主要内容有:1) 新建并编辑Hello.java;2) 编译Java源文件生成.class文件;3) 通过命令行解释执行Java程序;4) 解释主方法的作用及信息输出操作。本课强调了类定义、文件命名规则和基本程序结构的重要性,并建议初学者使用记事本编写代码以熟悉基础语法。
|
12天前
|
人工智能 算法 Java
Java高级应用开发:AI赋能下的智能代码生成与优化
本文探讨了AI技术,特别是像DeepSeek这样的智能工具,在Java高级应用开发中的应用。AI在代码生成、优化、自动化测试等方面发挥重要作用,可自动生成高质量代码片段、提出优化建议并检测潜在错误,显著提升开发效率与代码质量。未来,AI将进一步推动Java开发的智能化和自动化,为开发者带来全新的开发体验。
|
12天前
|
人工智能 Java 数据处理
Java高级应用开发:基于AI的微服务架构优化与性能调优
在现代企业级应用开发中,微服务架构虽带来灵活性和可扩展性,但也增加了系统复杂性和性能瓶颈。本文探讨如何利用AI技术,特别是像DeepSeek这样的智能工具,优化Java微服务架构。AI通过智能分析系统运行数据,自动识别并解决性能瓶颈,优化服务拆分、通信方式及资源管理,实现高效性能调优,助力开发者设计更合理的微服务架构,迎接未来智能化开发的新时代。
|
13天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
129 60
【Java并发】【线程池】带你从0-1入门线程池
|
2天前
|
存储 网络协议 安全
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
43 23