Python中的并发编程

简介: 本文介绍了Python中的并发编程,并具体介绍了线程、进程、协程的基本用法和案例。

蚂蚁.png

1. 并发的概念

1.1 并发和并行

并发逻辑上同时处理多件事情,并行实际上同时做多件事情。
并发不一定通过并行实现,也可以通过多任务实现。例如玩游戏时听歌,但歌曲播放和游戏运行并不一定是同时(同一个CPU时间)发生的,可能第1个CPU时间播放歌曲,然后第2个CPU时间执行游戏,这样交替执行
并行要求同时执行,即同一个CPU时间两个事情都发生,为了实现并行,计算机必须能同时执行多个计算任务,如多核CPU或多个CPU。

并发和并行不互斥,并行是并发的一种实现方式。
并发-并行

1.2 Python实现并发的方式:进程、线程、协程

Python实现并发

进程是程序运行时的一个实例。进程通信只能携带原始字节,因此Python的对象需要序列化为原始字节才能在进程间通信。

线程是一个进程中的执行单元。一个进程启动后,会创建主线程,并且可以调用操作系统API创建更多线程。一个进程内的线程可以共享内存空间,轻松共享数据。

可以看出线程比进程更轻量级,更方便进行数据共享。
进程间通信需要通过管道、socket、消息队列等方式。

GIL全称为全局解释器锁,每个Python解释器程序是一个进程,虽然可以在一个进程中启动多个线程,但同一时间只有一个Python线程可以持有GIL,其它线程无法执行。所以Python中无法通过线程实现并行计算。
在这里插入图片描述

协程是可以挂起自身并在以后恢复的函数。Python 协程通常在事件循环的监督下在单个线程中运行。协程支持协作式多任务处理:一个协程必须使用 yieldawait 关键字显式放弃控制权,另一个协程才可以并发(而非并行)开展工作。

1.3 小结

本章首先介绍了并发并行的概念,并发逻辑上同时处理多件事情,并行实际上同时做多件事情,并行是并发的一种实现。
然后介绍了Python中实现并发的3种方式:进程、线程、协程
此外,还介绍了由于GIL的存在导致Python多线程同一时间只能有一个线程执行。

2. 线程

2.1 通过threading库使用线程

在Python中,threading 库提供了线程的接口,包括线程的创建启动同步

2.1.1 例1 使用线程旋转指针

场景:为程序耗时长的操作显示动画,表示程序正常运行没有卡死。

具体来说,我们将启动一个耗时3秒的函数,在这3秒内,在终端显示指针旋转的动画。下面用线程来实现这个操作。

首先我们定义旋转函数spin和阻塞函数slow

  • spin函数每隔0.1s依次打印\|/-,看起来就像是指针转动:
  • slow函数用来模拟一个耗时的操作。这里我们直接调用time.sleep(3) 等待3秒,然后返回一个结果。
    spin函数-指针旋转
    spin函数的实现如下:
    ```python
    import itertools
    import time
    def spin(msg: str) -> None:
    for char in itertools.cycle(r'|/-'):
      status = f'\r{char} {msg}' 
      print(status, end='', flush=True)
      time.sleep(0.1)
    
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

if name == 'main':
spin("thinking...")


`slow`函数的实现如下:
```python
# 阻塞3秒,并返回42
def slow() -> int:
    time.sleep(3) 
    return 42

time.sleep() 阻塞它所在的线程,但是释放 GIL,其他 Python 线程可以继续运行。

现在,我们要用线程实现并发,使得(看上去)slowspin同时进行。
下面对spin函数做了一些修改,通过threading.Event信号量来同步线程

import itertools
import time
from threading import Thread, Event

# 旋转
def spin(msg: str, done: Event) -> None:  # done用于同步线程
    for char in itertools.cycle(r'\|/-'): 
        status = f'\r{char} {msg}' 
        print(status, end='', flush=True)
        if done.wait(.1): #等待/阻塞 。除非有其他线程set了这个事件,则返回True;或者经过指定的时间(0.1s)后,返回 False。
            break
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

# 阻塞3秒,并返回42
def slow() -> int:
    time.sleep(3) 
    return 42

使用线程来并发执行两个函数。我们需要两个线程分别执行spinnerslow,但程序本身会启动一个线程,因此我们只需启动spinner线程。

def supervisor() -> int: 
    done = Event()  # 信号量,用于线程同步
    spinner = Thread(target=spin, args=('thinking!', done)) # 使用Thread创建线程实例spinner。
    print(f'spinner object: {spinner}') 
    spinner.start() # 启动spinner线程
    result = slow()  # 调用slow,阻塞 main 线程。同时,次线程spinner运行旋转指针动画
    done.set() # 设置done为真,唤醒等待done的线程。结束spinner中的循环。
    spinner.join() # 等待spinner 线程结束。-貌似这里加不加都不影响。
    return result

def main() -> None:
    result = supervisor() 
    print(f'Answer: {result}')

if __name__ == '__main__':
    main()

spin和slow”同时“执行

程序的执行顺序,主要步骤都发生在supervisor函数中,我们从supervisor开始看。
由于GIL的存在,同一时刻只有一个线程在执行。所以下面是一个顺序执行的过程。
执行过程大致如下:
双线程执行过程

主线程:创建spinner线程,启动spinner线程
spinner线程:输出字符,然后遇到done.wait(.1) 阻塞自己。
主线程:调用slow函数,遇到time.sleep(3) 阻塞
spinner线程:done.wait(.1) 超过了0.1秒返回False,继续输出字符。重复进行阻塞0.1秒、输出字符。
3秒后......
主线程:slow执行完毕,返回结果42。主线程继续执行done.set(),这会唤醒等待done的线程spinner
spinner线程:运行到done.wait(.1),由于主线程执行了done.set()使得这里的结果为True,所以执行break,结束循环。执行循环下面的print语句后spinner线程结束。
主线程:返回结果。

注:本例代码主要来自《流畅的Python》(第二版) 19.4.1### 2.1.2 例2 计算因子

2.1.2 例2 并行计算

第二个例子我们看一个(失败的)并行计算的例子:
用n个线程并行计算n个数各自的因子

基准方法
顺序执行。

import time

# 计算number的因子
def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i

numbers = [2139079, 1214759, 1516637, 1852285, 14256346, 12456533]
start = time.time()

for number in numbers:
    list(factorize(number))

end = time.time()
delta = end - start
print(f'串行方法花费了 {delta:.3f} 秒')

多线程方式
可以像例1中使用Thread函数实现线程:

def get_factor(number):
    factors = list(factorize(number))
    return factors

start = time.time()
threads = []
for number in numbers:
    thread = Thread(target=get_factor, args=(number,))
    thread.start() # 启动
    threads.append(thread)

# 等待所有线程完成
for thread in threads:
    thread.join() # 等待完成

end = time.time()
delta = end - start
print(f'Thread方法花费了 {delta:.3f} 秒')

实现线程的另一种方式是继承Thread类并实现run方法:

from threading import Thread

# 继承Thread,需要实现run方法,在run方法中执行要做的事情
class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number

    def run(self):
        self.factors = list(factorize(self.number))


start = time.time()

threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start() # 启动
    threads.append(thread)

# 等待所有线程完成
for thread in threads:
    thread.join() # 等待完成

end = time.time()
delta = end - start
print(f'Thread方法花费了 {delta:.3f} 秒')

运行结果:
多线程的并行计算
你会发现这个多线程的版本并没有变快,这并不意外。因为GIL的存在,多线程无法同时执行,甚至因为创建和切换线程产生额外的开销导致耗时增加。

注:本例代码来自《Effective Python》(第二版) 第53章

2.2 线程池

线程池原理是用一个任务队列让多个线程从中获取任务执行,然后返回结果。
concurrent.futures模块提供了线程池和进程池简化了多线程/进程操作。常见的用法是创建线程池,提交任务,等待完成并获取结果

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(count, item) for item in number_list] # count是一个函数,item是其参数
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
  • concurrent.futures.ThreadPoolExecutor(max_workers=5)创建了一个线程池,max_workers指定了线程数量上限。通过线程池可以创建和执行任务。
  • concurrent.futures使用Future类表示(未来的)任务。调用.submit()时会创建并执行一个任务(Future)。
  • .as_completed(futures)是一个迭代器,当futures中有任务完成时会产出该future.

Python最广为使用的并发处理库futures使用入门与内部原理对这个过程做了比较好的说明:
futures线程池
主线程是通过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并立即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。

python-parallel-programming-cookbook-cn 1.0 文档 中的一个例子对使用顺序执行、线程池进程池三种方式进行计算的时间进行了比较:

import concurrent.futures
import time


# 一个耗时的计算
def count(number) :
    for i in range(0, 10000000):
        i=i+1
    return i * number

if __name__ == "__main__":
    number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 顺序执行
    start_time = time.time()
    for item in number_list:
        print(count(item))
    print("Sequential execution in " + str(time.time() - start_time), "seconds")
    # 线程池
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in " + str(time.time() - start_time_1), "seconds")

    # 进程池
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Process pool execution in " + str(time.time() - start_time_2), "seconds")

结果为:

Sequential execution in 7.095552206039429 seconds
Thread pool execution in 7.140377998352051 seconds
Process pool execution in 4.240718126296997 seconds

2.3 多线程发送网络请求

我们使用https://www.vatcomply.com 来演示多线程发送网络请求。
该提供了汇率查询的API,我们可以像下面这样发送请求获取某种货币对其它货币的汇率。

import requests
response = requests.get("https://api.vatcomply.com/rates?base=USD")
print(response.json())

返回结果是一个json格式的文本,包含了base中查询的货币对其它货币的汇率:

{
   'date': '2023-12-07', 'base': 'USD', 'rates': {
   'EUR': 0.9284189026088572, 'USD': 1.0, 'JPY': 145.0004642094513, 'BGN': 1.8158016897224027, 'CZK': 22.612570791941327, ..., 'ZAR': 18.759260978553524}
}

下面我们比较不同方式发送多个请求的耗时。

2.3.1 顺序执行

我们使用顺序执行的方式,发送5次请求:

import time
import requests

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]

    rates[base] = 1.
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

def main():
    for base in BASES:
        fetch_rates(base)

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))

执行结果:

1 USD =     1.0 USD,   0.928 EUR,    4.02 PLN,    10.9 NOK,    22.6 CZK
1 EUR =    1.08 USD,     1.0 EUR,    4.33 PLN,    11.8 NOK,    24.4 CZK
1 PLN =   0.249 USD,   0.231 EUR,     1.0 PLN,    2.71 NOK,    5.62 CZK
1 NOK =  0.0916 USD,  0.0851 EUR,   0.369 PLN,     1.0 NOK,    2.07 CZK
1 CZK =  0.0442 USD,  0.0411 EUR,   0.178 PLN,   0.483 NOK,     1.0 CZK

time elapsed: 2.96s

顺序执行需要等待上一个请求返回后才能发起下一个请求,所以用时较长。

2.3.2 多线程

只需要在main函数中做一点修改,启动多个线程。

from threading import Thread
def main():
    threads = [] 
    for base in BASES:
        thread = Thread(target=fetch_rates, args=[base])
        thread.start()
        threads.append(thread)
    while threads:
        threads.pop().join()

执行结果:

1 PLN =   0.249 USD,   0.231 EUR,     1.0 PLN,    2.71 NOK,    5.62 CZK
1 NOK =  0.0916 USD,  0.0851 EUR,   0.369 PLN,     1.0 NOK,    2.07 CZK
1 EUR =    1.08 USD,     1.0 EUR,    4.33 PLN,    11.8 NOK,    24.4 CZK
1 USD =     1.0 USD,   0.928 EUR,    4.02 PLN,    10.9 NOK,    22.6 CZK
1 CZK =  0.0442 USD,  0.0411 EUR,   0.178 PLN,   0.483 NOK,     1.0 CZK

time elapsed: 0.62s

多线程的效果很好,极大地缩短了程序的耗时。因为我们连续发送了5个请求并等待结果,而不是像顺序执行中的发送一个请求后等待它返回结果后再发送下一个。
(同时我们也发现了:​多线程导致任务完成的顺序改变了, 打印的结果和启动顺序'USD', 'EUR', 'PLN', 'NOK', 'CZK'不同)

但上面的代码存在一些问题:

  • 没有限制线程的数量。过多的线程可能导致因请求过快而被网站封IP。
  • 线程函数中使用print,可能导致输出混乱。
  • 每个函数被委托给单独的线程,这使得控制输入处理的速率极其困难。

    2.3.3 使用线程池

使用线程池创建指定数量的线程,这些线程将消耗队列中的工作项,直到队列变空。
线程池带来的好处:

  • 控制线程数量
  • 减少创建线程的开销。

注:这里我们用队列手动实现了线程池,但Python提供了封装好的 concurrent.futures.ThreadPoolExecutor

from queue import Empty

# 从work_queue中获取任务并执行
def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            fetch_rates(item)
            work_queue.task_done()


from threading import Thread
from queue import Queue

THREAD_POOL_SIZE = 4
def main():
    # work_queue是任务队列
    work_queue = Queue()
    for base in BASES:
        work_queue.put(base)

    # 创建指定数量个线程
    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
        ]
    for thread in threads:
        thread.start()

    work_queue.join()
    while threads:
        threads.pop().join()

main函数中,我们创建了一个队列work_queue来存放需要处理的参数,然后启动了指定数量THREAD_POOL_SIZE的线程。这些线程都执行worker函数,参数都是work_queue

worker() 函数的主体是一个 while 循环,直到队列为空时结束循环。
在每次迭代中,它尝试用 work_queue.get_nowait()以非阻塞方式获取新项目。如果队列已经为空,work_queue.get_nowait()将引发 Empty 异常,从而中断循环并结束。否则从队列中获取一个项目,调用fetch_rates(item) 并用 work_queue.task_done() 将该项目标记为已处理。当队列中的所有项目都已标记为完成时,主线程中的 work_queue.join() 函数将返回。

2.3.4 两个队列

线程函数中使用print,有时会出现混乱的输出。
下面我们使用一个额外的队列来收集结果,并在主线程中输出结果。

首先移除原来的print函数。

def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]


    rates[base] = 1.
    # 移除print
    return base, rates

def present_result(base, rates):
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

修改worker函数,用results_queue收集结果:

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            results_queue.put(fetch_rates(item)) # 将结果放入results_queue
            work_queue.task_done()

在main函数中打印结果:

def main():
    work_queue = Queue()
    results_queue = Queue()
    for base in BASES:
        work_queue.put(base)
    threads = [
        Thread(target=worker, args=(work_queue,results_queue))
        for _ in range(THREAD_POOL_SIZE)
        ]
    for thread in threads:
        thread.start()

    work_queue.join()
    while threads:
        threads.pop().join()

    # 打印结果
    while not results_queue.empty():
        present_result(*results_queue.get())

2.3.5 处理线程中的错误

我们的fetch_rates函数向网站发送请求时可能因为网络等原因出错,然后该线程会结束(但该任务没有完成)。主线程中的work_queue.join()会等待所有任务完成,从而程序被卡住。

我们通过在fetch_rates中添加一个随机报错模拟网络出错的情况:

import random
def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    # 随机引起一个报错
    if random.randint(0, 5) < 1:
        # simulate error by overriding status code
        response.status_code = 500
    response.raise_for_status()
    rates = response.json()["rates"]


    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates

如果出现了错误(异常),程序将抛出异常,然后卡住。

因此我们需要在worker中添加异常处理。
当发生异常时,程序将异常存入results_queue中;如果没有异常,则存放正常的结果;并且总是该标记任务完成。

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        # 处理错误
        try:
            result = fetch_rates(item)
        except Exception as err:
            results_queue.put(err)
        else:
            results_queue.put(result)
        finally:
            work_queue.task_done()

在main函数中:

    # 打印结果
    while not results_queue.empty():
        result = results_queue.get()
        if isinstance(result, Exception):
            raise result
        present_result(*result)

程序遇到错误时,不会再卡住,在最后的打印时会抛出(raise)错误。

2.3.6 Throttling(节流)

过快的请求可能导致网站负载过大,从而封禁我们的IP。
因此我们需要控制请求的速度
我们将使用的算法有时称为令牌桶(token bucket),非常简单。它包括以下功能:
• 有一个包含预定义数量令牌的存储桶
• 每个令牌对应于处理一项工作的单个权限
• 每次工作人员请求一个或多个令牌(权限)时,我们都会执行以下操作:

1. 我们检查自上次重新装满桶以来已经过去了多长**时间** 
2. 如果时间差允许,我们将与时间差相对应的令牌数量重新装满桶 
3. 如果存储的数量令牌大于或等于请求的数量,我们减少存储的令牌数量并返回该值 
4. 如果存储的令牌数量小于请求的数量,我们返回零

两件重要的事情是
1.始终用零令牌初始化令牌桶(?)
2.并且永远不要让它溢出。

from threading import Lock 
import time

class Throttle: 
    def __init__(self, rate): 
        self._consume_lock = Lock() # 使用锁避免冲突
        self.rate = rate # 速率,rate越大则允许的请求间隔越小
        self.tokens = 0
        self.last = None

    def consume(self, amount=1): 
        with self._consume_lock: 
            now = time.time() 
            #初始化上次时间
            if self.last is None: 
                self.last = now 
            elapsed = now - self.last 
            # 间隔时间足够,增加令牌
            if elapsed * self.rate > 1: 
                self.tokens += elapsed * self.rate
                self.last = now 
            # 避免桶溢出
            self.tokens = min(self.rate, self.tokens)
            # 如果令牌足够,则发给请求的进程
            if self.tokens >= amount: 
                self.tokens -= amount
                return amount

            return 0

这个类的用法非常简单。
我们只需在主线程中创建一个 Throttle 实例(例如 Throttle(10)rate=10,允许每1/10秒发送一个请求,rate越大则允许的请求速度越快),并将其作为参数传递给每个工作线程:

    throttle = Throttle(10)
    ...
    threads = [
        Thread(target=worker, 
               args=(work_queue, results_queue, throttle)
               )
        for _ in range(THREAD_POOL_SIZE)
        ]

worker中,需要消耗throttle

def worker(work_queue, results_queue, throttle):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break

        # 尝试获取和消耗令牌
        while not throttle.consume():
            time.sleep(.1)

        # 处理错误
        ...

如果没有足够的令牌,则该线程会等待直到能够获取令牌时继续执行。

注:本节代码来自Expert Python Programming 6.3

2.4 PyQt 多线程

2.4.1 卡住的计时器

我们定义了一个计时器,每秒钟更新一次显示的数字。此外我们定义了一个耗时5秒的任务oh_no,和按钮“危险”绑定。

当我们点击“危险”按钮时,程序去执行oh_no,导致显示停止更新了。
卡住

import sys
import time
from PyQt6.QtCore import QTimer
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.counter = 0
        layout = QVBoxLayout()
        self.l = QLabel("Start")
        b = QPushButton("DANGER!")
        b.pressed.connect(self.oh_no)
        layout.addWidget(self.l)
        layout.addWidget(b)
        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)
        self.show()

        # 定时器,每1秒更新一次文本
        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def oh_no(self):
        time.sleep(5)
    def recurring_timer(self):
        self.counter += 1
        self.l.setText("Counter: %d" % self.counter)

app = QApplication(sys.argv)
window = MainWindow()
app.exec()

QT提供了线程的接口,主要通过两个类实现
QRunnable: 工作的容器
QThreadPool:线程池

继承QRunnable并实现run方法:

class Worker(QRunnable):
    """
    Worker thread
    """
    @pyqtSlot()
    def run(self):
        """
        Your code goes in this function
        """
        print("Thread start")
        time.sleep(5)
        print("Thread complete")

创建线程池:

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.threadpool = QThreadPool()
        print(
        "Multithreading with maximum %d threads" % self.
        threadpool.maxThreadCount()
        )

使用线程池启动任务:

def oh_no(self):
    worker = Worker()
    self.threadpool.start(worker)

使用线程后,当我们点击危险时会启动额外的线程去执行任务,不会阻塞Qt的显示。
不卡了

2.4.2 进度条

当我们执行一个耗时的任务时,常见的做法是添加一个进度条来让用户了解任务的进度。

为此,我们需要在任务中发送进度信息,然后在Qt窗口中更新进度。
进度条

1.导入相关库

import sys
import time
from PyQt6.QtCore import QObject, QRunnable, QThreadPool, QTimer,\
                         pyqtSignal, pyqtSlot
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QProgressBar,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

2.在任务中使用信号量发送进度

# 信号量,用于表示进度
class WorkerSignals(QObject):
    progress = pyqtSignal(int)

class Worker(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        total_n = 1000
        for n in range(total_n):
            progress_pc = int(100 * float(n + 1) / total_n) #Progress 0-100% as int
            self.signals.progress.emit(progress_pc) # 通过信号发送当前进度值
            time.sleep(0.01)

3.在窗口中接收信号,并在进度条中显示

class MainWindow(QMainWindow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        layout = QVBoxLayout()
        self.progressbar = QProgressBar() # 进度条
        button = QPushButton("启动")
        button.pressed.connect(self.execute)

        layout.addWidget(self.progressbar)
        layout.addWidget(button)
        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)
        self.show()

        self.threadpool = QThreadPool()
        print(
            "Multithreading with maximum %d threads" % self.
            threadpool.maxThreadCount()
        )
    def execute(self):
        worker = Worker()
        # 和update_progress连接,
        worker.signals.progress.connect(self.update_progress)
        # Execute
        self.threadpool.start(worker)
    # 接收progress信号,并显示
    def update_progress(self, progress_value):
        self.progressbar.setValue(progress_value)

2.5 小结

在GIL的限制下,Python线程对于并行计算没有用处,但是对于等待(IO、网络、后台任务)是有用处的。
我们看到了线程的一些实例,例如网络请求,进度条。

3. 竞争和锁

由于共享内存,多线程容易遇到竞争问题:两个内存对同一个变量进行修改可能导致意想不到的问题。

看一个计数的例子:我们创建了一个全局变量thread_visits,在visit_counter()中修改这个变量值。

from threading import Thread
thread_visits = 0
def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        thread_visits +=  1 #  thread_visits = thread_visits + 1

if __name__ == "__main__":
    thread_count = 100
    threads = [
        Thread(target=visit_counter)
        for _ in range(thread_count)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"thread_count={thread_count}, thread_visits={thread_visits}")

执行结果:

第1次 :thread_count=100, thread_visits=7227793
第2次 :thread_count=100, thread_visits=9544020
第3次 :thread_count=100, thread_visits=9851811

执行该程序会发现每次运行thread_visits的值都不一样
因为在 thread_visits 变量上的读取和写入操作之间有一段时间,另一个线程可以介入并操作结果。这导致了竞争。
在这里插入图片描述

(线程1和线程2对变量thread_visits的竞争。两个线程都对thread_visits执行了+1的操作,但最后thread_visits的是1,而不是2。)

thread_visits += 1 实际包含读写两个操作,它等价于
thread_visits = thread_visits + 1,先读取thread_visits的值并+1,再写入到thread_visits

正确方法是使用保证一次只有一个线程可以处理单个代码块
在这里插入图片描述

from threading import Thread
from threading import Lock

thread_visits = 0
thread_visits_lock = Lock()

def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        with thread_visits_lock:
            thread_visits +=  1 #  thread_visits = thread_visits + 1

运行结果:

thread_count=100, thread_visits=10000000

这次我们得到了正确的结果,但花费了接近一分钟的时间。因为受保护的块不能并行运行。此外,获取和释放锁是需要一些额外操作。

将锁放在外面的时候,会发现花费的时间减少了很多。因为减少了获取和释放锁的消耗。

    with thread_visits_lock:
        for _ in range(100_000):  
            thread_visits +=  1

4. 进程

4.1 使用多进程

multiprocessing模块提供了使用进程的方法,使用起来和线程threading模块非常类似。
multiprocessing模块包含一个与Thread类非常相似的Process类。
你可以将Thread替换成Process

import itertools
import time
# 从multiprocessing导入
from multiprocessing import Process, Event
from multiprocessing import synchronize 

# 旋转
def spin(msg: str, done: synchronize.Event ) -> None: 
    for char in itertools.cycle(r'\|/-'): 
        status = f'\r{char} {msg}' 
        print(status, end='', flush=True)
        if done.wait(.1): 
            break
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

# 阻塞3秒,并返回42
def slow() -> int:
    time.sleep(3) 
    return 42

def supervisor() -> int: 
    done = Event() 
    # Thread 替换成Process
    spinner = Process(target=spin, args=('thinking!', done)) 
    print(f'spinner object: {spinner}') 
    spinner.start() 
    result = slow() 
    done.set() 
    spinner.join() 
    return result

def main() -> None:
    result = supervisor() 
    print(f'Answer: {result}')

if __name__ == '__main__':
    main()

同样,我们用进程改写2.1.2 中的并行计算。

import time
from multiprocessing import Process
# 计算number的因子
def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i
numbers = [2139079, 1214759, 1516637, 1852285, 14256346, 12456533]
def get_factor(number):
    factors = list(factorize(number))
    return factors

if __name__ == '__main__':
    start = time.time()
    processes = []
    for number in numbers:
        process = Process(target=get_factor, args=(number,))
        process.start() # 启动
        processes.append(process)


    for process in processes:
        process.join() # 等待完成

    end = time.time()
    delta = end - start
    print(f'Process {delta:.3f} 秒')

我们发现修改为进程后,计算耗费时间减少了一些:

(之前的)顺序执行花费 2.478 秒

Process 1.744 秒

由于进程启动和通信需要耗费一定时间,所以并不明显。
如果把numbers中的数字加大,时间减少的会更明显:

numbers = [4139079, 2214759, 4516637, 6852285, 44256346, 62456533]

运行结果:

顺序执行花费 11.079 秒

Process 6.870 秒

multiprocessing还提供了进程池Pool,可以方便地处理一系列输入:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

5. 异步

5.1 异步编程

Python3.4后新增了asyncio模块,支持异步编程。
异步是在一个线程中通过任务切换的方式让多个任务”同时“进展。asyncio不涉及线程/进程切换,减少了线程/进程创建、上下文切换的开销,更轻量级。asyncio的核心是事件循环,不断监听/执行队列中的任务。
事件循环

由于asyncio是在一个线程中通过任务切换的方式执行多任务,所以这些任务需要是非阻塞的。如果某个任务是阻塞的,比如常规的sleep函数、数值计算等,那么这个任务会占据线程,让其它任务没有机会执行。

5.2 async和await

在函数定义的def关键字之前加上async,就可以定义一个协程

async def async_hello(): 
    print("hello, world!") 
    await asyncio.sleep(1) # 异步的睡眠任务。如果用常规的time.sleep()会阻塞程序。
    print("1秒钟过去了...")

async 关键字定义的函数很特殊。调用时,它们不会执行内部代码,而是返回一个协程对象(coroutine object)。

In [2]: async_hello()
Out[2]: <coroutine object async_hello at 0x0000012904713CC8>

await在异步任务启动之后,暂停当前 async 函数的执行,把执行权交给其他任务。等到异步任务结束,再把执行权交回 async 函数,继续往下执行。
在上面这个async_hello()的例子中,当执行到await asyncio.sleep(1)时,会启动任务asyncio.sleep(1),并交出执行权,让其他任务执行。1秒后,任务asyncio.sleep(1)完成了,会继续执行async_hello()的下一行print("1秒钟过去了...")

事件循环中安排其执行之前,协程对象不会执行任何操作。 下面我们来执行这个协程。

import asyncio
async def async_hello(): 
    print("hello, world!") 
    await asyncio.sleep(1)
    print("1秒钟过去了...") 



# 1.获取事件循环
loop = asyncio.get_event_loop()
# 2.执行协程
loop.run_until_complete(async_hello())
# 3.关闭事件循环
loop.close()

# 上面三步等价于:
asyncio.run(async_hello()) # python3.7新增asyncio.run()执行协程

5.3 执行多个任务/协程

如果您有多个任务或协程等待,可以使用 asyncio.gather() 将它们聚合到一个对象中。

import asyncio 
import random
async def print_number(number):
    await asyncio.sleep(random.random())
    print(number) 
    return number

async def main():
    results = await asyncio.gather(*[ 
            print_number(number) 
                for number in range(10) 
        ]) 
    print("results=", results)

asyncio.run(main())

运行结果:

6 8 9 5 0 7 3 4 1 2
results= [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

asyncio.gather() 用于收集多个协程以并发执行它们。结果是一个对象,表示运行所有提供的协程的future结果。

5.4 异步编程的实例

网络IO是一个合适用异步编程处理的任务,可惜requests库没有提供异步请求的方法,不过aiohttp提供了异步 HTTP方法 。

import asyncio
import time
import aiohttp

async def get_rates(session: aiohttp.ClientSession, base: str):
    async with session.get(
        f"https://api.vatcomply.com/rates?base={base}"
    ) as response:
        rates = (await response.json())['rates']
        rates[base] = 1.
        return base, rates

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def present_result(base, rates):
    rates_line = ", ".join(
    [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

async def main():
    async with aiohttp.ClientSession() as session:
        for result in await asyncio.gather(*[
           get_rates(session, base) for base in BASES]):
            present_result(*result)

if __name__ == "__main__":
    started = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))
1 USD =     1.0 USD,   0.916 EUR,    3.98 PLN,    10.4 NOK,    22.5 CZK
1 EUR =    1.09 USD,     1.0 EUR,    4.34 PLN,    11.3 NOK,    24.5 CZK
1 PLN =   0.251 USD,    0.23 EUR,     1.0 PLN,    2.61 NOK,    5.65 CZK
1 NOK =  0.0962 USD,  0.0881 EUR,   0.383 PLN,     1.0 NOK,    2.16 CZK
1 CZK =  0.0445 USD,  0.0407 EUR,   0.177 PLN,   0.462 NOK,     1.0 CZK

time elapsed: 1.05s

参考

  • 《流畅的Python》(第二版)第19章 Python并发模型
  • 《Effective Python》(第二版) 第53章
  • 一文看懂Python协程
  • Python 异步编程入门
相关文章
|
9天前
|
安全 数据安全/隐私保护 数据中心
Python并发编程大挑战:线程安全VS进程隔离,你的选择影响深远!
【7月更文挑战第9天】Python并发:线程共享内存,高效但需处理线程安全(GIL限制并发),适合IO密集型;进程独立内存,安全但通信复杂,适合CPU密集型。使用`threading.Lock`保证线程安全,`multiprocessing.Queue`实现进程间通信。选择取决于任务性质和性能需求。
20 1
|
7天前
|
数据采集 数据库 Python
Python并发编程新篇章:asyncio库使用全攻略,轻松驾驭异步世界!
【7月更文挑战第11天】Python的asyncio开启异步编程时代,通过案例展示如何用它和aiohttp构建并发爬虫。安装aiohttp后,定义异步函数`fetch`进行HTTP请求,返回状态码和内容长度。在`main`中,并发执行多个`fetch`任务,利用`asyncio.gather`收集结果。使用`async with`管理HTTP会话资源,确保释放。通过这种方式,爬虫性能大幅提升,适用于高并发场景。学习asyncio是提升并发性能的关键。
31 14
|
1天前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
【7月更文挑战第17天】Python并发编程中,异步编程(如`asyncio`)在IO密集型任务中提高效率,利用等待时间执行其他任务。但对CPU密集型任务,由于GIL限制,多线程效率不高,此时应选用`multiprocessing`进行多进程并行计算以突破限制。选择合适的并发策略是关键:异步适合IO,多进程适合CPU。理解这些能帮助构建高效并发程序。
13 6
|
22小时前
|
开发框架 并行计算 .NET
从菜鸟到大神:Python并发编程深度剖析,IO与CPU的异步战争!
【7月更文挑战第18天】Python并发涉及多线程、多进程和异步IO(asyncio)。异步IO适合IO密集型任务,如并发HTTP请求,能避免等待提高效率。多进程在CPU密集型任务中更优,因可绕过GIL限制实现并行计算。通过正确选择并发策略,开发者能提升应用性能和响应速度。
|
3天前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
【7月更文挑战第15天】Python 3.5+引入的协程和异步函数革新了并发编程。协程,轻量级线程,由程序控制切换,降低开销。异步函数是协程的高级形式,允许等待异步操作。通过`asyncio`库,如示例所示,能并发执行任务,提高I/O密集型任务效率,实现并发而非并行,优化CPU利用率。理解和掌握这些工具对于构建高效网络应用至关重要。
16 6
|
21小时前
|
UED 开发者 Python
Python并发编程新纪元:异步编程如何重塑IO与CPU密集型任务的处理方式?
【7月更文挑战第18天】Python异步编程提升IO任务效率,非阻塞模式减少等待时间,优化用户体验。asyncio库与await关键字助力编写非阻塞代码,示例展示异步HTTP请求。CPU密集型任务中,异步编程结合多进程可提升效率。异步编程挑战包括代码复杂性,解决策略包括使用类型提示、异步框架及最佳实践。异步编程重塑任务处理方式,成为现代Python开发的关键。
8 2
|
2天前
|
数据采集 并行计算 数据处理
工具人必看:Python并发编程工具箱大揭秘,IO与CPU密集型任务的最佳拍档!
【7月更文挑战第16天】Python并发编程助力IO密集型(asyncio+aiohttp,异步Web爬虫示例)和CPU密集型(multiprocessing,并行计算数组和)任务。asyncio利用单线程异步IO提升Web应用效率,multiprocessing通过多进程克服GIL限制,实现多核并行计算。善用这些工具,可优化不同场景下的程序性能。
8 1
|
3天前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
12 2
|
4天前
|
消息中间件 安全 数据处理
Python中的并发编程:理解多线程与多进程的区别与应用
在Python编程中,理解并发编程是提高程序性能和响应速度的关键。本文将深入探讨多线程和多进程的区别、适用场景及实际应用,帮助开发者更好地利用Python进行并发编程。
|
10天前
|
数据处理 调度 Python
Python并发编程实战指南:深入理解线程(threading)与进程(multiprocessing)的奥秘,打造高效并发应用!
【7月更文挑战第8天】Python并发编程探索:使用`threading`模块创建线程处理任务,虽受限于GIL,适合I/O密集型工作。而`multiprocessing`模块通过进程实现多核利用,适用于CPU密集型任务。通过实例展示了线程和进程的创建与同步,强调了根据任务类型选择合适并发模型的重要性。

相关实验场景

更多