开发者学堂课程【开源 Flink 极速上手教程:Flink Ecosystems】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/331/detail/3713
Flink Ecosystems(三)
三.示例&Demo
示例的演示是通过 Docker compose 环境来做的。Docker 环境首先有一张 Kafka的数据源的表叫做 Rides 表,这张表包含了2013年初纽约市的出租车乘坐信息,这张表有几个字段。rideId 表示一次打车的唯一的id,但是在数据中会把一次打车拆分成上车和下车两个事件,所以一次打车的 ID 会出现两次。taxiId 是出租车唯一的 id。isStart 标识该事件是上车还是下车。lon 和 lat 标识事件发生的经纬度信息。eventTime 就是事件发生的时间戳。psgCnt 表示这一趟打车的乘客的人数。
有了 rides 的 schema 信息之后,需要创建一张 Kafka 的表,能够消费 Kafka的数据源。数据源的格式都是 JSON的,所以 format 设置为 JSON。
1.例一:将 Append-Only 表写入 Kafka
查询最近十分钟之内开始的行程数并将结果写入
Sink_TenMinPsgCnts 表。看查询要求应该会使用到 Append-Only,所以生成一张 Append-Only 的表,所以 Sink 表继续使用 Kafka 来创建一张 Kafka 的sink 表,将查询结果写入到这张表。
sink 表的 SQL 语句(DDL):
语句提交成功之后,可以通过 Docker 的命令在 Kafka Connector 中验证数据是否正常写入。
Demo:
首先进入 Docker compose 的文件目录,通过 Docker compose up -d 方式启动,然后启动 sql-client,然后启动 shell 命令。创建好 rides 表,通过 show tables 验证。describe 一下,看一下是否符合预期。然后创建 sink 表,写 dml。语句,把 dml 提交到集群上执行,然后到WebUI 确定作业是否跑起来。确认最后的数据是否已经写入,执行命令可以看到几个窗口的数据已经算出来了,因为是流式作业不会自动停止,所以需要在WebUI 上停止。
2.例二:查询离开每个地区的行程数并将结果写入表
查询每个地区的行程数肯定会做 upset的操作,对于 upset,Kafka 表就不能支持了,因此选了 Elasticsearch 表。
创建 sink 表,然后通过 dml 的 INTO 语句查询,然后将查询结果写入到Elasticsearch 表。在作业执行过程当中,可以通过 Elasticsearch 的页面查看写入的数据,确认数据是否正常写入 Elasticsearch。
Demo:
建表提交作业,作业提交成功,在 WebUI 上确认作业已成功运行。
在 Elasticsearch 中确认创建成功,可以查看 index 中具体的数据,可以看到对应的每个 areaId 以及 count 信息。同样这个作业也需要手动停止。
3.例三:查询 Hive 表
在 Docker 环境中,Hive 有一个单独的Connector,所以需要启动另一个单独的Connector。在 src 表中查找出现次数超过3的 key 并将结果写入 dest 表,在 Hive Connector 中已经创建好了。查询 Hive 表,当 Hive 表的数据读完之后,作业会自动结束掉。
Demo:
首先启动 Hive Connector,在 Hive Connector 中启动 Hive 的对应的sql-client,启动的时候自动连接了 Catalog,切换到 Catalog,可以看到两张表都在,然后提交查询。在 WebUI 上验证, 注意 Hive 的端口号与之前不同,看到作业已经跑起来了,因为是批作业,是会结束的,不需要手动停止。确认数据是否写入正常,可以看到数据都已写入。
4.例四:将 DataGen 表写入 Hive
DataGen 是数据生成器,生成的数据是流式的数据,所以该例子借助了Flink1.11流式数据写入 Hive 的功能。
左边的 DDL 是创建 DataGen 表,表做的比较简单,只有一个整型的字段,按照sequence 方式生成,从一到一百五,每秒钟生成五条,也就是说这个数据大约会运行三十秒钟就会结束。右边是创建另一种 Hive 表叫 dest2,如果要在 Flink 中创建 Hive 表需要使用 Hive dialect,首先把 dialect 切换成 Hive,然后创建 dest2 表,这张表与 DataGen 表 schema是完全一样的。
demo:
需要切换成 streaming,启动 sql-client,然后建 DataGen 表,创建 Hive 表,使用Hive dialect,可以动态切换,每条语句都可以使用不同的 dialect,然后提交作业,作业提交成功,在 WebUI 上确认,作业已经跑起来了,作业的名字就是dml 语句容易区分。特别强调这里的端口号与之前的不太一样。作业已成功结束,确认数据写入是否正确,执行select 命令看到结果没有问题。