开发者社区> 问答> 正文

消耗Watchdog队列事件的Python并行线程

赏金将在3天后过期。这个问题的答案有资格获得+50声望奖励。弗林(Furin)希望对该问题“引起更多关注”:

嗨!任何人都可以给我有关使用队列在python中传递事件的建议吗?

我有这段代码,每当外部程序(TCPdump)在我的目录中创建一个* pcap文件时,该事件便会将事件放入队列中。我的问题是,尽管我从process()函数获得了打印信息,但我总是得到一个空队列。

我究竟做错了什么?队列是否正确定义并在两个类之间共享?

EDIT ----------------- 我也许理解为什么我有一个空队列,我认为这是因为我正在打印自己初始化的队列,直到它被Handler类填充。我修改了代码,并创建了两个应该占用相同队列的进程,但是现在执行卡在queue.put()和线程ReadPcapFiles()上了。

这里是更新的代码:

import time
import pyshark
import concurrent.futures
import threading
import logging
from queue import Queue
from multiprocessing import Process
from watchdog.observers import Observer, api
from watchdog.events import PatternMatchingEventHandler

class Handler(PatternMatchingEventHandler):
    patterns = ["\*pcap", "\*pcapng"]

    def __init__(self, queue):
        PatternMatchingEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):
        #print(f'event type: {event.event_type}  path : {event.src_path}')   
        self.queue.put(event.src_path)
        logging.info(f"Storing message: {self.queue.qsize()}")
        print("Producer queue: ", list(self.queue.queue))
        #self.queue.get()

    def on_created(self, event):
        self.process(event)          


def StartWatcher(watchdogq, event):
    path = 'C:\\...'
    handler = Handler(watchdogq)
    observer = Observer()
    while not event.is_set():
        observer.schedule(handler, path, recursive=False)
        print("About to start observer")
        observer.start()
        try:
            while True:
                time.sleep(1)
        except Exception as error:
            observer.stop()
            print("Error: " + str(error))
        observer.join()


def ReadPcapFiles(consumerq, event):
    while not event.is_set() or not consumerq.empty():
        print("Consumer queue: ", consumerq.get())
        #print("Consumer queue: ", list(consumerq.queue))

    # pcapfile = pyshark.FileCapture(self.queue.get())
    #     for packet in pcapfile:
    #         countPacket +=1 

if __name__ == '__main__':
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
    logging.getLogger().setLevel(logging.DEBUG)

    queue = Queue()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(StartWatcher,queue, event)
        executor.submit(ReadPcapFiles,queue, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

旧代码:

import time
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

class Handler(PatternMatchingEventHandler):
    patterns = ["\*pcap", "\*pcapng"]

    def __init__(self, queue):
        PatternMatchingEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):
        print(f'event type: {event.event_type}  path : {event.src_path}')   
        self.queue.put(event.src_path)

    def on_created(self, event):
        self.process(event)

class Watcher():
    def __init__(self, path):
        self.queue = Queue()
        self.observer = Observer()
        self.handler = Handler(self.queue)
        self.path = path

    def start(self): 
        self.observer.schedule(self.handler, self.path, recursive=True)
        self.observer.start()
        try:
            while True:
                time.sleep(1)
                self.queue.get()
                print(list(self.queue.queue))
        except Exception as error:
            self.observer.stop()
            print("Error: " + str(error))
        self.observer.join()  

if __name__ == '__main__':
    watcher = Watcher('C:\\...')
    watcher.start()

问题来源:stackoverflow

展开
收起
is大龙 2020-03-24 17:54:11 1479 0
1 条回答
写回答
取消 提交回答
  • 这对我有用(谢谢,我从这个答案中得到了主要思想,谢谢!),但是请注意,我认为这是一种解决方法,因此,如果有人对此有更好的解决方案,或者可以更好地解释Python中这种行为的原因,请执行毫不犹豫地回答!

    我的猜测是我有两个主要问题: -我正在另一个线程内启动Watchdog进程(这以某种方式阻塞了我的队列消耗线程)。 -Python线程不能真正并行运行,因此有必要启动一个独立的进程。

    这是我的代码:

    import time
    import pyshark
    import threading
    import logging
    import os
    from queue import Queue
    from multiprocessing import Process, Pool
    from watchdog.observers import Observer, api
    from watchdog.events import PatternMatchingEventHandler
    from concurrent.futures import ThreadPoolExecutor
    
    class Handler(PatternMatchingEventHandler):
        patterns = ["\*pcap", "\*pcapng"]
    
        def __init__(self, queue):
            PatternMatchingEventHandler.__init__(self)
            self.queue = queue
    
        def process(self, event):  
            self.queue.put(event.src_path)
            logging.info(f"Storing message: {self.queue.qsize()}")
            print("Producer queue: ", list(self.queue.queue))
    
    
        def on_created(self, event):
            #wait that the transfer of the file is finished before processing it
            file_size = -1
            while file_size != os.path.getsize(event.src_path):
                file_size = os.path.getsize(event.src_path)
                time.sleep(1)
    
            self.process(event)         
    
    def ConsumeQueue(consumerq):
        while True:
            if not consumerq.empty(): 
                pool = Pool()
                pool.apply_async(ReadPcapFiles, (consumerq.get(), ))
            else:    
                time.sleep(1)
    
    def ReadPcapFiles(get_event):        
        createdFile = get_event
        print(f"This is my event in ReadPacapFile {createdFile}")
    
        countPacket = 0
        bandwidth = 0
        pcapfile = pyshark.FileCapture(createdFile)
        for packet in pcapfile:
            countPacket +=1
            bandwidth = bandwidth + int(packet.length)
        print(f"Packet nr {countPacket}")
        print(f"Byte per second {bandwidth}")
    
    
    if __name__ == '__main__':
    
        format = "%(asctime)s: %(message)s"
        logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
        logging.getLogger().setLevel(logging.DEBUG)
    
        queue = Queue()
        path = 'C:\\...'
    
        worker = threading.Thread(target=ConsumeQueue, args=(queue, ), daemon=True)
        print("About to start worker")
        worker.start()
    
        event_handler = Handler(queue)
        observer = Observer()
        observer.schedule(event_handler, path, recursive=False)
        print("About to start observer")
        observer.start()
    
        try:
            while True:
                time.sleep(1)
        except Exception as error:
            observer.stop()
            print("Error: " + str(error))
        observer.join()
    

    回答来源:stackoverflow

    2020-03-24 17:54:22
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
From Python Scikit-Learn to Sc 立即下载
Data Pre-Processing in Python: 立即下载
双剑合璧-Python和大数据计算平台的结合 立即下载