从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策

通过使用实时分析,企业可以快速有效地对用户行为模式做出反应。这使他们能够利用可能错过的机会,防止问题变得更糟。

Apache Kafka 是一个流行的事件流平台,可用于实时摄取从多个垂直领域(如物联网、金融交易、库存等)的各种来源生成的数据/事件。然后,这些数据可以流式传输到多个下游应用程序或引擎中,以便进一步处理和最终分析,以支持决策。

Apache Flink 是一个强大的引擎,用于在到达 Kafka 主题时通过修改、丰富或重组流数据来优化或增强流数据。从本质上讲,Flink 是一个下游应用程序,它持续消耗来自 Kafka 主题的数据流进行处理,然后将处理后的数据摄取到各个 Kafka 主题中。最终,可以集成 Apache Druid,以使用来自 Kafka 主题的处理后的流数据进行分析、查询和做出即时业务决策。

image.png

在我之前的文章中,我解释了如何将 Flink 1.18 与 Kafka 3.7.0 集成。在本文中,我将概述将处理后的数据从 Flink 1.18.1 传输到 Kafka 2.13-3.7.0 主题的步骤。几个月前,我们发表了一篇单独的文章,详细介绍了如何将 Kafka 主题中的流数据引入 Apache Druid 进行分析和查询。你可以在这里阅读。

执行环境

我们配置了一个多节点集群(三个节点),其中每个节点至少有 8 GB RAM 和 250 GB SSD 以及 Ubuntu-22.04.2 amd64 作为操作系统。

OpenJDK 11 在每个节点上都安装了环境变量配置。JAVA_HOME

Python 3 或 Python 2 以及 Perl 5 在每个节点上都可用。

三节点 Apache Kafka-3.7.0 集群已启动并运行 Apache Zookeeper -3.5.6。在两个节点上。

Apache Druid 29.0.0 已在集群中尚未为 Kafka 代理安装 Zookeeper 的节点上安装和配置。Zookeeper 已在其他两个节点上安装和配置。Leader 代理已启动并运行在运行 Druid 的节点上。

使用 Datafaker 库开发了一个模拟器,每隔 10 秒生成一次实时虚假的金融交易 JSON 记录,并将其发布到创建的 Kafka 主题中。下面是模拟器生成的 JSON 数据馈送。

JSON的

{"timestamp":"2024-03-14T04:31:09Z ","upiID":"9972342663@ybl","name":"Kiran Marar","note":" ","amount":"14582.00","currency":"INR","geoLocation":"Latitude: 54.1841745 Longitude: 13.1060775","deviceOS":"IOS","targetApp":"PhonePe","merchantTransactionId":"ebd03de9176201455419cce11bbfed157a","merchantUserId":"65107454076524@ybl"}

解压 Druid 和 Kafka 的 leader broker 未运行的节点上的 Apache Flink-1.18.1-bin-scala_2.12.tgz 的存档

在 Flink 中运行流式处理作业

我们将深入研究从 Kafka 主题中提取数据的过程,其中传入的消息是从模拟器发布,在其上执行处理任务,然后将处理后的数据重新集成回多节点 Kafka 集群的其他主题。

我们开发了一个 Java 程序 (),作为作业提交给 Flink 来执行上述步骤,考虑 2 分钟的窗口,并计算模拟 UPI 事务数据流上同一手机号码 () 的平均交易金额。以下 jar 文件列表已包含在项目构建或类路径中。StreamingToFlinkJob.javaupi id  

image.png

使用下面的代码,我们可以在开发的 Java 类中得到 Flink 执行环境。  

爪哇岛

Configuration conf = new Configuration();
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

现在,我们应该读取模拟器已经发布到 Java 程序内 Kafka 主题的消息/流。这是代码块。

爪哇岛

KafkaSource kafkaSource = KafkaSource.<UPITransaction>builder()
    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)// IP Address with port 9092 where leader broker is running in cluster
    .setTopics(IKafkaConstants.INPUT_UPITransaction_TOPIC_NAME)
    .setGroupId("upigroup")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new KafkaUPISchema())
    .build();

要从 Kafka 中检索信息,在 Flink 中设置反序列化模式对于处理 JSON 格式的事件、将原始数据转换为结构化形式至关重要。重要的是,需要设置为 no.of Kafka 主题分区,否则水印将不适用于源,并且数据不会发布到接收器。setParallelism

爪哇岛

DataStream<UPITransaction> stream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(2)), "Kafka Source").setParallelism(1);


通过从 Kafka 成功检索事件,我们可以通过合并处理步骤来增强流式处理作业。后续代码片段读取 Kafka 数据,按手机号码 () 进行组织,并计算每个手机号码的平均价格。为了实现这一点,我们开发了一个自定义窗口函数来计算平均值,并实现了水印以有效地处理事件时间语义。下面是代码片段:upiID

爪哇岛

SerializableTimestampAssigner<UPITransaction> sz = new SerializableTimestampAssigner<UPITransaction>() {
   @Override
   public long extractTimestamp(UPITransaction transaction, long l) {
    try {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
     Date date = sdf.parse(transaction.eventTime);
     return date.getTime();
    } catch (Exception e) {
     return 0;
    }
   }
  };
 
WatermarkStrategy<UPITransaction> watermarkStrategy = WatermarkStrategy.<UPITransaction>forBoundedOutOfOrderness(Duration.ofMillis(100)).withTimestampAssigner(sz);
DataStream<UPITransaction> watermarkDataStream = stream.assignTimestampsAndWatermarks(watermarkStrategy);
 
//Instead of event time, we can use window based on processing time. Using TumblingProcessingTimeWindows  
DataStream<TransactionAgg> groupedData = watermarkDataStream.keyBy("upiId").window(TumblingEventTimeWindows.of(Time.milliseconds(2500),
       Time.milliseconds(500))).sum("amount"); 
          .apply(new TransactionAgg());

最终,在 Flink 内部执行处理逻辑(基于手机号码计算同一 UPI ID 在连续交易流流 2 分钟窗口内的平均价格)。这是 Window 函数的代码块,用于计算每个 UPI ID 或手机号码的平均金额。

爪哇岛

public class TransactionAgg
   implements WindowFunction<UPITransaction, TransactionAgg, Tuple, TimeWindow> {

  @Override
  public void apply(Tuple key, TimeWindow window, Iterable<UPITransaction> values, Collector<TransactionAgg> out) {
   Integer sum = 0; //Consider whole number
   int count = 0;
   String upiID = null ;
   for (UPITransaction value : values) {
    sum += value.amount;
    upiID = value.upiID;
    count++;
   }

   TransactionAgg output = new TransactionAgg();
   output.upiID = upiID;
   output.eventTime = window.getEnd();
   output.avgAmount = (sum / count);
   out.collect( output);
  }

 }

我们已经处理了数据。下一步是序列化对象并将其发送到其他 Kafka 主题。在开发的 Java 代码 () 中添加 a,将处理后的数据从 Flink 引擎发送到在多节点 Kafka 集群上创建的不同 Kafka 主题。下面是在将对象发送/发布到 Kafka 主题之前序列化对象的代码片段:KafkaSinkStreamingToFlinkJob.java

爪哇岛

public class KafkaTrasactionSinkSchema implements KafkaRecordSerializationSchema<TransactionAgg> {

   
    @Override
    public ProducerRecord<byte[], byte[]> serialize(
            TransactionAgg aggTransaction, KafkaSinkContext context, Long timestamp) {
        try {
            return new ProducerRecord<>(
                    topic,
                    null, // not specified  partition so setting null
                    aggTransaction.eventTime,
                    aggTransaction.upiID.getBytes(),
                    objectMapper.writeValueAsBytes(aggTransaction));
        } catch (Exception e) {
            throw new IllegalArgumentException(
                    "Exception on serialize record: " + aggTransaction, e);
        }
       
    }
}

而且,下面是用于接收已处理数据的代码块,该数据发送回不同的 Kafka 主题。

爪哇岛

KafkaSink<TransactionAgg> sink = KafkaSink.<TransactionAgg>builder()
    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)
    .setRecordSerializer(new KafkaTrasactionSinkSchema(IKafkaConstants.OUTPUT_UPITRANSACTION_TOPIC_NAME))
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();
 groupedData.sinkTo(sink); // DataStream that created above for TransactionAgg
  env.execute();

将 Druid 与 Kafka 主题连接起来

在最后一步中,我们需要将 Druid 与 Kafka 主题集成,以消耗 Flink 持续发布的处理后的数据流。借助 Apache Druid,我们可以直接连接 Apache Kafka,从而可以持续摄取实时数据,然后进行查询,从而在现场做出业务决策,而无需干预任何第三方系统或应用程序。Apache Druid 的另一个优点是,我们不需要配置或安装任何第三方 UI 应用程序来查看登陆或发布到 Kafka 主题的数据。为了精简这篇文章,我省略了将 Druid 与 Apache Kafka 集成的步骤。但是,几个月前,我发表了一篇关于这个主题的文章(本文前面链接)。您可以阅读它并遵循相同的方法。

结语

上面提供的代码片段仅供理解之用。它说明了从 Kafka 主题获取消息/数据流、处理消耗的数据以及最终将修改后的数据发送/推送到其他 Kafka 主题的顺序步骤。这允许 Druid 拾取修改后的数据流进行查询,作为最后一步进行分析。稍后,如果您有兴趣在自己的基础架构上执行整个代码库,我们将在 GitHub 上上传它。


目录
相关文章
|
1月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
614 13
Apache Flink 2.0-preview released
|
23天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
48 7
|
23天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
67 5
|
23天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
58 4
|
23天前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
60 4
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
70 3
|
18天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
23天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
45 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
19天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
23天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
54 2

推荐镜像

更多
下一篇
无影云桌面