简介
本文使用python模拟golang的协程功能。
通过模拟Golang的CSP(Communicating Sequential Processes)模型,作者创建了一个名为Queuey
的类来实现无阻塞的放入和取出操作。Queuey
使用锁来同步访问,并提供了同步和异步的接口。
异步操作利用了Python的async/await
语法。
通过检查调用堆栈来决定使用同步还是异步方法。
示例展示了如何在协程中使用这个类进行异步生产者-消费者模式。
1 并发的使用场景
有很多地方需要并发地实现访问或提供服务,可以使用到异步的方式编程,其中协程是一个流行的方式。在golang中 启动协程很容易,这里将在python中也实现类似的功能。
channel 用于在协程直接通信,并且可以根据channle条件退出协程,一个缓冲channel定义如下
chanNoSize = make(chan int)
chanWithSize = make(chan int,2)
go的并发模型是一种被称之为CSP的类型,在CSP 模型中 chanel是第一类对象, 它不关注发送消息的实体。
而关注与发送消息使用的channel communicating sequential Process 简称CSP顺序通信进程,是一种用户态和系统级线程控制的混合编程模型。
csp可以被认为是一种形式语言,用于描述系统中的互动模式.
在go内部channel实现了以下几个功能:
qcount 保存队列的项目/数据计数
dataqsize 是循环队列大小。 这用于缓冲通道场景,也就是make的第二参数
elemsize 是通道相对单个元素的大小
buf 是我们使用缓冲通道时存储数据的实际循环队列
closed 表示通道是否关闭。
语法 close(chanWithSize) 默认为0,关闭时设置为1.
chanel 是被单独创建并且可以在进程之间传递。
通信模式类似于 boss-worker模式。 一个实体通过将消息发送到 channel
监听这个channel的实体处理时,两个实体之间是匿名的,
2 实现python版本的协程
而python这类线程 默认的开发方式为 线程内存同步方式.
我们可以使用一些内部函数,实现类型go的协程,稍微会复杂一些。
我们定义一个协程通信管理队列,类似于channel
class Queuey():
def __init__(self, maxsize):
self.mutex = Lock()
self.maxsize = maxsize
self.items = list()
self.getters = list()
self.putters = list()
实现一个可冲入锁对象 self.mutex 一个锁对象是一个同步基元。
要创建一个锁 的方法是: -调用 threading.Lock()。 有以下方法。
acquire() -- 锁定,可能会阻塞,直到获得锁。
release() -- 解除对锁的锁定
locked() -- 测试该锁是否被锁定
锁不属于锁定它的线程;另一个线程可以 解锁它。 如果一个线程试图锁定一个它已经锁定的锁 将会阻塞,直到另一个线程将其解锁。 死锁可能会随之产生。
acquire(blocking=True, timeout=-1) -> bool
(acquisition_lock()是一个弃用的同义词)
锁定该锁。 没有参数,如果该锁已经被 锁定(即使是被同一个线程),等待另一个线程释放 锁,一旦获得锁,则返回True。
有参数时,只有当参数为 True 时才会阻塞。
并且返回值反映了是否获得了锁。
阻塞操作是可中断的。
3 无阻塞放入和取出
先获取到锁, 如果结果队列items中不为空,那么就在待处理的 待返回对象中取出一个,唤醒一个 左部待添加的 fut 元素,并将其设置为 True, 返回队列 左部 一个元素,同时 错误信息返回None,表示没有错误(对应go的nil)
def get_noblock(self):
with self.mutex:
if self.items:
if self.putters:
self.putters.pop(0).set_result(True)
return self.items.pop(0), None
else:
fut = Future()
self.getters.append(fut)
return fut, None
先获取到锁, 如果结果队列items中 小于缓冲大小 maxsize,那么添加到item处理队列中,再进行 getters 操作,如果getters队列不为空,那么从items队列的左部获取一个值,并设置结果到getters左部。
如果结果队列已经达到最大缓冲大小,那么将待返回结果 future 添加到 putters 队列(如果结果队列有对象需要返回时将从这里取得并返回)。
def put_noblock(self, item):
with self.mutex:
if len(self.items) < self.maxsize:
self.items.append(item)
if self.getters:
self.getters.pop(0).set_result(
self.items.pop(0)
)
else:
fut = Future()
self.putters.append(fut)
return fut
获取的任务返回状态默认是pending。在无阻塞的操作函数完成后,基于此,我们可以实现同步 或 异步地操作协程。
同步存和同步取
def get_sync(self): item, fut = self.get_noblock() if fut: item = fut.result() return item def put_sync(self, item): while True: fut = self.put_noblock(item) if fut is None: return fut.result()
异步取和存
async def get_async(self): item, fut = self.get_noblock() if fut: item = await wait_for(wrap_future(fut), None) return item async def put_async(self, item): while True: fut = self.put_noblock(item) if fut is None: return await wait_for(wrap_future(fut), None)
如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式
def get(self):
if sys._getframe(2).f_code.co_flags & 0x380:
print(f"get async item")
return self.get_async()
else: # 不是协程,使用同步的方式 取值
print(f"get sync item:")
return self.get_sync() if len(self.items) > 0 else None
如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式
def put(self, item):
if sys._getframe(2).f_code.co_flags & 0x380:
print(f"put async item:", item)
return self.put_async(item)
else:
print(f"put sync item:", item)
return self.put_sync(item)
判断一个函数是否异步和协程的方式,在python中可以使用一个内置的方法,
sys._getframe(2).f_code.co_flags & 0x380
这使python知道函数需要被协程一样的使用, 一种进入函数的方式的判断,在py3中需要在 调用函数 之前加 await 关键字
sys._getframe 从调用栈中返回一个框架对象。 如果给定了可选的整数深度,则返回比堆栈顶部调用次数少的那个框架对象。
调用堆栈的顶部以下的框架对象。 如果这个深度超过了调用 栈,ValueError将被引发。 默认的深度是0,返回 调用堆栈顶部的框架。
返回的 CodeType 中代码标记如果 为 十进制 896, 这个函数应该被用于内部和专门的目的 仅用于内部和专门用途。
4 在函数 使用
最后在函数中使用这个类, 在 线程中执行 异步取值
async def aproducer(q, n):
for i in range(n):
await q.put(i)
await q.put(None)
async def aconsumer(q):
while True:
item = await q.get()
if item is None:
break
print("Async Got:", item)
5 小结
我们可以使用 wait_for 实现类似的功能。 它等待单个Future或coroutine完成,有超时参数可以设置。
Coroutine将被包裹在Task中。 返回Future或coroutine的结果。 当超时发生时。 它取消任务并引发TimeoutError。
为了避免任务取消,将其包裹在shield()中。 如果等待被取消了,任务也被取消了。
这个函数是一个coroutine。
下一节,我们完成同步存和异步取,异步存和异步取,并对此函数的做示例,与go的实现做对比。