海量数据处理-Python

简介: 文章目录海量数据处理-Python海量数据处理的困难大文件生成空间受限

海量数据处理的困难

海量数据处理的困难用一句话概括,就是时空资源不够。具体来说,

  • 空间受限:无法将海量数据一次性读入内存;
  • 时间受限:无法在有限时间内,完成针对海量数据的某项处理工作。

而对于空间受限的问题,一般的解决办法是“大而化小,分而治之”的策略,既然一次性行不通,那就一部分一部分读,每读入一部分可以生成一个小文件,小文件是可以直接读入内存的,我们这样分割大数据之后,再依次处理小文件的数据。

对于时间受限的问题,我们一般的解决办法是高效的算法配合恰当的数据结构,比如哈希表,Bloom Filter,堆,倒排索引,tire树);


至于所谓的单机及集群问题,通俗点来讲,单机就是处理装载数据的机器有限(只要考虑cpu,内存,硬盘的数据交互),而集群,机器有多辆,适合分布式处理,并行计算(更多考虑节点和节点间的数据交互)。


海量数据处理Big Data Processing的大致方法包括:


分而治之/hash映射 + hash统计 + 堆/快速/归并排序;

双层桶划分

Bloom filter/Bitmap;

Trie树/数据库/倒排索引;

外排序;

分布式处理之Hadoop/Mapreduce。


大文件生成

生成2亿个IP地址

# 生成2亿个IP
def generateRandom(rangeFrom, rangeTo):
    import random
    return random.randint(rangeFrom,rangeTo)
def generageMassiveIPAddr(fileLocation,numberOfLines):
    IP = []
    with open(fileLocation, 'a+') as file_handler:
        for i in range(numberOfLines):
            IP.append('10.197.' + str(generateRandom(0,255))+'.'+ str(generateRandom(0,255)) + '\n')
        file_handler.writelines(IP)
if __name__ == '__main__':
    from time import ctime
    print(ctime())
    for i in range(20):
        print( '  ' + str(i) + ": " + ctime())
        generageMassiveIPAddr('d:\\massiveIP.txt', 10000000)
    print(ctime())

耗时14m54s,输出为:

Thu Jan  6 19:01:05 2022
  0: Thu Jan  6 19:01:05 2022
  1: Thu Jan  6 19:01:50 2022
  2: Thu Jan  6 19:02:37 2022
  3: Thu Jan  6 19:03:22 2022
  4: Thu Jan  6 19:04:07 2022
  5: Thu Jan  6 19:04:52 2022
  6: Thu Jan  6 19:05:40 2022
  7: Thu Jan  6 19:06:25 2022
  8: Thu Jan  6 19:07:10 2022
  9: Thu Jan  6 19:07:56 2022
  10: Thu Jan  6 19:08:41 2022
  11: Thu Jan  6 19:09:26 2022
  12: Thu Jan  6 19:10:10 2022
  13: Thu Jan  6 19:10:54 2022
  14: Thu Jan  6 19:11:38 2022
  15: Thu Jan  6 19:12:22 2022
  16: Thu Jan  6 19:13:06 2022
  17: Thu Jan  6 19:13:49 2022
  18: Thu Jan  6 19:14:32 2022
  19: Thu Jan  6 19:15:15 2022
Thu Jan  6 19:15:59 2022

文件IP数量为2亿条,大小为3105013kb / (1024*1024) = 2.96GB:



平均每条IP大小为:3105013 * 1024 / 200000000= 15.90

空间受限

假设内存大小为512M

可以载入的IP数量为 512*1024*1024/15.90 = 33765466

三千三百七十六万五千四百六十六

需要分块的数量为:200000000 / 33765466 = 5.923

分块读取

import pandas as pd
from tqdm import tqdm
f = open('d:\\massiveIP.txt')
reader = pd.read_csv(f, sep=',',header=None,names=["IP"], iterator=True)
loop = True
chunkSize = 33765466
chunks = []
# 需要6次分块,这里迭代7次
for i in tqdm(range(7)) :
    try:
        # print("*"*5,i,"*"*5)
        chunk = reader.get_chunk(chunkSize)
        df1 = chunk["IP"].value_counts()
        chunks.append(df1)
        del chunk
    except StopIteration:
        # loop = False
        print("Iteration is stopped.")
re_chunks = []
for i in chunks:
    df1 = i.reset_index()
    re_chunks.append(df1)
df22 = pd.concat(re_chunks, ignore_index=True)
df22.groupby(by=["index"]).agg({"IP":"sum"}).reset_index().sort_values(by=["IP"],ascending=False)

耗时2m36.1s

输出为:

100%|██████████| 7/7 [02:33<00:00, 21.88s/it]
  index IP
14312 10.197.148.78 3460
59266 10.197.77.215 3456
47632 10.197.36.112 3440
46706 10.197.32.200 3435
56153 10.197.66.179 3427
... ... ...
33713 10.197.216.28 2988
5232  10.197.116.2  2988
51331 10.197.49.216 2982
25792 10.197.189.41 2972
8555  10.197.128.195  2955
65536 rows × 2 columns

文件拆分提取

拆分小文件

# 大文件分割 
from time import ctime
def splitFile(fileLocation, targetFoler):
    import os
    import shutil
    if not os.path.exists(targetFoler):
        os.mkdir(targetFoler)
        print("create ",targetFoler, " successed")
    else:
        shutil.rmtree(targetFoler)
        print("remove ",targetFoler, " successed")
        os.mkdir(targetFoler)
        print("create ",targetFoler, " successed")
    with open(fileLocation, 'r') as file_handler:
        # 512M 512*1024*1024/15.90 = 33765466 
        block_size = 33765466 
        line = file_handler.readline()
        temp = []
        countFile = 1
        while line:
            if line =="":
                break
            for i in range(block_size):
                if i == (block_size-1):
                    # write block to small files
                    with open(targetFoler + "\\file_"+str(countFile)+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp)
                        temp = []
                        print("  file " + str(countFile) + " generated at: " + str(ctime()))
                        countFile = countFile + 1
                else:
                    temp.append(line)
                    line = file_handler.readline()
if __name__ == '__main__':
    print("Start At: " + str(ctime()))
    splitFile("d:\\massiveIP.txt", "d:\\massiveData")

耗时:7m2.9s

输出为:

Start At: Thu Jan  6 19:30:33 2022
remove  d:\massiveData  successed
create  d:\massiveData  successed
  file 1 generated at: Thu Jan  6 19:31:15 2022
  file 2 generated at: Thu Jan  6 19:31:54 2022
  file 3 generated at: Thu Jan  6 19:32:33 2022
  file 4 generated at: Thu Jan  6 19:33:11 2022
  file 5 generated at: Thu Jan  6 19:33:53 2022
  file 6 generated at: Thu Jan  6 19:34:39 2022
  file 7 generated at: Thu Jan  6 19:37:34 2022


比较小文件

import os
from time import ctime
def findIP(targetFolder):
    Result = {}
    IP = {}
    for root, dirs, files in os.walk(targetFolder):
        for f in files:
            # read small files
            file_handler = open(os.path.join(targetFolder, f), 'r')
            lines = file_handler.readlines()
            file_handler.close()
            # get IP in file, store to IP Dictionary
            for line in lines:
                if line in IP:
                    IP[line] = IP[line] + 1
                else:
                    IP[line] = 1
            # sort Dictionary
            IP = sorted(IP.items(), key=lambda d: d[1])
            # get max item(s), store to Result Dictionary
            maxItem = IP[-1][1]
            print('  File ' + str(f) + ":")
            print ("    maxItem: " + str(IP[-1]))
            tempTuple = IP.pop()
            while tempTuple[1] == maxItem:
                if tempTuple[0] in Result:
                    Result[tempTuple[0]] = Result[tempTuple[0]] + 1
                else:
                    Result[tempTuple[0]] = tempTuple[1]
                tempTuple = IP.pop()
            IP = {}
            print ('    Finished: ' + ctime())
    print(sorted(Result.items(), key=lambda d: d[1]))
if __name__ == '__main__':
    print ('Start: ' + ctime())
    findIP("d:\\massiveData")
    print ('End: ' + ctime())

耗时2m28.7s

输出为:

Start: Thu Jan  6 19:45:08 2022
  File file_1.txt:
    maxItem: ('10.197.171.38\n', 619)
    Finished: Thu Jan  6 19:45:31 2022
  File file_2.txt:
    maxItem: ('10.197.207.162\n', 626)
    Finished: Thu Jan  6 19:45:54 2022
  File file_3.txt:
    maxItem: ('10.197.237.210\n', 602)
    Finished: Thu Jan  6 19:46:16 2022
  File file_4.txt:
    maxItem: ('10.197.45.67\n', 614)
    Finished: Thu Jan  6 19:46:41 2022
  File file_5.txt:
    maxItem: ('10.197.153.232\n', 605)
    Finished: Thu Jan  6 19:47:04 2022
  File file_6.txt:
    maxItem: ('10.197.148.78\n', 612)
    Finished: Thu Jan  6 19:47:28 2022
  File file_7.txt:
    maxItem: ('10.197.210.11\n', 160)
    Finished: Thu Jan  6 19:47:36 2022
[('10.197.210.11\n', 160), ('10.197.237.210\n', 602), ('10.197.193.149\n', 602), ('10.197.130.236\n', 602), ('10.197.243.35\n', 602), ('10.197.253.239\n', 602), ('10.197.153.232\n', 605), ('10.197.148.78\n', 612), ('10.197.45.67\n', 614), ('10.197.171.38\n', 619), ('10.197.207.162\n', 626)]
End: Thu Jan  6 19:47:36 2022

备注:这个输出是错误,只考虑了局部最大IP,并没考虑全局最大IP

解决办法如下:

## 解决局部排序问题
import os
from time import ctime
def findIP(targetFolder):
    Result = {}
    IP = {}
    for root, dirs, files in os.walk(targetFolder):
        for f in files:
            # read small files
            file_handler = open(os.path.join(targetFolder, f), 'r')
            lines = file_handler.readlines()
            file_handler.close()
            # get IP in file, store to IP Dictionary
            for line in lines:
                if line in IP:
                    IP[line] = IP[line] + 1
                else:
                    IP[line] = 1
            # sort Dictionary
        IP = sorted(IP.items(), key=lambda d: d[1])
        print ('    Finished: ' + ctime())
    # print(sorted(Result.items(), key=lambda d: d[1]))
    return IP
print ('Start: ' + ctime())
ipDict = findIP("d:\\massiveData")
print ('End: ' + ctime())
print(ipDict[-10:])

耗时为:2m28.6s

输出为:

Start: Thu Jan  6 19:56:03 2022
    Finished: Thu Jan  6 19:58:31 2022
End: Thu Jan  6 19:58:31 2022
[('10.197.158.206\n', 3414), ('10.197.163.161\n', 3418), ('10.197.237.105\n', 3421), ('10.197.179.88\n', 3425), ('10.197.254.65\n', 3426), ('10.197.66.179\n', 3427), ('10.197.32.200\n', 3435), ('10.197.36.112\n', 3440), ('10.197.77.215\n', 3456), ('10.197.148.78\n', 3460)]

通过hash拆分文件

拆分小文件-依据hash

代码如下:

#  大文件分割 
from time import ctime
def splitFile(fileLocation, targetFoler):
    import os
    import shutil
    if not os.path.exists(targetFoler):
        os.mkdir(targetFoler)
        print("create ",targetFoler, " successed")
    else:
        shutil.rmtree(targetFoler)
        print("remove ",targetFoler, " successed")
        os.mkdir(targetFoler)
        print("create ",targetFoler, " successed")
    fileNum = [1,2,3,4,5,6,7,8,9]
    with open(fileLocation, 'r') as file_handler:
        # 512M 512*1024*1024/15.90 = 33765466 
        # block_size = 33765466 
        batch_size = 20000000
        line = file_handler.readline()
        temp0 = []
        temp1 = []
        temp2 = []
        temp3 = []
        temp4 = []
        temp5 = []
        temp6 = []
        while line:
            # if line =="":
            #     break
            if hash(line) % 7 == fileNum[0]:
                temp0.append(line)
                line = file_handler.readline()
                if len(temp0) > 10000000:
                    with open(targetFoler + "\\file_"+str(fileNum[0])+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp0)
                        temp0 = []
            elif  hash(line) % 7 == fileNum[1]:
                temp1.append(line)
                line = file_handler.readline()
            elif  hash(line) % 7 == fileNum[2]:
                temp2.append(line)
                line = file_handler.readline()
            elif  hash(line) % 7 == fileNum[3]:
                temp3.append(line)
                line = file_handler.readline()
            elif  hash(line) % 7 == fileNum[4]:
                temp4.append(line)
                line = file_handler.readline()
            elif  hash(line) % 7 == fileNum[5]:
                temp5.append(line)
                line = file_handler.readline()
            else:
                temp6.append(line)
                line = file_handler.readline()
            if len(temp1)  > batch_size or line =="": 
                    with open(targetFoler + "\\file_"+str(fileNum[1])+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp1)
                        temp1 = []
            if len(temp2) > batch_size or line =="":
                    with open(targetFoler + "\\file_"+str(fileNum[2])+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp2)
                        temp2 = [] 
            if len(temp3) > batch_size or line =="":
                    with open(targetFoler + "\\file_"+str(fileNum[3])+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp3)
                        temp3 = [] 
            if len(temp4) > batch_size or line =="":
                    with open(targetFoler + "\\file_"+str(fileNum[4])+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp4)
                        temp4 = []
            if len(temp5) > batch_size or line =="":
                    with open(targetFoler + "\\file_"+str(fileNum[5])+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp5)
                        temp5 = []
            if len(temp6) > batch_size or line =="":
                    with open(targetFoler + "\\file_"+str(fileNum[6])+".txt", 'a+') as file_writer:
                        file_writer.writelines(temp6)
                        temp6 = []
if __name__ == '__main__':
    print("Start At: " + str(ctime()))
    splitFile("d:\\massiveIP.txt", "d:\\massiveDataHash")

耗时:10m34.2s


求取IP前TopK(还是遍历所有文件并聚合)

#### 解决局部排序问题
import os
from time import ctime
def findIP(targetFolder):
    Result = {}
    IP = {}
    for root, dirs, files in os.walk(targetFolder):
        for f in files:
            # read small files
            file_handler = open(os.path.join(targetFolder, f), 'r')
            lines = file_handler.readlines()
            file_handler.close()
            # get IP in file, store to IP Dictionary
            for line in lines:
                if line in IP:
                    IP[line] = IP[line] + 1
                else:
                    IP[line] = 1
    return IP
print ('Start: ' + ctime())
ipDict = findIP("d:\\massiveDataHash")
print ('End: ' + ctime())
ipNum = 0
for key, value in ipDict.items():
    # print("\nKey: " + key)
    # print("Value: " + str(value))
    ipNum += value
print(ipNum)
IP = sorted(ipDict.items(), key=lambda d: d[1]) 
IP[-10:]

输出:

Start: Thu Jan  6 21:26:59 2022
End: Thu Jan  6 21:28:43 2022
200141521
[('10.197.158.206\n', 3414),
 ('10.197.163.161\n', 3418),
 ('10.197.237.105\n', 3421),
 ('10.197.179.88\n', 3425),
 ('10.197.254.65\n', 3426),
 ('10.197.66.179\n', 3427),
 ('10.197.32.200\n', 3435),
 ('10.197.36.112\n', 3440),
 ('10.197.77.215\n', 3456),
 ('10.197.148.78\n', 3460)]

求取最大IP,每个文件求最大值

## 解决局部排序问题
import os
from time import ctime
def findIP(targetFolder):
    Result = {}
    IP = {}
    for root, dirs, files in os.walk(targetFolder):
        for f in files:
            # read small files
            file_handler = open(os.path.join(targetFolder, f), 'r')
            lines = file_handler.readlines()
            file_handler.close()
            # get IP in file, store to IP Dictionary
            for line in lines:
                if line in IP:
                    IP[line] = IP[line] + 1
                else:
                    IP[line] = 1
            # sort Dictionary
            IP = sorted(IP.items(), key=lambda d: d[1])
            # # get max item(s), store to Result Dictionary
            maxItem = IP[-1][1]
            print ('  File ' + str(f) + ":")
            print( "    maxItem: " + str(IP[-1]))
            tempTuple = IP.pop()
            while tempTuple[1] == maxItem:
                if tempTuple[0] in Result:
                    Result[tempTuple[0]] = Result[tempTuple[0]] + 1
                else:
                    Result[tempTuple[0]] = tempTuple[1]
                tempTuple = IP.pop()
            IP = {}
            print ('    Finished: ' + ctime())
    print(sorted(Result.items(), key=lambda d: d[1]))
    # return IP
print ('Start: ' + ctime())
ipDict = findIP("d:\\massiveDataHash")
print ('End: ' + ctime())

耗时1m38.5s

输出:

Start: Thu Jan  6 21:59:04 2022
  File file_1.txt:
    maxItem: ('10.197.87.218\n', 2313)
    Finished: Thu Jan  6 21:59:13 2022
  File file_2.txt:
    maxItem: ('10.197.31.199\n', 3400)
    Finished: Thu Jan  6 21:59:27 2022
  File file_3.txt:
    maxItem: ('10.197.148.78\n', 3460)
    Finished: Thu Jan  6 21:59:41 2022
  File file_4.txt:
    maxItem: ('10.197.66.179\n', 3427)
    Finished: Thu Jan  6 21:59:55 2022
  File file_5.txt:
    maxItem: ('10.197.32.200\n', 3435)
    Finished: Thu Jan  6 22:00:10 2022
  File file_6.txt:
    maxItem: ('10.197.36.112\n', 3440)
    Finished: Thu Jan  6 22:00:25 2022
  File file_7.txt:
    maxItem: ('10.197.147.86\n', 3399)
    Finished: Thu Jan  6 22:00:42 2022
[('10.197.87.218\n', 2313), ('10.197.147.86\n', 3399), 
('10.197.31.199\n', 3400), ('10.197.66.179\n', 3427), 
('10.197.32.200\n', 3435), ('10.197.36.112\n', 3440), 
('10.197.148.78\n', 3460)]

构造字典-针对重复较多的键

import os
from time import ctime
def findIPAtOnce(targetFile):
    print ("Started At: " + ctime())
    Result = {}
    with open(targetFile, 'r') as file_handler:
        print ("File Read Finished At: " + ctime())
        for line in file_handler:
            if line in Result:
                Result[line] = Result[line] + 1
            else:
                Result[line] = 1
        print ("Write to Dic Finished At: " + ctime())
        Result = sorted(Result.items(), key=lambda d: d[1])
        print ("Sorting Finished At: " + ctime())
        print ('Result:')
        for i in range(10):
            print ('  ' + str(Result.pop()))
if __name__ == '__main__':
    findIPAtOnce("d:\\massiveIP.txt")

耗时:2m8.4s

输出:

Started At: Thu Jan  6 22:04:40 2022
File Read Finished At: Thu Jan  6 22:04:40 2022
Write to Dic Finished At: Thu Jan  6 22:06:48 2022
Sorting Finished At: Thu Jan  6 22:06:48 2022
Result:
  ('10.197.148.78\n', 3460)
  ('10.197.77.215\n', 3456)
  ('10.197.36.112\n', 3440)
  ('10.197.32.200\n', 3435)
  ('10.197.66.179\n', 3427)
  ('10.197.254.65\n', 3426)
  ('10.197.179.88\n', 3425)
  ('10.197.237.105\n', 3421)
  ('10.197.163.161\n', 3418)
  ('10.197.158.206\n', 3414)

时间受限

Bitmap算法

class Bitmap:
    def __init__(self, maxValue=31,array=None):
        if maxValue < 0 :
            raise BaseException("最大值不能小于0。")
        self.max = maxValue
        if array:
            self.max = int(max(array))
        self.size = self.__getIndex(self.max)[0] + 1
        self.array = [0] * self.size
        if array:
            for i in range(len(array)):
                self.set(array[i])
    # 获取数组的索引、位的索引
    def __getIndex(self, num):
        aIndex = int(num / 32)
        bIndex = num % 32
        return aIndex,bIndex
    # 改变数组长度
    def __changeLength(self, aIndex, bIndex):
        self.max = aIndex * 32 + bIndex
        self.size = aIndex + 1
        newArray = [0] * self.size
        for i in range(len(self.array)):
            newArray[i] = self.array[i]
        self.array = newArray
    '''
     添加数据
     先求得该数对应的数组索引以及位的索引
     如果要将该数对应的位置为1(value=True),则用当前数组的数与1右移该数对应的位索引求或
     如果要将该数对应的位置为0(value=False),则用当前数组的数与1右移该数对应的位索引取反后求与
    '''
    def set(self, num, value=True):
        aIndex, bIndex = self.__getIndex(int(num))
        if aIndex >= self.size:
            self.__changeLength(aIndex, bIndex)
        arrayNum = self.array[aIndex]
        if value:
            self.array[aIndex ] = arrayNum | (1 << bIndex )
        else:
            temp = 1 << bIndex
            ft = ~temp
            self.array[aIndex] = arrayNum & ft
    '''
    判断某个数是存在
    先求得该数对应的数组索引以及位的索引,用数组当前索引对应的数与1右移该数对应位的索引位求与
    结果不为0表示存在,否则不存在
    '''
    def isExist(self, num):
        aIndex, bIndex = self.__getIndex (num)
        arrayNum = self.array[aIndex]
        if arrayNum & (1 << bIndex):
            return True
        return False
    def __getNewMaxValue(self, newArray):
        for i in range(len(newArray)-1, -1, -1):
            if newArray[i]:
                isEnd = False
                for n in range((i+1)*32-1, i*32-1, -1):
                    aIndex, bIndex = self.__getIndex (n)
                    arrayNum = newArray[aIndex]
                    if arrayNum & (1 << bIndex):
                        return n
        return 0
    '''
    求自身与bitmap的交集,返回一个新的Bitmap对象
    例如:
    self表示养猫的人,bitmap表示养狗的人
    那么,and_的结果表示猫狗都养的人
    '''
    def and_(self,bitmap):
        size = self.size if self.size < bitmap.size else bitmap.size
        newArray = [0] * size
        for i in range(size):
            newArray[i] = self.array[i] & bitmap.array[i]
        maxValue = self.__getNewMaxValue(newArray)
        newBitmap = Bitmap(maxValue)
        newBitmap.array = newArray
        return newBitmap
    '''
    求自身与bitmap的并集,返回一个新的Bitmap对象
    例如:
    self表示养猫的人,bitmap表示养狗的人
    那么,or_的结果表示养猫或养猫或者猫狗都养的人
    '''
    def or_(self,bitmap):
        newArray = []
        if self.size > bitmap.size:
            newArray = self.array.copy()
        else:
            newArray = bitmap.array.copy()
        size = self.size if self.size < bitmap.size else bitmap.size
        for i in range(size):
            newArray[i] = self.array[i] | bitmap.array[i]
        maxValue = self.__getNewMaxValue(newArray)
        newBitmap = Bitmap(maxValue)
        newBitmap.array = newArray
        return newBitmap
    '''
    除去在bitmap中存在的自身元素,返回一个新的Bitmap对象
    例如:
    self表示养猫的人,bitmap表示养狗的人
    那么,andNot的结果表示只养猫的人
    '''
    def andNot(self,bitmap):
        andBitmap = self.and_(bitmap)
        newArray = self.array.copy()
        for i in range (andBitmap.size):
            newArray[i] = self.array[i] ^ andBitmap.array[i]
        maxValue = self.__getNewMaxValue (newArray)
        newBitmap = Bitmap (maxValue)
        newBitmap.array = newArray.copy()
        return newBitmap
    '''
    求出自身与bitmap的并集元素,再去除自身与bitmap的交集元素
    例如:
    self表示养猫的人,bitmap表示养狗的人
    那么,xor的结果表示只养猫或者只养狗的人
    '''
    def xor(self, bitmap):
        orBitmap = self.or_(bitmap)
        andBitmap = self.and_ (bitmap)
        newArray = orBitmap.array
        for i in range (andBitmap.size):
            newArray[i] = newArray[i] ^ andBitmap.array[i]
        maxValue = self.__getNewMaxValue (newArray)
        newBitmap = Bitmap (maxValue)
        newBitmap.array = newArray
        return newBitmap
    '''
    排序,返回排序后的列表
    desc为True降序,否则升序
    '''
    def sort(self, desc=False):
        result = []
        for i in range(self.max+1):
            if self.isExist(i):
                result.append(i)
        if desc:
            return result[::-1]
        else:
            return result
if __name__ == "__main__":
    # 利用bitmap对数组排序
    array1 = [1,31,15,45,123,344]
    bitmap1 = Bitmap(array=array1)
    print ("原数组:" + str (array1))
    print("升序排序:"+str(bitmap1.sort()))
    print ("降序排序:" + str(bitmap1.sort (desc=True)))
    #利用bitmap对数组去重
    array2 = [1, 33, 34, 1, 33, 532, 232, 532, 543, 99]
    bitmap2 = Bitmap(array=array2)
    print ("原数组:" + str (array2))
    print("去掉重复数据后的数组为:"+str(bitmap2.sort()))
    #判断某个数据是否在数组中
    num = 99
    judge = "在" if bitmap2.isExist(num) else "不在"
    print("%d%s数组中"%(num, judge))
    '''
    集合操作
    例子:列表list1表示喜欢刘亦非的人的id,列表list2表示喜欢angelababy的人
    '''
    list1 = [2, 10, 12, 43, 45, 112, 345, 645, 1243, 10982]
    list2 = [4, 16, 12, 48, 49, 345, 564, 786, 6554, 10982, 50982]
    bitmap1 = Bitmap(array=list1)
    bitmap2 = Bitmap(array=list2)
    bitmap3 = bitmap1.or_(bitmap2)
    print("喜欢刘亦非或者喜欢angelababy的人有:"+str(bitmap3.sort()))
    bitmap3 = bitmap1.and_ (bitmap2)
    print ("既喜欢刘亦非又喜欢angelababy的人有:" + str (bitmap3.sort ()))
    bitmap3 = bitmap1.andNot(bitmap2)
    print ("喜欢刘亦非但不喜欢angelababy的人有:" + str (bitmap3.sort ()))
    bitmap3 = bitmap1.xor(bitmap2)
    print ("只喜欢刘亦非或只喜欢angelababy的人有:" + str (bitmap3.sort ()))
# -*- coding:utf-8 -*-
class BitMap():
    def __init__(self, maxValue):
        self._size = int((maxValue + 31 - 1) / 31)  # 向上取正  确定数组大小
        # self._size = num / 31  # 向下取正  确定数组大小
        self.array = [0 for i in range(self._size)]  # 初始化为0
    def getElemIndex(self, num):  # 获取该数的数组下标
        return num // 31
    def getBitIndex(self, num):  # 获取该数所在数组的位下标
        return num % 31
    def set(self, num):  # 将该数所在的位置1
        elemIndex = self.getElemIndex(num)
        bitIndex = self.getBitIndex(num)
        self.array[elemIndex] = self.array[elemIndex] | (1 << bitIndex)
    def clean(self, num): # 将该数所在的位置0
        elemIndex = self.getElemIndex(num)
        bitIndex = self.getBitIndex(num)
        self.array[elemIndex] = self.array[elemIndex] & (~(1 << bitIndex))
    def find(self, num):  # 查找该数是否存在
        elemIndex = self.getElemIndex(num)
        bitIndex = self.getBitIndex(num)
        if self.array[elemIndex] & (1 << bitIndex):
            return True
        return False
if __name__ == '__main__':
    array_list = [45, 2, 78, 35, 67, 90, 879, 0, 340, 123, 46]
    results = []
    bitmap = BitMap(maxValue=max(array_list))
    for num in array_list:
        bitmap.set(num)
    for i in range(max(array_list) + 1):
        if bitmap.find(i):
            results.append(i)
    print(array_list)
    print(results)

布隆过滤器

from bitarray import bitarray
# 3rd party
import mmh3
class BloomFilter(set):
    def __init__(self, size, hash_count):
        super(BloomFilter, self).__init__()
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)
        self.size = size
        self.hash_count = hash_count
    def __len__(self):
        return self.size
    def __iter__(self):
        return iter(self.bit_array)
    def add(self, item):
        for ii in range(self.hash_count):
            index = mmh3.hash(item, ii) % self.size
            self.bit_array[index] = 1
        return self
    def __contains__(self, item):
        out = True
        for ii in range(self.hash_count):
            index = mmh3.hash(item, ii) % self.size
            if self.bit_array[index] == 0:
                out = False
        return out
def main():
    bloom = BloomFilter(100, 10)
    animals = ['dog', 'cat', 'giraffe', 'fly', 'mosquito', 'horse', 'eagle',
               'bird', 'bison', 'boar', 'butterfly', 'ant', 'anaconda', 'bear',
               'chicken', 'dolphin', 'donkey', 'crow', 'crocodile']
    # First insertion of animals into the bloom filter
    for animal in animals:
        bloom.add(animal)
    # Membership existence for already inserted animals
    # There should not be any false negatives
    for animal in animals:
        if animal in bloom:
            print('{} is in bloom filter as expected'.format(animal))
        else:
            print('Something is terribly went wrong for {}'.format(animal))
            print('FALSE NEGATIVE!')
    # Membership existence for not inserted animals
    # There could be false positives
    other_animals = ['badger', 'cow', 'pig', 'sheep', 'bee', 'wolf', 'fox',
                     'whale', 'shark', 'fish', 'turkey', 'duck', 'dove',
                     'deer', 'elephant', 'frog', 'falcon', 'goat', 'gorilla',
                     'hawk' ]
    for other_animal in other_animals:
        if other_animal in bloom:
            print('{} is not in the bloom, but a false positive'.format(other_animal))
        else:
            print('{} is not in the bloom filter as expected'.format(other_animal))
if __name__ == '__main__':
    main()

输出为:

dog is in bloom filter as expected
cat is in bloom filter as expected
giraffe is in bloom filter as expected
fly is in bloom filter as expected
mosquito is in bloom filter as expected
horse is in bloom filter as expected
eagle is in bloom filter as expected
bird is in bloom filter as expected
bison is in bloom filter as expected
boar is in bloom filter as expected
butterfly is in bloom filter as expected
ant is in bloom filter as expected
anaconda is in bloom filter as expected
bear is in bloom filter as expected
chicken is in bloom filter as expected
dolphin is in bloom filter as expected
donkey is in bloom filter as expected
crow is in bloom filter as expected
crocodile is in bloom filter as expected
badger is not in the bloom filter as expected
cow is not in the bloom filter as expected
pig is not in the bloom filter as expected
sheep is not in the bloom, but a false positive
bee is not in the bloom filter as expected
wolf is not in the bloom filter as expected
frog is not in the bloom filter as expected
falcon is not in the bloom filter as expected
goat is not in the bloom filter as expected
gorilla is not in the bloom filter as expected
hawk is not in the bloom filter as expected

字典树实现

# -*- coding:utf-8 -*-
"""
Description:大变双向字典树
迭代次数默认最大999,可以增加但是没必要。其实能深到999层,那这个序列还是选择另外的处理方式吧。
@author: WangLeAi
@date: 2018/8/15
"""
class TrieNode(object):
    def __init__(self, value=None, count=0, parent=None):
        # 值
        self.value = value
        # 频数统计
        self.count = count
        # 父结点
        self.parent = parent
        # 子节点,{value:TrieNode}
        self.children = {}
class Trie(object):
    def __init__(self):
        # 创建空的根节点
        self.root = TrieNode()
    def insert(self, sequence):
        """
        基操,插入一个序列
        :param sequence: 列表
        :return:
        """
        cur_node = self.root
        for item in sequence:
            if item not in cur_node.children:
                # 插入结点
                child = TrieNode(value=item, count=1, parent=cur_node)
                cur_node.children[item] = child
                cur_node = child
            else:
                # 更新结点
                cur_node = cur_node.children[item]
                cur_node.count += 1
    def search(self, sequence):
        """
        基操,查询是否存在完整序列
        :param sequence: 列表
        :return:
        """
        cur_node = self.root
        mark = True
        for item in sequence:
            if item not in cur_node.children:
                mark = False
                break
            else:
                cur_node = cur_node.children[item]
        # 如果还有子结点,说明序列并非完整
        if cur_node.children:
            mark = False
        return mark
    def delete(self, sequence):
        """
        基操,删除序列,准确来说是减少计数
        :param sequence: 列表
        :return:
        """
        mark = False
        if self.search(sequence):
            mark = True
            cur_node = self.root
            for item in sequence:
                cur_node.children[item].count -= 1
                if cur_node.children[item].count == 0:
                    cur_node.children.pop(item)
                    break
                else:
                    cur_node = cur_node.children[item]
        return mark
    def search_part(self, sequence, prefix, suffix, start_node=None):
        """
        递归查找子序列,返回前缀和后缀结点
        此处简化操作,仅返回一位前后缀的内容与频数
        :param sequence: 列表
        :param prefix: 前缀字典,初始传入空字典
        :param suffix: 后缀字典,初始传入空字典
        :param start_node: 起始结点,用于子树的查询
        :return:
        """
        if start_node:
            cur_node = start_node
            prefix_node = start_node.parent
        else:
            cur_node = self.root
            prefix_node = self.root
        mark = True
        # 必须从第一个结点开始对比
        for i in range(len(sequence)):
            if i == 0:
                if sequence[i] != cur_node.value:
                    for child_node in cur_node.children.values():
                        self.search_part(sequence, prefix, suffix, child_node)
                    mark = False
                    break
            else:
                if sequence[i] not in cur_node.children:
                    for child_node in cur_node.children.values():
                        self.search_part(sequence, prefix, suffix, child_node)
                    mark = False
                    break
                else:
                    cur_node = cur_node.children[sequence[i]]
        if mark:
            if prefix_node.value:
                # 前缀数量取序列词中最后一词的频数
                if prefix_node.value in prefix:
                    prefix[prefix_node.value] += cur_node.count
                else:
                    prefix[prefix_node.value] = cur_node.count
            for suffix_node in cur_node.children.values():
                if suffix_node.value in suffix:
                    suffix[suffix_node.value] += suffix_node.count
                else:
                    suffix[suffix_node.value] = suffix_node.count
            # 即使找到一部分还需继续查找子结点
            for child_node in cur_node.children.values():
                self.search_part(sequence, prefix, suffix, child_node)
if __name__ == "__main__":
    trie = Trie()
    texts = [["葬爱", "少年", "葬爱", "少年", "慕周力", "哈哈"], ["葬爱", "少年", "阿西吧"], ["烈", "烈", "风", "中"], ["忘记", "了", "爱"],
             ["埋葬", "了", "爱"]]
    for text in texts:
        trie.insert(text)
    markx = trie.search(["忘记", "了", "爱"])
    print(markx)
    markx = trie.search(["忘记", "了"])
    print(markx)
    markx = trie.search(["忘记", "爱"])
    print(markx)
    markx = trie.delete(["葬爱", "少年", "王周力"])
    print(markx)
    prefixx = {}
    suffixx = {}
    trie.search_part(["葬爱", "少年"], prefixx, suffixx)
    print(prefixx)
    print(suffixx)

输出:


相关文章
|
8月前
|
算法 数据挖掘 Python
海量数据,3行Python代码直接获取!
海量数据,3行Python代码直接获取!
220 0
|
7月前
|
数据采集 数据可视化 大数据
Python在大数据处理中的应用实践
Python在大数据处理中扮演重要角色,借助`requests`和`BeautifulSoup`抓取数据,`pandas`进行清洗预处理,面对大规模数据时,`Dask`提供分布式处理能力,而`matplotlib`和`seaborn`则助力数据可视化。通过这些工具,数据工程师和科学家能高效地管理、分析和展示海量数据。
322 4
|
7月前
|
机器学习/深度学习 数据采集 分布式计算
如何用Python处理大数据分析?
【6月更文挑战第14天】如何用Python处理大数据分析?
94 4
|
8月前
|
数据可视化 大数据 数据处理
大数据处理时的python和R语言
【5月更文挑战第5天】本文讨论了在语言Python 和R中数据处理时的框架,比如Python中的 OpenCV, Matplotlib, NumPy, Pandas, 和Seaborn。
55 1
大数据处理时的python和R语言
|
8月前
|
SQL 数据可视化 数据挖掘
2024年8个Python高效数据分析的技巧。,2024年最新Python基础面试题2024
2024年8个Python高效数据分析的技巧。,2024年最新Python基础面试题2024
2024年8个Python高效数据分析的技巧。,2024年最新Python基础面试题2024
|
8月前
|
数据可视化 大数据 Python
python大数据分析处理
python大数据分析处理
93 0
|
8月前
|
大数据 Python
Python大数据之Python进阶(三)多进程的使用
Python大数据之Python进阶(三)多进程的使用
65 0
|
8月前
|
大数据 Python
Python大数据之Python进阶(六)多线程的使用
Python大数据之Python进阶(六)多线程的使用
74 0
|
8月前
|
数据采集 前端开发 JavaScript
Python大数据之Python进阶(一)介绍
Python大数据之Python进阶(一)介绍
134 0
|
机器学习/深度学习 数据采集 数据可视化
【Python入门系列】第十七篇:Python大数据处理和分析
大数据处理和分析是指对大规模数据集进行收集、存储、处理和分析的过程。随着互联网和信息技术的发展,我们可以轻松地获取到大量的数据,这些数据包含着宝贵的信息和洞察力。然而,由于数据量庞大、复杂性高和多样性,传统的数据处理和分析方法已经无法胜任。
3068 1