使用 Kafka 和 Flink 构建实时数据处理系统

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 引言 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要。流计算的出现,就是为了更好地解决这类数据在处理过程中遇到的问题。

引言

在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要。流计算的出现,就是为了更好地解决这类数据在处理过程中遇到的问题。与传统架构不同,流计算模型在数据流动的过程中实时地进行捕捉和处理,并根据业务需求对数据进行计算分析,最终把结果保存或者分发给需要的组件。本文将从实时数据产生和流向的各个环节出发,通过一个具有实际意义的案例,向读者介绍如何使用 Apache Kafka 和 Apache Flink 构建一个实时的数据处理系统,当然本文只是抛砖引玉,因为构建一个良好健壮的实时数据处理系统并不是一篇文章可以说清楚的。在阅读本文前,假设您已经对 Apache Kafka 分布式消息系统有了基本的了解,并且可以使用 Flink SQL 编写业务逻辑。接下来,就让我们一起看看如何构建一个简易的实时数据处理系统吧。

关于Kafka

Kafka 是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统,最早是由 Linkedin 开发,并于 2011 年开源并贡献给 Apache 软件基金会。一般来说,Kafka 有以下几个典型的应用场景:
  • 作为消息队列。由于 Kafka 拥有高吞吐量,并且内置消息主题分区,备份,容错等特性,使得它更适合使用在大规模,高强度的消息数据处理的系统中。
  • 流计算系统的数据源。流数据产生系统作为 Kafka 消息数据的生产者将数据流分发给 Kafka 消息主题,流数据计算系统 (Flink,Storm,Spark Streaming等) 实时消费并计算数据。这也是本文将要介绍的应用场景。
  • 系统用户行为数据源。这种场景下,系统将用户的行为数据,如访问页面,停留时间,搜索日志,感兴趣的话题等数据实时或者周期性的发布到 Kafka 消息主题,作为对接系统数据的来源。
  • 日志聚集。Kafka 可以作为一个日志收集系统的替代解决方案,我们可以将系统日志数据按类别汇集到不同的 Kafka 消息主题中。
  • 事件源。在基于事件驱动的系统中,我们可以将事件设计成合理的格式,作为 Kafka 消息数据存储起来,以便相应系统模块做实时或者定期处理。由于 Kafka 支持大数据量存储,并且有备份和容错机制,所以可以让事件驱动型系统更加健壮和高效。
当然 Kafka 还可以支持其他的应用场景,在这里我们就不一一罗列了。关于 Kafka 更详细的介绍,请读者参考 Kafka 官网。需要指出的是,本文使用的 Kafka 版本是基于 Scala 2.10 版本构建的 0.8.2.1 版本。

关于Flink

Flink支持多种数据源:Kafka、MQ、SLS、Datahub 等,原生支持写入到 MQ、OTS、常见关系数据库等存储介质,提供了不同的抽象级别以开发流式或批处理应用。
  • 最底层级的抽象仅仅提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API中。它允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
  • 实际上,大多数应用并不需要上述的低层级抽象,而是针对 核心API(Core APIs) 进行编程,比如 DataStream API(有界或无界流数据)以及 DataSet API(有界数据集)。这些流畅的API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows),状态(state)等等。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
低层级的 过程函数 与 DataStream API 相集成,使其可以对某些特定的操作进行低层级的抽象。DataSet API 为有界数据集提供了额外的原语,例如循环与迭代。
  • Table API 是以  为中心的声明式DSL,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表具有附加的模式(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了 什么逻辑操作应该执行 而不是准确地确定 这些操作代码的看上去如何 。 尽管Table API可以通过多种类型的用户定义的函数进行扩展,其仍不如 核心API 更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序还可以在执行之前通过应用优化规则的优化器。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
  • Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。

案例介绍与Flink SQL编程实现

1.案例介绍

该案例中,我们假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。

2.案例分析

对于某一个访问论坛的用户,我们需要对他的行为数据做一个抽象,以便于解释网页话题热度的计算过程。
首先,我们通过一个向量来定义用户对于某个网页的行为即点击的网页,开始时间,停留时间,以及是否点赞,可以表示如下:
(page001.html,0, 1, 0.5, 1)
向量的第一项表示网页的 ID,第二项表示用户网页开始点击时间,第三项表示从进入网站到离开对该网页的点击次数,第四项表示停留时间,以秒为单位,第五项是代表是否点赞,1 为赞,-1 表示踩,0 表示中立。
其次,我们再按照各个行为对计算网页话题热度的贡献,给其设定一个权重,在本文中,我们假设点击次数权重是 0.8,因为用户可能是由于没有其他更好的话题,所以再次浏览这个话题。停留时间权重是 0.8,因为用户可能同时打开多个 tab 页,但他真正关注的只是其中一个话题。是否点赞权重是 1,因为这一般表示用户对该网页的话题很有兴趣。
最后,我们定义用下列公式计算某条行为数据对于该网页热度的贡献值。
f(x,y,z)=0.8x+0.8y+z
那么对于上面的行为数据 (page001.html, 1, 0.5, 1),利用公式可得:
H(page001)=f(x,y,z)= 0.8x+0.8y+z=0.8*1+0.8*0.5+1*1=2.2
读者可以留意到,在这个过程中,我们忽略了用户本身,也就是说我们不关注用户是谁,而只关注它对于网页热度所做的贡献。

3.生产行为数据信息

在本案例中我们将使用一段程序来模拟用户行为,该程序每隔 5 秒钟会随机的向 user-behavior-topic 主题推送 0 到 50 条行为数据消息,显然,这个程序扮演消息生产者的角色,在实际应用中,这个功能一般会由一个系统来提供。为了简化消息处理,我们定义消息的格式如下:
网页 ID|开始点击时间|点击次数|停留时间 (分钟)|是否点赞
ID(varchar)|firsttime(timestamp)|count(bigint)|timegap(bigint)|positive(boolean)

4.编写Flink SQL编写程序实时处理数据

在弄清楚了要解决的问题之后,就可以开始编码实现了。对于本案例中的问题,在实现上的基本步骤如下:
  • 创建源表和结果表
  • 创建子查询,使用 ROW_NUMBER() 窗口函数来对数据根据排序列进行排序并标上排名
  • 外层查询中,对排名进行过滤,只取前N条,如N=5,那么就是取 Top 5 的数据,即网页热度排名


-- Kafka源表
create table input(
id             varchar,
count          bigint,
timegap        bigint,
positive       boolean,
firsttime      timestamp,
WATERMARK wk FOR firsttime as withOffset(firsttime, 2000)
)with(
type='kafka'
endpoint='xxx'
......
);
-- Hbase存储表
create table output(
window_start  TIMESTAMP,
window_end    TIMESTAMP,
id            bigint,
contribution  double
)with(
type='ALIHBASE'
endpoint='xxx'
......
);
-- 5秒钟窗口贡献度统计
CREATE VIEW group_view AS
SELECT id,
      TUMBLE_START(firsttime, INTERVAL '5' SECOND) AS start_time,
      (0.8*count+0.8*timegap+positive) as contribution
FROM input
GROUP BY id, TUMBLE(firsttime, INTERVAL '1' MINUTE);

-- 统计每5秒 top5 贡献值ID,并输出
insert into output
select
(
   SELECT
   id, firsttime, contribution,
   ROW_NUMBER() OVER (PARTITION BY firsttime ORDER BY contribution DESC) as rownum,
   FROM
   group_view
)
WHERE rownum <= 5;


注意事项

利用 Flink 构建一个高效健壮的流数据计算系统,我们还需要注意以下方面。
  • 需要合理的设置时间窗口,即需要保证Flink的计算窗口合理地统计到热度最高的话题,理论上Flink的窗口可以是无界的,也可以是很小的时间窗口,但是合理的窗口大小设计对业务逻辑的影响。
  • 虽然本文案例中,我们只是把 (近) 实时计算结果打印出来,但是实际上很多时候这些结果会被保存到数据库,HDFS, 或者发送回 Kafka, 以供其他系统利用这些数据做进一步的业务处理,Flink可以直接实现这些功能。
  • 由于流计算对实时性要求很高,所以任何由于 JVM Full GC 引起的系统暂停都是不可接受的。Flink 采用类似 DBMS 的 sort 和 join 算法,直接操作二进制数据,从而使序列化/反序列化带来的开销达到最小。所以 Flink 的内部实现更像 C/C++ 而非 Java。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。如果要操作多块MemorySegment就像操作一块大的连续内存一样,Flink会使用逻辑视图(AbstractPagedInputView)来方便操作。
  • Flink内部支持exactly-once,要想达到端到端(Soruce到Sink)的exactly-once,需要Blink外部Soruce和Sink的支持,比如Source要支持精准的offset,Sink要支持两阶段提交,也就是继承TwoPhaseCommitSinkFunction。
  • Flink中当所有输入的barrier没有完全到来的时候,早到来的event在exactly-once的情况向会进行缓存(不进行处理),而at-least-once的模式下即使所有输入的barrier没有完全到来的时候,早到来的event也会进行处理。也就是说对于at-least-once模式下,对于下游节点而言,本来数据属于checkpoint n的数据在checkpoint n-1里面也可能处理过了。所以我们建议,Flink的checkpoint模式设置为exactl-once模式。

结束语

本文包含了集成Flink和 Kafka 分布式消息系统的基本知识,但是需要指出的是,在实际问题中,我们可能面临更多的问题,如性能优化,内存不足,以及其他未曾遇到的问题。希望通过本文的阅读,读者能对使用 Flink SQL 和 Kafka 构建实时数据处理系统有一个基本的认识,为读者进行更深入的研究提供一个参考依据。读者在阅读本文的时候发现任何问题或者有任何建议,请不吝赐教,留下您的评论,我会及时回复。希望我们可以一起讨论,共同进步。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
17天前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
190 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
消息中间件 存储 传感器
131 0
|
5月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
287 0
|
5月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
195 12
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
439 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
290 1
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
655 0
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版
  • 下一篇
    开通oss服务