阅读本文之前 需要先在 服务器端配置好 伪分布的 hadoop
可以参考博主之前的文章
!!!!
先记录一下自己遇到的坑
hadoop 找不到python
安装python 后还需要在 py文件中添加
#! python执行路径
#!/usr/local/python3/Python-3.6.5/python3
否则会出现很多莫名其妙的 bug
!!!!
hadoop 需要开启的端口不是一般的的多,,,
所以在服务器上进行部署的时候 ,把来自 本机的请求 源的 安全组 端口全部打开,否则会出现很多莫名其妙的 bug
配置万hadoop 检查 yarn 是否启动
[root@122 ~]# jps 32176 DataNode 31858 NameNode 31842 ResourceManager 25683 Jps 27334 JobHistoryServer 1114 NodeManager 32652 SecondaryNameNode
发现 resourcemanager 与 namenode datanode 都在正常运行
打开web 页面,创建 /shiyan2 目录
上传 input.txt 文件
input.txt 内容如下
2014010114 2014010216 2014010317 2014010410 2014010506 2012010609 2012010732 2012010812 2012010919 2012011023 2001010116 2001010212 2001010310 2001010411 2001010529 2013010619 2013010722 2013010812 2013010929 2013011023 2008010105 2008010216 2008010337 2008010414 2008010516 2007010619 2007010712 2007010812 2007010999 2007011023 2010010114 2010010216 2010010317 2010010410 2010010506 2015010649 2015010722 2015010812 2015010999 2015011023
根据博主肤浅的理解, hadoop 分布计算 就是根据 每个data节点 利用本身的资源 每一行处理文本数据
python 结合hadoop 相当于利用了文件的 重定向功能
首先在进行问题描述, input.txt中每 一行数据 都是一个 日期的温度值,
我们现在需要进行一个 分布式的 每年最高温度的 计算,
首先我们在本地实现
利用 python 实现这个计算还是很简单的
创建一个新目录 /now_work 名称随意 ,先将 input.txt 放入目录
书写 map 计算, map 计算就是在 可以在子节点进行的计算,
放到 本问题中 就是 从 初始数据中计算 每一行的年份 与 温度的关系
mapreduce.Job: map 0% reduce 0%
INFO mapreduce.Job: map 100% reduce 0%
INFO mapreduce.Job: map 100% reduce 100%
这三行是最终计算时的输出, 我们可以发现 map 与 reduce 分步进行的
在 本题中 map 指的就是 从 2015011023 计算出年份以及 温度输出形式为
2015 23
当所有子节点都完成了各自的计算 ,为了统计 全年最高温度 ,必须将所有文件的计算结果进行 合并 才可以得到最终结果
先列出 map.py 文件
#!/usr/local/python3/bin/python3 import sys for line in sys.stdin: fields = line.split() for item in fields: print(item[:4], ' ', item[-2:])
到了这一步 /new_work 中已经有了input.txt , 与 map.py
cd 到 /new_work 执行
cat input.txt | ./map.py
[root@122 cloudcomput]# cat input.txt | ./map.py 2014 14 2014 16 2014 17 2014 10 2014 06 #省略 n 多行
致辞我们已经完成了map 步骤,即 可以在 子节点 完成的计算任务
当然了,复现本文的 前提是你已经安装了 hadoop 以及python并配置了相关的环境变量
下边来写 reduce 步骤
由于我们得到了 2014 14 等数据
reduce.py 如下
#!/usr/local/python3/bin/python3 import sys result={} for line in sys.stdin: kvs=line.strip().split(' ') k=kvs[0] v=kvs[3] if k in result: if int(result[k])< int(v): result[k]=str(v) else: result[k]=str(v) for k,v in result.items(): print('%s\t%s' %(k,v))
现在 /new_work 中已经有了
input.txt map.py reduce .py
测试本地是否可用
cd /new_work
[root@] cat input.txt | ./map.py | sort | ./reduce.py |more 2001 29 2007 99 2008 37 2010 17 2012 32 2013 29 2014 17 2015 99
本题中数据 都是随便造的,不用太过在意
以上就是 分布式 算法的大概过程,下边我们让 hadoop 来执行脚本即可
来到hadoop 安装目录
新建 /python_script 目录
map.py reduce,py 上传 服务器/python_script目录
新建 脚本
vi run.sh
内容如下
# 配置 hadoop 的 执行命令 在安装路径的bin 文件中 HADOOP_CMD="/cloudcomput/hadoop-3.2.1/bin/hadoop" # 配置 配置 streaming jar包的 路径 STREAM_JAR_PATH="/cloudcomput/hadoop-3.2.1/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar" #配置 需要 计算的 输入文件的路径 #如果有 多个文件可以书写为 /{文件夹名称}/*.txt 或者其他格式 INPUT_FILE_PATH_1="/shiyan2/input.txt" #配置 输出文件的 文件夹目录 OUTPUT_PATH="/output" # 在执行之前如果 存在 输出文件夹 原本会报错 , #所以执行之前先删除 output 文件夹 , 再 导出文件 $HADOOP_CMD fs -rm -r skipTrash $OUTPUT_PATH # Step 1. # hadoop jar streaming.jar $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper ./map.py \ -reducer ./reduce.py \ -file ./map.py \ -file ./reduce.py \ #配置 mapper 步骤的 算法py文件 #即 可以在 子节点进行的计算 #配置 reducer 步骤的 算法py文件 #需要 汇总计算 的文件
确保 hadoop 下没有 output 目录
执行
cd /python_script
./run.sh