基于阿里云官方Flink滚动窗口测试示例完善篇

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 一 官方文档背景 首先列出官方文档对于Flink滚动窗口的介绍以及示例演示:https://help.aliyun.com/document_detail/62511.html?spm=a2c4g.11174283.6.650.73161e494aJMpz对于具体滚动窗口的含义和参数及SQL的使用不再详尽介绍,官方文档介绍的已经相当完善(niubi); Flink SQL支持的窗口聚合主要是两种:Window聚合和Over聚合。

一 官方文档背景

首先列出官方文档对于Flink滚动窗口的介绍以及示例演示:
https://help.aliyun.com/document_detail/62511.html?spm=a2c4g.11174283.6.650.73161e494aJMpz
对于具体滚动窗口的含义和参数及SQL的使用不再详尽介绍,官方文档介绍的已经相当完善(niubi);
Flink SQL支持的窗口聚合主要是两种:Window聚合和Over聚合。本文档主要介绍Window聚合。Window聚合支持两种时间属性定义窗口:Event Time和Processing Time。
本文主要验证和完善的是Flink SQL对于滚动窗口函数的event time的示例的完善;
如下是关于event time的解释

Event Time
Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。
由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要明文定义一个Watermark计算方法。

至于为什么完善,就用文章的详细描述过程来解释吧。

二 原始文档测试过程及问题

2.1 测试流程及相关准备

这里测试的整个流程如下;

image

通过datahub的logstash插件将文档中示例的数据放到csv文件当中,通过logstash同步到datahub当中;
这里将时间换算成Unix时间戳格式类型,并且要符合datahub微妙级别的格式即16位,如下换算以后末尾增加6个0;

image

并且在datahub中使用bigint方式来存储;
还有一种方式是将标准时间格式的数据以string字符串类型方式存储到datahub中然后再连接到flink当中进行计算;
两种方式都需要在flink中进行进一步的处理;
如下是使用bigint方式存储的字段进行flink处理的方式;

image

如下是使用string方式存储的字段进行flink处理的方式;

image

2.2 原始测试流程

在client源端将数据写入csv文件

image

然后通过logstash插件将数据写入datahub中;

image

如下是datahub捕获到的数据情况

image

通过Flink接datahub存储进行作业开发,开发代码如下,和文档中示例代码一致,做了时间类型的转换,否则会报错;

image

启动任务监控,写入数据以后监控发现有6条记录写入,但是输出是1条;

image

查询输出print结果只输出了1条完全不符合文档中描述的输出3条记录的结果;

image

2.3 数据丢失统计

通过如上的测试,我们通过string存储标准时间格式的方式再次进行了验证,结果依然一样,会发生数据丢失的情况;
如下是详细的解释;
需要注意这里的withOffset设置的是2000即2s;

WATERMARK wk FOR ts as withOffset(ts, 2000) --为Rowtime定义Watermark。

image

最终查到目标端输出的结果只有如下几条数据,其他数据均丢失;

image

三 原始文档测试流程疑问处理

基于官方文档滚动窗口函数连接内容源端Flink引用的数据来源是从datahub同步过来的;但是存在一个问题是标准时间格式的数据同步到datahub以后就变成了16位(微妙级别的Unix时间戳)的timestamp字段来存储时间;

image

而读取到flink以后滚动窗口识别的timestamp时间类型字段的Unix时间戳默认支持识别毫秒级别,即13位的时间戳,直接影响到的数据的计算;
但是在文档中并没有对这部分数据进行实际的处理影响用户测试参考,其实官方文档额外提供了另外的一篇文章--“计算列”:
https://help.aliyun.com/document_detail/110847.html?spm=a2c4g.11186623.2.11.46be1216uEKQSM#concept-nnx-bwy-bhb
详细解释了如何进行针对datahub类似的时间戳格式字段的转换以及flink SQL对应的支持处理方式;这样就解决了时间戳无法识别的问题;

四 基于“计算列”模拟测试

4.1 “计算列”模拟测试

从第三章的讲述基于“计算列”的时间处理通过logstash插件模拟再次进行测试滚动窗口的实现;
开发作业代码如下:

--SQL
--********************************************************************--
--Author: asp_dmp
--CreateTime: 2019-11-07 10:34:36
--Comment: tumble_window
--********************************************************************--

CREATE TABLE tumble_window (
username VARCHAR,
click_url VARCHAR,
`time` bigint,
ts as to_timestamp(`time`/1000),
WATERMARK FOR ts as withOffset(ts, 2000)
) WITH (
type = 'datahub',
endPoint = 'http://dh-cn-beijing-int-vpc.aliyuncs.com',
roleArn='acs:ram::xxxxxxxx:role/aliyunstreamdefaultrole',
project = 'huiyan_zrh',
topic = 'tumble_window_int'
);

CREATE TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
type='print'
);

INSERT INTO tumble_output
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE),
TUMBLE_END(ts, INTERVAL '1' MINUTE),
username,
COUNT(click_url)
FROM tumble_window
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;

经过测试源端通过logstash写入12条数据记录;

Jark,http://taobao.com/xxx,1507600800000000       2017-10-10 10:00:00.0  
Jark,http://taobao.com/xxx,1507600810000000       2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000       2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000       2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000       2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000       2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000      2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000      2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000      2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000      2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000       2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000       2017-10-10 10:04:29.0

注:对于如上表格中后边的时间格式仅为了解释使用的时间情况,测试不需要
查看logstash插入日志显示插入12条,从插入情况看并非是顺序一条条进去的;这里留个疑问,后边内容来解释

image

在flink侧查看发现接收到了12条数据记录,但是输出只有一条,

image

从如上的开发作业的实现效果来看,我们想实现的是按照每分钟用户点击网页的次数来分组统计,也就是说希望得到的是6条数据,然而只输出一条;
还是有部分数据丢失了;
然后怀疑主要原因是消息处理延迟吗?Watermark如果设置大一点,降低敏感度,是不是数据不会被丢弃?或者还有其他更好的方案吗?

4.2 Watermark参数的功能

基于4.1章节,在这里就要引申出来开发的flink SQL中引用的Watermark参数了;
使用方式如下:

WATERMARK [watermarkName] FOR AS withOffset(, offset)
对于Watermark用如下示例解释:
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
Watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据的Watermark时间含义:时间戳小于1501750580000(2017-08-03 08:56:20.000)的数据已经全部到达。
对于Watermark的使用总结如下:
a.Watermark含义是所有时间戳t'< t 的事件已经全部发生。若t(Watermark)已经生效,则后续Event Time小于t的记录将全部丢弃(后续支持用户配置,使Event Time小于t的数据也能继续更新)。
b.针对乱序的的流,Watermark至关重要。即使部分事件延迟到达,也不会过大影响窗口计算的正确性。
c.并行数据流中,当算子(Operator)有多个输入流时,算子的Event Time以最小流Event Time为准。
详细参考:
https://help.aliyun.com/document_detail/110837.html?spm=a2c4g.11186623.6.624.7ab63a98bPsoRU

五 数据“没有被抛弃”

基于Watermark的解释,其实数据不应该丢弃,而offset参数设置也是为了能够进一步保证数据不会被丢弃;
既然上述的所有测试都出现的数据丢弃的情况发生;那么我们就要回到最初的源头来考虑滚动窗口到底是要实现什么样的功能呢?
其实从业务角度来考虑的话并不难以理解,就拿官方提供的示例来讲,用户点击页面,即使并发再高,速度再快,时间总是有先后顺序的,入库也是肯定是有先后的,因此就可以模拟这样一个顺序写入的一个场景然后观察数据是否会被丢弃;
这时候有人可能会疑问还有时区的问题存在,那么就可以考虑参考官方提供时区内容部分;
https://help.aliyun.com/document_detail/96910.html?spm=a2c4g.11186623.6.622.2ffc56cfVYMyD7
当然至于像淘宝,天猫这样遍布全球的大电商,对于这样场景的统计,就不得而知了,我想可能内部会基于时区做转换吧,或者先考虑本地计算汇总然后再做总的汇总?
瞎猜的,佩服阿里云的这些大佬们,这里省略10000个钦佩感慨的字;
那么回过头来,我们就以北京时间为准,进行模拟顺序写入的方式来实现滚动窗口,看是否会出现数据丢失的情况发生;
同样,我们通过logstash一条条的顺序写入到datahub当中;

Jark,http://taobao.com/xxx,1507600800000000   2017-10-10 10:00:00.0  
Jark,http://taobao.com/xxx,1507600810000000   2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000   2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000   2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000   2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000   2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000  2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000  2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000  2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000  2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000   2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000   2017-10-10 10:04:29.0
Tim,http://taobao.com/xxxx,1507601070000000   2017-10-10 10:04:30.0
Tim,http://taobao.com/xxxx,1507601071000000   2017-10-10 10:04:31.0
Tim,http://taobao.com/xxxx,1507601072000000   2017-10-10 10:04:32.0
Tim,http://taobao.com/xxxx,1507601078000000   2017-10-10 10:04:38.0
Tim,http://taobao.com/xxxx,1507601082000000   2017-10-10 10:04:42.0
Tim,http://taobao.com/xxxx,1507601083000000   2017-10-10 10:04:43.0
Tim,http://taobao.com/xxxx,1507601101000000   2017-10-10 10:05:01.0
Tim,http://taobao.com/xxxx,1507601110000000   2017-10-10 10:05:10.0

注:对于如上表格中后边的时间格式仅为了解释使用的时间情况,测试不需要
输入的csv文件内容如下:

image

数据通过logstash插入datahub日志打印如下:
这里显示的就是一条条的进行了插入

image

然后我们再次从flink当中查看数据统计的输出情况,数据没有丢失,并且最新的数据的输出,需要下游数据再进来以后进行计算统计;

image

至此,我们这里也就应该解释清楚了吧;

数据之所以丢,是因为“若t(Watermark)已经生效,则后续Event Time小于t的记录将全部丢弃。”这句话,原因从测试的整个现象来推测,是并行写入导致的(解释上述4.1章节疑问,并行写入加上我们的数据是一下子输入到datahub中的),可能我们认为应该是顺序写入的,所以出现了数据的丢失;而如果按照滚动窗口来说,就拿文档中用户点击页面来举例,点击的情况是随着时间一下一下点击出来的,所以基本是顺序写入,然后根据相应的时间间隔粒度来统计,数据就不丢失了。
对于offset则是要控制在并行写入(无序)的情况下,对于数据延迟问题的解决,结论就是在offset设置过小时,出现数据丢失的概率较大;offset设置过大,则又会出现数据处理不及时的情况,有兴趣的同学可以通过基于连续时间以及offset设置来验证,篇幅有限,各位有兴趣可搞一波事情;

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
152 0
|
1月前
|
数据采集 自然语言处理 数据库
深入体验阿里云通义灵码:测试与实例展示
阿里云通义灵码是一款强大的代码生成工具,支持自然语言描述需求,快速生成高质量代码。它在测试、代码质量和用户体验方面表现出色,能够高效地生成 Python 和 Java 等语言的代码,助力开发者提升开发效率和代码质量。无论是新手还是资深开发者,都能从中受益匪浅。
深入体验阿里云通义灵码:测试与实例展示
|
4月前
|
弹性计算 测试技术 持续交付
阿里云云效产品使用合集之如何进行自动化测试
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
2月前
|
Java 流计算
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
46 1
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
|
2月前
|
弹性计算 安全 Linux
阿里云国际版使用ping命令测试ECS云服务器不通的排查方法
阿里云国际版使用ping命令测试ECS云服务器不通的排查方法
|
2月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
43 0
|
4月前
|
运维 Java Devops
阿里云云效操作报错合集之在流水线增加单元测试报错,是什么导致的
本合集将整理呈现用户在使用过程中遇到的报错及其对应的解决办法,包括但不限于账户权限设置错误、项目配置不正确、代码提交冲突、构建任务执行失败、测试环境异常、需求流转阻塞等问题。阿里云云效是一站式企业级研发协同和DevOps平台,为企业提供从需求规划、开发、测试、发布到运维、运营的全流程端到端服务和工具支撑,致力于提升企业的研发效能和创新能力。
|
4月前
|
存储 Kubernetes 测试技术
阿里云块存储问题之处理信用分低的测试用例(即不稳定Case)如何解决
阿里云块存储问题之处理信用分低的测试用例(即不稳定Case)如何解决
52 0
|
4月前
|
存储 Kubernetes 测试技术
阿里云块存储问题之生产代码与测试代码需要同步原子提交如何解决
阿里云块存储问题之生产代码与测试代码需要同步原子提交如何解决
46 0
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。