本文介绍在文件存储HDFS上搭建及使用Apache Flink的方法。
准备工作 在文件存储HDFS上使用Apache Flink,需要先完成以下准备工作。
说明 本文档的操作步骤中涉及的安装包版本号、文件夹路径,请根据实际情况进行替换。 开通文件存储HDFS服务并创建文件系统实例和挂载点,详情请参见HDFS快速入门。 在计算节点上安装JDK。 版本不能低于1.8。 在计算节点上安装Scala。 Scala下载地址:官方链接,其版本要与使用的Apache Flink版本相兼容。 下载Apache Hadoop压缩包。 Apache Hadoop下载地址:官方链接。建议您选用的Apache Hadoop版本不低于2.7.2,本文档中使用的Apache Hadoop版本为Apache Hadoop 2.7.2。 下载Apache Flink压缩包。 在文件存储HDFS上使用的Flink的版本必须为1.9.0及以上,Apache Flink下载地址:官方链接。本文档中使用的Flink版本为官方提供的预编译版本Flink 1.9.0。 配置Apache Hadoop 执行如下命令解压Apache Hadoop压缩包到指定文件夹。 tar -zxvf hadoop-2.7.2.tar.gz -C /usr/local/ 修改hadoop-env.sh配置文件。 执行如下命令打开hadoop-env.sh配置文件。 vim /usr/local/hadoop-2.7.2/etc/hadoop/hadoop-env.sh 配置JAVA_HOME目录,如下所示。 export JAVA_HOME=/usr/java/default 修改core-site.xml配置文件。 执行如下命令打开core-site.xml配置文件。 vim /usr/local/hadoop-2.7.2/etc/hadoop/core-site.xml 在core-site.xml配置文件中,配置如下信息,详情请参见挂载文件系统。 fs.defaultFS dfs://x-xxxxxxxx.cn-xxxxx.dfs.aliyuncs.com:10290 fs.dfs.impl com.alibaba.dfs.DistributedFileSystem fs.AbstractFileSystem.dfs.impl com.alibaba.dfs.DFS io.file.buffer.size 8388608 alidfs.use.buffer.size.setting true dfs.usergroupservice.impl com.alibaba.dfs.security.LinuxUserGroupService.class dfs.connection.count 16 修改mapred-site.xml配置文件。 执行如下命令打开mapred-site.xml配置文件。 vim /usr/local/hadoop-2.7.2/etc/hadoop/mapred-site.xml 在mapred-site.xml配置文件中,配置如下信息。 mapreduce.framework.name yarn 修改yarn-site.xml配置文件。 执行如下命令打开yarn-site.xml配置文件。 vim /usr/local/hadoop-2.7.2/etc/hadoop/yarn-site.xml 在yarn-site.xml配置文件中,配置如下信息。 yarn.resourcemanager.hostname xxxx
yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.aux-services.mapreduce.shuffle.class org.apache.hadoop.mapred.ShuffleHandler yarn.nodemanager.resource.memory-mb 16384 yarn.nodemanager.resource.cpu-vcores 4 yarn.scheduler.maximum-allocation-vcores 4 yarn.scheduler.minimum-allocation-mb 3584 yarn.scheduler.maximum-allocation-mb 14336 修改slaves配置文件。 执行如下命令打开slaves配置文件。 vim /usr/local/hadoop-2.7.2/etc/hadoop/slaves 在slaves配置文件中,配置如下信息。 node1 node2 配置环境变量。 执行如下命令打开/etc/profile配置文件。 vim /etc/profile 在/etc/profile配置文件中,配置如下信息。 export HADOOP_HOME=/usr/local/hadoop-2.7.2 export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH 执行如下命令使配置生效。 source /etc/profile 执行如下命令配置文件存储HDFS的SDK。 您可以单击下载文件存储HDFS的SDK(此处以aliyun-sdk-dfs-1.0.3.jar为例),将其部署在Apache Hadoop生态系统组件的CLASSPATH上,详情请参见挂载文件系统。cp aliyun-sdk-dfs-1.0.3.jar /usr/local/hadoop-2.7.2/share/hadoop/hdfs 执行如下命令将${HADOOP_HOME}文件夹同步到集群的其他节点。 scp -r hadoop-2.7.2/ root@node2:/usr/local/ 验证Apache Hadoop配置 完成Apache Hadoop配置后,不需要格式化namenode,也不需要使用start-dfs.sh来启动HDFS相关服务。如需使用yarn服务,只需在resourcemanager节点启动yarn服务,具体验证Apache Hadoop配置成功的方法请参见验证安装。
编译flink-shade 下载 flink-shade源码到指定目录。 git clone https://github.com/apache/flink-shaded.git ~/flink-shade 修改flink-shade源码中的pom文件。 修改Hadoop版本为您的集群中使用的版本,在本文档中使用的Hadoop版本为2.7.2。 vim ~/flink-shaded/flink-shaded-hadoop-2-parent/pom.xml flink-shade_1 在依赖项中添加文件存储HDFS SDK,在本文档使用文件存储HDFS SDK版本为1.0.3。 vim ~/flink-shaded/flink-shaded-hadoop-2-parent/flink-shaded-hadoop-2/pom.xml ... com.aliyun.dfs aliyun-sdk-dfs 1.0.3 ... flink-shade_2 编译打包。 cd ~/flink-shaded mvn package -Dshade-sources 配置Apache Flink 执行如下命令解压Flink压缩包到指定文件夹。 tar -zxvf flink-1.9.0-bin-scala_2.11.tgz -C /usr/local/ 拷贝flink-shade编译的flink-shaded-hadoop-2-uber-x.y.z.jar到Flink的lib目录下。 cp ~/flink-shaded/flink-shaded-hadoop-2-parent/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.7.2-11.0.jar /usr/local/flink-1.9.0/lib/ 说明 在使用Apache Flink之前必须在您的集群环境变量中配置HADOOP_HOME,HADOOP_CLASSPATH和HADOOP_CONF_DIR,详情请参见配置Apache Hadoop中的步骤7:配置环境变量。 如果您使用的Flink版本中已经包含flink-shaded-hadoop-2-uber-x.y.z.jar,则需要使用编译flink-shade中编译的flink-shaded-hadoop-2-uber-x.y.z.jar进行替换。 如果您需要对Flink进行额外的配置,请参考官方文档:配置操作指南。 验证Apache Flink配置 使用Flink自带的WordCount.jar对文件存储HDFS上的数据进行读取,并将计算结果写入到文件存储HDFS,在测试之前需要先启动yarn服务。
生成测试数据。 此处使用Apache Hadoop 2.7.2自带的jar包hadoop-mapreduce-examples-2.7.2.jar中的randomtextwriter方法在文件存储HDFS上生成测试数据。
/usr/local/hadoop-2.7.2/bin/hadoop jar /usr/local/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar randomtextwriter
-D mapreduce.randomtextwriter.totalbytes=10240
-D mapreduce.randomtextwriter.bytespermap=1024
-D mapreduce.job.maps=4
-D mapreduce.job.reduces=2
dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input
其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
查看在文件存储HDFS上生成的测试数据。 /usr/local/hadoop-2.7.2/bin/hadoop fs -cat dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input/* 其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
提交wordcount程序。 /usr/local/flink-1.9.0/bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024
/usr/local/flink-1.9.0/examples/batch/WordCount.jar
--input dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input
--output dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/output
其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
查看在文件存储HDFS上的结果文件。 /usr/local/hadoop-2.7.2/bin/hadoop fs -cat dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/output 其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。