开发者学堂课程【开源 Flink 极客训练营:Flink Ecosystems】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/760/detail/13343
Flink Ecosystems
三.示例&Demo
DataGen Connector
为了解决这个问题,提供了一个这种内置的这种可能,当然这些内置的主要的作用,一方面是帮助这个新用户能够尽快熟悉,能够更快的去体验强大的功能。另一个也是为帮助开发人员去做一些代码的调试或者说是性能测试等,几个可量可数,首先是这个this sentence,比如这里创建了一个this这样一张表,指定了几个字段。如果去写这个工具,做这张图片的表,肯定是会负责去为生成数据,并不是事先要存储在某个地方。做一些比较比较细腻控制,比如说指定每秒钟生成多少行数据,整形这种字段可以指定从小到大来创建,比如从一到一年,一共生成1000条数据,通过random的方式来创建random方式,当然随机创建随机来指定这个数值。那么random的这个最大最小值也可以指定,像这个分类型的这种字段可以指定它的长度是多少。
主要用处是打印到所用的数据输出上
主要是可以用来做性能的测试,是在你不关心这个数据,实际写出去用它来做一个性能测试,排除掉你这个写数据的对性能的影响,你只需要写出来就可以了。
有一张数据源表,我们把它叫做外表,这张表包含2013年初纽约市的这个出租车乘坐的信息,这张表有几个字段,第一行是一次打车的一个唯一的一个ID。是会把一次打车拆分成了上车和下车两个事件,所以说一次打车的ID会出现两次,然后taxi ID就是出租车的唯一ID,然后表示了你这几个事件是上车还是下车,浪费SP ect的表示了你这个恶事件所发生的这个对应数据,然后even Tom是事件发生时间,表示你这一趟打车的这个乘客的人数。
创建Kafka源表
将Append-Only表写入Kafka
查询最近10分钟之内开始的行程数并将结果写入 Sink_TenMinPsgCnts表
docker-compose exec kafka kafka-consoleconsumersh-bootstrap server kafka:9092--topic TenMinPsgCnts--from-beginning
首先在包含中进入目录后启动,启动成功如图
看命令是否存入
将Upsert表写入Elasticsearch
查询离开每个地区的行程数并将结果写入Sink_AreaCnts表
将Upsert表写入Elasticsearch
列出所有索引:http://localhost:9200/_cat/indices?v
查看area-cnts索引:http://localhost:9200/area-cnts?pretty=true
查看area-cnts索引的统计信息:http://localhost:9200/areacnts/_stats?pretty=true
查看area-cnts索引的内容:http://localhost:9200/area-cnts/search?pretty=true
查询Hive表
在src表中查找出现次数超过3的key,并将key和出现次数写入dest表
启动成功
查看数据进行的正常
将DataGen表写入Hive
Flink1.11开始支持流式数据写入Hive
注意需要把mode换成swimming,之前是代词,这里我们要切换之后去启动
建表成功如图:
如图表示上传作业成功,作业名字比较容易区分出来,这个作业大概运行30秒,端口号是18081。
如图表示数据上传成功:


















