海量数据处理的困难
海量数据处理的困难用一句话概括,就是时空资源不够。具体来说,
- 空间受限:无法将海量数据一次性读入内存;
- 时间受限:无法在有限时间内,完成针对海量数据的某项处理工作。
而对于空间受限的问题,一般的解决办法是“大而化小,分而治之”的策略,既然一次性行不通,那就一部分一部分读,每读入一部分可以生成一个小文件,小文件是可以直接读入内存的,我们这样分割大数据之后,再依次处理小文件的数据。
对于时间受限的问题,我们一般的解决办法是高效的算法配合恰当的数据结构,比如哈希表,Bloom Filter,堆,倒排索引,tire树);
至于所谓的单机及集群问题,通俗点来讲,单机就是处理装载数据的机器有限(只要考虑cpu,内存,硬盘的数据交互),而集群,机器有多辆,适合分布式处理,并行计算(更多考虑节点和节点间的数据交互)。
海量数据处理Big Data Processing的大致方法包括:
分而治之/hash映射 + hash统计 + 堆/快速/归并排序;
双层桶划分
Bloom filter/Bitmap;
Trie树/数据库/倒排索引;
外排序;
分布式处理之Hadoop/Mapreduce。
大文件生成
生成2亿个IP地址
# 生成2亿个IP def generateRandom(rangeFrom, rangeTo): import random return random.randint(rangeFrom,rangeTo) def generageMassiveIPAddr(fileLocation,numberOfLines): IP = [] with open(fileLocation, 'a+') as file_handler: for i in range(numberOfLines): IP.append('10.197.' + str(generateRandom(0,255))+'.'+ str(generateRandom(0,255)) + '\n') file_handler.writelines(IP) if __name__ == '__main__': from time import ctime print(ctime()) for i in range(20): print( ' ' + str(i) + ": " + ctime()) generageMassiveIPAddr('d:\\massiveIP.txt', 10000000) print(ctime())
耗时14m54s,输出为:
Thu Jan 6 19:01:05 2022 0: Thu Jan 6 19:01:05 2022 1: Thu Jan 6 19:01:50 2022 2: Thu Jan 6 19:02:37 2022 3: Thu Jan 6 19:03:22 2022 4: Thu Jan 6 19:04:07 2022 5: Thu Jan 6 19:04:52 2022 6: Thu Jan 6 19:05:40 2022 7: Thu Jan 6 19:06:25 2022 8: Thu Jan 6 19:07:10 2022 9: Thu Jan 6 19:07:56 2022 10: Thu Jan 6 19:08:41 2022 11: Thu Jan 6 19:09:26 2022 12: Thu Jan 6 19:10:10 2022 13: Thu Jan 6 19:10:54 2022 14: Thu Jan 6 19:11:38 2022 15: Thu Jan 6 19:12:22 2022 16: Thu Jan 6 19:13:06 2022 17: Thu Jan 6 19:13:49 2022 18: Thu Jan 6 19:14:32 2022 19: Thu Jan 6 19:15:15 2022 Thu Jan 6 19:15:59 2022
文件IP数量为2亿条,大小为3105013kb / (1024*1024) = 2.96GB:
平均每条IP大小为:3105013 * 1024 / 200000000= 15.90
空间受限
假设内存大小为512M
可以载入的IP数量为 512*1024*1024/15.90 = 33765466
三千三百七十六万五千四百六十六
需要分块的数量为:200000000 / 33765466 = 5.923
分块读取
import pandas as pd from tqdm import tqdm f = open('d:\\massiveIP.txt') reader = pd.read_csv(f, sep=',',header=None,names=["IP"], iterator=True) loop = True chunkSize = 33765466 chunks = [] # 需要6次分块,这里迭代7次 for i in tqdm(range(7)) : try: # print("*"*5,i,"*"*5) chunk = reader.get_chunk(chunkSize) df1 = chunk["IP"].value_counts() chunks.append(df1) del chunk except StopIteration: # loop = False print("Iteration is stopped.") re_chunks = [] for i in chunks: df1 = i.reset_index() re_chunks.append(df1) df22 = pd.concat(re_chunks, ignore_index=True) df22.groupby(by=["index"]).agg({"IP":"sum"}).reset_index().sort_values(by=["IP"],ascending=False)
耗时2m36.1s
输出为:
100%|██████████| 7/7 [02:33<00:00, 21.88s/it] index IP 14312 10.197.148.78 3460 59266 10.197.77.215 3456 47632 10.197.36.112 3440 46706 10.197.32.200 3435 56153 10.197.66.179 3427 ... ... ... 33713 10.197.216.28 2988 5232 10.197.116.2 2988 51331 10.197.49.216 2982 25792 10.197.189.41 2972 8555 10.197.128.195 2955 65536 rows × 2 columns
文件拆分提取
拆分小文件
# 大文件分割 from time import ctime def splitFile(fileLocation, targetFoler): import os import shutil if not os.path.exists(targetFoler): os.mkdir(targetFoler) print("create ",targetFoler, " successed") else: shutil.rmtree(targetFoler) print("remove ",targetFoler, " successed") os.mkdir(targetFoler) print("create ",targetFoler, " successed") with open(fileLocation, 'r') as file_handler: # 512M 512*1024*1024/15.90 = 33765466 block_size = 33765466 line = file_handler.readline() temp = [] countFile = 1 while line: if line =="": break for i in range(block_size): if i == (block_size-1): # write block to small files with open(targetFoler + "\\file_"+str(countFile)+".txt", 'a+') as file_writer: file_writer.writelines(temp) temp = [] print(" file " + str(countFile) + " generated at: " + str(ctime())) countFile = countFile + 1 else: temp.append(line) line = file_handler.readline() if __name__ == '__main__': print("Start At: " + str(ctime())) splitFile("d:\\massiveIP.txt", "d:\\massiveData")
耗时:7m2.9s
输出为:
Start At: Thu Jan 6 19:30:33 2022 remove d:\massiveData successed create d:\massiveData successed file 1 generated at: Thu Jan 6 19:31:15 2022 file 2 generated at: Thu Jan 6 19:31:54 2022 file 3 generated at: Thu Jan 6 19:32:33 2022 file 4 generated at: Thu Jan 6 19:33:11 2022 file 5 generated at: Thu Jan 6 19:33:53 2022 file 6 generated at: Thu Jan 6 19:34:39 2022 file 7 generated at: Thu Jan 6 19:37:34 2022
比较小文件
import os from time import ctime def findIP(targetFolder): Result = {} IP = {} for root, dirs, files in os.walk(targetFolder): for f in files: # read small files file_handler = open(os.path.join(targetFolder, f), 'r') lines = file_handler.readlines() file_handler.close() # get IP in file, store to IP Dictionary for line in lines: if line in IP: IP[line] = IP[line] + 1 else: IP[line] = 1 # sort Dictionary IP = sorted(IP.items(), key=lambda d: d[1]) # get max item(s), store to Result Dictionary maxItem = IP[-1][1] print(' File ' + str(f) + ":") print (" maxItem: " + str(IP[-1])) tempTuple = IP.pop() while tempTuple[1] == maxItem: if tempTuple[0] in Result: Result[tempTuple[0]] = Result[tempTuple[0]] + 1 else: Result[tempTuple[0]] = tempTuple[1] tempTuple = IP.pop() IP = {} print (' Finished: ' + ctime()) print(sorted(Result.items(), key=lambda d: d[1])) if __name__ == '__main__': print ('Start: ' + ctime()) findIP("d:\\massiveData") print ('End: ' + ctime())
耗时2m28.7s
输出为:
Start: Thu Jan 6 19:45:08 2022 File file_1.txt: maxItem: ('10.197.171.38\n', 619) Finished: Thu Jan 6 19:45:31 2022 File file_2.txt: maxItem: ('10.197.207.162\n', 626) Finished: Thu Jan 6 19:45:54 2022 File file_3.txt: maxItem: ('10.197.237.210\n', 602) Finished: Thu Jan 6 19:46:16 2022 File file_4.txt: maxItem: ('10.197.45.67\n', 614) Finished: Thu Jan 6 19:46:41 2022 File file_5.txt: maxItem: ('10.197.153.232\n', 605) Finished: Thu Jan 6 19:47:04 2022 File file_6.txt: maxItem: ('10.197.148.78\n', 612) Finished: Thu Jan 6 19:47:28 2022 File file_7.txt: maxItem: ('10.197.210.11\n', 160) Finished: Thu Jan 6 19:47:36 2022 [('10.197.210.11\n', 160), ('10.197.237.210\n', 602), ('10.197.193.149\n', 602), ('10.197.130.236\n', 602), ('10.197.243.35\n', 602), ('10.197.253.239\n', 602), ('10.197.153.232\n', 605), ('10.197.148.78\n', 612), ('10.197.45.67\n', 614), ('10.197.171.38\n', 619), ('10.197.207.162\n', 626)] End: Thu Jan 6 19:47:36 2022
备注:这个输出是错误,只考虑了局部最大IP,并没考虑全局最大IP
解决办法如下:
## 解决局部排序问题 import os from time import ctime def findIP(targetFolder): Result = {} IP = {} for root, dirs, files in os.walk(targetFolder): for f in files: # read small files file_handler = open(os.path.join(targetFolder, f), 'r') lines = file_handler.readlines() file_handler.close() # get IP in file, store to IP Dictionary for line in lines: if line in IP: IP[line] = IP[line] + 1 else: IP[line] = 1 # sort Dictionary IP = sorted(IP.items(), key=lambda d: d[1]) print (' Finished: ' + ctime()) # print(sorted(Result.items(), key=lambda d: d[1])) return IP print ('Start: ' + ctime()) ipDict = findIP("d:\\massiveData") print ('End: ' + ctime()) print(ipDict[-10:])
耗时为:2m28.6s
输出为:
Start: Thu Jan 6 19:56:03 2022 Finished: Thu Jan 6 19:58:31 2022 End: Thu Jan 6 19:58:31 2022 [('10.197.158.206\n', 3414), ('10.197.163.161\n', 3418), ('10.197.237.105\n', 3421), ('10.197.179.88\n', 3425), ('10.197.254.65\n', 3426), ('10.197.66.179\n', 3427), ('10.197.32.200\n', 3435), ('10.197.36.112\n', 3440), ('10.197.77.215\n', 3456), ('10.197.148.78\n', 3460)]
通过hash拆分文件
拆分小文件-依据hash
代码如下:
# 大文件分割 from time import ctime def splitFile(fileLocation, targetFoler): import os import shutil if not os.path.exists(targetFoler): os.mkdir(targetFoler) print("create ",targetFoler, " successed") else: shutil.rmtree(targetFoler) print("remove ",targetFoler, " successed") os.mkdir(targetFoler) print("create ",targetFoler, " successed") fileNum = [1,2,3,4,5,6,7,8,9] with open(fileLocation, 'r') as file_handler: # 512M 512*1024*1024/15.90 = 33765466 # block_size = 33765466 batch_size = 20000000 line = file_handler.readline() temp0 = [] temp1 = [] temp2 = [] temp3 = [] temp4 = [] temp5 = [] temp6 = [] while line: # if line =="": # break if hash(line) % 7 == fileNum[0]: temp0.append(line) line = file_handler.readline() if len(temp0) > 10000000: with open(targetFoler + "\\file_"+str(fileNum[0])+".txt", 'a+') as file_writer: file_writer.writelines(temp0) temp0 = [] elif hash(line) % 7 == fileNum[1]: temp1.append(line) line = file_handler.readline() elif hash(line) % 7 == fileNum[2]: temp2.append(line) line = file_handler.readline() elif hash(line) % 7 == fileNum[3]: temp3.append(line) line = file_handler.readline() elif hash(line) % 7 == fileNum[4]: temp4.append(line) line = file_handler.readline() elif hash(line) % 7 == fileNum[5]: temp5.append(line) line = file_handler.readline() else: temp6.append(line) line = file_handler.readline() if len(temp1) > batch_size or line =="": with open(targetFoler + "\\file_"+str(fileNum[1])+".txt", 'a+') as file_writer: file_writer.writelines(temp1) temp1 = [] if len(temp2) > batch_size or line =="": with open(targetFoler + "\\file_"+str(fileNum[2])+".txt", 'a+') as file_writer: file_writer.writelines(temp2) temp2 = [] if len(temp3) > batch_size or line =="": with open(targetFoler + "\\file_"+str(fileNum[3])+".txt", 'a+') as file_writer: file_writer.writelines(temp3) temp3 = [] if len(temp4) > batch_size or line =="": with open(targetFoler + "\\file_"+str(fileNum[4])+".txt", 'a+') as file_writer: file_writer.writelines(temp4) temp4 = [] if len(temp5) > batch_size or line =="": with open(targetFoler + "\\file_"+str(fileNum[5])+".txt", 'a+') as file_writer: file_writer.writelines(temp5) temp5 = [] if len(temp6) > batch_size or line =="": with open(targetFoler + "\\file_"+str(fileNum[6])+".txt", 'a+') as file_writer: file_writer.writelines(temp6) temp6 = [] if __name__ == '__main__': print("Start At: " + str(ctime())) splitFile("d:\\massiveIP.txt", "d:\\massiveDataHash")
耗时:10m34.2s
求取IP前TopK(还是遍历所有文件并聚合)
#### 解决局部排序问题 import os from time import ctime def findIP(targetFolder): Result = {} IP = {} for root, dirs, files in os.walk(targetFolder): for f in files: # read small files file_handler = open(os.path.join(targetFolder, f), 'r') lines = file_handler.readlines() file_handler.close() # get IP in file, store to IP Dictionary for line in lines: if line in IP: IP[line] = IP[line] + 1 else: IP[line] = 1 return IP print ('Start: ' + ctime()) ipDict = findIP("d:\\massiveDataHash") print ('End: ' + ctime()) ipNum = 0 for key, value in ipDict.items(): # print("\nKey: " + key) # print("Value: " + str(value)) ipNum += value print(ipNum) IP = sorted(ipDict.items(), key=lambda d: d[1]) IP[-10:]
输出:
Start: Thu Jan 6 21:26:59 2022 End: Thu Jan 6 21:28:43 2022 200141521 [('10.197.158.206\n', 3414), ('10.197.163.161\n', 3418), ('10.197.237.105\n', 3421), ('10.197.179.88\n', 3425), ('10.197.254.65\n', 3426), ('10.197.66.179\n', 3427), ('10.197.32.200\n', 3435), ('10.197.36.112\n', 3440), ('10.197.77.215\n', 3456), ('10.197.148.78\n', 3460)]
求取最大IP,每个文件求最大值
## 解决局部排序问题 import os from time import ctime def findIP(targetFolder): Result = {} IP = {} for root, dirs, files in os.walk(targetFolder): for f in files: # read small files file_handler = open(os.path.join(targetFolder, f), 'r') lines = file_handler.readlines() file_handler.close() # get IP in file, store to IP Dictionary for line in lines: if line in IP: IP[line] = IP[line] + 1 else: IP[line] = 1 # sort Dictionary IP = sorted(IP.items(), key=lambda d: d[1]) # # get max item(s), store to Result Dictionary maxItem = IP[-1][1] print (' File ' + str(f) + ":") print( " maxItem: " + str(IP[-1])) tempTuple = IP.pop() while tempTuple[1] == maxItem: if tempTuple[0] in Result: Result[tempTuple[0]] = Result[tempTuple[0]] + 1 else: Result[tempTuple[0]] = tempTuple[1] tempTuple = IP.pop() IP = {} print (' Finished: ' + ctime()) print(sorted(Result.items(), key=lambda d: d[1])) # return IP print ('Start: ' + ctime()) ipDict = findIP("d:\\massiveDataHash") print ('End: ' + ctime())
耗时1m38.5s
输出:
Start: Thu Jan 6 21:59:04 2022 File file_1.txt: maxItem: ('10.197.87.218\n', 2313) Finished: Thu Jan 6 21:59:13 2022 File file_2.txt: maxItem: ('10.197.31.199\n', 3400) Finished: Thu Jan 6 21:59:27 2022 File file_3.txt: maxItem: ('10.197.148.78\n', 3460) Finished: Thu Jan 6 21:59:41 2022 File file_4.txt: maxItem: ('10.197.66.179\n', 3427) Finished: Thu Jan 6 21:59:55 2022 File file_5.txt: maxItem: ('10.197.32.200\n', 3435) Finished: Thu Jan 6 22:00:10 2022 File file_6.txt: maxItem: ('10.197.36.112\n', 3440) Finished: Thu Jan 6 22:00:25 2022 File file_7.txt: maxItem: ('10.197.147.86\n', 3399) Finished: Thu Jan 6 22:00:42 2022 [('10.197.87.218\n', 2313), ('10.197.147.86\n', 3399), ('10.197.31.199\n', 3400), ('10.197.66.179\n', 3427), ('10.197.32.200\n', 3435), ('10.197.36.112\n', 3440), ('10.197.148.78\n', 3460)]
构造字典-针对重复较多的键
import os from time import ctime def findIPAtOnce(targetFile): print ("Started At: " + ctime()) Result = {} with open(targetFile, 'r') as file_handler: print ("File Read Finished At: " + ctime()) for line in file_handler: if line in Result: Result[line] = Result[line] + 1 else: Result[line] = 1 print ("Write to Dic Finished At: " + ctime()) Result = sorted(Result.items(), key=lambda d: d[1]) print ("Sorting Finished At: " + ctime()) print ('Result:') for i in range(10): print (' ' + str(Result.pop())) if __name__ == '__main__': findIPAtOnce("d:\\massiveIP.txt")
耗时:2m8.4s
输出:
Started At: Thu Jan 6 22:04:40 2022 File Read Finished At: Thu Jan 6 22:04:40 2022 Write to Dic Finished At: Thu Jan 6 22:06:48 2022 Sorting Finished At: Thu Jan 6 22:06:48 2022 Result: ('10.197.148.78\n', 3460) ('10.197.77.215\n', 3456) ('10.197.36.112\n', 3440) ('10.197.32.200\n', 3435) ('10.197.66.179\n', 3427) ('10.197.254.65\n', 3426) ('10.197.179.88\n', 3425) ('10.197.237.105\n', 3421) ('10.197.163.161\n', 3418) ('10.197.158.206\n', 3414)
时间受限
Bitmap算法
class Bitmap: def __init__(self, maxValue=31,array=None): if maxValue < 0 : raise BaseException("最大值不能小于0。") self.max = maxValue if array: self.max = int(max(array)) self.size = self.__getIndex(self.max)[0] + 1 self.array = [0] * self.size if array: for i in range(len(array)): self.set(array[i]) # 获取数组的索引、位的索引 def __getIndex(self, num): aIndex = int(num / 32) bIndex = num % 32 return aIndex,bIndex # 改变数组长度 def __changeLength(self, aIndex, bIndex): self.max = aIndex * 32 + bIndex self.size = aIndex + 1 newArray = [0] * self.size for i in range(len(self.array)): newArray[i] = self.array[i] self.array = newArray ''' 添加数据 先求得该数对应的数组索引以及位的索引 如果要将该数对应的位置为1(value=True),则用当前数组的数与1右移该数对应的位索引求或 如果要将该数对应的位置为0(value=False),则用当前数组的数与1右移该数对应的位索引取反后求与 ''' def set(self, num, value=True): aIndex, bIndex = self.__getIndex(int(num)) if aIndex >= self.size: self.__changeLength(aIndex, bIndex) arrayNum = self.array[aIndex] if value: self.array[aIndex ] = arrayNum | (1 << bIndex ) else: temp = 1 << bIndex ft = ~temp self.array[aIndex] = arrayNum & ft ''' 判断某个数是存在 先求得该数对应的数组索引以及位的索引,用数组当前索引对应的数与1右移该数对应位的索引位求与 结果不为0表示存在,否则不存在 ''' def isExist(self, num): aIndex, bIndex = self.__getIndex (num) arrayNum = self.array[aIndex] if arrayNum & (1 << bIndex): return True return False def __getNewMaxValue(self, newArray): for i in range(len(newArray)-1, -1, -1): if newArray[i]: isEnd = False for n in range((i+1)*32-1, i*32-1, -1): aIndex, bIndex = self.__getIndex (n) arrayNum = newArray[aIndex] if arrayNum & (1 << bIndex): return n return 0 ''' 求自身与bitmap的交集,返回一个新的Bitmap对象 例如: self表示养猫的人,bitmap表示养狗的人 那么,and_的结果表示猫狗都养的人 ''' def and_(self,bitmap): size = self.size if self.size < bitmap.size else bitmap.size newArray = [0] * size for i in range(size): newArray[i] = self.array[i] & bitmap.array[i] maxValue = self.__getNewMaxValue(newArray) newBitmap = Bitmap(maxValue) newBitmap.array = newArray return newBitmap ''' 求自身与bitmap的并集,返回一个新的Bitmap对象 例如: self表示养猫的人,bitmap表示养狗的人 那么,or_的结果表示养猫或养猫或者猫狗都养的人 ''' def or_(self,bitmap): newArray = [] if self.size > bitmap.size: newArray = self.array.copy() else: newArray = bitmap.array.copy() size = self.size if self.size < bitmap.size else bitmap.size for i in range(size): newArray[i] = self.array[i] | bitmap.array[i] maxValue = self.__getNewMaxValue(newArray) newBitmap = Bitmap(maxValue) newBitmap.array = newArray return newBitmap ''' 除去在bitmap中存在的自身元素,返回一个新的Bitmap对象 例如: self表示养猫的人,bitmap表示养狗的人 那么,andNot的结果表示只养猫的人 ''' def andNot(self,bitmap): andBitmap = self.and_(bitmap) newArray = self.array.copy() for i in range (andBitmap.size): newArray[i] = self.array[i] ^ andBitmap.array[i] maxValue = self.__getNewMaxValue (newArray) newBitmap = Bitmap (maxValue) newBitmap.array = newArray.copy() return newBitmap ''' 求出自身与bitmap的并集元素,再去除自身与bitmap的交集元素 例如: self表示养猫的人,bitmap表示养狗的人 那么,xor的结果表示只养猫或者只养狗的人 ''' def xor(self, bitmap): orBitmap = self.or_(bitmap) andBitmap = self.and_ (bitmap) newArray = orBitmap.array for i in range (andBitmap.size): newArray[i] = newArray[i] ^ andBitmap.array[i] maxValue = self.__getNewMaxValue (newArray) newBitmap = Bitmap (maxValue) newBitmap.array = newArray return newBitmap ''' 排序,返回排序后的列表 desc为True降序,否则升序 ''' def sort(self, desc=False): result = [] for i in range(self.max+1): if self.isExist(i): result.append(i) if desc: return result[::-1] else: return result if __name__ == "__main__": # 利用bitmap对数组排序 array1 = [1,31,15,45,123,344] bitmap1 = Bitmap(array=array1) print ("原数组:" + str (array1)) print("升序排序:"+str(bitmap1.sort())) print ("降序排序:" + str(bitmap1.sort (desc=True))) #利用bitmap对数组去重 array2 = [1, 33, 34, 1, 33, 532, 232, 532, 543, 99] bitmap2 = Bitmap(array=array2) print ("原数组:" + str (array2)) print("去掉重复数据后的数组为:"+str(bitmap2.sort())) #判断某个数据是否在数组中 num = 99 judge = "在" if bitmap2.isExist(num) else "不在" print("%d%s数组中"%(num, judge)) ''' 集合操作 例子:列表list1表示喜欢刘亦非的人的id,列表list2表示喜欢angelababy的人 ''' list1 = [2, 10, 12, 43, 45, 112, 345, 645, 1243, 10982] list2 = [4, 16, 12, 48, 49, 345, 564, 786, 6554, 10982, 50982] bitmap1 = Bitmap(array=list1) bitmap2 = Bitmap(array=list2) bitmap3 = bitmap1.or_(bitmap2) print("喜欢刘亦非或者喜欢angelababy的人有:"+str(bitmap3.sort())) bitmap3 = bitmap1.and_ (bitmap2) print ("既喜欢刘亦非又喜欢angelababy的人有:" + str (bitmap3.sort ())) bitmap3 = bitmap1.andNot(bitmap2) print ("喜欢刘亦非但不喜欢angelababy的人有:" + str (bitmap3.sort ())) bitmap3 = bitmap1.xor(bitmap2) print ("只喜欢刘亦非或只喜欢angelababy的人有:" + str (bitmap3.sort ()))
# -*- coding:utf-8 -*- class BitMap(): def __init__(self, maxValue): self._size = int((maxValue + 31 - 1) / 31) # 向上取正 确定数组大小 # self._size = num / 31 # 向下取正 确定数组大小 self.array = [0 for i in range(self._size)] # 初始化为0 def getElemIndex(self, num): # 获取该数的数组下标 return num // 31 def getBitIndex(self, num): # 获取该数所在数组的位下标 return num % 31 def set(self, num): # 将该数所在的位置1 elemIndex = self.getElemIndex(num) bitIndex = self.getBitIndex(num) self.array[elemIndex] = self.array[elemIndex] | (1 << bitIndex) def clean(self, num): # 将该数所在的位置0 elemIndex = self.getElemIndex(num) bitIndex = self.getBitIndex(num) self.array[elemIndex] = self.array[elemIndex] & (~(1 << bitIndex)) def find(self, num): # 查找该数是否存在 elemIndex = self.getElemIndex(num) bitIndex = self.getBitIndex(num) if self.array[elemIndex] & (1 << bitIndex): return True return False if __name__ == '__main__': array_list = [45, 2, 78, 35, 67, 90, 879, 0, 340, 123, 46] results = [] bitmap = BitMap(maxValue=max(array_list)) for num in array_list: bitmap.set(num) for i in range(max(array_list) + 1): if bitmap.find(i): results.append(i) print(array_list) print(results)
布隆过滤器
from bitarray import bitarray # 3rd party import mmh3 class BloomFilter(set): def __init__(self, size, hash_count): super(BloomFilter, self).__init__() self.bit_array = bitarray(size) self.bit_array.setall(0) self.size = size self.hash_count = hash_count def __len__(self): return self.size def __iter__(self): return iter(self.bit_array) def add(self, item): for ii in range(self.hash_count): index = mmh3.hash(item, ii) % self.size self.bit_array[index] = 1 return self def __contains__(self, item): out = True for ii in range(self.hash_count): index = mmh3.hash(item, ii) % self.size if self.bit_array[index] == 0: out = False return out def main(): bloom = BloomFilter(100, 10) animals = ['dog', 'cat', 'giraffe', 'fly', 'mosquito', 'horse', 'eagle', 'bird', 'bison', 'boar', 'butterfly', 'ant', 'anaconda', 'bear', 'chicken', 'dolphin', 'donkey', 'crow', 'crocodile'] # First insertion of animals into the bloom filter for animal in animals: bloom.add(animal) # Membership existence for already inserted animals # There should not be any false negatives for animal in animals: if animal in bloom: print('{} is in bloom filter as expected'.format(animal)) else: print('Something is terribly went wrong for {}'.format(animal)) print('FALSE NEGATIVE!') # Membership existence for not inserted animals # There could be false positives other_animals = ['badger', 'cow', 'pig', 'sheep', 'bee', 'wolf', 'fox', 'whale', 'shark', 'fish', 'turkey', 'duck', 'dove', 'deer', 'elephant', 'frog', 'falcon', 'goat', 'gorilla', 'hawk' ] for other_animal in other_animals: if other_animal in bloom: print('{} is not in the bloom, but a false positive'.format(other_animal)) else: print('{} is not in the bloom filter as expected'.format(other_animal)) if __name__ == '__main__': main()
输出为:
dog is in bloom filter as expected cat is in bloom filter as expected giraffe is in bloom filter as expected fly is in bloom filter as expected mosquito is in bloom filter as expected horse is in bloom filter as expected eagle is in bloom filter as expected bird is in bloom filter as expected bison is in bloom filter as expected boar is in bloom filter as expected butterfly is in bloom filter as expected ant is in bloom filter as expected anaconda is in bloom filter as expected bear is in bloom filter as expected chicken is in bloom filter as expected dolphin is in bloom filter as expected donkey is in bloom filter as expected crow is in bloom filter as expected crocodile is in bloom filter as expected badger is not in the bloom filter as expected cow is not in the bloom filter as expected pig is not in the bloom filter as expected sheep is not in the bloom, but a false positive bee is not in the bloom filter as expected wolf is not in the bloom filter as expected frog is not in the bloom filter as expected falcon is not in the bloom filter as expected goat is not in the bloom filter as expected gorilla is not in the bloom filter as expected hawk is not in the bloom filter as expected
字典树实现
# -*- coding:utf-8 -*- """ Description:大变双向字典树 迭代次数默认最大999,可以增加但是没必要。其实能深到999层,那这个序列还是选择另外的处理方式吧。 @author: WangLeAi @date: 2018/8/15 """ class TrieNode(object): def __init__(self, value=None, count=0, parent=None): # 值 self.value = value # 频数统计 self.count = count # 父结点 self.parent = parent # 子节点,{value:TrieNode} self.children = {} class Trie(object): def __init__(self): # 创建空的根节点 self.root = TrieNode() def insert(self, sequence): """ 基操,插入一个序列 :param sequence: 列表 :return: """ cur_node = self.root for item in sequence: if item not in cur_node.children: # 插入结点 child = TrieNode(value=item, count=1, parent=cur_node) cur_node.children[item] = child cur_node = child else: # 更新结点 cur_node = cur_node.children[item] cur_node.count += 1 def search(self, sequence): """ 基操,查询是否存在完整序列 :param sequence: 列表 :return: """ cur_node = self.root mark = True for item in sequence: if item not in cur_node.children: mark = False break else: cur_node = cur_node.children[item] # 如果还有子结点,说明序列并非完整 if cur_node.children: mark = False return mark def delete(self, sequence): """ 基操,删除序列,准确来说是减少计数 :param sequence: 列表 :return: """ mark = False if self.search(sequence): mark = True cur_node = self.root for item in sequence: cur_node.children[item].count -= 1 if cur_node.children[item].count == 0: cur_node.children.pop(item) break else: cur_node = cur_node.children[item] return mark def search_part(self, sequence, prefix, suffix, start_node=None): """ 递归查找子序列,返回前缀和后缀结点 此处简化操作,仅返回一位前后缀的内容与频数 :param sequence: 列表 :param prefix: 前缀字典,初始传入空字典 :param suffix: 后缀字典,初始传入空字典 :param start_node: 起始结点,用于子树的查询 :return: """ if start_node: cur_node = start_node prefix_node = start_node.parent else: cur_node = self.root prefix_node = self.root mark = True # 必须从第一个结点开始对比 for i in range(len(sequence)): if i == 0: if sequence[i] != cur_node.value: for child_node in cur_node.children.values(): self.search_part(sequence, prefix, suffix, child_node) mark = False break else: if sequence[i] not in cur_node.children: for child_node in cur_node.children.values(): self.search_part(sequence, prefix, suffix, child_node) mark = False break else: cur_node = cur_node.children[sequence[i]] if mark: if prefix_node.value: # 前缀数量取序列词中最后一词的频数 if prefix_node.value in prefix: prefix[prefix_node.value] += cur_node.count else: prefix[prefix_node.value] = cur_node.count for suffix_node in cur_node.children.values(): if suffix_node.value in suffix: suffix[suffix_node.value] += suffix_node.count else: suffix[suffix_node.value] = suffix_node.count # 即使找到一部分还需继续查找子结点 for child_node in cur_node.children.values(): self.search_part(sequence, prefix, suffix, child_node) if __name__ == "__main__": trie = Trie() texts = [["葬爱", "少年", "葬爱", "少年", "慕周力", "哈哈"], ["葬爱", "少年", "阿西吧"], ["烈", "烈", "风", "中"], ["忘记", "了", "爱"], ["埋葬", "了", "爱"]] for text in texts: trie.insert(text) markx = trie.search(["忘记", "了", "爱"]) print(markx) markx = trie.search(["忘记", "了"]) print(markx) markx = trie.search(["忘记", "爱"]) print(markx) markx = trie.delete(["葬爱", "少年", "王周力"]) print(markx) prefixx = {} suffixx = {} trie.search_part(["葬爱", "少年"], prefixx, suffixx) print(prefixx) print(suffixx)
输出: