将文件流写入本地
源码
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/alluxio
Flink流式处理数据
结合当前业务梳理流程
来源数据源:数百万数据量的CSV文件 结果保存数据:CSV或Mysql
- 略过表头
- 在已知几列的情况下 执行上图代码
比如有6列 那么读取csv的时候 flink均认为是String类型去读取(人为指定的类型)
筛选异常数据
异常数据的判断标准
比如输入数据源CSV中一行数据为
若认定圈红的那一列是数字类型
那么此时因为是字符串 无法转换为数字类型
那么这一行是异常数据
将异常数据保存
根据业务灵活处理
- 第一个全红的 2: 表示第二行
- 第二个圈红的部分 表示 当前列数据应为Double类型但实际上是非数字类型 所以该行是异常数据
在方法内部对于全局变量的使用仅限于在方法内部使用 不会对方法之后的作用域有效
比如
过滤函数
filter 是过滤函数 对每一行数据进行过滤 返回false则会被过滤掉
全局变量
List<Integer> rowList=new ArrayList<>(); 在filter函数作用域之外 可以在filter函数中使用 仅限于filter函数之间才有效 在filter函数之后 则无法使用filter对该变量的修改
- 保存到CSV
- 缺陷
需要指定Tuple类 比如生成的csv文件一行有25列 那么就使用Tuple25 还需要定义25个泛型 比如Tuple25<String,String,....> 最多可支持25列 如果是超过25列那么就不支持 所以使用起来非常不方便 而且使用范围有限
我当时在这块费了时间,因为csv列数超过了25列 比如26列,我想着在增加一个Tuple26或TupleN 尝试了之后 不可以 后来找到了国内Flink钉钉群 请教了下里面的大佬 说是建议保存到Mysql中
- 保存到Mysql
配置mysql信息和要执行的sql语句
局限性
假如我有1000个列 那么需要建立一个表有1000个列吗 如果有5000个列呢 所以这种方式 也不太好
此时已经到了项目的最后期限了 很多同事在等着我的结果 我的压力也倍增 差点准备放弃flink 用low的方式实现 最后灵机一动看到了保存到txt文本文件的方法
- 保存到Text
这种方式简单有效
DEMO源码
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/flink
Flink国内钉钉群号
群号 : 23138101
后记
上面这点东西 忙活了我3-4天时间 自我感觉 真是太笨了 国内相关的资料目前还比较少 写下这点心得和经验给需要的朋友们 避免走弯路