一、代码实现
import logging import os import threading import time from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer logger = logging.getLogger(__name__) class LogWatcher(FileSystemEventHandler): def __init__(self, log_file, on_modified_callback=None): """" 初始化 LogWatcher 类的实例。 参数: - log_file:日志文件的路径 - on_modified_callback:可选的回调函数,在文件修改时调用 属性: - log_file:日志文件的路径 - file_object:日志文件对象 - on_modified_callback:文件修改回调函数 - last_line:最后一行文本 - observer:观察者对象 - match_string:需要匹配的字符串 - stop_watching:停止监视的标志 """ self.log_file = log_file self.file_object = open(log_file, 'rb') self.on_modified_callback = on_modified_callback self.last_line = self.get_last_line() # 初始化时获取最后一行文本 self.observer = Observer() self.observer.schedule(self, ".", recursive=False) self.match_string = None self.stop_watching = False def start(self): """ 启动观察者对象,开始监视文件变化。 """ self.observer.start() def stop(self): """ 停止观察者对象,结束监视文件变化。 """ self.observer.stop() self.observer.join() self.file_object.close() def get_last_line(self): """ 获取日志文件的最后一行文本。它通过将文件指针移动到文件末尾,然后逐个字符向前搜索,直到找到换行符为止。 返回值: - 最后一行文本,如果文件为空则返回None """ # 将文件指针移动到文件末尾 self.file_object.seek(0, os.SEEK_END) # 获取当前文件指针的位置(此时指针在最后一行的末尾) position = self.file_object.tell() try: # 尝试向前移动两个字节 new_position = max(position - 2, 0) self.file_object.seek(new_position, os.SEEK_SET) except OSError as e: # 如果发生错误,可能是文件太小,返回None return None # 逐个字符向前搜索,确保文件指针最终停在当前行的第一个字符处 while True: # read(1)读取的是指针位置的下一个字符,每次调用read(1)都会读取一个字符,并将指针向后移动一个字符的位置。 char = self.file_object.read(1).decode('utf-8', errors='ignore') if char == '\n': break if new_position == 0: # 如果已经到达文件开头,跳出循环 break # 尝试向前移动一个字节位置,确保不越界到文件开头 new_position = max(new_position - 1, 0) # 将文件指针移动到新的位置 self.file_object.seek(new_position, os.SEEK_SET) # last_line = self.file_object.readline().decode('utf-8', errors='ignore').strip() last_line = self.file_object.read(position - new_position).decode('utf-8', errors='ignore').strip() # 输出调试信息 logger.debug(f'Reading line: {last_line}') return last_line def on_modified(self, event): """ on_modified方法是FileSystemEventHandler的回调方法,当日志文件发生变化时,都会调用这个方法。 参数: - event:文件变化事件对象 """ # 注意,这里一个要用绝对路径比较,不能直接使用 event.src_path == self.log_file, # event.src_path == self.log_file 的值为false # if event.src_path == self.log_file: if os.path.abspath(event.src_path) == os.path.abspath(self.log_file): # 在文件发生变化时,实时获取最后一行文本 self.last_line = self.get_last_line() # 用户可在外部传入一个回调方法,在文本发生变化时执行该事件 if self.on_modified_callback: self.on_modified_callback() # 调用基类的同名方法,以便执行基类的默认行为 super(LogWatcher, self).on_modified(event) def tail_last_line_and_match(self, match_string=None, max_match_seconds=10): """ 实时监控日志文件的变化,并实时获取最后一行文本。如果匹配到指定的字符串,停止监视。 参数: - match_string:需要匹配的字符串 """ self.match_string = match_string self.start() end_time = time.time() + max_match_seconds try: while not self.stop_watching and time.time() <= end_time: if self.match_string and self.match_string in self.last_line: self.stop_watching = True except KeyboardInterrupt: pass self.stop_watching = True # 停止监视循环 def write_logs(log_file): """在新线程中写入日志""" for i in range(10): with open(log_file, 'a') as file: file.write(f'New log entry {i}\n') time.sleep(1) # 每秒写入一次日志 if __name__ == '__main__': import logging logging.basicConfig(level=logging.DEBUG) log_file = 'demo.log' # 创建日志文件并写入示例日志 with open(log_file, 'w') as file: file.write('This is the first line of the log.\n') file.write('This is the second line of the log.\n') log_watcher = LogWatcher(log_file) # 启动新线程写入日志 write_thread = threading.Thread(target=write_logs, args=(log_file,)) write_thread.start() # 启动实时监控日志文件变化,并打印最后一行文本,直到匹配到指定字符串或超时才停止监视 log_watcher.tail_last_line_and_match(match_string='New log entry 9', max_match_seconds=20) # 等待写入线程结束 write_thread.join()
三、Demo验证
运行代码,控制台的输出结果:
DEBUG:__main__:Reading line: This is the second line of the log. DEBUG:__main__:Reading line: New log entry 0 DEBUG:__main__:Reading line: New log entry 1 DEBUG:__main__:Reading line: New log entry 2 DEBUG:__main__:Reading line: New log entry 3 DEBUG:__main__:Reading line: New log entry 4 DEBUG:__main__:Reading line: New log entry 5 DEBUG:__main__:Reading line: New log entry 6 DEBUG:__main__:Reading line: New log entry 7 DEBUG:__main__:Reading line: New log entry 8 DEBUG:__main__:Reading line: New log entry 9 Process finished with exit code 0