Flink流式处理百万数据量CSV文件(下)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: Flink流式处理百万数据量CSV文件(下)

将文件流写入本地

image.png

源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/alluxio


Flink流式处理数据


微信图片_20220430191141.png


结合当前业务梳理流程

来源数据源:数百万数据量的CSV文件
结果保存数据:CSV或Mysql

image.png

image.png

  • 略过表头
  • 在已知几列的情况下 执行上图代码
比如有6列
那么读取csv的时候 
flink均认为是String类型去读取(人为指定的类型)

筛选异常数据

image.png

异常数据的判断标准

比如输入数据源CSV中一行数据为

image.png

若认定圈红的那一列是数字类型

那么此时因为是字符串 无法转换为数字类型

那么这一行是异常数据

将异常数据保存

image.png


根据业务灵活处理


  • 第一个全红的 2: 表示第二行
  • 第二个圈红的部分 表示 当前列数据应为Double类型但实际上是非数字类型 所以该行是异常数据


在方法内部对于全局变量的使用仅限于在方法内部使用 不会对方法之后的作用域有效

比如

image.png


过滤函数

filter 是过滤函数 
对每一行数据进行过滤
返回false则会被过滤掉

全局变量

List<Integer> rowList=new ArrayList<>();
在filter函数作用域之外
可以在filter函数中使用
仅限于filter函数之间才有效
在filter函数之后 则无法使用filter对该变量的修改
  • 保存到CSV

image.png

  • 缺陷
需要指定Tuple类
比如生成的csv文件一行有25列 那么就使用Tuple25
还需要定义25个泛型 比如Tuple25<String,String,....>
最多可支持25列
如果是超过25列那么就不支持
所以使用起来非常不方便 而且使用范围有限

我当时在这块费了时间,因为csv列数超过了25列 比如26列,我想着在增加一个Tuple26或TupleN 尝试了之后 不可以 后来找到了国内Flink钉钉群 请教了下里面的大佬 说是建议保存到Mysql中

  • 保存到Mysql

image.png


配置mysql信息和要执行的sql语句

局限性

假如我有1000个列 那么需要建立一个表有1000个列吗
如果有5000个列呢 所以这种方式 也不太好

此时已经到了项目的最后期限了 很多同事在等着我的结果 我的压力也倍增 差点准备放弃flink 用low的方式实现 最后灵机一动看到了保存到txt文本文件的方法

  • 保存到Text


image.png

这种方式简单有效

DEMO源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/flink


Flink国内钉钉群号


群号 : 23138101


后记


上面这点东西 忙活了我3-4天时间 
自我感觉 真是太笨了
国内相关的资料目前还比较少
写下这点心得和经验给需要的朋友们
避免走弯路



相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
526 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1032 0
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
448 15
|
11月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
832 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
1120 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
21662 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
251 1
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
180 1
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
233 0