Python 大数据量文本文件高效解析方案代码实现

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: Python 大数据量文本文件高效解析方案代码实现

大数据量文本文件高效解析方案代码实现

测试环境

Python 3.6.2

Win 10 内存 8G,CPU I5 1.6 GHz

背景描述

这个作品来源于一个日志解析工具的开发,这个开发过程中遇到的一个痛点,就是日志文件多,日志数据量大,解析耗时长。在这种情况下,寻思一种高效解析数据解析方案。

解决方案描述

1、采用多线程读取文件

2、采用按块读取文件替代按行读取文件

由于日志文件都是文本文件,需要读取其中每一行进行解析,所以一开始会很自然想到采用按行读取,后面发现合理配置下,按块读取,会比按行读取更高效。

按块读取来的问题就是,可能导致完整的数据行分散在不同数据块中,那怎么解决这个问题呢?解答如下:

将数据块按换行符\n切分得到日志行列表,列表第一个元素可能是一个完整的日志行,也可能是上一个数据块末尾日志行的组成部分,列表最后一个元素可能是不完整的日志行(即下一个数据块开头日志行的组成部分),也可能是空字符串(日志块中的日志行数据全部是完整的),根据这个规律,得出以下公式,通过该公式,可以得到一个新的数据块,对该数据块二次切分,可以得到数据完整的日志行

上一个日志块首部日志行 +\n + 尾部日志行 + 下一个数据块首部日志行 + \n + 尾部日志行 + ...

3、将数据解析操作拆分为可并行解析部分和不可并行解析部分

数据解析往往涉及一些不可并行的操作,比如数据求和,最值统计等,如果不进行拆分,并行解析时势必需要添加互斥锁,避免数据覆盖,这样就会大大降低执行的效率,特别是不可并行操作占比较大的情况下。

对数据解析操作进行拆分后,可并行解析操作部分不用加锁。考虑到Python GIL的问题,不可并行解析部分替换为单进程解析。

4、采用多进程解析替代多线程解析

采用多进程解析替代多线程解析,可以避开Python GIL全局解释锁带来的执行效率问题,从而提高解析效率。

5、采用队列实现“协同”效果

引入队列机制,实现一边读取日志,一边进行数据解析:

  1. 日志读取线程将日志块存储到队列,解析进程从队列获取已读取日志块,执行可并行解析操作
  2. 并行解析操作进程将解析后的结果存储到另一个队列,另一个解析进程从队列获取数据,执行不可并行解析操作。

代码实现

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from datetime import datetime
from joblib import Parallel, delayed, parallel_backend
from collections import deque
from multiprocessing import cpu_count
import threading
class LogParser(object):
    def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()):
        self.log_unparsed_queue = deque() # 用于存储未解析日志
        self.log_line_parsed_queue = deque()  # 用于存储已解析日志行
        self.is_all_files_read = False  # 标识是否已读取所有日志文件
        self.process_num_for_log_parsing = process_num_for_log_parsing # 并发解析日志文件进程数
        self.chunk_size = chunk_size # 每次读取日志的日志块大小
        self.files_read_list = [] # 存放已读取日志文件
        self.log_parsing_finished = False # 标识是否完成日志解析
    def read_in_chunks(self, filePath, chunk_size=1024*1024):
        """
        惰性函数(生成器),用于逐块读取文件。
        默认区块大小:1M
        """
        with open(filePath, 'r', encoding='utf-8') as f:            
            while True:
                chunk_data = f.read(chunk_size)
                if not chunk_data:
                    break
                yield chunk_data
    def read_log_file(self, logfile_path):
        '''
        读取日志文件
        这里假设日志文件都是文本文件,按块读取后,可按换行符进行二次切分,以便获取行日志
        '''
        temp_list = []  # 二次切分后,头,尾行日志可能是不完整的,所以需要将日志块头尾行日志相连接,进行拼接
        for chunk in self.read_in_chunks(logfile_path, self.chunk_size):
            log_chunk = chunk.split('\n')
            temp_list.extend([log_chunk[0], '\n'])
            temp_list.append(log_chunk[-1])
            self.log_unparsed_queue.append(log_chunk[1:-1])
        self.log_unparsed_queue.append(''.join(temp_list).split('\n'))
        self.files_read_list.remove(logfile_path)
    def start_processes_for_log_parsing(self):
        '''启动日志解析进程'''
        with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing):
            Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing))
        self.log_parsing_finished = True
    def parse_logs(self):
        '''解析日志'''
        method_url_re_pattern = re.compile('(HEAD|POST|GET)\s+([^\s]+?)\s+',re.DOTALL)
        url_time_taken_extractor = re.compile('HTTP/1\.1.+\|(.+)\|\d+\|', re.DOTALL)
        while self.log_unparsed_queue or self.files_read_list:
            if not self.log_unparsed_queue:
                continue
            log_line_list = self.log_unparsed_queue.popleft()
            for log_line in log_line_list:
                #### do something with log_line
                if not log_line.strip():
                    continue
                res = method_url_re_pattern.findall(log_line)
                if not res:
                    print('日志未匹配到请求URL,已忽略:\n%s' % log_line)
                    continue
                method = res[0][0]
                url = res[0][1].split('?')[0]  # 去掉了 ?及后面的url参数
                # 提取耗时
                res = url_time_taken_extractor.findall(log_line)
                if res:
                    time_taken = float(res[0])
                else:
                    print('未从日志提取到请求耗时,已忽略日志:\n%s' % log_line)
                    continue
                # 存储解析后的日志信息
                self.log_line_parsed_queue.append({'method': method,
                                                   'url': url,
                                                   'time_taken': time_taken,
                                                   })
    def collect_statistics(self):
        '''收集统计数据'''
        def _collect_statistics():
            while self.log_line_parsed_queue or not self.log_parsing_finished:
                if not self.log_line_parsed_queue:
                    continue
                log_info = self.log_line_parsed_queue.popleft()
                # do something with log_info
        with parallel_backend("multiprocessing", n_jobs=1):
            Parallel()(delayed(_collect_statistics)() for i in range(1))
    def run(self, file_path_list):
        # 多线程读取日志文件
        for file_path in file_path_list:
            thread = threading.Thread(target=self.read_log_file,
                                      name="read_log_file",
                                      args=(file_path,))
            thread.start()
            self.files_read_list.append(file_path)
        # 启动日志解析进程
        thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
        thread.start()
        # 启动日志统计数据收集进程
        thread = threading.Thread(target=self.collect_statistics, name="collect_statistics")
        thread.start()
        start = datetime.now()
        while threading.active_count() > 1:
            print('程序正在努力解析日志...')
            time.sleep(0.5)
        end = datetime.now()
        print('解析完成', 'start', start, 'end', end, '耗时', end - start)
if __name__ == "__main__":
    log_parser = LogParser()
    log_parser.run(['access.log', 'access2.log'])

注意:

需要合理的配置单次读取文件数据块的大小,不能过大,或者过小,否则都可能会导致数据读取速度变慢。笔者实践环境下,发现10M~15M每次是一个比较高效的配置。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
17天前
|
开发框架 数据建模 中间件
Python中的装饰器:简化代码,增强功能
在Python的世界里,装饰器是那些静悄悄的幕后英雄。它们不张扬,却能默默地为函数或类增添强大的功能。本文将带你了解装饰器的魅力所在,从基础概念到实际应用,我们一步步揭开装饰器的神秘面纱。准备好了吗?让我们开始这段简洁而富有启发性的旅程吧!
26 6
|
5天前
|
计算机视觉 Python
如何使用Python将TS文件转换为MP4
本文介绍了如何使用Python和FFmpeg将TS文件转换为MP4文件。首先需要安装Python和FFmpeg,然后通过`subprocess`模块调用FFmpeg命令,实现文件格式的转换。代码示例展示了具体的操作步骤,包括检查文件存在性、构建FFmpeg命令和执行转换过程。
29 7
|
2天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
15 2
|
10天前
|
数据可视化 Python
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
通过这些思维导图和分析说明表,您可以更直观地理解和选择适合的数据可视化图表类型,帮助更有效地展示和分析数据。
50 8
|
18天前
|
API Python
【Azure Developer】分享一段Python代码调用Graph API创建用户的示例
分享一段Python代码调用Graph API创建用户的示例
41 11
|
19天前
|
测试技术 Python
探索Python中的装饰器:简化代码,增强功能
在Python的世界中,装饰器是那些能够为我们的代码增添魔力的小精灵。它们不仅让代码看起来更加优雅,还能在不改变原有函数定义的情况下,增加额外的功能。本文将通过生动的例子和易于理解的语言,带你领略装饰器的奥秘,从基础概念到实际应用,一起开启Python装饰器的奇妙旅程。
34 11
|
15天前
|
Python
探索Python中的装饰器:简化代码,增强功能
在Python的世界里,装饰器就像是给函数穿上了一件神奇的外套,让它们拥有了超能力。本文将通过浅显易懂的语言和生动的比喻,带你了解装饰器的基本概念、使用方法以及它们如何让你的代码变得更加简洁高效。让我们一起揭开装饰器的神秘面纱,看看它是如何在不改变函数核心逻辑的情况下,为函数增添新功能的吧!
|
16天前
|
程序员 测试技术 数据安全/隐私保护
深入理解Python装饰器:提升代码重用与可读性
本文旨在为中高级Python开发者提供一份关于装饰器的深度解析。通过探讨装饰器的基本原理、类型以及在实际项目中的应用案例,帮助读者更好地理解并运用这一强大的语言特性。不同于常规摘要,本文将以一个实际的软件开发场景引入,逐步揭示装饰器如何优化代码结构,提高开发效率和代码质量。
42 6
|
20天前
|
Python
如何提高Python代码的可读性?
如何提高Python代码的可读性?
33 4
|
18天前
|
PHP 开发者 容器
PHP命名空间深度解析:避免命名冲突与提升代码组织####
本文深入探讨了PHP中命名空间的概念、用途及最佳实践,揭示其在解决全局命名冲突、提高代码可维护性方面的重要性。通过生动实例和详尽分析,本文将帮助开发者有效利用命名空间来优化大型项目结构,确保代码的清晰与高效。 ####
18 1
下一篇
DataWorks