+关注继续查看

# 基本MapReduce模式

## 计数与求和

1 class Mapper 2 method Map(docid id, doc d) 3 for all term t in doc d do 4 Emit(term t, count 1) 5 6 class Reducer 7 method Reduce(term t, counts [c1, c2,...]) 8 sum = 0 9 for all count c in [c1, c2,...] do 10 sum = sum + c 11 Emit(term t, count sum)

1 class Mapper 2 method Map(docid id, doc d) 3 H = new AssociativeArray 4 for all term t in doc d do 5 H{t} = H{t} + 1 6 for all term t in H do 7 Emit(term t, count H{t})

1 class Mapper 2 method Map(docid id, doc d) 3 for all term t in doc d do 4 Emit(term t, count 1) 5 6 class Combiner 7 method Combine(term t, [c1, c2,...]) 8 sum = 0 9 for all count c in [c1, c2,...] do 10 sum = sum + c 11 Emit(term t, count sum) 12 13 class Reducer 14 method Reduce(term t, counts [c1, c2,...]) 15 sum = 0 16 for all count c in [c1, c2,...] do 17 sum = sum + c 18 Emit(term t, count sum)

# 非基本 MapReduce 模式

## 迭代消息传递 (图处理)

1 class Mapper 2 method Map(id n, object N) 3 Emit(id n, object N) 4 for all id m in N.OutgoingRelations do 5 Emit(id m, message getMessage(N)) 6 7 class Reducer 8 method Reduce(id m, [s1, s2,...]) 9 M = null 10 messages = [] 11 for all s in [s1, s2,...] do 12 if IsObject(s) then 13 M = s 14 else // s is a message 15 messages.add(s) 16 M.State = calculateState(messages) 17 Emit(id m, item M)

### 案例研究： 沿分类树的有效性传递

1 class N 2 State in {True = 2, False = 1, null = 0}, 3 initialized 1 or 2 for end-of-line categories, 0 otherwise 4 method getMessage(object N) 5 return N.State 6 method calculateState(state s, data [d1, d2,...]) 7 return max( [d1, d2,...] )

### 案例研究：广度优先搜索

1 class N 2 State is distance, 3 initialized 0 for source node, INFINITY for all other nodes 4 method getMessage(N) 5 return N.State + 1 6 method calculateState(state s, data [d1, d2,...]) 7 min( [d1, d2,...] )

### 案例研究：网页排名和 Mapper 端数据聚合

1 class N 2 State is PageRank 3 method getMessage(object N) 4 return N.State / N.OutgoingRelations.size() 5 method calculateState(state s, data [d1, d2,...]) 6 return ( sum([d1, d2,...]) )

1 class Mapper 2 method Initialize 3 H = new AssociativeArray 4 method Map(id n, object N) 5 p = N.PageRank / N.OutgoingRelations.size() 6 Emit(id n, object N) 7 for all id m in N.OutgoingRelations do 8 H{m} = H{m} + p 9 method Close 10 for all id n in H do 11 Emit(id n, value H{n}) 12 13 class Reducer 14 method Reduce(id m, [s1, s2,...]) 15 M = null 16 p = 0 17 for all s in [s1, s2,...] do 18 if IsObject(s) then 19 M = s 20 else 21 p = p + s 22 M.PageRank = p 23 Emit(id m, item M)

## 值去重 （对唯一项计数）

  Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}

Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2

1 class Mapper 2 method Map(null, record [value f, categories [g1, g2,...]]) 3 for all category g in [g1, g2,...] 4 Emit(record [g, f], count 1) 5 6 class Reducer 7 method Reduce(record [g, f], counts [n1, n2, ...]) 8 Emit(record [g, f], null )

1 class Mapper 2 method Map(record [f, g], null) 3 Emit(value g, count 1) 4 5 class Reducer 6 method Reduce(value g, counts [n1, n2,...]) 7 Emit(value g, sum( [n1, n2,...] ) )

1 class Mapper 2 method Map(null, record [value f, categories [g1, g2,...] ) 3 for all category g in [g1, g2,...] 4 Emit(value f, category g) 5 6 class Reducer 7 method Initialize 8 H = new AssociativeArray : category -> count 9 method Reduce(value f, categories [g1, g2,...]) 10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] ) 11 for all category g in [g1', g2',...] 12 H{g} = H{g} + 1 13 method Close 14 for all category g in H do 15 Emit(category g, count H{g})

## 互相关

• 使用 combiners 带来的的好处有限，因为很可能所有项对都是唯一的
• 不能有效利用内存

1 class Mapper 2 method Map(null, items [i1, i2,...] ) 3 for all item i in [i1, i2,...] 4 for all item j in [i1, i2,...] 5 Emit(pair [i j], count 1) 6 7 class Reducer 8 method Reduce(pair [i j], counts [c1, c2,...]) 9 s = sum([c1, c2,...]) 10 Emit(pair[i j], count s)

Stripes Approach（条方法？不知道这个名字怎么理解）

• 中间结果的键数量相对较少，因此减少了排序消耗。
• 可以有效利用 combiners。
• 可在内存中执行，不过如果没有正确执行的话也会带来问题。
• 实现起来比较复杂。
• 一般来说， “stripes” 比 “pairs” 更快

1 class Mapper 2 method Map(null, items [i1, i2,...] ) 3 for all item i in [i1, i2,...] 4 H = new AssociativeArray : item -> counter 5 for all item j in [i1, i2,...] 6 H{j} = H{j} + 1 7 Emit(item i, stripe H) 8 9 class Reducer 10 method Reduce(item i, stripes [H1, H2,...]) 11 H = new AssociativeArray : item -> counter 12 H = merge-sum( [H1, H2,...] ) 13 for all item j in H.keys() 14 Emit(pair [i j], H{j})

## 用MapReduce 表达关系模式

### 筛选（Selection）

1 class Mapper 2 method Map(rowkey key, tuple t) 3 if t satisfies the predicate 4 Emit(tuple t, null)

### 投影（Projection）

1 class Mapper 2 method Map(rowkey key, tuple t) 3 tuple g = project(t) // extract required fields to tuple g 4 Emit(tuple g, null) 5 6 class Reducer

68 0
ML之GMM：Gaussian Mixture Model高斯混合模型相关论文、算法步骤相关配图
ML之GMM：Gaussian Mixture Model高斯混合模型相关论文、算法步骤相关配图
11 0

703 0
ML之K-means：基于DIY数据集利用K-means算法聚类(测试9种不同聚类中心的模型性能)
ML之K-means：基于DIY数据集利用K-means算法聚类(测试9种不同聚类中心的模型性能)
19 0

492 0
+关注
dongzhumao

163

1

《SaaS模式云原生数据仓库应用场景实践》

《看见新力量：二》电子书