在文件存储 HDFS 上使用 Apache Flink

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要为大家介绍在文件存储HDFS上搭建及使用Apache Flink的方法。

111.jpg
镜像下载、域名解析、时间同步请点击 阿里巴巴开源镜像站

一、准备工作

在文件存储HDFS上使用Apache Flink,需要先完成以下准备工作。

说明 本文档的操作步骤中涉及的安装包版本号、文件夹路径,请根据实际情况进行替换。

  1. 开通文件存储HDFS服务并创建文件系统实例和挂载点,详情请参见HDFS快速入门
  2. 在计算节点上安装JDK。版本不能低于1.8。
  3. 在计算节点上安装Scala。Scala下载地址:官方链接,其版本要与使用的Apache Flink版本相兼容。
  4. 下载Apache Hadoop压缩包。Apache Hadoop下载地址:官方链接。建议您选用的Apache Hadoop版本不低于2.7.2,本文档中使用的Apache Hadoop版本为Apache Hadoop 2.7.2。
  5. 下载Apache Flink压缩包。在文件存储HDFS上使用的Flink的版本必须为1.9.0及以上,Apache Flink下载地址:官方链接。本文档中使用的Flink版本为官方提供的预编译版本Flink 1.9.0。

二、配置Apache Hadoop

1、执行如下命令解压Apache Hadoop压缩包到指定文件夹。

tar -zxvf hadoop-2.7.2.tar.gz -C /usr/local/

2、修改hadoop-env.sh配置文件。

  • 执行如下命令打开hadoop-env.sh配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/hadoop-env.sh
  • 配置JAVA_HOME目录,如下所示。
export JAVA_HOME=/usr/java/default

3、修改core-site.xml配置文件。

  • 执行如下命令打开core-site.xml配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/core-site.xml
  • 在core-site.xml配置文件中,配置如下信息,详情请参见挂载文件系统
<configuration>
<property>
     <name>fs.defaultFS</name>
     <value>dfs://x-xxxxxxxx.cn-xxxxx.dfs.aliyuncs.com:10290</value>
     <!-- 该地址填写您的挂载点地址 -->
</property>
<property>
     <name>fs.dfs.impl</name>
     <value>com.alibaba.dfs.DistributedFileSystem</value>
</property>
<property>
     <name>fs.AbstractFileSystem.dfs.impl</name>
     <value>com.alibaba.dfs.DFS</value>
</property>
<property>
     <name>io.file.buffer.size</name>
     <value>8388608</value>
</property>
<property>
     <name>alidfs.use.buffer.size.setting</name>
     <value>true</value>
</property>
<property>
     <name>dfs.usergroupservice.impl</name>
     <value>com.alibaba.dfs.security.LinuxUserGroupService.class</value>
</property>
  <property>
     <name>dfs.connection.count</name>
     <value>16</value>
</property>
</configuration>

4、修改mapred-site.xml配置文件。

  • 执行如下命令打开mapred-site.xml配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/mapred-site.xml
  • 在mapred-site.xml配置文件中,配置如下信息。
<configuration>
<property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
</property>
</configuration>

5、修改yarn-site.xml配置文件。

  • 执行如下命令打开yarn-site.xml配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/yarn-site.xml
  • 在yarn-site.xml配置文件中,配置如下信息。
<configuration>
<property>
  <name>yarn.resourcemanager.hostname</name>
  <value>xxxx</value>
  <!-- 该地址填写集群中yarn的resourcemanager的hostname -->
</property>
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
  <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
  <name>yarn.nodemanager.resource.memory-mb</name>
  <value>16384</value>
    <!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
  <name>yarn.nodemanager.resource.cpu-vcores</name>
  <value>4</value>
     <!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
  <name>yarn.scheduler.maximum-allocation-vcores</name>
  <value>4</value>
    <!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
  <name>yarn.scheduler.minimum-allocation-mb</name>
  <value>3584</value>
    <!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
  <name>yarn.scheduler.maximum-allocation-mb</name>
  <value>14336</value>
    <!-- 根据您当前的集群能力进行配置此项 -->
</property>
</configuration>

6、修改slaves配置文件。

  • 执行如下命令打开slaves配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/slaves
  • 在slaves配置文件中,配置如下信息。
node1
node2

7、配置环境变量。

  • 执行如下命令打开/etc/profile配置文件。
vim /etc/profile
  • 在/etc/profile配置文件中,配置如下信息。
export HADOOP_HOME=/usr/local/hadoop-2.7.2
export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
  • 执行如下命令使配置生效。
source /etc/profile

8、执行如下命令配置文件存储HDFS的SDK。您可以单击下载文件存储HDFS的SDK(此处以aliyun-sdk-dfs-1.0.3.jar为例),将其部署在Apache Hadoop生态系统组件的CLASSPATH上,详情请参见挂载文件系统

cp aliyun-sdk-dfs-1.0.3.jar  /usr/local/hadoop-2.7.2/share/hadoop/hdfs

9、执行如下命令将${HADOOP_HOME}文件夹同步到集群的其他节点。

scp -r hadoop-2.7.2/ root@node2:/usr/local/

三、验证Apache Hadoop配置

完成Apache Hadoop配置后,不需要格式化namenode,也不需要使用start-dfs.sh来启动HDFS相关服务。如需使用yarn服务,只需在resourcemanager节点启动yarn服务,具体验证Apache Hadoop配置成功的方法请参见验证安装

四、编译flink-shade

1、下载 flink-shade源码到指定目录。

git clone https://github.com/apache/flink-shaded.git  ~/flink-shade

2、修改flink-shade源码中的pom文件。修改Hadoop版本为您的集群中使用的版本,在本文档中使用的Hadoop版本为2.7.2。

vim  ~/flink-shaded/flink-shaded-hadoop-2-parent/pom.xml

1.png
在依赖项中添加文件存储HDFS SDK,在本文档使用文件存储HDFS SDK版本为1.0.3。

vim  ~/flink-shaded/flink-shaded-hadoop-2-parent/flink-shaded-hadoop-2/pom.xml
...
<dependency>
       <groupId>com.aliyun.dfs</groupId>
       <artifactId>aliyun-sdk-dfs</artifactId>
       <version>1.0.3</version>
</dependency>
...

2.png
3、编译打包。

cd ~/flink-shaded
mvn package -Dshade-sources

五、配置Apache Flink

1、执行如下命令解压Flink压缩包到指定文件夹。

tar -zxvf flink-1.9.0-bin-scala_2.11.tgz -C /usr/local/

2、拷贝flink-shade编译的flink-shaded-hadoop-2-uber-x.y.z.jar到Flink的lib目录下。

cp  ~/flink-shaded/flink-shaded-hadoop-2-parent/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.7.2-11.0.jar /usr/local/flink-1.9.0/lib/

说明

  • 在使用Apache Flink之前必须在您的集群环境变量中配置HADOOP_HOME,HADOOP_CLASSPATH和HADOOP_CONF_DIR,详情请参见配置Apache Hadoop中的步骤7:配置环境变量。
  • 如果您使用的Flink版本中已经包含flink-shaded-hadoop-2-uber-x.y.z.jar,则需要使用编译flink-shade中编译的flink-shaded-hadoop-2-uber-x.y.z.jar进行替换。
  • 如果您需要对Flink进行额外的配置,请参考官方文档:配置操作指南

六、验证Apache Flink配置

使用Flink自带的WordCount.jar对文件存储HDFS上的数据进行读取,并将计算结果写入到文件存储HDFS,在测试之前需要先启动yarn服务。
1、生成测试数据。此处使用Apache Hadoop 2.7.2自带的jar包hadoop-mapreduce-examples-2.7.2.jar中的randomtextwriter方法在文件存储HDFS上生成测试数据。

/usr/local/hadoop-2.7.2/bin/hadoop jar  /usr/local/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar 
randomtextwriter \
-D mapreduce.randomtextwriter.totalbytes=10240 \
-D mapreduce.randomtextwriter.bytespermap=1024 \
-D mapreduce.job.maps=4  \
-D mapreduce.job.reduces=2  \
dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input \

其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
2、查看在文件存储HDFS上生成的测试数据。

/usr/local/hadoop-2.7.2/bin/hadoop fs -cat dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input/*

其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
3、提交wordcount程序。

/usr/local/flink-1.9.0/bin/flink run 
-m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 \
/usr/local/flink-1.9.0/examples/batch/WordCount.jar \
--input dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input \
--output dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/output \

其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
4、查看在文件存储HDFS上的结果文件。

/usr/local/hadoop-2.7.2/bin/hadoop fs -cat dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/output

其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
3.png

阿里巴巴开源镜像站 提供全面,高效和稳定的镜像下载服务。钉钉搜索 ' 21746399 ‘ 加入镜像站官方用户交流群。”

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
505 13
Apache Flink 2.0-preview released
|
25天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
59 3
|
30天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
52 1
|
29天前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
分布式计算 Java Hadoop
Flink1.4 HDFS Connector
原文来源于:Flink1.4 HDFS Connector 此连接器提供一个 Sink,将分区文件写入 Hadoop FileSystem 支持的任何文件系统。
1923 0
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
814 7
阿里云实时计算Flink在多行业的应用和实践
|
7天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
569 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。

推荐镜像

更多