我正在试图理解Threadpool库。下面是我的样本代码,它的输出如下: 代码:
import os
import time
from multiprocessing.dummy import Pool as ThreadPool
class ProcessTest:
def myfunct1(self,id, pid):
print("Function 1 -", "Process ID - ", pid,": ","ID - ", id)
def WorkerProcess(self, myId):
self.myfunct1(myId, os.getpid())
def StartWorkflow(self):
myId = [1,2,3]
pool = ThreadPool(len(myId))
pool.map(self.WorkerProcess, myId)
pool.close()
pool.join()
A = ProcessTest()
A.StartWorkflow()
输出:
Function 1 - Process ID - 24920 : ID - 3
Function 1 - Process ID - 24920 : ID - 2
Function 1 - Process ID - 24920 : ID - 1
这是那个池子。map使用PID = 24920创建一个系统进程,并对3个myid并行执行WorkerProcess函数:1、2、3。 现在,
If WorkerProcess function and myId = 1 it takes 10 seconds to complete.
If WorkerProcess function and myId = 2 it takes 5 seconds to complete.
If WorkerProcess function and myId = 3 it takes 5 seconds to complete.
如何在6秒内终止WorkerProcess函数和myId = 1的执行而不是继续执行呢?此外,此终止不应损害其他id的处理—workerprocess函数和myId = 2和3 问题来源StackOverflow 地址:/questions/59383210/terminate-peacefully-if-execution-times-goes-beyond-certain-threshold-using-pyth
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Python中,使用multiprocessing.dummy
实际上是利用线程(而非进程)来并行执行任务,因为它基于threading
模块。要优雅地终止一个超时的线程,你需要采用一种机制来检查是否已达到时间限制。这通常涉及在线程运行的函数中添加定时检查。
下面是一个修改后的例子,展示了如何为每个任务添加超时处理:
import os
import time
from multiprocessing.dummy import Pool as ThreadPool
from threading import Thread, Event
class TimeoutException(Exception):
pass
def timeout_handler(thread, timeout_duration):
def handler():
if thread.is_alive():
thread.raise_exc(TimeoutException)
timer = Timer(timeout_duration, handler)
timer.start()
return timer
class ThreadWithTimer(Thread):
def __init__(self, *args, **kwargs):
self.timer = None
super().__init__(*args, **kwargs)
def raise_exc(self, exc):
raise exc
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
finally:
# 如果设置了计时器,确保结束它以避免资源泄露
if self.timer is not None:
self.timer.cancel()
class ProcessTest:
def myfunct1(self, id, pid):
print("Function 1 -", "Process ID - ", pid, ": ", "ID - ", id)
if id == 1:
time.sleep(10) # 模拟耗时操作
else:
time.sleep(5)
def worker_with_timeout(self, myId, timeout):
thread = ThreadWithTimer(target=self.WorkerProcess, args=(myId,))
self.timer = timeout_handler(thread, timeout)
thread.start()
thread.join() # 这里会因为异常而中断或正常完成
if thread.is_alive():
print(f"Worker for ID {myId} timed out.")
else:
print(f"Worker for ID {myId} completed.")
def WorkerProcess(self, myId):
try:
self.myfunct1(myId, os.getpid())
except TimeoutException:
print(f"Task with ID {myId} exceeded the time limit and was terminated.")
def StartWorkflow(self):
myId = [1, 2, 3]
pool = ThreadPool(len(myId))
timeouts = [10, 6, 6] # 分别为每个任务设置超时时间
for idx, id in enumerate(myId):
pool.apply_async(self.worker_with_timeout, (id, timeouts[idx]))
pool.close()
pool.join()
A = ProcessTest()
A.StartWorkflow()
在这个示例中,我们定义了一个ThreadWithTimer
类,该类继承自Thread
,并添加了处理超时的功能。worker_with_timeout
方法创建了一个带有超时处理的线程,并尝试等待其完成。如果线程在指定时间内没有完成,timeout_handler
将抛出一个TimeoutException
,从而终止该线程的工作。
请注意,虽然这个方法可以用来尝试优雅地终止线程,但要注意的是在Python中强制终止线程可能会导致未预料到的副作用,比如资源泄漏或不一致的状态。因此,在设计程序时应尽量避免依赖于这种硬性终止逻辑,而是考虑任务本身的可中断性设计。