hadoop提供了自动分发文件也压缩包的功能,只需要在启动hadoop streaming作业的时候增加响应的配置参数(-file)即可实现。
在执行streaming程序时,使用 -file 选项指定需要分发的本地文件;
- wordwhite (只统计的单词)
$ vim wordwhite
- mapper程序
$ vim mapper.py
#!/usr/bin/env python
import sys
def read_wordwhite(file):
word_set = set()
with open(file, 'r') as fd:
for line in fd:
word = line.strip()
return word_set
def mapper(file_fd):
word_set = read_wordwhite(file_fd)
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
if word != "" and (word in word_set):
print "%s\t%s" %(word, 1)
if __name__ == "__main__":
if sys.argv[1]:
file_fd = sys.argv[1]
- reducer程序
vim reducer.py
#!/usr/bin/env python
import sys
def reducer():
current_word = None
word_sum = 0
for line in sys.stdin:
word_list = line.strip().split('\t')
if len(word_list) < 2:
word = word_list[0].strip()
word_value = word_list[1].strip()
if current_word == None:
current_word = word
if current_word != word:
print "%s\t%s" %(current_word, str(word_sum))
current_word = word
word_sum = 0
word_sum += int(word_value)
print "%s\t%s" %(current_word, str(word_sum))
if __name__ == "__main__":
- run_streaming程序
$ vim runstreaming.sh
-mapper "python mapper.py wordwhite" \
-reducer "python reducer.py" \
-file ./mapper.py \
-file ./reducer.py \
-file ./wordwhite
- 执行程序
首先需要将测试的文件:The_Man_of_Property 上传到hdfs,同时创建wordcount输出目录;$ hadoop fs -put ./The_Man_of_Property /input/ $ hadoop fs -mkdir /output/wordcount
注:本次hadoop环境是伪分布式,hadoop 2.6版本。
$ ./runstreaming.sh
18/01/26 13:30:27 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./mapper.py, ./reducer.py, ./wordwhite, /tmp/hadoop-unjar7204532228900236640/] [] /tmp/streamjob7580948745512643345.jar tmpDir=null
18/01/26 13:30:29 INFO client.RMProxy: Connecting to ResourceManager at /
18/01/26 13:30:29 INFO client.RMProxy: Connecting to ResourceManager at /
18/01/26 13:30:31 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/26 13:30:31 INFO mapreduce.JobSubmitter: number of splits:2
18/01/26 13:30:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516345010544_0008
18/01/26 13:30:32 INFO impl.YarnClientImpl: Submitted application application_1516345010544_0008
18/01/26 13:30:32 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1516345010544_0008/
18/01/26 13:30:32 INFO mapreduce.Job: Running job: job_1516345010544_0008
18/01/26 13:30:40 INFO mapreduce.Job: Job job_1516345010544_0008 running in uber mode : false
18/01/26 13:30:40 INFO mapreduce.Job: map 0% reduce 0%
18/01/26 13:30:50 INFO mapreduce.Job: map 50% reduce 0%
18/01/26 13:30:51 INFO mapreduce.Job: map 100% reduce 0%
18/01/26 13:30:58 INFO mapreduce.Job: map 100% reduce 100%
18/01/26 13:30:59 INFO mapreduce.Job: Job job_1516345010544_0008 completed successfully
18/01/26 13:30:59 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=73950
FILE: Number of bytes written=582815
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=636501
HDFS: Number of bytes written=27
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=12815
Total time spent by all reduces in occupied slots (ms)=5251
Total time spent by all map tasks (ms)=12815
Total time spent by all reduce tasks (ms)=5251
Total vcore-milliseconds taken by all map tasks=12815
Total vcore-milliseconds taken by all reduce tasks=5251
Total megabyte-milliseconds taken by all map tasks=13122560
Total megabyte-milliseconds taken by all reduce tasks=5377024
Map-Reduce Framework
Map input records=2866
Map output records=9243
Map output bytes=55458
Map output materialized bytes=73956
Input split bytes=198
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=73956
Reduce input records=9243
Reduce output records=3
Spilled Records=18486
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=332
CPU time spent (ms)=3700
Physical memory (bytes) snapshot=707719168
Virtual memory (bytes) snapshot=8333037568
Total committed heap usage (bytes)=598736896
Shuffle Errors
File Input Format Counters
Bytes Read=636303
File Output Format Counters
Bytes Written=27
18/01/26 13:30:59 INFO streaming.StreamJob: Output directory: /output/wordcount/wordwhitetest
- 查看结果
$ hadoop fs -ls /output/wordcount/wordwhitetest/ Found 2 items -rw-r--r-- 1 centos supergroup 0 2018-01-26 13:30 /output/wordcount/wordwhitetest/_SUCCESS -rw-r--r-- 1 centos supergroup 27 2018-01-26 13:30 /output/wordcount/wordwhitetest/part-00000 $ hadoop fs -text /output/wordcount/wordwhitetest/part-00000 and 2573 had 1526 the 5144
2、hadoop streaming 语法参考
