以下是一个在 Python 中使用协程自定义实现互斥锁的示例代码:
import asyncio class CustomMutex: def __init__(self): self.lock = asyncio.Lock() async def acquire(self): await self.lock.acquire() async def release(self): self.lock.release()
在上述代码中,我们定义了一个 CustomMutex
类。
__init__
方法初始化了一个 asyncio.Lock
对象。
acquire
方法用于获取锁,它使用 await
等待锁的获取。
release
方法用于释放锁。
下面是一个使用这个自定义互斥锁的示例:
mutex = CustomMutex() async def task1(): await mutex.acquire() print("Task 1 acquired the lock") # 模拟一些操作 await asyncio.sleep(2) print("Task 1 releasing the lock") await mutex.release() async def task2(): print("Task 2 waiting for the lock") await mutex.acquire() print("Task 2 acquired the lock") # 模拟一些操作 await asyncio.sleep(3) print("Task 2 releasing the lock") await mutex.release() async def main(): tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())] await asyncio.gather(*tasks) asyncio.run(main())
在这个示例中,task1
和 task2
两个协程都试图获取互斥锁进行操作。由于锁的互斥性,它们会按照获取锁的顺序依次执行相关操作。
希望这个示例对您理解如何在 Python 协程中自定义互斥锁有所帮助!如果您在实际应用中有更复杂的需求,可能需要对锁的行为进行更多的定制和优化。