Python海量数据的生成与处理
参考:https://blog.csdn.net/quicktest/article/details/7453189
概述
生成1亿条数据
代码如下:
# 生成1亿个IP def generateRandom(rangeFrom, rangeTo): import random return random.randint(rangeFrom,rangeTo) def generageMassiveIPAddr(fileLocation,numberOfLines): IP = [] file_handler = open(fileLocation, 'a+') for i in range(numberOfLines): IP.append('10.197.' + str(generateRandom(0,255))+'.'+ str(generateRandom(0,255)) + '\n') file_handler.writelines(IP) file_handler.close() if __name__ == '__main__': from time import ctime print(ctime()) for i in range(10): print( ' ' + str(i) + ": " + ctime()) generageMassiveIPAddr('d:\\massiveIP.txt', 10000000) print(ctime())
程序输出如下:
Thu Dec 30 13:01:34 2021 0: Thu Dec 30 13:01:34 2021 1: Thu Dec 30 13:02:12 2021 2: Thu Dec 30 13:02:50 2021 3: Thu Dec 30 13:03:28 2021 4: Thu Dec 30 13:04:07 2021 5: Thu Dec 30 13:04:45 2021 6: Thu Dec 30 13:05:25 2021 7: Thu Dec 30 13:06:07 2021 8: Thu Dec 30 13:06:46 2021 9: Thu Dec 30 13:07:25 2021 Thu Dec 30 13:08:04 2021
可以看出,每1千万条数据需要40s左右,1亿条一共耗时6min30s,一共330s。
生成的文件大小为:
1.4GB
直接读取测试
加载数据
代码如下:
import pandas as pd from time import ctime print(ctime()) df = pd.read_csv("d:\\massiveIP.txt",header=None,names=["IP"]) print(ctime())
用时 29s,输出如下:
Thu Dec 30 13:20:24 2021 Thu Dec 30 13:20:53 2021
查看占用内存大小:
df.info()
输出如下:
<class 'pandas.core.frame.DataFrame'> RangeIndex: 100000000 entries, 0 to 99999999 Data columns (total 1 columns): # Column Dtype --- ------ ----- 0 IP object dtypes: object(1) memory usage: 762.9+ MB
确定重复次数的最大值
确定重复值的个数
可以采用value_counts():
value_counts()是一种查看表格某列中有多少个不同值的快捷方法,并计算每个不同值有在该列中有多少重复值。
value_counts()是Series拥有的方法,一般在DataFrame中使用时,需要指定对哪一列或行使用
%%time df1 = df["IP"].value_counts() df1
输出:
Wall time: 31.6 s 10.197.87.47 1678 10.197.38.53 1677 10.197.42.238 1676 10.197.28.183 1676 10.197.63.208 1674 ... 10.197.30.195 1381 10.197.91.33 1379 10.197.7.231 1376 10.197.11.136 1366 10.197.241.199 1358 Name: IP, Length: 65536, dtype: int64
耗时31.6s
生成10亿条数据
由于生成1亿条数据没压力,现在生成5亿条数据
将:
if __name__ == '__main__': from time import ctime print(ctime()) for i in range(50): # 原来为10,现在修改为50 print( ' ' + str(i) + ": " + ctime()) generageMassiveIPAddr('d:\\massiveIP.txt', 10000000) print(ctime())
耗时27min35.8s,
生成的文件大小为:7.04GB,共7559142440.96字节
一共5亿条数据,每条15.12个字节
输出为:
Thu Dec 30 15:04:51 2021 0: Thu Dec 30 15:04:51 2021 1: Thu Dec 30 15:05:32 2021 2: Thu Dec 30 15:06:12 2021 3: Thu Dec 30 15:06:51 2021 4: Thu Dec 30 15:07:29 2021 5: Thu Dec 30 15:08:08 2021 6: Thu Dec 30 15:08:48 2021 7: Thu Dec 30 15:09:30 2021 8: Thu Dec 30 15:10:11 2021 9: Thu Dec 30 15:10:51 2021 10: Thu Dec 30 15:11:30 2021 11: Thu Dec 30 15:12:10 2021 12: Thu Dec 30 15:12:54 2021 13: Thu Dec 30 15:13:42 2021 14: Thu Dec 30 15:14:23 2021 15: Thu Dec 30 15:15:05 2021 16: Thu Dec 30 15:15:44 2021 17: Thu Dec 30 15:16:25 2021 18: Thu Dec 30 15:17:05 2021 19: Thu Dec 30 15:17:45 2021 20: Thu Dec 30 15:18:23 2021 21: Thu Dec 30 15:19:03 2021 22: Thu Dec 30 15:19:47 2021 23: Thu Dec 30 15:20:34 2021 36: Thu Dec 30 15:29:28 2021 37: Thu Dec 30 15:30:12 2021 38: Thu Dec 30 15:30:58 2021 39: Thu Dec 30 15:31:46 2021 Thu Dec 30 15:32:27 2021
直接读取测试
加载数据
代码如下:
import pandas as pd from time import ctime print(ctime()) df = pd.read_csv("d:\\massiveIP.txt",header=None,names=["IP"]) print(ctime())
打开资源监视器,会查看到占用内存情况:
vscode占用内存增加较快,我不断的关掉qq,关掉钉钉,关掉不用的浏览器,结果。。。
在2min49.5s后,输出如下:
MemoryError: Unable to allocate 3.73 GiB for an array with shape (500000000,) and data type object
这时程序即使终端,也不会释放内存,可以手动释放内存
a = [] for x in locals().keys(): print(x) a.append(x) import gc # for i in a: # del locals()[x] gc.collect()
如果想调整可以内存大小,可参考:
https://blog.csdn.net/com_fang_bean/article/details/106862052
但这里我就不再调整内存大小,而是通过别的手段来加载数据。
通过分块加载数据
代码:
import pandas as pd from tqdm import tqdm f = open('d:\\massiveIP.txt') reader = pd.read_csv(f, sep=',',header=None,names=["IP"], iterator=True) loop = True chunkSize = 100000000 chunks = [] for i in tqdm(range(10)) : try: chunk = reader.get_chunk(chunkSize) df1 = chunk["IP"].value_counts() chunks.append(df1) del chunk except StopIteration: # loop = False print("Iteration is stopped.")
耗时6m3.6s
输出为:
100%|██████████| 10/10 [06:03<00:00, 36.33s/it]
加载每个块的统计结果
代码:
re_chunks = [] for i in chunks: df1 = i.reset_index() re_chunks.append(df1) df22 = pd.concat(re_chunks, ignore_index=True) df22
输出为:
index IP 0 10.197.87.47 1678 1 10.197.38.53 1677 2 10.197.42.238 1676 3 10.197.28.183 1676 4 10.197.63.208 1674 ... ... ... 327675 10.197.215.196 1380 327676 10.197.18.130 1379 327677 10.197.251.175 1371 327678 10.197.57.85 1368 327679 10.197.115.87 1358
通过分组聚合重置排序获取IP数量的值
df22.groupby(by=["index"]).agg({"IP":"sum"}).reset_index().sort_values(by=["IP"],ascending=False)
输出如下:
index IP 32949 10.197.213.31 7982 48006 10.197.37.219 7972 63967 10.197.93.7 7961 40524 10.197.240.167 7946 45524 10.197.28.6 7945 ... ... ... 54610 10.197.60.172 7302 8240 10.197.127.141 7293 59005 10.197.76.210 7292 38627 10.197.233.73 7286 11341 10.197.138.168 7282
校验结果是否正确
df22["IP"].sum()
输出如下:
500000000
与原始数量一致,表示过程没有问题,到此,基于pandas的海量数据处理顺利完成。