Map-Reduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,
然后再将结果合并成最终结果(REDUCE)。MongoDB提供的Map-Reduce非常灵活,对于大规模数据分析也相当实用。
MapReduce 命令
- 以下是MapReduce的基本语法:
>db.collection.mapReduce(
function() {emit(key,value);}, //map 函数
function(key,values) {return reduceFunction}, //reduce 函数
{
out: collection,
query: document,
sort: document,
limit: number
}
)
db.collection.mapReduce(
<map>,
<reduce>,
{
out: <collection>,
query: <document>,
sort: <document>,
limit: <number>,
finalize: <function>,
scope: <document>,
jsMode: <boolean>,
verbose: <boolean>,
bypassDocumentValidation: <boolean>
}
)
MapReduce要实现两个函数:Map和Reduce。
Map函数调用emit(key,value)遍历一个或多个集合中所有的记录,进行分组(group by),
然后将key与value传给Reduce函数进行处理,输出结果。
(1)MapReduce使用自定义JavaScript函数执行map和reduce操作,所以是基于js引擎,单线程执行,效率不高,
比Aggregation复杂,适合用做后台统计等。
(2)MapReduce支持分片操作,可以进行拆分,分发到不同的机器上执行(多服务器并行做数据集合处理),
然后再将不同的机器处理的结果汇集起 来,输出结果,。
(3)MapReduce能执行单一聚合的所有操作count、distinct、group,
但group 在当数据量非常大的时候,处理能力就不太好,先筛选再分组,不支持 分片,对数据量有所限制,效率不高。
参数说明:
参数说明:
map:是JavaScript 函数,负责将每一个输入文档转换为零或多个文档,通过key进行分组,生成键值对序列,作为 reduce 函数参数。
reduce:是JavaScript 函数,对map操作的输出做合并的化简的操作(将key-values变成key-value,也就是把values数组变成一个单一的值value)。
out:reduce执行完,存放的集合,如果不指定集合,则使用默认的临时集合,在MapReduce的连接关闭后自动就被删除了。
- out:{<action>:<collectionName>
- [, db:<dbName>]
- [, sharded:<boolean>]
- [, nonAtomic:<boolean>]}
query:过滤的条件,对符合条件的文档执行map函数。(query。limit,sort可以随意组合)。
sort :对文档进行排序,sort和limit结合的sort排序参数(也是在发往map函数前给文档排序),可以优化分组机制。
limit :发往map函数的文档数量的上限(要是没有limit,单独使用sort的用处不大)。
finalize:可以对reduce输出结果再一次修改,跟group的finalize一样,不过MapReduce没有group的4MB文档的输出限制。
scope:向map、reduce、finalize导入外部变量。
verbose:是否包括结果信息中的时间信息,默认为fasle。
"timing" : {
"mapTime" : 0,
"emitLoop" : 2,
"reduceTime" : 0,
"mode" : "mixed",
"total" : 0
}
以下实例在集合 orders 中查找 status:"A" 的数据,并根据 cust_id 来分组,并计算 amount 的总和。
使用 MapReduce
- 考虑以下文档结构存储用户的文章,文档存储了用户的 user_name 和文章的 status 字段:
db.posts.insert([
{
"post_text": "Spring",
"user_name": "fly",
"status":"active"
},
{
"post_text": "Struts2",
"user_name": "fly",
"status":"active"
},
{
"post_text": "Hibernate",
"user_name": "fly",
"status":"disabled"
},
{
"post_text": "Mybatis",
"user_name": "fly",
"status":"active"
},
{
"post_text": "Redis",
"user_name": "fly",
"status":"disabled"
},
{
"post_text": "Restful",
"user_name": "cc",
"status":"active"
},
{
"post_text": "Dubbo",
"user_name": "cc",
"status":"disabled"
},
{
"post_text": "SpringCloud",
"user_name": "cc",
"status":"active"
},
{
"post_text": "SpringBoot",
"user_name": "cc",
"status":"disabled"
},
{
"post_text": "MongoDB",
"user_name": "cc",
"status":"active"
},
{
"post_text": "MySQL",
"user_name": "cc",
"status":"active"
}
])
- 现在,我们将在 posts 集合中使用 mapReduce 函数来选取已发布的文章(status:"active"),并通过user_name分组,计算每个用户的文章数:
>db.posts.mapReduce(
function() { emit(this.user_name,1); },
function(key, values) {return Array.sum(values)},
{
query:{status:"active"},
out:"post_total"
}
)
- 以上 mapReduce 输出结果为:
/* 1 */
{
"result" : "post_total",
"timeMillis" : 409.0,
"counts" : {
"input" : 7,
"emit" : 7,
"reduce" : 2,
"output" : 2
},
"ok" : 1.0,
"_o" : {
"result" : "post_total",
"timeMillis" : 409,
"counts" : {
"input" : 7,
"emit" : 7,
"reduce" : 2,
"output" : 2
},
"ok" : 1.0
},
"_keys" : [
"result",
"timeMillis",
"counts",
"ok"
],
"_db" : {
"_mongo" : {
"slaveOk" : true,
"host" : "localhost:27017",
"defaultDB" : "mongotest",
"authStatus" : {
"authRequired" : true,
"isMaster" : true,
"replSetGetStatus" : true
},
"_readMode" : "commands",
"_writeMode" : "commands"
},
"_name" : "mongotest"
},
"_coll" : {
"_mongo" : {
"slaveOk" : true,
"host" : "localhost:27017",
"defaultDB" : "mongotest",
"authStatus" : {
"authRequired" : true,
"isMaster" : true,
"replSetGetStatus" : true
},
"_readMode" : "commands",
"_writeMode" : "commands"
},
"_db" : {
"_mongo" : {
"slaveOk" : true,
"host" : "localhost:27017",
"defaultDB" : "mongotest",
"authStatus" : {
"authRequired" : true,
"isMaster" : true,
"replSetGetStatus" : true
},
"_readMode" : "commands",
"_writeMode" : "commands"
},
"_name" : "mongotest"
},
"_shortName" : "post_total",
"_fullName" : "mongotest.post_total"
}
}
- 结果表明,共有 7个符合查询条件(status:"active")的文档, 在map函数中生成了 7个键值对文档,最后使用reduce函数将相同的键值分为 2 组。
- 具体参数说明:
result:reduce执行完,存放的集合,如果不指定集合,则使用默认的临时集合,在MapReduce的连接关闭后自动就被删除了。
timeMillis:执行花费的时间,毫秒为单位
input:满足条件被发送到map函数的文档个数
emit:在map函数中emit被调用的次数,也就是所有集合中的数据总量
ouput:结果集合中的文档个数(count对调试非常有帮助)
ok:是否成功,成功为1
err:如果失败,这里可以有失败原因,不过从经验上来看,原因比较模糊,作用不大
使用 find 操作符来查看 mapReduce 的查询结果:
>db.posts.mapReduce(
function() { emit(this.user_name,1); },
function(key, values) {return Array.sum(values)},
{
query:{status:"active"},
out:"post_total"
}
).find()
- 以上查询显示如下结果,两个用户 tom 和 mark 有两个发布的文章:
/* 1 */
{
"_id" : "cc",
"value" : 4.0
}
/* 2 */
{
"_id" : "fly",
"value" : 3.0
}
- 用类似的方式,MapReduce可以被用来构建大型复杂的聚合查询。
- Map函数和Reduce函数可以使用 JavaScript 来实现,使得MapReduce的使用非常灵活和强大。
附录:参数详解
- map函数
【map函数】
map是JavaScript 函数,负责将每一个输入文档转换为零或多个文档,通过key进行分组,生成键值对序列,
作为 reduce 函数参数。
function() {
emit(key, value);
}
key对文档进行分组,value是要统计的数据,value可以是JSON对象(emit只能容纳MongoDB的最大BSON文件大小的一半)。
我们对订单的详细统计每个产品类型卖出了多少个。
- 订单数据
db.item.insert( [
{
"quantity" : 2,
"price" : 5.0,
"pnumber" : "p003"
},{
"quantity" : 2,
"price" : 8.0,
"pnumber" : "p002"
},{
"quantity" : 1,
"price" : 4.0,
"pnumber" : "p002"
},{
"quantity" : 2,
"price" : 4.0,
"pnumber" : "p001"
},{
"quantity" : 4,
"price" : 10.0,
"pnumber" : "p003"
},{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p001"
},{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p003"
},{
"quantity" : 5,
"price" : 10.0,
"pnumber" : "p002"
}
])
我们先通过 pnumber进行分组,然后在对 quantity相加
相当于select pnumber,sum(quantity) from item group by pnumber
varmap= function() { emit(this.pnumber,this.quantity)}
var reduce=function(key,values){return {'pumber':key,'quantity':Array.sum(values)}}
db.item.mapReduce( map,
reduce,
{ out: "map_reduce_data" }
)
db.map_reduce_data.find()
- 输出结果
/* 1 */
{
"_id" : "p001",
"value" : {
"pumber" : "p001",
"quantity" : 12.0
}
}
/* 2 */
{
"_id" : "p002",
"value" : {
"pumber" : "p002",
"quantity" : 8.0
}
}
/* 3 */
{
"_id" : "p003",
"value" : {
"pumber" : "p003",
"quantity" : 16.0
}
}
- query过滤的条件
对符合条件的文档将会执行map函数。(query。limit,sort可以随意组合),
我们对订单的详细的每次每种产品卖出的数量要大于5的并统计每个产品类型卖出了多少个。
我们先通过 pnumber进行分组,然后在对 quantity相加
相当于select pnumber,sum(quantity) from item where quantity>5 group by pnumber
varmap= function() { emit(this.pnumber,this.quantity)}
var reduce=function(key,values){return {'pumber':key,'quantity':Array.sum(values)}}
db.item.mapReduce( map,
reduce,
{ query:{'quantity':{$gt:5}},
out: "map_reduce_data" }
)
db.map_reduce_data.find()
/* 1 */
{
"_id" : "p001",
"value" : 10.0
}
/* 2 */
{
"_id" : "p003",
"value" : 10.0
}
- value是JSON对象
value可以是JSON格式,我们对订单的详细统计每个产品类型出现次数。
我们先通过 pnumber进行分组,然后在对 quantity相加 相当于select pnumber,count(*) from item group by pnumber。
varmap= function() {emit(this.pnumber,{count:1});}
var reduce=function(key,values){
var count=0;
values.forEach(function(val){ count+=val.count;});
return {'pumber':key,"count":count};
}
db.item.mapReduce( map,
reduce,
{ out: "map_reduce_data" }
)
db.map_reduce_data.find()
/* 1 */
{
"_id" : "p001",
"value" : {
"pumber" : "p001",
"count" : 2.0
}
}
/* 2 */
{
"_id" : "p002",
"value" : {
"pumber" : "p002",
"count" : 3.0
}
}
/* 3 */
{
"_id" : "p003",
"value" : {
"pumber" : "p003",
"count" : 3.0
}
}
- emit多次的循环
可以对emit多次的循环,可以根据输入文档的项目字段中的元素的数量(键,值)多次调用:
function() {
this.items.forEach(function(item){ emit(key, value); });
}
- 数据:
db.orders.insert( [
{
"onumber" : "001",
"item" : [{
"quantity" : 2,
"price" : 5.0,
"pnumber" : "p003"
},{
"quantity" : 2,
"price" : 8.0,
"pnumber" : "p002"
}]
},{
"onumber" : "002",
"item" : [{
"quantity" : 1,
"price" : 4.0,
"pnumber" : "p002"
},{
"quantity" : 2,
"price" : 4.0,
"pnumber" : "p001"
},{
"quantity" : 4,
"price" : 10.0,
"pnumber" : "p003"
}]
},{
"onumber" : "003",
"item" : [{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p001"
},{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p003"
}]
},{
"onumber" : "004",
"item" : [{
"quantity" : 5,
"price" : 10.0,
"pnumber" : "p002"
}]
}
])
我们对统计订单中对应的产品销售了多少个,我们先通过 pnumber进行分组,然后在对 quantity相加。
varmap= function() { this.item.forEach(function(it){ emit(it.pnumber,it.quantity); })}
var reduce=function(key,values){return {'pumber':key,'quantity':Array.sum(values)}}
db.orders.mapReduce( map,
reduce,
{ out: "map_reduce_data" }
)
- 输出结果:
db.map_reduce_data.find()
/* 1 */
{
"_id" : "p001",
"value" : {
"pumber" : "p001",
"quantity" : 12.0
}
}
/* 2 */
{
"_id" : "p002",
"value" : {
"pumber" : "p002",
"quantity" : 8.0
}
}
/* 3 */
{
"_id" : "p003",
"value" : {
"pumber" : "p003",
"quantity" : 16.0
}
}
> varmap= function() { this.item.forEach(function(it){ emit(it.pnumber,it.quantity); })}
也可以这样写
> varmap= function() { for(var i=0;i<this.item.length;i++){ var it=item[i]; emit(it.pnumber,it.quantity); }}
- reduce函数
reduce是JavaScript 函数,对map操作的输出做合并的化简的操作
(将key-values变成key-value,也就是把values数组变成一个单一的值value)。
function(key, values) {
...
return result;
}
values:值参数是一个数组,返回对象的类型必须与由map函数发出的值的类型相同。
reduce:函数应该交换:即结果中元素的顺序不影响reduce函数的输出。
reduce( key, [ A, B ] ) == reduce( key, [ B, A ] )
对map操作的输出做合并的化简的操作(将key-values变成key-value,也就是把values数组变成一个单一的值value),
我们对订单的详细统计每个产品类型卖出了多少个和每种产品出现次数。
我们先通过 pnumber进行分组,然后在对 quantity相加
相当于select pnumber,count(*),sum(quantity) from item group by pnumber
varmap= function() {
var value={count:1, quantity:this.quantity};
emit(this.pnumber,value);
}
var reduce=function(key,values){
varreducedVal= { count: 0, quantity: 0 };
for (vari=0; i < values.length; i++) {
reducedVal.count += values[i].count;
reducedVal.quantity += values[i].quantity;
}
return reducedVal;
}
db.item.mapReduce( map,
reduce,
{ out: "map_reduce_data" }
).find()
- 输出结果:
/* 1 */
{
"_id" : "p001",
"value" : {
"count" : 2.0,
"quantity" : 12.0
}
}
/* 2 */
{
"_id" : "p002",
"value" : {
"count" : 3.0,
"quantity" : 8.0
}
}
/* 3 */
{
"_id" : "p003",
"value" : {
"count" : 3.0,
"quantity" : 16.0
}
}
参考来源: http://www.runoob.com/mongodb/mongodb-map-reduce.html
参考来源:http://blog.csdn.net/congcong68/article/details/51471460