大数据开发笔记(三):Mapreduce

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: MapReduce是采用一种分而治之的思想设计出来的分布式计算框架,它由两个阶段组成:map阶段和reduce阶段。

Mapreduce思维导图


image.png


Mapreduce流程介绍



image.png

image.png


MapReduce是采用一种分而治之的思想设计出来的分布式计算框架,它由两个阶段组成:map阶段和reduce阶段。


在map阶段中:


首先读取HDFS中的文件,每个文件都以一个个block形式存在,block中的数据会被解析成多个kv对,然后调用map task的map方法;

map方法对接收到的kv对进行分片处理,转换输出成新的kv对;

然后对kv对进行分区和排序压缩,中间会涉及到map的shuffle过程,最后存入本地磁盘,供后续reduce task作为输入参数使用。

在reduce阶段中:


reduce方法将多个map task的输出,按照不同的分区传输到不同的reduce 上进行合并排序,这中间也涉及到reduce的shuffle过程(shuffle就像洗扑克牌一样)

然后reduce方法对输入的键值对汇总计算,输出计算结果

最后把reduce的输出保存在HDFS文件中。


Mapreduce概念



一、MapReduce编程模型

思想:分成两个阶段,map阶段和reduce阶段;


map阶段将大的任务,切分成小任务,分散到不同的服务器中,并行的执行(减少每个服务器的计算负载,同时减少整体的计算时间); reduce阶段,将map阶段的结果做汇总


map阶段的输入、输出都是以kv对的形式;输出结果写入map任务所在服务器的本地磁盘 reduce阶段的输入来自map阶段的输出;reduce的输出结果也是kv对,默认存储到HDFS中


二、MapReduce编程示例

1、写三个主要的代码 自定义的map类 在map方法中,编写业务代码 自定义的reduce类 在reduce方法中,编写业务代码 程序入口main 编写程序入口


2、可以本地运行


3、集群运行


三、combiner combiner原理

map端本地聚合 其本质是一个reducer类


四、shuffle

shuffle主要指的map端的输出,然后如何传输到reduce task的过程


shuflle中的执行顺序是先分区,然后在溢写之前进行排序,最后溢出的多个磁盘文件会进行合并成一个大文件。


1.map端: 输出是kv对,先写入环形缓冲区(100M),当达到80%后,缓冲区中的数据开始溢出写磁盘文件; 溢出过程中,有以下几个操作: 1、根据reduce个数对kv对做分区;


2、每个分区中,根据kv对的key做排序;


3、若有map端本地聚合combine,则对每个分区中,同组数据(默认指key相同的kv对)的values做聚合 在溢出写的过程中,可以继续向环形缓冲区写入数据;但是若写入速度大于溢出写的速度,当100m占满后,暂停向环形缓冲区中写入的过程;只执行溢出写的过程;直到全部溢出写到磁盘,才恢复向缓冲区写入。


随着不断的溢出写磁盘文件,本地磁盘会生成多个溢出文件,在map task完成之前,溢出文件会被合并成一个已分区、已排序的输出文件


reduce端: reduce task会在每个map task运行完成后,获得map task输出中,属于自己的分区数据(许多kv对); 数据先在reduce的jvm内存中,如果数据占空间变大,则合并后溢出写磁盘文件;若指定了combine,在合并时,运行它,减少写入磁盘的数据量。随着溢出文件的增多,会合并文件 所有map task复制完成后,进入“合并阶段”;维持原顺序排序每组合并后的数据,调用一次reduce方法


MR编程实战

1.wordcount: 针对实际业务场景(一个用户(user_id)的评价量,点击量,收藏量等等)

map.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
   line = line.strip()
   words = line.split()
   for word in words:
     print "%s\t%s" % (word, 1)
reduce.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
  line = line.strip()
  word, count = line.split('\t', 1)
  try:
    count = int(count)
  except ValueError:
    continue
  if current_word == word:
    current_count += count
  else:
    if current_word:
      print "%s\t%s" % (current_word, current_count)
    current_count = count
    current_word = word
 if word == current_word:
 print "%s\t%s" % (current_word, current_count)

2.mr实现单词idf统计

map.py
import sys
for line in sys.stdin:
  ss = line.strip().split('\t')
  if len(ss) != 2:
   continue
  file_name, file_content = ss
  word_list = file_content.strip().split(' ')
  word_set = set(word_list)
  for word in word_set:
    print '\t'.join([word, '1'])
red.py
[root@master tfidf_test]# cat red.py
import sys
import math
current_word = None
sum = 0
docs_cnt = 508
for line in sys.stdin:
  ss = line.strip().split('\t')
  if len(ss) != 2:
    continue
  word, val = ss
  if current_word == None:
    current_word = word
  if current_word != word:
    idf = math.log(float(docs_cnt) / (float(sum) + 1.0))
    print '\t'.join([current_word, str(idf)])
    current_word = word
    sum = 0
   sum += int(val)
idf = math.log(float(docs_cnt) / (float(sum) + 1.0))
print '\t'.join([current_word, str(idf)])
输出当前单词+idf分

3.ip地址识别--二分法




相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
5月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
127 2
|
7月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
141 2
|
2月前
|
数据采集 机器学习/深度学习 DataWorks
DataWorks产品评测:大数据开发治理的深度体验
DataWorks产品评测:大数据开发治理的深度体验
153 1
|
5月前
|
大数据 网络安全 数据安全/隐私保护
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(二)
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(二)
218 5
|
5月前
|
XML 大数据 网络安全
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(一)
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(一)
123 4
|
6月前
|
SQL 分布式计算 大数据
代码编码原则和规范大数据开发
此文档详细规定了SQL代码的编写规范,包括代码的清晰度,执行效率,以及注释的必要性。它强调所有SQL关键字需统一使用大写或小写,并禁止使用select *操作。此外,还规定了代码头部的信息模板,字段排列方式,INSERT, SELECT子句的格式,运算符的使用,CASE语句编写规则,查询嵌套规范,表别名定义,以及SQL注释的添加方法。这些规则有助于提升代码的可读性和可维护性。
126 0
|
6月前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
194 0
|
7月前
|
分布式计算 大数据 Hadoop
MapReduce:大数据处理的基石
【8月更文挑战第31天】
219 0
|
7月前
|
机器学习/深度学习 分布式计算 算法
MaxCompute 的 MapReduce 与机器学习
【8月更文第31天】随着大数据时代的到来,如何有效地处理和分析海量数据成为了一个重要的课题。MapReduce 是一种编程模型,用于处理和生成大型数据集,其核心思想是将计算任务分解为可以并行处理的小任务。阿里云的 MaxCompute 是一个面向离线数据仓库的计算服务,提供了 MapReduce 接口来处理大规模数据集。本文将探讨如何利用 MaxCompute 的 MapReduce 功能来执行复杂的计算任务,特别是应用于机器学习场景。
157 0
|
7月前
|
数据可视化
Echarts数据可视化开发| 智慧数据平台
Echarts数据可视化开发| 智慧数据平台

热门文章

最新文章