使用python实现一个用户态协程

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【6月更文挑战第28天】本文探讨了如何在Python中实现类似Golang中协程(goroutines)和通道(channels)的概念。文章最后提到了`wait_for`函数在处理超时和取消操作中的作

简介

本文使用python模拟golang的协程功能。

通过模拟Golang的CSP(Communicating Sequential Processes)模型,作者创建了一个名为Queuey的类来实现无阻塞的放入和取出操作。
Queuey使用锁来同步访问,并提供了同步和异步的接口。
异步操作利用了Python的async/await语法。
通过检查调用堆栈来决定使用同步还是异步方法。
示例展示了如何在协程中使用这个类进行异步生产者-消费者模式。
mandala曼德罗符号.png

1 并发的使用场景

有很多地方需要并发地实现访问或提供服务,可以使用到异步的方式编程,其中协程是一个流行的方式。在golang中 启动协程很容易,这里将在python中也实现类似的功能。

channel 用于在协程直接通信,并且可以根据channle条件退出协程,一个缓冲channel定义如下

chanNoSize = make(chan int)

chanWithSize = make(chan int,2)

go的并发模型是一种被称之为CSP的类型,在CSP 模型中 chanel是第一类对象, 它不关注发送消息的实体。

而关注与发送消息使用的channel communicating sequential Process 简称CSP顺序通信进程,是一种用户态和系统级线程控制的混合编程模型。

csp可以被认为是一种形式语言,用于描述系统中的互动模式.

在go内部channel实现了以下几个功能:

     qcount 保存队列的项目/数据计数

     dataqsize 是循环队列大小。 这用于缓冲通道场景,也就是make的第二参数
     elemsize 是通道相对单个元素的大小
     buf 是我们使用缓冲通道时存储数据的实际循环队列
     closed 表示通道是否关闭。 

语法 close(chanWithSize) 默认为0,关闭时设置为1.

chanel 是被单独创建并且可以在进程之间传递。

通信模式类似于 boss-worker模式。 一个实体通过将消息发送到 channel
监听这个channel的实体处理时,两个实体之间是匿名的,

2 实现python版本的协程

而python这类线程 默认的开发方式为 线程内存同步方式.
我们可以使用一些内部函数,实现类型go的协程,稍微会复杂一些。

我们定义一个协程通信管理队列,类似于channel

    class Queuey():
        def __init__(self, maxsize):
            self.mutex = Lock()
            self.maxsize = maxsize
            self.items = list()
            self.getters = list()
            self.putters = list()

实现一个可冲入锁对象 self.mutex 一个锁对象是一个同步基元。
要创建一个锁 的方法是: -调用 threading.Lock()。 有以下方法。

  acquire() -- 锁定,可能会阻塞,直到获得锁。
  release() -- 解除对锁的锁定
  locked() -- 测试该锁是否被锁定

锁不属于锁定它的线程;另一个线程可以 解锁它。 如果一个线程试图锁定一个它已经锁定的锁 将会阻塞,直到另一个线程将其解锁。 死锁可能会随之产生。

acquire(blocking=True, timeout=-1) -> bool
(acquisition_lock()是一个弃用的同义词)

锁定该锁。 没有参数,如果该锁已经被 锁定(即使是被同一个线程),等待另一个线程释放 锁,一旦获得锁,则返回True。

有参数时,只有当参数为 True 时才会阻塞。

并且返回值反映了是否获得了锁。

阻塞操作是可中断的。

3 无阻塞放入和取出

先获取到锁, 如果结果队列items中不为空,那么就在待处理的 待返回对象中取出一个,唤醒一个 左部待添加的 fut 元素,并将其设置为 True, 返回队列 左部 一个元素,同时 错误信息返回None,表示没有错误(对应go的nil)

def get_noblock(self):
    with self.mutex:
        if self.items: 
            if self.putters:
                self.putters.pop(0).set_result(True)

            return self.items.pop(0), None
        else:
            fut = Future()
            self.getters.append(fut)
            return fut, None

先获取到锁, 如果结果队列items中 小于缓冲大小 maxsize,那么添加到item处理队列中,再进行 getters 操作,如果getters队列不为空,那么从items队列的左部获取一个值,并设置结果到getters左部。

如果结果队列已经达到最大缓冲大小,那么将待返回结果 future 添加到 putters 队列(如果结果队列有对象需要返回时将从这里取得并返回)。

   def put_noblock(self, item):
        with self.mutex:  
            if len(self.items) < self.maxsize:
                self.items.append(item) 
                if self.getters:
                    self.getters.pop(0).set_result(
                        self.items.pop(0)
                    )
            else:
                fut = Future()
                self.putters.append(fut)
                return fut

获取的任务返回状态默认是pending。在无阻塞的操作函数完成后,基于此,我们可以实现同步 或 异步地操作协程。

  • 同步存和同步取

         def get_sync(self):
              item, fut = self.get_noblock()
              if fut:
                  item = fut.result()
              return item
    
        def put_sync(self, item):
              while True:
                  fut = self.put_noblock(item)
                  if fut is None:
                      return
                  fut.result()
    
  • 异步取和存

     async def get_async(self):
                 item, fut = self.get_noblock()
                 if fut:
                     item = await wait_for(wrap_future(fut), None)
                 return item
    
     async def put_async(self, item):
         while True:
             fut = self.put_noblock(item)
             if fut is None:
                 return
             await wait_for(wrap_future(fut), None)
    

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def get(self):
        if sys._getframe(2).f_code.co_flags & 0x380: 
            print(f"get async item")
            return self.get_async()
        else:  # 不是协程,使用同步的方式 取值
            print(f"get sync item:")

            return self.get_sync() if len(self.items) > 0 else None

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def put(self, item):
        if sys._getframe(2).f_code.co_flags & 0x380:  
            print(f"put async item:", item)
            return self.put_async(item)
        else:   
            print(f"put sync item:", item)
            return self.put_sync(item)

判断一个函数是否异步和协程的方式,在python中可以使用一个内置的方法,

    sys._getframe(2).f_code.co_flags & 0x380

这使python知道函数需要被协程一样的使用, 一种进入函数的方式的判断,在py3中需要在 调用函数 之前加 await 关键字

sys._getframe 从调用栈中返回一个框架对象。 如果给定了可选的整数深度,则返回比堆栈顶部调用次数少的那个框架对象。

调用堆栈的顶部以下的框架对象。 如果这个深度超过了调用 栈,ValueError将被引发。 默认的深度是0,返回 调用堆栈顶部的框架。

返回的 CodeType 中代码标记如果 为 十进制 896, 这个函数应该被用于内部和专门的目的 仅用于内部和专门用途。

4 在函数 使用

最后在函数中使用这个类, 在 线程中执行 异步取值

async def aproducer(q, n):
    for i in range(n):

        await q.put(i)

    await q.put(None)

async def aconsumer(q):

    while True:

        item = await q.get() 
        if item is None:
            break
        print("Async Got:", item)

5 小结

我们可以使用 wait_for 实现类似的功能。 它等待单个Future或coroutine完成,有超时参数可以设置。

Coroutine将被包裹在Task中。 返回Future或coroutine的结果。 当超时发生时。 它取消任务并引发TimeoutError。
为了避免任务取消,将其包裹在shield()中。 如果等待被取消了,任务也被取消了。

这个函数是一个coroutine。

下一节,我们完成同步存和异步取,异步存和异步取,并对此函数的做示例,与go的实现做对比。

目录
相关文章
|
1月前
|
网络协议 调度 开发者
python中gevent基于协程的并发编程模型详细介绍
`gevent`是Python的第三方库,提供基于协程的并发模型,适用于I/O密集型任务的高效异步编程。其核心是协程调度器,在单线程中轮流执行多个协程,通过非阻塞I/O实现高并发。主要特点包括协程调度、事件循环的I/O模型、同步/异步编程支持及易用性。示例代码展示了一个使用`gevent`实现的异步TCP服务器,当客户端连接时,服务器以协程方式处理请求,实现非阻塞通信。
21 0
|
1月前
|
数据采集 数据库 C++
python并发编程:并发编程中是选择多线程呢?还是多进程呢?还是多协程呢?
python并发编程:并发编程中是选择多线程呢?还是多进程呢?还是多协程呢?
33 0
|
1月前
|
安全 调度 Python
探索Python中的并发编程:协程与多线程的比较
本文将深入探讨Python中的并发编程技术,重点比较协程与多线程的特点和应用场景。通过对协程和多线程的原理解析,以及在实际项目中的应用案例分析,读者将能够更好地理解两种并发编程模型的异同,并在实践中选择合适的方案来提升Python程序的性能和效率。
|
7天前
|
数据挖掘 程序员 调度
Python并发编程之协程与异步IO
传统的多线程和多进程模型在处理大规模并发时存在一些性能瓶颈和资源消耗问题。本文将重点介绍Python中基于协程和异步IO的并发编程方法,探讨其工作原理和实际应用,帮助开发者更好地理解并利用Python的并发编程能力。
|
8天前
|
开发者 Python
探索 Python 中的协程:从基本概念到实际应用
在现代编程中,异步处理变得越来越重要,Python 通过其内置的协程提供了强大的工具来简化这一过程。本文将深入探讨 Python 中的协程,从基本概念出发,逐步展示其实际应用,并通过具体代码示例帮助你掌握这种技术。
|
5天前
|
安全 Unix API
完整了解如何在python中处理协程和流
【6月更文挑战第25天】本文介绍异步库asyncio的概念和用法,异步编程在Python中是通过事件循环和协程实现并发,随着版本更新,API有所变化。
25 1
|
13天前
|
数据挖掘 调度 开发者
Python并发编程的艺术:掌握线程、进程与协程的同步技巧
并发编程在Python中涵盖线程、进程和协程,用于优化IO操作和响应速度。`threading`模块支持线程,`multiprocessing`处理进程,而`asyncio`则用于协程。线程通过Lock和Condition Objects同步,进程使用Queue和Pipe通信。协程利用异步事件循环避免上下文切换。了解并发模型及同步技术是提升Python应用性能的关键。
37 5
|
15天前
|
调度 开发者 UED
探索Python中的异步编程:从回调到协程
【6月更文挑战第14天】本文深入探讨了Python异步编程的演变历程,从最初的回调函数到现代的协程模型。我们将通过具体示例,展示如何利用asyncio库提升程序的执行效率和响应能力。文章旨在为读者提供一个清晰的异步编程发展脉络,并指导他们如何在项目中实际应用这些技术。
|
26天前
|
JavaScript 前端开发 程序员
Python协程与asyncio
理解Python中的协程,我们需从其底层原理开始,逐步深入。协程的核心在于控制流的非阻塞式管理,它允许在单一线程内实现并发处理,通过事件循环和协作式多任务来提高效率。
|
14天前
|
Java 开发者 计算机视觉
探索Python中的并发编程:线程与协程
本文将深入探讨Python中的并发编程,重点比较线程和协程的工作机制、优缺点及其适用场景,帮助开发者在实际项目中做出更明智的选择。

热门文章

最新文章