Flink流式处理百万数据量CSV文件(中)

简介: Flink流式处理百万数据量CSV文件(中)

方式二 注解方式


image.png

  • 对类添加@RabbitListener(queues = "${java.flink.queue}")注解
  • 指定队列名称 可从配置文件中读取
  • 对方法添加 @RabbitHandler 注解

三个参数

  • Object message
任意类型的消息
# 解析mq消息
String messageString=JsonUtils.toJson(message);
Message message1=JsonUtils.fromJsonObject(messageString,Message.class);
String message2 = new String(message1.getBody(), "UTF-8");
  • Message msg
手动确认
//如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = msg.getMessageProperties().getDeliveryTag();
//通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
  • Channel channel
// 处理失败,重新压入MQ
channel.basicRecover();


线程池


源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/thread

image.png

spring线程相关注解

  • @EnableAsync
    使用多线程
  • @Async
加在线程任务的方法上(需要异步执行的任务)
定义一个线程任务
通过spring提供的ThreadPoolTaskExecutor就可以使用线程池

重要参数

  • corePoolSize
    核心线程数
  • maxPoolSize
    最大线程数
  • queueCapacity
    队列容量
  • keepAliveSeconds
    活跃时间
  • waitForTasksToCompleteOnShutdown
    设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
  • rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
  • setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
  • CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行

使用线程池中的线程去执行异步任务


image.png


分布式内存文件系统Alluxio


环境搭建

  • 自定义dokcer网络
docker network create alluxio_nw
  • 安装alluxio master
docker run -d  --rm \
    -p 19999:19999 \
    -p 19998:19998 \
    --net=alluxio_nw \
    --name=alluxio-master \
    -e ALLUXIO_JAVA_OPTS=" \
       -Dalluxio.master.hostname=alluxio-master" \
    alluxio/alluxio master
  • 安装alluxio worker
docker run -d --rm \
    -p 29999:29999 \
    -p 30000:30000 \
    --net=alluxio_nw \
    --name=alluxio-worker \
    --shm-size=3971.64MB \
    -e ALLUXIO_JAVA_OPTS=" \
       -Dalluxio.worker.memory.size=3971.64MB \
       -Dalluxio.master.hostname=alluxio-master \
       -Dalluxio.worker.hostname=alluxio-worker" \
    alluxio/alluxio worker

域名转发配置

sudo vim /etc/hosts
127.0.0.1 alluxio-worker

上传alluxio文件

image.png

下载alluxio文件

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
526 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1032 0
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
448 15
|
11月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
832 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
1120 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
21662 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
251 1
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
180 1
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
233 0