# 基于Python实现MapReduce

### 一、什么是MapReduce

• Map阶段：在这个阶段，输入数据集被分割成小块，并由多个Map任务处理。每个Map任务将输入数据映射为一系列(key, value)对，并生成中间结果。
• Reduce阶段：在这个阶段，中间结果被重新分组和排序，以便相同key的中间结果被传递到同一个Reduce任务。每个Reduce任务将具有相同key的中间结果合并、计算，并生成最终的输出。

from collections import defaultdict
def mapper(word):
return word, 1
def reducer(key_value_pair):
key, values = key_value_pair
return key, sum(values)
def map_reduce_function(input_list, mapper, reducer):
'''
- input_list: 字符列表
- mapper: 映射函数，将输入列表中的每个元素映射到一个键值对
- reducer: 聚合函数，将映射结果中的每个键值对聚合到一个键值对
- return: 聚合结果
'''
map_results = map(mapper, input_list)
shuffler = defaultdict(list)
for key, value in map_results:
shuffler[key].append(value)
return map(reducer, shuffler.items())
if __name__ == "__main__":
words = "python best language".split(" ")
result = list(map_reduce_function(words, mapper, reducer))
print(result)

[('python', 1), ('best', 1), ('language', 1)]

### 二、基于多线程实现MapReduce

from collections import defaultdict
def __init__(self, input_list, mapper, shuffler):
self.input_list = input_list
self.mapper = mapper
self.shuffler = shuffler
def run(self):
map_results = map(self.mapper, self.input_list)
for key, value in map_results:
self.shuffler[key].append(value)
def reducer(key_value_pair):
key, values = key_value_pair
return key, sum(values)
def mapper(word):
return word, 1
shuffler = defaultdict(list)

for i in range(0, len(input_list), chunk_size):
chunk = input_list[i:i+chunk_size]

return map(reducer, shuffler.items())
if __name__ == "__main__":
words = "python is the best language for programming and python is easy to learn".split(" ")
for i in result:
print(i)

### 三、基于多进程实现MapReduce

from collections import defaultdict
import multiprocessing
def mapper(chunk):
word_count = defaultdict(int)
for word in chunk.split():
word_count[word] += 1
return word_count
def reducer(word_counts):
result = defaultdict(int)
for word_count in word_counts:
for word, count in word_count.items():
result[word] += count
return result
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
def map_reduce_function(text, num_processes):
chunk_size = (len(text) + num_processes - 1) // num_processes
chunks_list = list(chunks(text, chunk_size))
with multiprocessing.Pool(processes=num_processes) as pool:
word_counts = pool.map(mapper, chunks_list)
result = reducer(word_counts)
return result
if __name__ == "__main__":
text = "python is the best language for programming and python is easy to learn"
num_processes = 4
result = map_reduce_function(text, num_processes)
for i in result:
print(i, result[i])

### 四、在100GB的文件中检索数据

1. 文件太大，读取速度慢

1. 文件太大，内存消耗特别大

from datetime import datetime
import multiprocessing
"""
生成器函数：分块读取文件内容
- file_path: 文件路径
- chunk_size: 块大小,默认为1MB
"""
with open(file_path, 'r', encoding='utf-8') as file:
while True:
if not chunk:
break
yield chunk
def search_in_chunk(chunk:str, keyword:str):
"""在文件块中搜索关键字
- chunk: 文件块
- keyword: 要搜索的关键字
"""
lines = chunk.split('\n')
for line in lines:
if keyword in line:
print(f"找到了:", line)
def search_in_file(file_path:str, keyword:str, chunk_size=1024*1024):
"""在文件中搜索关键字
file_path: 文件路径
keyword: 要搜索的关键字
chunk_size: 文件块大小,为1MB
"""
with multiprocessing.Pool() as pool:
pool.apply_async(search_in_chunk, args=(chunk, keyword))

if __name__ == "__main__":
start = datetime.now()
file_path = "file.txt"
keyword = "张三"
search_in_file(file_path, keyword)
end = datetime.now()
print(f"搜索完成，耗时 {end - start}")

|
1月前
|
SQL 分布式计算 数据可视化

51 4
|
6月前
|

27 0
|
6月前
|

centos7 伪分布式 hadoop 利用 python 执行 mapreduce
centos7 伪分布式 hadoop 利用 python 执行 mapreduce
29 0
|

Python实现一个最简单的MapReduce编程模型WordCount
Python实现一个最简单的MapReduce编程模型WordCount
123 0
|

Python实现一个最简单的MapReduce编程模型WordCount
Python实现一个最简单的MapReduce编程模型WordCount
134 0
|

|

MapReduce是一种编程模型，通过将工作分成独立的任务并在一组机器上并行执行任务，可以处理和生成大量数据。 MapReduce编程风格的灵感来自函数式编程结构map和reduce，它们通常用于处理数据列表。
2232 0
|

1472 0
|