使用python实现一个用户态协程

本文涉及的产品
云原生网关 MSE Higress,422元/月
可观测监控 Prometheus 版,每月50GB免费额度
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 【6月更文挑战第28天】本文探讨了如何在Python中实现类似Golang中协程(goroutines)和通道(channels)的概念。文章最后提到了`wait_for`函数在处理超时和取消操作中的作

简介

本文使用python模拟golang的协程功能。

通过模拟Golang的CSP(Communicating Sequential Processes)模型,作者创建了一个名为Queuey的类来实现无阻塞的放入和取出操作。
Queuey使用锁来同步访问,并提供了同步和异步的接口。
异步操作利用了Python的async/await语法。
通过检查调用堆栈来决定使用同步还是异步方法。
示例展示了如何在协程中使用这个类进行异步生产者-消费者模式。
mandala曼德罗符号.png

1 并发的使用场景

有很多地方需要并发地实现访问或提供服务,可以使用到异步的方式编程,其中协程是一个流行的方式。在golang中 启动协程很容易,这里将在python中也实现类似的功能。

channel 用于在协程直接通信,并且可以根据channle条件退出协程,一个缓冲channel定义如下

chanNoSize = make(chan int)

chanWithSize = make(chan int,2)

go的并发模型是一种被称之为CSP的类型,在CSP 模型中 chanel是第一类对象, 它不关注发送消息的实体。

而关注与发送消息使用的channel communicating sequential Process 简称CSP顺序通信进程,是一种用户态和系统级线程控制的混合编程模型。

csp可以被认为是一种形式语言,用于描述系统中的互动模式.

在go内部channel实现了以下几个功能:

     qcount 保存队列的项目/数据计数

     dataqsize 是循环队列大小。 这用于缓冲通道场景,也就是make的第二参数
     elemsize 是通道相对单个元素的大小
     buf 是我们使用缓冲通道时存储数据的实际循环队列
     closed 表示通道是否关闭。 

语法 close(chanWithSize) 默认为0,关闭时设置为1.

chanel 是被单独创建并且可以在进程之间传递。

通信模式类似于 boss-worker模式。 一个实体通过将消息发送到 channel
监听这个channel的实体处理时,两个实体之间是匿名的,

2 实现python版本的协程

而python这类线程 默认的开发方式为 线程内存同步方式.
我们可以使用一些内部函数,实现类型go的协程,稍微会复杂一些。

我们定义一个协程通信管理队列,类似于channel

    class Queuey():
        def __init__(self, maxsize):
            self.mutex = Lock()
            self.maxsize = maxsize
            self.items = list()
            self.getters = list()
            self.putters = list()

实现一个可冲入锁对象 self.mutex 一个锁对象是一个同步基元。
要创建一个锁 的方法是: -调用 threading.Lock()。 有以下方法。

  acquire() -- 锁定,可能会阻塞,直到获得锁。
  release() -- 解除对锁的锁定
  locked() -- 测试该锁是否被锁定

锁不属于锁定它的线程;另一个线程可以 解锁它。 如果一个线程试图锁定一个它已经锁定的锁 将会阻塞,直到另一个线程将其解锁。 死锁可能会随之产生。

acquire(blocking=True, timeout=-1) -> bool
(acquisition_lock()是一个弃用的同义词)

锁定该锁。 没有参数,如果该锁已经被 锁定(即使是被同一个线程),等待另一个线程释放 锁,一旦获得锁,则返回True。

有参数时,只有当参数为 True 时才会阻塞。

并且返回值反映了是否获得了锁。

阻塞操作是可中断的。

3 无阻塞放入和取出

先获取到锁, 如果结果队列items中不为空,那么就在待处理的 待返回对象中取出一个,唤醒一个 左部待添加的 fut 元素,并将其设置为 True, 返回队列 左部 一个元素,同时 错误信息返回None,表示没有错误(对应go的nil)

def get_noblock(self):
    with self.mutex:
        if self.items: 
            if self.putters:
                self.putters.pop(0).set_result(True)

            return self.items.pop(0), None
        else:
            fut = Future()
            self.getters.append(fut)
            return fut, None

先获取到锁, 如果结果队列items中 小于缓冲大小 maxsize,那么添加到item处理队列中,再进行 getters 操作,如果getters队列不为空,那么从items队列的左部获取一个值,并设置结果到getters左部。

如果结果队列已经达到最大缓冲大小,那么将待返回结果 future 添加到 putters 队列(如果结果队列有对象需要返回时将从这里取得并返回)。

   def put_noblock(self, item):
        with self.mutex:  
            if len(self.items) < self.maxsize:
                self.items.append(item) 
                if self.getters:
                    self.getters.pop(0).set_result(
                        self.items.pop(0)
                    )
            else:
                fut = Future()
                self.putters.append(fut)
                return fut

获取的任务返回状态默认是pending。在无阻塞的操作函数完成后,基于此,我们可以实现同步 或 异步地操作协程。

  • 同步存和同步取

         def get_sync(self):
              item, fut = self.get_noblock()
              if fut:
                  item = fut.result()
              return item
    
        def put_sync(self, item):
              while True:
                  fut = self.put_noblock(item)
                  if fut is None:
                      return
                  fut.result()
    
  • 异步取和存

     async def get_async(self):
                 item, fut = self.get_noblock()
                 if fut:
                     item = await wait_for(wrap_future(fut), None)
                 return item
    
     async def put_async(self, item):
         while True:
             fut = self.put_noblock(item)
             if fut is None:
                 return
             await wait_for(wrap_future(fut), None)
    

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def get(self):
        if sys._getframe(2).f_code.co_flags & 0x380: 
            print(f"get async item")
            return self.get_async()
        else:  # 不是协程,使用同步的方式 取值
            print(f"get sync item:")

            return self.get_sync() if len(self.items) > 0 else None

如果是以协程的方式操作函数,则使用协程。 如果是协程,使用 异步的方式 取值,否则使用同步的方式

    def put(self, item):
        if sys._getframe(2).f_code.co_flags & 0x380:  
            print(f"put async item:", item)
            return self.put_async(item)
        else:   
            print(f"put sync item:", item)
            return self.put_sync(item)

判断一个函数是否异步和协程的方式,在python中可以使用一个内置的方法,

    sys._getframe(2).f_code.co_flags & 0x380

这使python知道函数需要被协程一样的使用, 一种进入函数的方式的判断,在py3中需要在 调用函数 之前加 await 关键字

sys._getframe 从调用栈中返回一个框架对象。 如果给定了可选的整数深度,则返回比堆栈顶部调用次数少的那个框架对象。

调用堆栈的顶部以下的框架对象。 如果这个深度超过了调用 栈,ValueError将被引发。 默认的深度是0,返回 调用堆栈顶部的框架。

返回的 CodeType 中代码标记如果 为 十进制 896, 这个函数应该被用于内部和专门的目的 仅用于内部和专门用途。

4 在函数 使用

最后在函数中使用这个类, 在 线程中执行 异步取值

async def aproducer(q, n):
    for i in range(n):

        await q.put(i)

    await q.put(None)

async def aconsumer(q):

    while True:

        item = await q.get() 
        if item is None:
            break
        print("Async Got:", item)

5 小结

我们可以使用 wait_for 实现类似的功能。 它等待单个Future或coroutine完成,有超时参数可以设置。

Coroutine将被包裹在Task中。 返回Future或coroutine的结果。 当超时发生时。 它取消任务并引发TimeoutError。
为了避免任务取消,将其包裹在shield()中。 如果等待被取消了,任务也被取消了。

这个函数是一个coroutine。

下一节,我们完成同步存和异步取,异步存和异步取,并对此函数的做示例,与go的实现做对比。

目录
相关文章
|
30天前
|
调度 Python
python知识点100篇系列(20)-python协程与异步编程asyncio
【10月更文挑战第8天】协程(Coroutine)是一种用户态内的上下文切换技术,通过单线程实现代码块间的切换执行。Python中实现协程的方法包括yield、asyncio模块及async/await关键字。其中,async/await结合asyncio模块可更便捷地编写和管理协程,支持异步IO操作,提高程序并发性能。协程函数、协程对象、Task对象等是其核心概念。
|
20天前
|
NoSQL 关系型数据库 MySQL
python协程+异步总结!
本文介绍了Python中的协程、asyncio模块以及异步编程的相关知识。首先解释了协程的概念和实现方法,包括greenlet、yield关键字、asyncio装饰器和async/await关键字。接着详细讲解了协程的意义和应用场景,如提高IO密集型任务的性能。文章还介绍了事件循环、Task对象、Future对象等核心概念,并提供了多个实战案例,包括异步Redis、MySQL操作、FastAPI框架和异步爬虫。最后提到了uvloop作为asyncio的高性能替代方案。通过这些内容,读者可以全面了解和掌握Python中的异步编程技术。
37 0
|
20天前
|
数据采集 缓存 程序员
python协程使用教程
1. **协程**:介绍了协程的概念、与子程序的区别、优缺点,以及如何在 Python 中使用协程。 2. **同步与异步**:解释了同步与异步的概念,通过示例代码展示了同步和异步处理的区别和应用场景。 3. **asyncio 模块**:详细介绍了 asyncio 模块的概述、基本使用、多任务处理、Task 概念及用法、协程嵌套与返回值等。 4. **aiohttp 与 aiofiles**:讲解了 aiohttp 模块的安装与使用,包括客户端和服务器端的简单实例、URL 参数传递、响应内容读取、自定义请求等。同时介绍了 aiofiles 模块的安装与使用,包括文件读写和异步迭代
23 0
|
1月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
深入探索:Python中的并发编程新纪元——协程与异步函数解析
27 3
|
2月前
|
Python
Python中的异步编程与协程实践
【9月更文挑战第28天】本文旨在通过一个简单易懂的示例,介绍如何在Python中利用asyncio库实现异步编程和协程。我们将通过代码示例来展示如何编写高效的并发程序,并解释背后的原理。
|
2月前
|
数据库 开发者 Python
实战指南:用Python协程与异步函数优化高性能Web应用
在快速发展的Web开发领域,高性能与高效响应是衡量应用质量的重要标准。随着Python在Web开发中的广泛应用,如何利用Python的协程(Coroutine)与异步函数(Async Functions)特性来优化Web应用的性能,成为了许多开发者关注的焦点。本文将从实战角度出发,通过具体案例展示如何运用这些技术来提升Web应用的响应速度和吞吐量。
30 1
|
2月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
在Python异步编程领域,协程与异步函数成为处理并发任务的关键工具。协程(微线程)比操作系统线程更轻量级,通过`async def`定义并在遇到`await`表达式时暂停执行。异步函数利用`await`实现任务间的切换。事件循环作为异步编程的核心,负责调度任务;`asyncio`库提供了事件循环的管理。Future对象则优雅地处理异步结果。掌握这些概念,可使代码更高效、简洁且易于维护。
26 1
|
2月前
|
调度 开发者 Python
探索Python中的异步编程:理解asyncio和协程
【9月更文挑战第22天】在现代软件工程中,异步编程是提升应用性能的关键技术之一。本文将深入探讨Python语言中的异步编程模型,特别是asyncio库的使用和协程的概念。我们将了解如何通过事件循环和任务来处理并发操作,以及如何用协程来编写非阻塞的代码。文章不仅会介绍理论知识,还会通过实际的代码示例展示如何在Python中实现高效的异步操作。
|
1月前
|
数据采集 调度 Python
Python编程异步爬虫——协程的基本原理(一)
Python编程异步爬虫——协程的基本原理(一)
|
1月前
|
数据采集 Python
Python编程异步爬虫——协程的基本原理(二)
Python编程异步爬虫——协程的基本原理(二)
下一篇
无影云桌面