Flink 任务 Jackson 解析 JSON 使用不当引发的反压问题

简介: 背景最近业务方反馈线上一个 topic 的数据延迟比较大,然后我查看了这个 topic 的数据是由一个 Flink 任务产生的,于是就找到了这个任务开始排查问题,发现这个任务是一个非常简单的任务,大致的逻辑是 kafka source -> flatmap -> filter -> map -> sink kafka.中间没有复杂的操作,我在本地写了一个简单的程序模拟线上的任务.方便大家理解, 任务的 DAG 如下图所示

背景


最近业务方反馈线上一个 topic 的数据延迟比较大,然后我查看了这个 topic 的数据是由一个 Flink 任务产生的,于是就找到了这个任务开始排查问题,发现这个任务是一个非常简单的任务,大致的逻辑是 kafka source -> flatmap -> filter -> map -> sink kafka.中间没有复杂的操作,我在本地写了一个简单的程序模拟线上的任务.方便大家理解, 任务的 DAG 如下图所示



image-20210827230429171


线上任务 source 的并行度设置的是 80 flatmap filter map 的并行度也是 80 所以和 source 都 chain 在一起,两个 sink 的并行度设置的都是 30,我这里本地就设置成 8 和 3 了.


排查过程

因为反馈说是下游 topic 的数据有延迟,我的第一反应是看任务是否有反压,通过 Flink 的 UI 上的 BackPressure 发现任务并没有反压,然后查看了第一个 operator 输入数据量的 QPS 大概是 50w/s 经过 filter 之后输出数据量的 QPS 是 15w/s 左右,然后查看了 kafka 的监控发现写入的 QPS 也是 15w/s ,这就说明确实不是因为写入 kafka 慢引起的反压,因为你产生了多少我就写入了多少,但是最终的数据有延迟,那就说明任务反压的地方是在第一个 operator 中的某个算子上,但是因为 operator chain 把所有的算子都 chain 在一起,不太方便定位反压的位置.于是我就在 flatmap 后面调用 disableChaining 打断了 operator chain.这个时候任务的 DAG 变成了下面这样.



image-20210827231856263


这个时候在查看 source 的 BackPressure 显示都是 high 状态,说明反压出现在 flatmap 算子,刚开始猜测可能是因为 flatmap 的时候数据量会暴增导致处理不过来了,所以就尝试增大了 flatmap 算子的并行度,但是发现并没有明显的效果,任务反压还是比较严重,哪怕是增大并行度到 500 效果都不是很明显,这就说明跟 flatmap 的数据量大小及并行度没有关系,没办法只能看代码了,大概看了下任务逻辑,代码非常简单,不应该会出现性能问题,唯一不足的就是代码逻辑不够简洁,略显繁琐,且有重复的逻辑,其实 filter 和 map 算子是没有必要的,优化了这些地方之后本以为就能解决反压问题,可是结果还是出现了反压.



image-20210829110127054


这就有点奇怪了,代码中没有复杂的逻辑,也没有和第三方交互,按道理来说不应该会有性能问题,再次查看代码发现代码里面解析 JSON 数据用的是 Jackson 类库,Jackson 的稳定性和性能肯定是没有问题的,在国外应用比较广泛,比如 springboot 包括 Flink 这种大型的框架内默认都使用 Jackson 更加能说明这一点,国内使用 Fastjson 类库会比较多一点.然后仔细看代码发现使用 Jackson 的时候需要实例化 ObjectMapper 对象,但是代码里面是在 flatmap 方法里面实例化 ObjectMapper 对象的,也就是说每来一条数据都需要实例化一次 ObjectMapper 对象,这显然是不合理的.刚开始没有发现是因为解析 json 被封装在一个方法里面,没有进到方法里面来看,那么究竟是不是因为 ObjectMapper 实例化影响到性能呢? 其实非常简单,我们可以在本地测试一下提前实例化好 ObjectMapper 和每条数据都实例化一遍的差别就能得到答案.测试代码如下


public static long a(String json) throws IOException {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i ++) {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.readValue(json, List.class);
        }
        long end = System.currentTimeMillis();
        return end - start;
    }
    public static long b(String json) throws IOException {
        long start = System.currentTimeMillis();
        ObjectMapper objectMapper = new ObjectMapper();
        for (int i = 0; i < 100000; i ++) {
            objectMapper.readValue(json, List.class);
        }
        long end = System.currentTimeMillis();
        return end - start;
    }
    String json = "[{\"name\":\"JasonLee\",\"age\":18},{\"name\":\"JasonLee1\",\"age\":18}]";
    System.out.println(a(json));
    System.out.println(b(json));
    // 打印的结果是:
    68425
    316


从上面测试的结果可以发现,这两种写法性能相差甚大,耗时完全不在一个量级上,也就是说 jackson 的 ObjectMapper 实例化是一个非常耗时的过程,这就能解释任务为什么会在 flatmap 算子出现反压了.


解决


既然知道了反压的原因,那解决办法就非常的简单了,把 ObjectMapper 对象的实例化放在 open 方法里面即可,类似于创建 JDBC 连接,让每个 task 初始化一次 ObjectMapper 对象,然后在 flatmap 方法里面直接使用.这样就不用每条数据都初始化一次.


SingleOutputStreamOperator<JasonEntity> filter = jasonEntityDataStreamSource.flatMap(new RichFlatMapFunction<JasonEntity, JasonEntity>() {
    private ObjectMapper objectMapper = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        objectMapper = new ObjectMapper();
    }
    @Override
    public void flatMap(JasonEntity jasonEntity, Collector<JasonEntity> collector) throws Exception {
        List<Map<String, String>> list = objectMapper.readValue("[\n" +
                "  {\"name\":\"JasonLee\",\"age\":18},\n" +
                "  {\"name\":\"JasonLee1\",\"age\":18}\n" +
                "]", List.class);
        System.out.println(list.toString());
        collector.collect(jasonEntity);
    }
}).disableChaining();


当然除了改成上面的写法外,还可以直接用 fastjson 来解析 JSON 数据,jackson 和 fastjson 的性能是差不多的,网上也有很多这两者对比的文章,感兴趣的也可以自己测试一下.这里就不做对比测试了.


最后把修改后的代码提交到线上后发现没有出现反压,下游的数据自然也不会有延迟了.其实这是一个非常小的问题,或者说是一个小细节吧,但是却能带来严重的后果.所以我们在写代码的时候还是要多注意细节,尽可能提高程序的性能.

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
611 3
|
缓存 Java 调度
Java并发编程:深入解析线程池与Future任务
【7月更文挑战第9天】线程池和Future任务是Java并发编程中非常重要的概念。线程池通过重用线程减少了线程创建和销毁的开销,提高了资源利用率。而Future接口则提供了检查异步任务状态和获取任务结果的能力,使得异步编程更加灵活和强大。掌握这些概念,将有助于我们编写出更高效、更可靠的并发程序。
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
608 16
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
337 12
|
数据采集 分布式计算 DataWorks
DataWorks产品使用合集之任务工作流中遇到了日志信息显示参数值没有正确解析的问题,该如何处理
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
JSON 安全 Java
深入解析Jackson的ObjectMapper:核心功能与方法指南
深入解析Jackson的ObjectMapper:核心功能与方法指南
1003 1
|
自然语言处理 计算机视觉 Python
VisProg解析:根据自然语言指令解决复杂视觉任务
VisProg是一个神经符号系统,能够根据自然语言指令生成并执行Python程序来解决复杂的视觉任务,提供可解释的解决方案。
287 0
|
JSON 资源调度 Kubernetes
实时计算 Flink版操作报错合集之解析JSON数组时,遇到报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
387 0
|
7月前
|
JSON API 数据格式
淘宝拍立淘按图搜索API系列,json数据返回
淘宝拍立淘按图搜索API系列通过图像识别技术实现商品搜索功能,调用后返回的JSON数据包含商品标题、图片链接、价格、销量、相似度评分等核心字段,支持分页和详细商品信息展示。以下是该API接口返回的JSON数据示例及详细解析:
|
7月前
|
JSON 算法 API
Python采集淘宝商品评论API接口及JSON数据返回全程指南
Python采集淘宝商品评论API接口及JSON数据返回全程指南

推荐镜像

更多
  • DNS