从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策

通过使用实时分析,企业可以快速有效地对用户行为模式做出反应。这使他们能够利用可能错过的机会,防止问题变得更糟。

Apache Kafka 是一个流行的事件流平台,可用于实时摄取从多个垂直领域(如物联网、金融交易、库存等)的各种来源生成的数据/事件。然后,这些数据可以流式传输到多个下游应用程序或引擎中,以便进一步处理和最终分析,以支持决策。

Apache Flink 是一个强大的引擎,用于在到达 Kafka 主题时通过修改、丰富或重组流数据来优化或增强流数据。从本质上讲,Flink 是一个下游应用程序,它持续消耗来自 Kafka 主题的数据流进行处理,然后将处理后的数据摄取到各个 Kafka 主题中。最终,可以集成 Apache Druid,以使用来自 Kafka 主题的处理后的流数据进行分析、查询和做出即时业务决策。

image.png

在我之前的文章中,我解释了如何将 Flink 1.18 与 Kafka 3.7.0 集成。在本文中,我将概述将处理后的数据从 Flink 1.18.1 传输到 Kafka 2.13-3.7.0 主题的步骤。几个月前,我们发表了一篇单独的文章,详细介绍了如何将 Kafka 主题中的流数据引入 Apache Druid 进行分析和查询。你可以在这里阅读。

执行环境

我们配置了一个多节点集群(三个节点),其中每个节点至少有 8 GB RAM 和 250 GB SSD 以及 Ubuntu-22.04.2 amd64 作为操作系统。

OpenJDK 11 在每个节点上都安装了环境变量配置。JAVA_HOME

Python 3 或 Python 2 以及 Perl 5 在每个节点上都可用。

三节点 Apache Kafka-3.7.0 集群已启动并运行 Apache Zookeeper -3.5.6。在两个节点上。

Apache Druid 29.0.0 已在集群中尚未为 Kafka 代理安装 Zookeeper 的节点上安装和配置。Zookeeper 已在其他两个节点上安装和配置。Leader 代理已启动并运行在运行 Druid 的节点上。

使用 Datafaker 库开发了一个模拟器,每隔 10 秒生成一次实时虚假的金融交易 JSON 记录,并将其发布到创建的 Kafka 主题中。下面是模拟器生成的 JSON 数据馈送。

JSON的

{"timestamp":"2024-03-14T04:31:09Z ","upiID":"9972342663@ybl","name":"Kiran Marar","note":" ","amount":"14582.00","currency":"INR","geoLocation":"Latitude: 54.1841745 Longitude: 13.1060775","deviceOS":"IOS","targetApp":"PhonePe","merchantTransactionId":"ebd03de9176201455419cce11bbfed157a","merchantUserId":"65107454076524@ybl"}

解压 Druid 和 Kafka 的 leader broker 未运行的节点上的 Apache Flink-1.18.1-bin-scala_2.12.tgz 的存档

在 Flink 中运行流式处理作业

我们将深入研究从 Kafka 主题中提取数据的过程,其中传入的消息是从模拟器发布,在其上执行处理任务,然后将处理后的数据重新集成回多节点 Kafka 集群的其他主题。

我们开发了一个 Java 程序 (),作为作业提交给 Flink 来执行上述步骤,考虑 2 分钟的窗口,并计算模拟 UPI 事务数据流上同一手机号码 () 的平均交易金额。以下 jar 文件列表已包含在项目构建或类路径中。StreamingToFlinkJob.javaupi id  

image.png

使用下面的代码,我们可以在开发的 Java 类中得到 Flink 执行环境。  

爪哇岛

Configuration conf = new Configuration();
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

现在,我们应该读取模拟器已经发布到 Java 程序内 Kafka 主题的消息/流。这是代码块。

爪哇岛

KafkaSource kafkaSource = KafkaSource.<UPITransaction>builder()
    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)// IP Address with port 9092 where leader broker is running in cluster
    .setTopics(IKafkaConstants.INPUT_UPITransaction_TOPIC_NAME)
    .setGroupId("upigroup")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new KafkaUPISchema())
    .build();

要从 Kafka 中检索信息,在 Flink 中设置反序列化模式对于处理 JSON 格式的事件、将原始数据转换为结构化形式至关重要。重要的是,需要设置为 no.of Kafka 主题分区,否则水印将不适用于源,并且数据不会发布到接收器。setParallelism

爪哇岛

DataStream<UPITransaction> stream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(2)), "Kafka Source").setParallelism(1);


通过从 Kafka 成功检索事件,我们可以通过合并处理步骤来增强流式处理作业。后续代码片段读取 Kafka 数据,按手机号码 () 进行组织,并计算每个手机号码的平均价格。为了实现这一点,我们开发了一个自定义窗口函数来计算平均值,并实现了水印以有效地处理事件时间语义。下面是代码片段:upiID

爪哇岛

SerializableTimestampAssigner<UPITransaction> sz = new SerializableTimestampAssigner<UPITransaction>() {
   @Override
   public long extractTimestamp(UPITransaction transaction, long l) {
    try {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
     Date date = sdf.parse(transaction.eventTime);
     return date.getTime();
    } catch (Exception e) {
     return 0;
    }
   }
  };
 
WatermarkStrategy<UPITransaction> watermarkStrategy = WatermarkStrategy.<UPITransaction>forBoundedOutOfOrderness(Duration.ofMillis(100)).withTimestampAssigner(sz);
DataStream<UPITransaction> watermarkDataStream = stream.assignTimestampsAndWatermarks(watermarkStrategy);
 
//Instead of event time, we can use window based on processing time. Using TumblingProcessingTimeWindows  
DataStream<TransactionAgg> groupedData = watermarkDataStream.keyBy("upiId").window(TumblingEventTimeWindows.of(Time.milliseconds(2500),
       Time.milliseconds(500))).sum("amount"); 
          .apply(new TransactionAgg());

最终,在 Flink 内部执行处理逻辑(基于手机号码计算同一 UPI ID 在连续交易流流 2 分钟窗口内的平均价格)。这是 Window 函数的代码块,用于计算每个 UPI ID 或手机号码的平均金额。

爪哇岛

public class TransactionAgg
   implements WindowFunction<UPITransaction, TransactionAgg, Tuple, TimeWindow> {

  @Override
  public void apply(Tuple key, TimeWindow window, Iterable<UPITransaction> values, Collector<TransactionAgg> out) {
   Integer sum = 0; //Consider whole number
   int count = 0;
   String upiID = null ;
   for (UPITransaction value : values) {
    sum += value.amount;
    upiID = value.upiID;
    count++;
   }

   TransactionAgg output = new TransactionAgg();
   output.upiID = upiID;
   output.eventTime = window.getEnd();
   output.avgAmount = (sum / count);
   out.collect( output);
  }

 }

我们已经处理了数据。下一步是序列化对象并将其发送到其他 Kafka 主题。在开发的 Java 代码 () 中添加 a,将处理后的数据从 Flink 引擎发送到在多节点 Kafka 集群上创建的不同 Kafka 主题。下面是在将对象发送/发布到 Kafka 主题之前序列化对象的代码片段:KafkaSinkStreamingToFlinkJob.java

爪哇岛

public class KafkaTrasactionSinkSchema implements KafkaRecordSerializationSchema<TransactionAgg> {

   
    @Override
    public ProducerRecord<byte[], byte[]> serialize(
            TransactionAgg aggTransaction, KafkaSinkContext context, Long timestamp) {
        try {
            return new ProducerRecord<>(
                    topic,
                    null, // not specified  partition so setting null
                    aggTransaction.eventTime,
                    aggTransaction.upiID.getBytes(),
                    objectMapper.writeValueAsBytes(aggTransaction));
        } catch (Exception e) {
            throw new IllegalArgumentException(
                    "Exception on serialize record: " + aggTransaction, e);
        }
       
    }
}

而且,下面是用于接收已处理数据的代码块,该数据发送回不同的 Kafka 主题。

爪哇岛

KafkaSink<TransactionAgg> sink = KafkaSink.<TransactionAgg>builder()
    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)
    .setRecordSerializer(new KafkaTrasactionSinkSchema(IKafkaConstants.OUTPUT_UPITRANSACTION_TOPIC_NAME))
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();
 groupedData.sinkTo(sink); // DataStream that created above for TransactionAgg
  env.execute();

将 Druid 与 Kafka 主题连接起来

在最后一步中,我们需要将 Druid 与 Kafka 主题集成,以消耗 Flink 持续发布的处理后的数据流。借助 Apache Druid,我们可以直接连接 Apache Kafka,从而可以持续摄取实时数据,然后进行查询,从而在现场做出业务决策,而无需干预任何第三方系统或应用程序。Apache Druid 的另一个优点是,我们不需要配置或安装任何第三方 UI 应用程序来查看登陆或发布到 Kafka 主题的数据。为了精简这篇文章,我省略了将 Druid 与 Apache Kafka 集成的步骤。但是,几个月前,我发表了一篇关于这个主题的文章(本文前面链接)。您可以阅读它并遵循相同的方法。

结语

上面提供的代码片段仅供理解之用。它说明了从 Kafka 主题获取消息/数据流、处理消耗的数据以及最终将修改后的数据发送/推送到其他 Kafka 主题的顺序步骤。这允许 Druid 拾取修改后的数据流进行查询,作为最后一步进行分析。稍后,如果您有兴趣在自己的基础架构上执行整个代码库,我们将在 GitHub 上上传它。


目录
相关文章
|
8天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
274 33
The Past, Present and Future of Apache Flink
|
1月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
86 7
|
1月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
83 5
|
1月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
354 4
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
21天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
53 5
|
1月前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
23天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
36 1
|
1月前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
1月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
1月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
47 0

推荐镜像

更多
下一篇
DataWorks