python多进程日志以及分布式日志的实现方式

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: python日志在多进程环境下的问题python日志模块logging支持多线程,但是在多进程下写入日志文件容易出现下面的问题:PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。也就是日志文件被占用的情况,原因是多个进程的文件handler对日志文件进行操作产生的。

python日志在多进程环境下的问题

python日志模块logging支持多线程,但是在多进程下写入日志文件容易出现下面的问题:


PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。


也就是日志文件被占用的情况,原因是多个进程的文件handler对日志文件进行操作产生的。


这个问题经常在TimedRotatingFileHandler、RotatingFileHandler中出现。

解决办法

题主在网上搜集了各种解决上面问题的办法,基本以下面三个方向为主:

  • 安装第三方库提供的handler
  • 重写filehandler加全局锁
  • 使用队列将消息传递


但是三种方法各有小缺陷:

  • 第三方库很久无人维护,且支持的功能比较单一,无法满足生产环境的需求。
  • 轮转日志的时候由于全局锁的存在,其他子进程无法记录日志,有丢失日志的风险。
  • 使用多进程消息队列的缺点在于使用困难,如果是多模块编程,需要将全局队列传来传去,在大型项目中显得很麻烦。


经过对官网的研究 ,题主无意中找到了一种非常方便且高效的方法,并且经过一定的修改使这种方法可用于分布式日志,且支持多语言日志的处理。

唯一的不足是需要新学习一个zmq通信协议,但是这并不是问题,如果只是想要一个解决方案并立即投入使用,只需要按照下面的方法编写,无需关注zmq的相关知识。

基于zmq的分布式日志

实现思路

  • 通过zmq的多对一通信,将多个地方的日志发送到一个地方集中处理,从而实现分布式日志。
  • 这个方法不仅可以解决python分布日志的问题,还可以很好的兼容其他语言,比如项目中还有C、java,那么可以将它们中的日志也发送过来,一并处理。

看到这很多人可能明白了,这个方法类似官网提供的SocketHandler,但本方法其实是基于QueueHandler实现的,有利于发挥zmq易用性、可插拔、并发性能好的优点。

代码实现

首先是集中处理日志的程序,也就是上面所说"多对一"中的一

import zmq
import logging
from logging import handlers
 
class ZeroMQSocketListener(handlers.QueueListener):
    def __init__(self, uri="tcp://127.0.0.1:5555", *handlers,**kwargs):
        self.respect_handler_level = True     # handler日志等级启用,允许对handler设置setLevel,False则忽视级别
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = self.ctx.socket(zmq.SUB)
        socket.bind(uri)
        socket.setsockopt_string(zmq.SUBSCRIBE, '')     # 订阅所有主题
        super().__init__(socket, *handlers, respect_handler_level=self.respect_handler_level)
 
    def dequeue(self,block):
        msg = self.queue.recv_json()
        # print('111',msg)    # 测试用
        return logging.makeLogRecord(msg)
 
def main_logger():
    # 日志集中处理区,在主程序中调用一次
    
    # handlers配置区,filter可选
    formatter = logging.Formatter("%(name)s - %(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s")
    console = logging.StreamHandler()
    console.setLevel(logging.ERROR)
    ch = handlers.TimedRotatingFileHandler(r'logs\face.log',when='M',
                                           # backupCount=180,
                                           encoding='utf-8')
    ch.setLevel(logging.INFO)
    ch.setFormatter(formatter)  # add formatter to ch
    
    # 设置监听的端口,并传递handlers
    loggerListener = ZeroMQSocketListener("tcp://127.0.0.1:5555",*(ch,console))
    loggerListener.start()   # 开启一个子线程处理记录器监听
    
# 主进程调用一次,非阻塞
main_logger()

自此,日志集中处理就结束了,是不是很简单,而且需要注意,我们这里不需要用到root logger,因为ZeroMQSocketListener会自动调用各种handlers将日志内容进行处理,想当于替代了logger的工作,所以也就没必要声明一个logger出来了。


更新:

这里的main_logger()是非阻塞,也就是下面还可以写其他代码,但是如果什么代码都没有,那么主进程就会直接退出,日志就收不到了。

如果接下来不需要做其他工作,那么请在main_logger()下方使用while True:time.sleep(0.5) 将主进程阻塞。


  • 需要重点关注通信地址"tcp://127.0.0.1:5555",因为其他地方的日志都会发送到这里来。


接下来是子进程中或者是你想记录日志的任何地方,比如在其他同事的电脑里

  • subprocess.py
import logging,zmq
from logging import handlers
 
# 我们需要的handler
class ZeroMQSocketHandler(handlers.QueueHandler):
    def __init__(self, uri="tcp://127.0.0.1:5555", socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = self.ctx.socket(socktype)
        socket.connect(uri)
        super().__init__(socket)
    def enqueue(self, record):
        self.queue.send_json(record.__dict__)
    def close(self):
        self.queue.close()
        
# 创建远端日志
rmtlogger = logging.getLogger('sub_root_name')    ##
rmtlogger.setLevel(logging.INFO)     # 建议设置一下,有时候默认是WARNING级别
rmtlogger.propagate=False    # 不允许传递,日志传递到这里就发送到主进程中
 
# 配置handler
zmqhandler = ZeroMQSocketHandler()
zmqhandler.setLevel(logging.INFO)
rmtlogger.addHandler(zmqhandler)
 
# if you have submodule
# import submodule  
 
# 记录日志
rmtlogger.info("这是一条遥远的日志")
  • 如果是多进程环境下,您大可直接将上面的代码直接开启到多个子进程中,并不会出现网络问题。


logger可以通过python日志的name系统进行传递,也就是说如果子进程中还有其他模块,可以通过日志传递系统将其他模块产生的日志传递过来,最后一并发送给监听器,就像下面:

  • submodule.py
# subprocess.py的子模块,如需测试注意调用
import logging
subMolduleLogger = logging.getLogger(f'sub_root_name.modulename')
 
subMolduleLogger.info("这是一条子模块日志")
# 这部分内容需要logging基础知识
  • 上面这条日志会传递给rmtlogger,通过rmtlogger发送到主进程。


在主进程中,设置了logging.Formatter对象,可以将产生日志的名字打印出来,用于区分日志产生的位置。

多语言支持

由于zmq本身就支持多语言,比如你使用c语言或其他语言,只需要在代码中使用zmq将日志通过json发送过来,python日志可以通过dict方法重建logger对象,具体可以打印上面代码中ZeroMQSocketListener.dequeue中的msg进行摸索,实现起来还是比较简单的。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
7月前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
319 73
|
4月前
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
|
4月前
|
消息中间件 分布式计算 并行计算
Python 高级编程与实战:构建分布式系统
本文深入探讨了 Python 中的分布式系统,介绍了 ZeroMQ、Celery 和 Dask 等工具的使用方法,并通过实战项目帮助读者掌握这些技术。ZeroMQ 是高性能异步消息库,支持多种通信模式;Celery 是分布式任务队列,支持异步任务执行;Dask 是并行计算库,适用于大规模数据处理。文章结合具体代码示例,帮助读者理解如何使用这些工具构建分布式系统。
|
6月前
|
分布式计算 DataWorks 数据处理
产品测评 | 上手分布式Python计算服务MaxFrame产品最佳实践
MaxFrame是阿里云自研的分布式计算框架,专为大数据处理设计,提供高效便捷的Python开发体验。其主要功能包括Python编程接口、直接利用MaxCompute资源、与MaxCompute Notebook集成及镜像管理功能。本文基于MaxFrame最佳实践,详细介绍了在DataWorks中使用MaxFrame创建数据源、PyODPS节点和MaxFrame会话的过程,并展示了如何通过MaxFrame实现分布式Pandas处理和大语言模型数据处理。测评反馈指出,虽然MaxFrame具备强大的数据处理能力,但在文档细节和新手友好性方面仍有改进空间。
|
6月前
|
数据采集 人工智能 分布式计算
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
在数据驱动的时代,大数据分析和AI模型训练对数据预处理的效率要求极高。传统的Pandas工具在小数据集下表现出色,但面对大规模数据时力不从心。阿里云推出的Python分布式计算框架MaxFrame,以“Pandas风格”为核心设计理念,旨在降低分布式计算门槛,同时支持超大规模数据处理。MaxFrame不仅保留了Pandas的操作习惯,还通过底层优化实现了高效的分布式调度、内存管理和容错机制,并深度集成阿里云大数据生态。本文将通过实践评测,全面解析MaxFrame的能力与价值,展示其在大数据和AI场景中的卓越表现。
134 4
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
|
5月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
171 0
|
6月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
319 8
|
5月前
|
Python
云产品评测|分布式Python计算服务MaxFrame获奖名单公布!
云产品评测|分布式Python计算服务MaxFrame获奖名单公布!
120 0
|
6月前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
6月前
|
SQL 分布式计算 数据处理
云产品评测|分布式Python计算服务MaxFrame | 在本地环境中使用MaxFrame + 基于MaxFrame实现大语言模型数据处理
本文基于官方文档,介绍了由浅入深的两个部分实操测试,包括在本地环境中使用MaxFrame & 基于MaxFrame实现大语言模型数据处理,对步骤有详细说明。体验下来对MaxCompute的感受是很不错的,值得尝试并使用!
146 1

热门文章

最新文章

推荐镜像

更多