开发者社区 问答 正文

如果使用python执行时间超过某个阈值,则和平终止

我正在试图理解Threadpool库。下面是我的样本代码,它的输出如下: 代码:

import os
import time
from multiprocessing.dummy import Pool as ThreadPool
class ProcessTest:
    def myfunct1(self,id, pid):
        print("Function 1 -", "Process ID - ", pid,": ","ID - ", id)

    def WorkerProcess(self, myId):
        self.myfunct1(myId, os.getpid())

    def StartWorkflow(self):
        myId = [1,2,3]
        pool = ThreadPool(len(myId))
        pool.map(self.WorkerProcess, myId)
        pool.close()
        pool.join()

A = ProcessTest()
A.StartWorkflow()

输出:

Function 1 - Process ID -  24920 :  ID -  3
Function 1 - Process ID -  24920 :  ID -  2
Function 1 - Process ID -  24920 :  ID -  1

这是那个池子。map使用PID = 24920创建一个系统进程,并对3个myid并行执行WorkerProcess函数:1、2、3。 现在,

If WorkerProcess function and myId = 1 it takes 10 seconds to complete.
If WorkerProcess function and myId = 2 it takes 5 seconds to complete.
If WorkerProcess function and myId = 3 it takes 5 seconds to complete.

如何在6秒内终止WorkerProcess函数和myId = 1的执行而不是继续执行呢?此外,此终止不应损害其他id的处理—workerprocess函数和myId = 2和3 问题来源StackOverflow 地址:/questions/59383210/terminate-peacefully-if-execution-times-goes-beyond-certain-threshold-using-pyth

展开
收起
kun坤 2019-12-27 10:43:22 625 分享 版权
阿里云 AI 助理回答

在Python中,使用multiprocessing.dummy实际上是利用线程(而非进程)来并行执行任务,因为它基于threading模块。要优雅地终止一个超时的线程,你需要采用一种机制来检查是否已达到时间限制。这通常涉及在线程运行的函数中添加定时检查。

下面是一个修改后的例子,展示了如何为每个任务添加超时处理:

import os
import time
from multiprocessing.dummy import Pool as ThreadPool
from threading import Thread, Event

class TimeoutException(Exception):
    pass

def timeout_handler(thread, timeout_duration):
    def handler():
        if thread.is_alive():
            thread.raise_exc(TimeoutException)
    timer = Timer(timeout_duration, handler)
    timer.start()
    return timer

class ThreadWithTimer(Thread):
    def __init__(self, *args, **kwargs):
        self.timer = None
        super().__init__(*args, **kwargs)

    def raise_exc(self, exc):
        raise exc

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            # 如果设置了计时器,确保结束它以避免资源泄露
            if self.timer is not None:
                self.timer.cancel()

class ProcessTest:
    def myfunct1(self, id, pid):
        print("Function 1 -", "Process ID - ", pid, ": ", "ID - ", id)
        if id == 1:
            time.sleep(10)  # 模拟耗时操作
        else:
            time.sleep(5)

    def worker_with_timeout(self, myId, timeout):
        thread = ThreadWithTimer(target=self.WorkerProcess, args=(myId,))
        self.timer = timeout_handler(thread, timeout)
        thread.start()
        thread.join()  # 这里会因为异常而中断或正常完成
        if thread.is_alive():
            print(f"Worker for ID {myId} timed out.")
        else:
            print(f"Worker for ID {myId} completed.")

    def WorkerProcess(self, myId):
        try:
            self.myfunct1(myId, os.getpid())
        except TimeoutException:
            print(f"Task with ID {myId} exceeded the time limit and was terminated.")

    def StartWorkflow(self):
        myId = [1, 2, 3]
        pool = ThreadPool(len(myId))
        timeouts = [10, 6, 6]  # 分别为每个任务设置超时时间
        for idx, id in enumerate(myId):
            pool.apply_async(self.worker_with_timeout, (id, timeouts[idx]))
        pool.close()
        pool.join()

A = ProcessTest()
A.StartWorkflow()

在这个示例中,我们定义了一个ThreadWithTimer类,该类继承自Thread,并添加了处理超时的功能。worker_with_timeout方法创建了一个带有超时处理的线程,并尝试等待其完成。如果线程在指定时间内没有完成,timeout_handler将抛出一个TimeoutException,从而终止该线程的工作。

请注意,虽然这个方法可以用来尝试优雅地终止线程,但要注意的是在Python中强制终止线程可能会导致未预料到的副作用,比如资源泄漏或不一致的状态。因此,在设计程序时应尽量避免依赖于这种硬性终止逻辑,而是考虑任务本身的可中断性设计。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答
问答分类:
问答标签:
问答地址: