Flink流式处理百万数据量CSV文件(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink流式处理百万数据量CSV文件(上)

前言


最近公司让做一个'没有必要'的需求

需求针对的对象

这是同一个csv文件的不同展示形式

  • Excel展示形式

image.png

这个csv文件中可能有数百万条数据

需求

将所有的异常数据检测出来

  • 什么样的数据是异常数据

image.png

圈红的数据我手动添加了一个a

原本是数字类型

现在变成了一个字符串类型

那么程序中将字符串类型转换为数字类型的话

就会报错

那这个值就是异常数据

  • 为什么我说是 '没有必要的需求'
  • 百万级别的数据量的csv一般都是由数据库导出来的
  • 数据库的列字段都是定义好的 比如是decimal类型的数据类型 导出来的话 那么也肯定是数字而不会是字符串
  • 而出现数字成字符串 是由于人工手动录入的情况下 才会出现 而这种情况又比较少
  • 该csv数据集用于跑python算法 比如通过pandas读取csv 统计某一个列数据的和 若全是数字则可以统计 若某一行数据是字符串 则会出现异常 那么可以通过pandas的方式做异常值数据处理 比如剔除这一行即可
  • 综上 花费人力物力去处理这一个没有必要的需求真的有些'没必要'

但领导发话了呀 这是客户的需求

所以do it

大致实现思路

读取该csv文件

解析csv每一行数据

检验每一行数据是否是异常数据

实现方式

  • 普通方式

通过java读取csv 然后一行一行处理

这种方式 若单机内存太小 很容易造成内存溢出

而且方式很low 没有多大挑战性

对个人技术能力没有提升

所以这种方式pass

  • Flink 流式处理

刚好头段时间 自己学习到了Flink

之前一直是纸上谈兵

现在终于有了用武之地

选好了技术方案 let's do it!


业务逻辑图


image.png

接下来简要说说此流程上的核心技术的实现原理


rabbitmq

DEMO源码


https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/rabbitmq


发送消息


image.png

重点配置说明

  • durable
    是否持久化,是否将队列持久化到mnesia数据库中,有专门的表保存队列声明
  • exclusive
    ①当前定义的队列是connection的channel是共享的,其他的connection是访问不到的
    ②当connection关闭的时候,队列将被删除
  • autoDelete
    当最后一个consumer(消费者)断开之后,队列将自动删除


监听消息


方式一

image.png

重要参数说明

  • autoack

image.png


autoAck(同no-ack)为true的时候
消息发送到操作系统的套接字缓冲区时即任务消息已经被消费
但如果此时套接字缓冲区崩溃
消息在未被消费者应用程序消费的情况下就被队列删除
所以,如果想要保证消息可靠的达到消费者端
建议将autoAck字段设置为false
这样当上面套接字缓冲区崩溃的情况同样出现
仍然能保证消息被重新消费


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
202 15
|
1月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
399 4
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
51 0
|
5月前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7874 10
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
4月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
4月前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
5月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
18427 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
4月前
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 监控 大数据
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
|
4月前
|
流计算
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免