Flink流式处理百万数据量CSV文件(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: Flink流式处理百万数据量CSV文件(下)

将文件流写入本地

image.png

源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/alluxio


Flink流式处理数据


微信图片_20220430191141.png


结合当前业务梳理流程

来源数据源:数百万数据量的CSV文件
结果保存数据:CSV或Mysql

image.png

image.png

  • 略过表头
  • 在已知几列的情况下 执行上图代码
比如有6列
那么读取csv的时候 
flink均认为是String类型去读取(人为指定的类型)

筛选异常数据

image.png

异常数据的判断标准

比如输入数据源CSV中一行数据为

image.png

若认定圈红的那一列是数字类型

那么此时因为是字符串 无法转换为数字类型

那么这一行是异常数据

将异常数据保存

image.png


根据业务灵活处理


  • 第一个全红的 2: 表示第二行
  • 第二个圈红的部分 表示 当前列数据应为Double类型但实际上是非数字类型 所以该行是异常数据


在方法内部对于全局变量的使用仅限于在方法内部使用 不会对方法之后的作用域有效

比如

image.png


过滤函数

filter 是过滤函数 
对每一行数据进行过滤
返回false则会被过滤掉

全局变量

List<Integer> rowList=new ArrayList<>();
在filter函数作用域之外
可以在filter函数中使用
仅限于filter函数之间才有效
在filter函数之后 则无法使用filter对该变量的修改
  • 保存到CSV

image.png

  • 缺陷
需要指定Tuple类
比如生成的csv文件一行有25列 那么就使用Tuple25
还需要定义25个泛型 比如Tuple25<String,String,....>
最多可支持25列
如果是超过25列那么就不支持
所以使用起来非常不方便 而且使用范围有限

我当时在这块费了时间,因为csv列数超过了25列 比如26列,我想着在增加一个Tuple26或TupleN 尝试了之后 不可以 后来找到了国内Flink钉钉群 请教了下里面的大佬 说是建议保存到Mysql中

  • 保存到Mysql

image.png


配置mysql信息和要执行的sql语句

局限性

假如我有1000个列 那么需要建立一个表有1000个列吗
如果有5000个列呢 所以这种方式 也不太好

此时已经到了项目的最后期限了 很多同事在等着我的结果 我的压力也倍增 差点准备放弃flink 用low的方式实现 最后灵机一动看到了保存到txt文本文件的方法

  • 保存到Text


image.png

这种方式简单有效

DEMO源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/flink


Flink国内钉钉群号


群号 : 23138101


后记


上面这点东西 忙活了我3-4天时间 
自我感觉 真是太笨了
国内相关的资料目前还比较少
写下这点心得和经验给需要的朋友们
避免走弯路



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
监控 流计算
【极数系列】Flink集成DataSource读取文件数据(08)
【极数系列】Flink集成DataSource读取文件数据(08)
|
4月前
|
关系型数据库 Linux PostgreSQL
这个错误是因为Flink CDC在尝试访问PostgreSQL的"decoderbufs"文件时,发现该文件不存在
【1月更文挑战第23天】【1月更文挑战第111篇】这个错误是因为Flink CDC在尝试访问PostgreSQL的"decoderbufs"文件时,发现该文件不存在
47 11
|
7月前
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
2月前
|
自然语言处理 Java Scala
Flink CDC产品常见问题之大文件整库同步怎么解决
Flink CDC产品常见问题之大文件整库同步怎么解决
|
4月前
|
SQL Java 流计算
Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
【1月更文挑战第1天】【1月更文挑战第2篇】Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
98 0
|
20天前
|
存储 消息中间件 运维
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
本文主要分享友盟+ U-App 整体的技术架构,以及在实时和离线计算上面的优化方案。
346 1
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
|
2月前
|
消息中间件 分布式计算 Kafka
Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
,Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
25 2
|
2月前
|
分布式计算 Hadoop Java
Flink CDC产品常见问题之tidb cdc 数据量大了就疯狂报空指针如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 NoSQL Java
Flink CDC产品常见问题之文件增大如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
消息中间件 监控 安全
【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同
【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同