Flink SQL Client综合实战

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 使用工具Flink SQL Client完成各种实时处理的操作

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

本篇概览

《Flink SQL Client初探》一文中,我们体验了Flink SQL Client的基本功能,今天来通过实战更深入学习和体验Flink SQL;

实战内容

本次实战主要是通过Flink SQL Client消费kafka的实时消息,再用各种SQL操作对数据进行查询统计,内容汇总如下:

  1. DDL创建Kafka表
  2. 窗口统计;
  3. 数据写入ElasticSearch
  4. 联表操作

    版本信息

  5. Flink:1.10.0
  6. Flink所在操作系统:CentOS Linux release 7.7.1908
  7. JDK:1.8.0_211
  8. Kafka:2.4.0(scala:2.12)
  9. Mysql:5.7.29

    数据源准备

  10. 本次实战用的数据,来源是阿里云天池公开数据集的一份淘宝用户行为数据集,获取方式请参考《准备数据集用于flink学习》
  11. 获取到数据集文件后转成kafka消息发出,这样我们使用Flink SQL时就按照实时消费kafka消息的方式来操作,具体的操作方式请参考《将CSV的数据发送到kafka》
  12. 上述操作完成后,一百零四万条淘宝用户行为数据就会通过kafka消息顺序发出,咱们的实战就有不间断实时数据可用 了,消息内容如下:

    {"user_id":1004080,"item_id":2258662,"category_id":79451,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
    {"user_id":100814,"item_id":5071478,"category_id":1107469,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
    {"user_id":114321,"item_id":4306269,"category_id":4756105,"behavior":"pv","ts":"2017-11-24T23:47:48Z"}
    
  13. 上述消息中每个字段的含义如下表:

列名称 说明
用户ID 整数类型,序列化后的用户ID
商品ID 整数类型,序列化后的商品ID
商品类目ID 整数类型,序列化后的商品所属类目ID
行为类型 字符串,枚举类型,包括('pv', 'buy', 'cart', 'fav')
时间戳 行为发生的时间戳
时间字符串 根据时间戳字段生成的时间字符串

jar准备

实战过程中要用到下面这五个jar文件:

  1. flink-jdbc_2.11-1.10.0.jar
  2. flink-json-1.10.0.jar
  3. flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
  4. flink-sql-connector-kafka_2.11-1.10.0.jar
  5. mysql-connector-java-5.1.48.jar

我已将这些文件打包上传到GitHub,下载地址:https://raw.githubusercontent.com/zq2599/blog_download_files/master/files/sql_lib.zip

请在flink安装目录下新建文件夹sql_lib,然后将这五个jar文件放进去;

Elasticsearch准备

如果您装了docker和docker-compose,那么下面的命令可以快速部署elasticsearch和head工具:

wget https://raw.githubusercontent.com/zq2599/blog_demos/master/elasticsearch_docker_compose/docker-compose.yml && \
docker-compose up -d

准备完毕,开始操作吧;

DDL创建Kafka表

  1. 进入flink目录,启动flink:bin/start-cluster.sh
  2. 启动Flink SQL Client:bin/sql-client.sh embedded -l sql_lib
  3. 启动成功显示如下:
    在这里插入图片描述
  4. 执行以下命令即可创建kafka表,请按照自己的信息调整参数:

    CREATE TABLE user_behavior (
     user_id BIGINT,
     item_id BIGINT,
     category_id BIGINT,
     behavior STRING,
     ts TIMESTAMP(3),
     proctime as PROCTIME(),   -- 处理时间列
     WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
    ) WITH (
     'connector.type' = 'kafka',  -- kafka connector
     'connector.version' = 'universal',  -- universal 支持 0.11 以上的版本
     'connector.topic' = 'user_behavior',  -- kafka topic
     'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
     'connector.properties.zookeeper.connect' = '192.168.50.43:2181',  -- zk 地址
     'connector.properties.bootstrap.servers' = '192.168.50.43:9092',  -- broker 地址
     'format.type' = 'json'  -- 数据源格式为 json
    );
    
  5. 执行SELECT * FROM user_behavior;看看原始数据,如果消息正常应该和下图类似:
    6.

    窗口统计

  6. 下面的SQL是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START返回的数据格式是timestamp,这里再调用DATE_FORMAT函数将其格式化成了字符串:

    SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
    DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
    COUNT(*)
    FROM user_behavior
    WHERE behavior = 'pv'
    GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);
    
  7. 得到数据如下所示:
    在这里插入图片描述

    数据写入ElasticSearch

  8. 确保elasticsearch已部署好;
  9. 执行以下语句即可创建es表,请按照您自己的es信息调整下面的参数:

    CREATE TABLE pv_per_minute ( 
     start_time STRING,
     end_time STRING,
     pv_cnt BIGINT
    ) WITH (
     'connector.type' = 'elasticsearch', -- 类型
     'connector.version' = '6',  -- elasticsearch版本
     'connector.hosts' = 'http://192.168.133.173:9200',  -- elasticsearch地址
     'connector.index' = 'pv_per_minute',  -- 索引名,相当于数据库表名
     'connector.document-type' = 'user_behavior', -- type,相当于数据库库名
     'connector.bulk-flush.max-actions' = '1',  -- 每条数据都刷新
     'format.type' = 'json',  -- 输出数据格式json
     'update-mode' = 'append'
    );
    
  10. 执行以下语句,就会将每分钟的pv总数写入es的pv_per_minute索引:

    INSERT INTO pv_per_minute
    SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time, 
    DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time, 
    COUNT(*) AS pv_cnt
    FROM user_behavior
    WHERE behavior = 'pv'
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
    
  11. 用es-head查看,发现数据已成功写入:
    在这里插入图片描述

    联表操作

  12. 当前user_behavior表的category_id表示商品类目,例如11120表示计算机书籍,61626表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;
  13. 如果我们将这五千多种类目分成6个大类,例如11120属于教育类,61626属于服装类,那么应该有个大类和类目的关系表;
  14. 这个大类和类目的关系表在MySQL创建,表名叫category_info,建表语句如下:

    CREATE TABLE `category_info`(
    `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    `parent_id` bigint ,
    `category_id` bigint ,
    PRIMARY KEY ( `id` )
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    
  15. category_info所有数据来自对原始数据中category_id字段的提取,并且随机将它们划分为6个大类,该表的数据请在我的GitHub下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql

  16. 请在MySQL上建表category_info,并将上述数据全部写进去;
  17. 在Flink SQL Client执行以下语句创建这个维表,mysql信息请按您自己配置调整:

    CREATE TABLE category_info (
     parent_id BIGINT, -- 商品大类
     category_id BIGINT  -- 商品详细类目
    ) WITH (
     'connector.type' = 'jdbc',
     'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo',
     'connector.table' = 'category_info',
     'connector.driver' = 'com.mysql.jdbc.Driver',
     'connector.username' = 'root',
     'connector.password' = '123456',
     'connector.lookup.cache.max-rows' = '5000',
     'connector.lookup.cache.ttl' = '10min'
    );
    
  18. 尝试联表查询:

    SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
    FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
    ON U.category_id = C.category_id;
    
  19. 如下图,联表查询成功,每条记录都能对应大类:
    在这里插入图片描述

  20. 再试试联表统计,每个大类的总浏览量:

    SELECT C.parent_id, COUNT(*) AS pv_count
    FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
    ON U.category_id = C.category_id
    WHERE behavior = 'pv'
    GROUP BY C.parent_id;
    
  21. 如下图,数据是动态更新的:
    在这里插入图片描述

  22. 执行以下语句,可以在统计时将大类ID转成中文名:

    SELECT CASE C.parent_id
    WHEN 1 THEN '服饰鞋包'
    WHEN 2 THEN '家装家饰'
    WHEN 3 THEN '家电'
    WHEN 4 THEN '美妆'
    WHEN 5 THEN '母婴'
    WHEN 6 THEN '3C数码'
    ELSE '其他'
    END AS category_name,
    COUNT(*) AS pv_count
    FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
    ON U.category_id = C.category_id
    WHERE behavior = 'pv'
    GROUP BY C.parent_id;
    
  23. 效果如下图:
    在这里插入图片描述
    至此,我们借助Flink SQL Client体验了Flink SQL丰富的功能,如果您也在学习Flink SQL,希望本文能给您一些参考;

欢迎关注阿里云开发者社区:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
233 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
2月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
|
4月前
|
SQL 运维 监控
SQL查询太慢?实战讲解YashanDB SQL调优思路
本文是Meetup第十期“调优实战专场”的第二篇技术文章,上一篇《高效查询秘诀,解码YashanDB优化器分组查询优化手段》中,我们揭秘了YashanDB分组查询优化秘诀,本文将通过一个案例,助你快速上手YashanDB慢日志功能,精准定位“慢SQL”后进行优化。
|
4月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
102 5
|
6月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
806 26
|
7月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
945 2
探索Flink动态CEP:杭州银行的实战案例
|
7月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
437 14
|
10月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
12月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
331 13
|
12月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
210 9

热门文章

最新文章