图解大数据 | 应用Map-Reduce进行大数据统计

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Hadoop使用一套Map-Reduce的计算框架,解决了大数据处理的难题。本教程ShowMeAI通过几个实例和代码,详细给大家讲解Hadoop使用Map-Reduce进行数据统计的方法。

ShowMeAI研究中心

作者:韩信子@ShowMeAI
教程地址http://www.showmeai.tech/tutorials/84
本文地址http://www.showmeai.tech/article-detail/170
声明:版权所有,转载请联系平台与作者并注明出处

收藏ShowMeAI查看更多精彩内容


1. 引言

本教程ShowMeAI详细给大家讲解Hadoop使用Map-Reduce进行数据统计的方法,关于Hadoop与map-reduce的基础知识,大家可以回顾ShowMeAI的基础知识讲解篇 分布式平台Hadoop与Map-reduce详解

尽管大部分人使用 Hadoop 都是用 java 完成,但是 Hadoop 程序可以用 Python、C++、ruby 等完成。本示例教大家用 Python 完成 MapReduce 实例统计输入文件的单词的词频。

  • 输入:文本文件
  • 输出:单词和词频信息,用 \t 隔开

2. Python实现 MapReduce 代码

使用 Python 完成 MapReduce 需要利用 Hadoop 流的 API,通过 STDIN(标准输入)、STDOUT(标准输出)在 Map 函数和 Reduce 函数之间传递数据。

我们会利用 Python 的 sys.stdin 读取输入数据,并把我们的输出传送给 sys.stdout。Hadoop 流将会完成其他的工作。

一个抽象的Hadoop大数据处理流程如下图所示

实操案例; 应用map-reduce进行大数据统计; Map-Reduce; Hadoop大数据处理流程; 4-1

对于本文提到的任务,我们做一个更详细的拆解,整个Hadoop Map-Reduce过程如下图所示

实操案例; 应用map-reduce进行大数据统计; Map-Reduce; Hadoop; Map-Reduce过程; 4-2

从上图,我们可以看到,我们在当前任务中,需要核心通过代码完成的步骤是:

  • Map:产生词与次数标记键值对
  • Reduce:聚合同一个词(key)的值,完成统计

下面我们来看看,通过 Python 如何完成这里的 Map 和 Reduce 阶段。

2.1 Map阶段:mapper.py

在这里,我们假设map阶段使用到的 Python 脚本存放地址为 ShowMeAI/hadoop/code/mapper.py

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

解释一下上述代码:

  • 文件从STDIN读取文件。
  • 把单词切开,并把单词和词频输出STDOUT。
  • Map 脚本不会计算单词的总数,而是直接输出 <word> 1(Reduce阶段会完成统计工作)。

为了使脚本可执行,增加 mapper.py 的可执行权限:

chmod +x ShowMeAI/hadoop/code/mapper.py

2.2 Reduce阶段:reducer.py

在这里,我们假设reduce阶段使用到的 Python 脚本存放地址为 ShowMeAI/hadoop/code/reducer.py

#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count如果不是数字的话,直接忽略掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要忘记最后的输出
    print "%s\t%s" % (current_word, current_count)

文件会读取 mapper.py 的结果作为 reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到STDOUT。

为了是脚本可执行,增加 reducer.py 的可执行权限

chmod +x ShowMeAI/hadoop/code/reducer.py

3.本地测试MapReduce流程

通常我们在把数据处理流程提交到集群进行运行之前,会本地做一个简单测试,我们会借助linux的管道命令 (cat data | map | sort | reduce) 对数据流进行串接,验证我们写的 mapper.pyreducer.py脚本功能是否正常。这种测试方式,能保证输出的最终结果是我们期望的。

测试的命令如下:

cd ShowMeAI/hadoop/code/
echo "foo foo quux labs foo bar quux" | python mapper.py
echo ``"foo foo quux labs foo bar quux"` `| python mapper.py | sort -k1, 1  | python reducer.py

其中的 sort 过程主要是完成以 key 为基准的排序,方便 reduce 阶段进行聚合统计。

4. Hadoop集群运行Python代码

4.1 数据准备

我们对以下三个文件进行词频统计,先根据下述路径下载:

将文件放置到 ShowMeAI/hadoop/datas/ 目录下。

4.2 执行程序

把本地的数据文件拷贝到分布式文件系统HDFS中。

bin/hadoop dfs -copyFromLocal ShowMeAI/hadoop/datas  hdfs_in

查看:

bin/hadoop dfs -ls


查看具体的文件:

bin/hadoop dfs -ls /user/showmeai/hdfs_in


执行MapReduce job:

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file ShowMeAI/hadoop/code/mapper.py     -mapper ShowMeAI/hadoop/code/mapper.py \
-file ShowMeAI/hadoop/code/reducer.py    -reducer ShowMeAI/hadoop/code/reducer.py \
-input /user/showmeai/hdfs_in/*    -output /user/showmeai/hdfs_out

实例输出:

实操案例; 应用map-reduce进行大数据统计; Map-Reduce; 运行结果; 4-3

查看输出结果是否在目标目录 /user/showmeai/hdfs_out

bin/hadoop dfs -ls /user/showmeai/hdfs_out


查看结果:

bin/hadoop dfs -cat /user/showmeai/hdfs_out2/part-00000

输出:

实操案例; 应用map-reduce进行大数据统计; Map-Reduce; 运行结果; 4-4

5. Mapper 和 Reducer代码优化

5.1 Python中的迭代器和生成器

我们这里对Map-Reduce的代码优化主要基于迭代器和生成器,对这个部分不熟悉的同学可以参考ShowMeAI的 Python 部分内容 → 《图解 Python | 迭代器与生成器》

5.2 优化Mapper 和 Reducer代码

mapper.py
#!/usr/bin/env python
import sys
def read_input(file):
    for line in file:
        yield line.split()

def main(separator='\t'):
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print "%s%s%d" % (word, separator, 1)

if __name__ == "__main__":
    main()

reducer.py
#!/usr/bin/env python
from operator import itemgetter
from itertools import groupby
import sys

def read_mapper_output(file, separator = '\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator = '\t'):
    data = read_mapper_output(sys.stdin, separator = separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except valueError:
            pass

if __name__ == "__main__":
    main()


我们对代码中的 groupby 做一个简单代码功能演示讲解,如下:

from itertools import groupby
from operator import itemgetter

things = [('2009-09-02', 11),
          ('2009-09-02', 3),
          ('2009-09-03', 10),
          ('2009-09-03', 4),
          ('2009-09-03', 22),
          ('2009-09-06', 33)]

sss = groupby(things, itemgetter(0))
for key, items in sss:
    print key
    for subitem in items:
        print subitem
    print '-' * 20


结果:

2009-09-02
('2009-09-02', 11)
('2009-09-02', 3)
--------------------
2009-09-03
('2009-09-03', 10)
('2009-09-03', 4)
('2009-09-03', 22)
--------------------
2009-09-06
('2009-09-06', 33)
--------------------

代码中:

  • groupby(things, itemgetter(0)) 以第0列为排序目标
  • groupby(things, itemgetter(1)) 以第1列为排序目标
  • groupby(things) 以整行为排序目标

6. 参考资料

【大数据技术与处理】推荐阅读

ShowMeAI 系列教程推荐

ShowMeAI用知识加速每一次技术成长

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
SQL 存储 分布式计算
ODPS技术架构深度剖析与实战指南——从零开始掌握阿里巴巴大数据处理平台的核心要义与应用技巧
【10月更文挑战第9天】ODPS是阿里巴巴推出的大数据处理平台,支持海量数据的存储与计算,适用于数据仓库、数据挖掘等场景。其核心组件涵盖数据存储、计算引擎、任务调度、资源管理和用户界面,确保数据处理的稳定、安全与高效。通过创建项目、上传数据、编写SQL或MapReduce程序,用户可轻松完成复杂的数据处理任务。示例展示了如何使用ODPS SQL查询每个用户的最早登录时间。
183 1
|
28天前
|
机器学习/深度学习 人工智能 运维
智能化运维:AI与大数据在IT运维中的应用探索####
本文旨在探讨人工智能(AI)与大数据分析技术如何革新传统IT运维模式,提升运维效率与服务质量。通过具体案例分析,揭示AI算法在故障预测、异常检测及自动化修复等方面的实际应用成效,同时阐述大数据如何助力实现精准运维管理,降低运营成本,提升用户体验。文章还将简要讨论实施智能化运维面临的挑战与未来发展趋势,为IT管理者提供决策参考。 ####
|
2月前
|
机器学习/深度学习 存储 大数据
云计算与大数据技术的融合应用
云计算与大数据技术的融合应用
|
3月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
78 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
3月前
|
存储 Java API
详细解析HashMap、TreeMap、LinkedHashMap等实现类,帮助您更好地理解和应用Java Map。
【10月更文挑战第19天】深入剖析Java Map:不仅是高效存储键值对的数据结构,更是展现设计艺术的典范。本文从基本概念、设计艺术和使用技巧三个方面,详细解析HashMap、TreeMap、LinkedHashMap等实现类,帮助您更好地理解和应用Java Map。
72 3
ly~
|
3月前
|
供应链 搜索推荐 安全
大数据模型的应用
大数据模型在多个领域均有广泛应用。在金融领域,它可用于风险评估与预测、智能营销及反欺诈检测,助力金融机构做出更加精准的决策;在医疗领域,大数据模型能够协助疾病诊断与预测、优化医疗资源管理和加速药物研发;在交通领域,该技术有助于交通流量预测、智能交通管理和物流管理,从而提升整体交通效率;电商领域则借助大数据模型实现商品推荐、库存管理和价格优化,增强用户体验与企业效益;此外,在能源和制造业中,大数据模型的应用范围涵盖从需求预测到设备故障预测等多个方面,全面推动了行业的智能化转型与升级。
ly~
233 2
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
56 3
ly~
|
3月前
|
机器学习/深度学习 人工智能 自然语言处理
大数据在智慧金融中的应用
在智能算法交易中,深度学习揭示价格波动的复杂动力学,强化学习依据市场反馈优化策略,助力投资者获取阿尔法收益。智能监管合规利用自然语言处理精准解读法规,实时追踪监管变化,确保机构紧跟政策。大数据分析监控交易,预警潜在违规行为,变被动防御为主动预防。数智化营销通过多维度数据分析,构建细致客户画像,提供个性化产品推荐。智慧客服借助 AI 技术提升服务质量,增强客户满意度。
ly~
166 3
ly~
|
3月前
|
供应链 搜索推荐 大数据
大数据在零售业中的应用
在零售业中,大数据通过分析顾客的购买记录、在线浏览习惯等数据,帮助零售商理解顾客行为并提供个性化服务。例如,分析网站点击路径以了解顾客兴趣,并利用历史购买数据开发智能推荐系统,提升销售和顾客满意度。此外,大数据还能优化库存管理,通过分析销售数据和市场需求,更准确地预测需求,减少库存积压和缺货现象,提高资金流动性。
ly~
540 2
ly~
|
3月前
|
供应链 监控 搜索推荐
大数据的应用场景
大数据在众多行业中的应用场景广泛,涵盖金融、零售、医疗保健、交通物流、制造、能源、政府公共服务及教育等领域。在金融行业,大数据用于风险评估、精准营销、反欺诈以及决策支持;零售业则应用于商品推荐、供应链管理和门店运营优化等;医疗保健领域利用大数据进行疾病预测、辅助诊断和医疗质量评估;交通物流业通过大数据优化物流配送、交通管理和运输安全;制造业则在生产过程优化、设备维护和供应链协同方面受益;能源行业运用大数据提升智能电网管理和能源勘探效率;政府和公共服务部门借助大数据改善城市管理、政务服务及公共安全;教育行业通过大数据实现个性化学习和资源优化配置;体育娱乐业则利用大数据提升赛事分析和娱乐制作水平。
ly~
780 2