一 官方文档背景
首先列出官方文档对于Flink滚动窗口的介绍以及示例演示:
https://help.aliyun.com/document_detail/62511.html?spm=a2c4g.11174283.6.650.73161e494aJMpz
对于具体滚动窗口的含义和参数及SQL的使用不再详尽介绍,官方文档介绍的已经相当完善(niubi);
Flink SQL支持的窗口聚合主要是两种:Window聚合和Over聚合。本文档主要介绍Window聚合。Window聚合支持两种时间属性定义窗口:Event Time和Processing Time。
本文主要验证和完善的是Flink SQL对于滚动窗口函数的event time的示例的完善;
如下是关于event time的解释
Event Time
Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。
由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要明文定义一个Watermark计算方法。
至于为什么完善,就用文章的详细描述过程来解释吧。
二 原始文档测试过程及问题
2.1 测试流程及相关准备
这里测试的整个流程如下;
通过datahub的logstash插件将文档中示例的数据放到csv文件当中,通过logstash同步到datahub当中;
这里将时间换算成Unix时间戳格式类型,并且要符合datahub微妙级别的格式即16位,如下换算以后末尾增加6个0;
并且在datahub中使用bigint方式来存储;
还有一种方式是将标准时间格式的数据以string字符串类型方式存储到datahub中然后再连接到flink当中进行计算;
两种方式都需要在flink中进行进一步的处理;
如下是使用bigint方式存储的字段进行flink处理的方式;
如下是使用string方式存储的字段进行flink处理的方式;
2.2 原始测试流程
在client源端将数据写入csv文件
然后通过logstash插件将数据写入datahub中;
如下是datahub捕获到的数据情况
通过Flink接datahub存储进行作业开发,开发代码如下,和文档中示例代码一致,做了时间类型的转换,否则会报错;
启动任务监控,写入数据以后监控发现有6条记录写入,但是输出是1条;
查询输出print结果只输出了1条完全不符合文档中描述的输出3条记录的结果;
2.3 数据丢失统计
通过如上的测试,我们通过string存储标准时间格式的方式再次进行了验证,结果依然一样,会发生数据丢失的情况;
如下是详细的解释;
需要注意这里的withOffset设置的是2000即2s;
WATERMARK wk FOR ts as withOffset(ts, 2000) --为Rowtime定义Watermark。
最终查到目标端输出的结果只有如下几条数据,其他数据均丢失;
三 原始文档测试流程疑问处理
基于官方文档滚动窗口函数连接内容源端Flink引用的数据来源是从datahub同步过来的;但是存在一个问题是标准时间格式的数据同步到datahub以后就变成了16位(微妙级别的Unix时间戳)的timestamp字段来存储时间;
而读取到flink以后滚动窗口识别的timestamp时间类型字段的Unix时间戳默认支持识别毫秒级别,即13位的时间戳,直接影响到的数据的计算;
但是在文档中并没有对这部分数据进行实际的处理影响用户测试参考,其实官方文档额外提供了另外的一篇文章--“计算列”:
https://help.aliyun.com/document_detail/110847.html?spm=a2c4g.11186623.2.11.46be1216uEKQSM#concept-nnx-bwy-bhb
详细解释了如何进行针对datahub类似的时间戳格式字段的转换以及flink SQL对应的支持处理方式;这样就解决了时间戳无法识别的问题;
四 基于“计算列”模拟测试
4.1 “计算列”模拟测试
从第三章的讲述基于“计算列”的时间处理通过logstash插件模拟再次进行测试滚动窗口的实现;
开发作业代码如下:
--SQL
--********************************************************************--
--Author: asp_dmp
--CreateTime: 2019-11-07 10:34:36
--Comment: tumble_window
--********************************************************************--
CREATE TABLE tumble_window (
username VARCHAR,
click_url VARCHAR,
`time` bigint,
ts as to_timestamp(`time`/1000),
WATERMARK FOR ts as withOffset(ts, 2000)
) WITH (
type = 'datahub',
endPoint = 'http://dh-cn-beijing-int-vpc.aliyuncs.com',
roleArn='acs:ram::xxxxxxxx:role/aliyunstreamdefaultrole',
project = 'huiyan_zrh',
topic = 'tumble_window_int'
);
CREATE TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
type='print'
);
INSERT INTO tumble_output
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE),
TUMBLE_END(ts, INTERVAL '1' MINUTE),
username,
COUNT(click_url)
FROM tumble_window
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;
经过测试源端通过logstash写入12条数据记录;
Jark,http://taobao.com/xxx,1507600800000000 2017-10-10 10:00:00.0
Jark,http://taobao.com/xxx,1507600810000000 2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000 2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000 2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000 2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000 2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000 2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000 2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000 2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000 2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000 2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000 2017-10-10 10:04:29.0
注:对于如上表格中后边的时间格式仅为了解释使用的时间情况,测试不需要
查看logstash插入日志显示插入12条,从插入情况看并非是顺序一条条进去的;这里留个疑问,后边内容来解释
在flink侧查看发现接收到了12条数据记录,但是输出只有一条,
从如上的开发作业的实现效果来看,我们想实现的是按照每分钟用户点击网页的次数来分组统计,也就是说希望得到的是6条数据,然而只输出一条;
还是有部分数据丢失了;
然后怀疑主要原因是消息处理延迟吗?Watermark如果设置大一点,降低敏感度,是不是数据不会被丢弃?或者还有其他更好的方案吗?
4.2 Watermark参数的功能
基于4.1章节,在这里就要引申出来开发的flink SQL中引用的Watermark参数了;
使用方式如下:
WATERMARK [watermarkName] FOR AS withOffset(, offset)
对于Watermark用如下示例解释:
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
Watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据的Watermark时间含义:时间戳小于1501750580000(2017-08-03 08:56:20.000)的数据已经全部到达。
对于Watermark的使用总结如下:
a.Watermark含义是所有时间戳t'< t 的事件已经全部发生。若t(Watermark)已经生效,则后续Event Time小于t的记录将全部丢弃(后续支持用户配置,使Event Time小于t的数据也能继续更新)。
b.针对乱序的的流,Watermark至关重要。即使部分事件延迟到达,也不会过大影响窗口计算的正确性。
c.并行数据流中,当算子(Operator)有多个输入流时,算子的Event Time以最小流Event Time为准。
详细参考:
https://help.aliyun.com/document_detail/110837.html?spm=a2c4g.11186623.6.624.7ab63a98bPsoRU
五 数据“没有被抛弃”
基于Watermark的解释,其实数据不应该丢弃,而offset参数设置也是为了能够进一步保证数据不会被丢弃;
既然上述的所有测试都出现的数据丢弃的情况发生;那么我们就要回到最初的源头来考虑滚动窗口到底是要实现什么样的功能呢?
其实从业务角度来考虑的话并不难以理解,就拿官方提供的示例来讲,用户点击页面,即使并发再高,速度再快,时间总是有先后顺序的,入库也是肯定是有先后的,因此就可以模拟这样一个顺序写入的一个场景然后观察数据是否会被丢弃;
这时候有人可能会疑问还有时区的问题存在,那么就可以考虑参考官方提供时区内容部分;
https://help.aliyun.com/document_detail/96910.html?spm=a2c4g.11186623.6.622.2ffc56cfVYMyD7
当然至于像淘宝,天猫这样遍布全球的大电商,对于这样场景的统计,就不得而知了,我想可能内部会基于时区做转换吧,或者先考虑本地计算汇总然后再做总的汇总?
瞎猜的,佩服阿里云的这些大佬们,这里省略10000个钦佩感慨的字;
那么回过头来,我们就以北京时间为准,进行模拟顺序写入的方式来实现滚动窗口,看是否会出现数据丢失的情况发生;
同样,我们通过logstash一条条的顺序写入到datahub当中;
Jark,http://taobao.com/xxx,1507600800000000 2017-10-10 10:00:00.0
Jark,http://taobao.com/xxx,1507600810000000 2017-10-10 10:00:10.0
Jark,http://taobao.com/xxx,1507600849000000 2017-10-10 10:00:49.0
Jark,http://taobao.com/xxx,1507600865000000 2017-10-10 10:01:05.0
Jark,http://taobao.com/xxx,1507600918000000 2017-10-10 10:01:58.0
Timo,http://taobao.com/xxx,1507600930000000 2017-10-10 10:02:10.0
Timor,http://taobao.com/xxx,1507600982000000 2017-10-10 10:03:02.0
Timor,http://taobao.com/xxx,1507601015000000 2017-10-10 10:03:35.0
Timor,http://taobao.com/xxx,1507601045000000 2017-10-10 10:04:05.0
Timor,http://taobao.com/xxx,1507601064000000 2017-10-10 10:04:24.0
Tim,http://taobao.com/xxxx,1507601068000000 2017-10-10 10:04:28.0
Tim,http://taobao.com/xxxx,1507601069000000 2017-10-10 10:04:29.0
Tim,http://taobao.com/xxxx,1507601070000000 2017-10-10 10:04:30.0
Tim,http://taobao.com/xxxx,1507601071000000 2017-10-10 10:04:31.0
Tim,http://taobao.com/xxxx,1507601072000000 2017-10-10 10:04:32.0
Tim,http://taobao.com/xxxx,1507601078000000 2017-10-10 10:04:38.0
Tim,http://taobao.com/xxxx,1507601082000000 2017-10-10 10:04:42.0
Tim,http://taobao.com/xxxx,1507601083000000 2017-10-10 10:04:43.0
Tim,http://taobao.com/xxxx,1507601101000000 2017-10-10 10:05:01.0
Tim,http://taobao.com/xxxx,1507601110000000 2017-10-10 10:05:10.0
注:对于如上表格中后边的时间格式仅为了解释使用的时间情况,测试不需要
输入的csv文件内容如下:
数据通过logstash插入datahub日志打印如下:
这里显示的就是一条条的进行了插入
然后我们再次从flink当中查看数据统计的输出情况,数据没有丢失,并且最新的数据的输出,需要下游数据再进来以后进行计算统计;
至此,我们这里也就应该解释清楚了吧;
数据之所以丢,是因为“若t(Watermark)已经生效,则后续Event Time小于t的记录将全部丢弃。”这句话,原因从测试的整个现象来推测,是并行写入导致的(解释上述4.1章节疑问,并行写入加上我们的数据是一下子输入到datahub中的),可能我们认为应该是顺序写入的,所以出现了数据的丢失;而如果按照滚动窗口来说,就拿文档中用户点击页面来举例,点击的情况是随着时间一下一下点击出来的,所以基本是顺序写入,然后根据相应的时间间隔粒度来统计,数据就不丢失了。
对于offset则是要控制在并行写入(无序)的情况下,对于数据延迟问题的解决,结论就是在offset设置过小时,出现数据丢失的概率较大;offset设置过大,则又会出现数据处理不及时的情况,有兴趣的同学可以通过基于连续时间以及offset设置来验证,篇幅有限,各位有兴趣可搞一波事情;