目录
任务1:给全校分布式数据库课程考试班级平均分前三名的班级中每个学生发奖
任务1升级版:给全校每门课程考试班级平均分前三名的班级中每个学生发奖
介绍
在用MongoDB查询返回的数据量很大的情况下,做一些比较复杂的统计和聚合操作做花费的时间很长的时候,可以用MongoDB中的MapReduce进行实现MapReduce是个非常灵活和强大的数据聚合工具。它的好处是可以把一个聚合任务分解为多个小的任务,分配到多服务器上并行处理。MongoDB也提供了MapReduce,当然查询语肯定是JavaScript。
db.collection.mapReduce( function() {emit(key,value);}, //map 函数 function(key,values[]) {return reduceFunction}, //reduce 函数 { out: collection, query: document, sort: document, limit: number } )
MongoDB中的MapReduce主要有以下几阶段:
- Map:把一个操作Map到集合中的每一个文档
- Shuffle: 根据Key分组对文档,并且为每个不同的Key生成一系列(>=1个)的值表(List of values)。
- Finalize:此步骤不是必须的。在得到MR最终结果后,再进行一些数据“修剪”性质的处理。
分布式数据库,如何利用到大数据的平台,如何利用高效,有利的工具解决数据的大量性,如复杂性,这就需要我们有一定的数据思维模式了
MongoDB中使用emit函数向MapReduce提供Key/Value对。
Reduce函数接受两个参数:Key,emits. Key即为emit函数中的Key。 emits是一个数组,它的元素就是emit函数提供的Value。
Reduce函数的返回结果必须要能被Map或者Reduce重复使用,所以返回结果必须与emits中元素结构一致。
Map或者Reduce函数中的this关键字,代表当前被Mapping文档。
// mapreduce // mapper: 采集单元数据键值对 var mapper = function(){ // key:group by 的字段,分组标准 var key = this.major // value: 要进行聚合计算的字段 var value = 1 emit(key, value); // {{grade:2018,gender:1}, 1} } / / map -- conbine+shuffle -> reduce // {{grade:2018,gender:1}, [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]} // reducer: 聚合数据, 产生一个单值数据 var reducer = function(key,values){ // key: {grade:2018,gender:1} // values [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1] var result = Array.sum(values) return result } / / options: 设置输入输出 var options = {out:"output", query:{}} db.students.mapReduce(mapper, reducer, options) db.output.find()
语法解析
db.runCommand({ mapreduce:<collection>, map:<mapfunction>, reduce:<reducefunction>, [,query:<query filter object>] [,sort:<sorts the input objects using this key.Useful for optimization,like sorting by the emit key for fewer reduces>] [,limit:<number of objects to return from collection>] [,out:<see output options below>] [,keeptemp:<true|false>] [,finalize:<finalizefunction>] [,scope:<object where fields go into javascript global scope>] [, jsMode : boolean,default true] [,verbose:true] });
参数说明:
- Mapreduce:要操作的目标集合
- Map:映射函数(生成键值对序列,作为reduce函数参数)
- Reduce:统计函数
- Query:目标记录过滤
- Sort:目标记录排序
- Limit:限制目标记录数量
- Out:统计结果存放集合(不指定使用临时集合,在客户端断开后自动删除)
- Keeptemp:是否保留临时集合
- Finalize:最终处理函数(对reduce返回结果进行最终整理后存入结果集合)
- Scope:向map、reduce、finalize导入外部变量
- jsMode说明:为false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,为true时 BSON-->js-->map-->reduce-->BSON
- Verbose:显示详细的时间统计信息
行查询的步骤
- MapReduce对指定的集合Collection进行查询
- 对A的结果集进行mapper方法采集
- 对B的结果执行finalize方法处理
- 最终结果集输出到临时Collection中
- 断开连接,临时Collection删除或保留
案例集合
统计各班级的学生人数(MapReduce)
// MapReduce // 统计各班级的学生人数 var mapper = function() { var key = this.major + this.grade.toString() + "-0" + this.class.toString() var value = 1 emit(key,value) } // mapper输出:{"大数据2019-01",1} {"大数据2019-02",1} {"大数据2019-01",1} {"大数据2019-01",1} // mapreduce函数会对mapper输出的key-value-pair进行合并 {k1,v1} {k2,v2} {k1,v3}/... => {k1,[v1,v3]} // {"大数据2019-01",[1,1,1]} {"大数据2019-02",[1]} var reducer = function(key,valueArray) { return Array.sum(valueArray) } var option = {out:'统计各班级的学生人数'} db.stu.mapReduce(mapper,reducer,option)
看到这里我们发现,这和我们的MySQL里面的创建视图有一定的异曲同工之妙,MapReduce的操作不单单是解决数据查询里面的大量所带来的耗时问题,还可以灵活的将结果进行输出
统计各班级的学生人数(aggregate)
// 统计各班级的学生人数 db.stu.aggregate([ {$group : {_id:{major:"$major",grade:"$grade",class:'$class'}, counts:{$sum:1}}}, {$sort : {'_id.major':1,"_id.grade":1,"_id.class":1}} ])
虽然使用常规的aggregate方法也可以,但是我们发现它的语法还比较的简单,但是!对于大量的数据集,它的效率就没有MapReduce那样有优势
统计分布式课程不及格人数(输出表格)
var mapper = function() { var key = this.major + this.grade.toString() + "-0" + this.class.toString() var value = 1 for(var i=0; i<this.courses.length; i++) { if(this.courses[i].course == '分布式数据库原理与应用' && this.courses[i].score < 60) { emit(key,value) break; } } } // mapper输出:{"大数据2019-01",1} {"大数据2019-02",1} {"大数据2019-01",1} {"大数据2019-01",1} // mapreduce函数会对mapper输出的key-value-pair进行合并 {k1,v1} {k2,v2} {k1,v3}/... => {k1,[v1,v3]} // {"大数据2019-01",[1,1,1]} {"大数据2019-02",[1]} var reducer = function(key,valueArray) { return Array.sum(valueArray) } var option = {out:'各班分布式不及格学生人数'} db.stu.mapReduce(mapper,reducer,option)
求全体学生的平均身高
// 求全体学生的平均身高 // map方法将每一个集合中的文档转化成如 {"全体学生",171},{"全体学生",171}...的输出 var mapper=function(){emit("全体学生",this.height)}; // MongoDB内部将进行聚合 // reduce方法得到如{"全体学生",[171,172,173,...]}的输入,然后对 values[171,172,173,...]数组进行avg()平均值运算,得到一个value var reducer=function(key,values){return Array.avg(values)}; // options中我们指定输出的数据将存入AvgHeight集合 var options={out:"AvgHeight"}; // 使用定义好的mapper,reducer方法和options设置进行mapReduce运算 db.students.mapReduce(map,reduce,options); // 查询结果集合AvgHeight db.AvgHeight.find();
分专业-年级-班级计算平均身高
// 定义一个mapper方法,从集合中的每一个文档中提取key-value对 var mapper=function(){ // 构造key,key可以是一个字符串,如:"大数据2018-1" key=this.major+this.grade+"-"+this.class // 构造key,key可以是一个文档对象,如:{专业:"大数据",年级:2018,班级:1} // key = {专业:this.major,年级:this.grade,班级:this.class}; // 构造value,这里要计算身高,所以将height字段值作为value value=this.height; /* 返回key-value对,如: {"大数据2018-1":171},{"大数据2018-1":172}, ... , {"应数2018-1":170},{"应数2018-1":173},{"应数2018-1":171} ... , {"应数2018-2":170},{"应数2018-2":171},{"应数2018-2":172} ... , */ emit(key, value); } / * MongoDB将按照key进行数据聚合,得到values数组 {"大数据2018-1":[171,172,...]}, {"应数2018-1":[170,173,171,...]}, {"应数2018-2":[170,171,172,...]}, ... */ // 对于每个key的values数组进行reduce操作,这里我们要用数组的avg方法对[171,172,...]进行平 均值计算 var reducer=function(key,values){ return Array.avg(values); } / / options中我们指定输出的数据将存入AvgHeight集合 var options={out:"AvgHeight"}; // 使用定义好的mapper,reducer方法和options设置进行mapReduce运算 db.students.mapReduce(mapper,reducer,options); // 查询结果集合AvgHeight, 可以对结果进行排序 db.AvgHeight.find().sort();
任务1:给全校分布式数据库课程考试班级平均分前三名的班级中每个学生发奖
求分布式课程的班级平均分 |
排序取前三名的班级 |
更新前三名班级的学生文档,增加一个prize字段 |
var mapper = function() { for(var i=0;i<this.courses.length;i++) { if(this.courses[i].course=="分布式数据库原理与应用") { var key = {课程:this.courses[i].course,专业:this.major,年 级:this.grade,班号:this.class}; var value = this.courses[i].score; emit(key,value); // {课程:"分布式数据库原理与应用",班级:应用统计2018-2}, 79 } } } / / {课程:"分布式数据库原理与应用",班级:应用统计2018-2}, [79,87,67,99,......] var reducer = function(key, values){ return Array.avg(values) } / / mapreduce结果数据输出到nosql_avg_score集合 var options = {out:"nosql_avg_score"} // 执行mapreduce运算 db.students.mapReduce(mapper, reducer, options) // 查询mapreduce得到的前三名的班级信息,将查询结果放入一个游标 var cursor = db.nosql_avg_score.find().sort({"value":-1}).limit(3) // 定义一个变量n,记录游标中的位置,由于游标next方法从第一个开始获取,而且游标中的数据在前面已 经做了sort排序,所以第一条数据就是第一名, 注意,在每一次while循环结束时,n会增加1 var n=1; while(cursor.hasNext()){ // 从游标中获取一条数据 var doc = cursor.next(); // 查询条件 var querydoc = {major:doc._id.专业, grade:doc._id.年级, class:doc._id.班号} // 更新条件,设置一个prize字段,值为“第几名” var updatedoc = {$set:{prize:"第"+n+"名"}} // 执行数据更新 db.students.updateMany(querydoc, updatedoc) n++; // 循环题末尾,n自增1 } / / 查询students表验证结果 db.students.find({prize:{$type:2}}) db.students.find()
任务1升级版:给全校每门课程考试班级平均分前三名的班级中每个学生发奖
// // // |
(1)求各门课程的班级平均分 (2)按课程,平均分排序 (3)遍历排序后的数据,对每门课程前三名的班级去更新对应的学生数据 |
// 数据的分片采集: key:{major,grade,class}, value:courses.$.score => var mapper = function() { var cls = { major: this.major, grade: this.grade, class: this.class } for (var i = 0; i < this.courses.length; i++) { cls.course = this.courses[i].course; var score = this.courses[i].score; //print(cls.major, cls.grade, cls.class, score); emit(cls, score); } } // 数据的聚合处理 var reducer = function(cls, scores) { return Array.avg(scores) } // 数据的输入输出 var options = { out: "avgScore" } db.avgScore.drop() db.students.mapReduce(mapper, reducer, options) // 将已经存在的奖prize删除掉, 避免类型错误 db.students.updateMany({}, { $unset: { "prize": 1 } }); // 得到按课程和平均分排序的班级列表 var cursor = db.avgScore.find().sort({ "_id.course": 1, "value": - 1 }) // 定义个变量n,表示第几名,由于数据已经按照课程和分数排序,第一个获取的分数就是第1名 var n = 1; // 定义2个变量,存放当前处理的数据的课程字段和上一次处理的课程字段 var curCourse, lstCourse; while(cursor.hasNext()) { var doc = cursor.next(); var query = { "major": doc._id.major, "grade": doc._id.grade, "class": doc._id.class }; // 将当前处理的课程字段赋值给curCoourse curCourse = doc._id.course; // 如果当前处理的字段和上一次处理的字段一样 if (curCourse == lstCourse) { n += 1; // 只取前3名,当n小于等于3是,是前三名,更新学生prize数组 if (n <= 3) { var prizename = curCourse + "第" + n + "名"; var updatedoc = { $push: { prize: prizename } }; db.students.updateMany(query, updatedoc); } else //只取前3名,所以n大于3的情况下,这个班级不处理,直接跳过 { lstCourse = curCourse; continue; } } else // 如果当前处理的课程curCourse和上一次处理的课程不一样,要重置名词变量n,重新取新课程的前三名 { 4.6 导入与导出 Navicat 转储js脚本 备份数据库/集合 (1)右键点数据库或者集合 (2)选择“转储脚本文件” > “结构和数据” n = 1; var prizename = curCourse + "第" + n + "名"; var updatedoc = { $push: { prize: prizename } }; db.students.updateMany(query, updatedoc); } // 处理一条数据后,将当前处理的课程赋值给lstCourse lstCourse = curCourse; } d b.students.find({ prize: { $type: 2 } }) // db.students.find()