前言
最近公司让做一个'没有必要'的需求
需求针对的对象
这是同一个csv文件的不同展示形式
- Excel展示形式
这个csv文件中可能有数百万条数据
需求
将所有的异常数据检测出来
- 什么样的数据是异常数据
圈红的数据我手动添加了一个a
原本是数字类型
现在变成了一个字符串类型
那么程序中将字符串类型转换为数字类型的话
就会报错
那这个值就是异常数据
- 为什么我说是 '没有必要的需求'
- 百万级别的数据量的csv一般都是由数据库导出来的
- 数据库的列字段都是定义好的 比如是decimal类型的数据类型 导出来的话 那么也肯定是数字而不会是字符串
- 而出现数字成字符串 是由于人工手动录入的情况下 才会出现 而这种情况又比较少
- 该csv数据集用于跑python算法 比如通过pandas读取csv 统计某一个列数据的和 若全是数字则可以统计 若某一行数据是字符串 则会出现异常 那么可以通过pandas的方式做异常值数据处理 比如剔除这一行即可
- 综上 花费人力物力去处理这一个没有必要的需求真的有些'没必要'
但领导发话了呀 这是客户的需求
所以do it
大致实现思路
读取该csv文件
解析csv每一行数据
检验每一行数据是否是异常数据
实现方式
- 普通方式
通过java读取csv 然后一行一行处理
这种方式 若单机内存太小 很容易造成内存溢出
而且方式很low 没有多大挑战性
对个人技术能力没有提升
所以这种方式pass
- Flink 流式处理
刚好头段时间 自己学习到了Flink
之前一直是纸上谈兵
现在终于有了用武之地
选好了技术方案 let's do it!
业务逻辑图
接下来简要说说此流程上的核心技术的实现原理
rabbitmq
DEMO源码
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/rabbitmq
发送消息
重点配置说明
- durable
是否持久化,是否将队列持久化到mnesia数据库中,有专门的表保存队列声明 - exclusive
①当前定义的队列是connection的channel是共享的,其他的connection是访问不到的
②当connection关闭的时候,队列将被删除 - autoDelete
当最后一个consumer(消费者)断开之后,队列将自动删除
监听消息
方式一
重要参数说明
- autoack
autoAck(同no-ack)为true的时候 消息发送到操作系统的套接字缓冲区时即任务消息已经被消费 但如果此时套接字缓冲区崩溃 消息在未被消费者应用程序消费的情况下就被队列删除 所以,如果想要保证消息可靠的达到消费者端 建议将autoAck字段设置为false 这样当上面套接字缓冲区崩溃的情况同样出现 仍然能保证消息被重新消费