Python中的并发编程(4)多线程发送网络请求

简介: Python中的并发编程(4)多线程发送网络请求

多线程发送网络请求

我们使用https://www.vatcomply.com 来演示多线程发送网络请求。

该提供了汇率查询的API,我们可以像下面这样发送请求获取某种货币对其它货币的汇率。

import requests
response = requests.get("https://api.vatcomply.com/rates?base=USD")
print(response.json())

返回结果是一个json格式的文本,包含了base中查询的货币对其它货币的汇率:

{'date': '2023-12-07', 'base': 'USD', 'rates': {'EUR': 0.9284189026088572, 'USD': 1.0, 'JPY': 145.0004642094513, 'BGN': 1.8158016897224027, 'CZK': 22.612570791941327, ..., 'ZAR': 18.759260978553524}
}

下面我们比较不同方式发送多个请求的耗时。

注:本节代码来自Expert Python Programming 6.3

顺序执行

我们使用顺序执行的方式,发送5次请求:

import time
import requests

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
 
    rates[base] = 1.
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

def main():
    for base in BASES:
        fetch_rates(base)

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))

执行结果:

1 USD =     1.0 USD,   0.928 EUR,    4.02 PLN,    10.9 NOK,    22.6 CZK
1 EUR =    1.08 USD,     1.0 EUR,    4.33 PLN,    11.8 NOK,    24.4 CZK
1 PLN =   0.249 USD,   0.231 EUR,     1.0 PLN,    2.71 NOK,    5.62 CZK
1 NOK =  0.0916 USD,  0.0851 EUR,   0.369 PLN,     1.0 NOK,    2.07 CZK
1 CZK =  0.0442 USD,  0.0411 EUR,   0.178 PLN,   0.483 NOK,     1.0 CZK

time elapsed: 2.96s

顺序执行需要等待上一个请求返回后才能发起下一个请求,所以用时较长。

多线程

只需要在main函数中做一点修改,启动多个线程。

from threading import Thread
def main():
    threads = [] 
    for base in BASES:
        thread = Thread(target=fetch_rates, args=[base])
        thread.start()
        threads.append(thread)
    while threads:
        threads.pop().join()

执行结果:

1 PLN =   0.249 USD,   0.231 EUR,     1.0 PLN,    2.71 NOK,    5.62 CZK
1 NOK =  0.0916 USD,  0.0851 EUR,   0.369 PLN,     1.0 NOK,    2.07 CZK
1 EUR =    1.08 USD,     1.0 EUR,    4.33 PLN,    11.8 NOK,    24.4 CZK
1 USD =     1.0 USD,   0.928 EUR,    4.02 PLN,    10.9 NOK,    22.6 CZK
1 CZK =  0.0442 USD,  0.0411 EUR,   0.178 PLN,   0.483 NOK,     1.0 CZK

time elapsed: 0.62s

多线程的效果很好,极大地缩短了程序的耗时。因为我们连续发送了5个请求并等待结果,而不是像顺序执行中的发送一个请求后等待它返回结果后再发送下一个。

(同时我们也发现了:多线程导致任务完成的顺序改变了, 打印的结果和启动顺序’USD’, ‘EUR’, ‘PLN’, ‘NOK’, 'CZK’不同)

但上面的代码存在一些问题:

  • 没有限制线程的数量。过多的线程可能导致因请求过快而被网站封IP。
  • 线程函数中使用print,可能导致输出混乱。
  • 每个函数被委托给单独的线程,这使得控制输入处理的速率极其困难。

使用线程池

使用线程池创建指定数量的线程,这些线程将消耗队列中的工作项,直到队列变空。

线程池带来的好处:

  • 控制线程数量
  • 减少创建线程的开销。

注:这里我们用队列手动实现了线程池,但Python提供了封装好的 concurrent.futures.ThreadPoolExecutor

from queue import Empty

# 从work_queue中获取任务并执行
def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            fetch_rates(item)
            work_queue.task_done()


from threading import Thread
from queue import Queue

THREAD_POOL_SIZE = 4
def main():
  # work_queue是任务队列
    work_queue = Queue()
    for base in BASES:
        work_queue.put(base)
        
    # 创建指定数量个线程
    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
        ]
    for thread in threads:
        thread.start()

    work_queue.join()
    while threads:
        threads.pop().join()

在main函数中,我们创建了一个队列work_queue来存放需要处理的参数,然后启动了指定数量THREAD_POOL_SIZE的线程。这些线程都执行worker函数,参数都是work_queue。


worker() 函数的主体是一个 while 循环,直到队列为空时结束循环。

在每次迭代中,它尝试用 work_queue.get_nowait()以非阻塞方式获取新项目。如果队列已经为空,work_queue.get_nowait()将引发 Empty 异常,从而中断循环并结束。否则从队列中获取一个项目,调用fetch_rates(item) 并用 work_queue.task_done() 将该项目标记为已处理。当队列中的所有项目都已标记为完成时,主线程中的 work_queue.join() 函数将返回。

两个队列

线程函数中使用print,有时会出现混乱的输出。

下面我们使用一个额外的队列来收集结果,并在主线程中输出结果。

首先移除原来的print函数。

def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
 
    
    rates[base] = 1.
    # 移除print
    return base, rates

def present_result(base, rates):
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

修改worker函数,用results_queue收集结果:

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            results_queue.put(fetch_rates(item)) # 将结果放入results_queue
            work_queue.task_done()

在main函数中打印结果:

def main():
    work_queue = Queue()
    results_queue = Queue()
    for base in BASES:
        work_queue.put(base)
    threads = [
        Thread(target=worker, args=(work_queue,results_queue))
        for _ in range(THREAD_POOL_SIZE)
        ]
    for thread in threads:
        thread.start()

    work_queue.join()
    while threads:
        threads.pop().join()
    
    # 打印结果
    while not results_queue.empty():
        present_result(*results_queue.get())

处理线程中的错误

我们的fetch_rates函数向网站发送请求时可能因为网络等原因出错,然后该线程会结束(但该任务没有完成)。主线程中的work_queue.join()会等待所有任务完成,从而程序被卡住。

我们通过在fetch_rates中添加一个随机报错模拟网络出错的情况:

import random
def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    # 随机引起一个报错
    if random.randint(0, 5) < 1:
        # simulate error by overriding status code
        response.status_code = 500
    response.raise_for_status()
    rates = response.json()["rates"]
    

    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates

如果出现了错误(异常),程序将抛出异常,然后卡住。

因此我们需要在worker中添加异常处理。

当发生异常时,程序将异常存入results_queue中;如果没有异常,则存放正常的结果;并且总是该标记任务完成。

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        # 处理错误
        try:
            result = fetch_rates(item)
        except Exception as err:
            results_queue.put(err)
        else:
            results_queue.put(result)
        finally:
            work_queue.task_done()

在main函数中:

    # 打印结果
    while not results_queue.empty():
        result = results_queue.get()
        if isinstance(result, Exception):
            raise result
        present_result(*result)


程序遇到错误时,不会再卡住,在最后的打印时会抛出(raise)错误。

Throttling(节流)

过快的请求可能导致网站负载过大,从而封禁我们的IP。

因此我们需要控制请求的速度

我们将使用的算法有时称为令牌桶(token bucket),非常简单。它包括以下功能:

• 有一个包含预定义数量令牌的存储桶

• 每个令牌对应于处理一项工作的单个权限

• 每次工作人员请求一个或多个令牌(权限)时,我们都会执行以下操作:

1. 我们检查自上次重新装满桶以来已经过去了多长时间

2. 如果时间差允许,我们将与时间差相对应的令牌数量重新装满桶

3. 如果存储的数量令牌大于或等于请求的数量,我们减少存储的令牌数量并返回该值

4. 如果存储的令牌数量小于请求的数量,我们返回零

两件重要的事情是

1.始终用零令牌初始化令牌桶(?)

2.并且永远不要让它溢出。

from threading import Lock 
import time

class Throttle: 
    def __init__(self, rate): 
        self._consume_lock = Lock() # 使用锁避免冲突
        self.rate = rate # 速率,rate越大则允许的请求间隔越小
        self.tokens = 0
        self.last = None

    def consume(self, amount=1): 
        with self._consume_lock: 
            now = time.time() 
            #初始化上次时间
            if self.last is None: 
                self.last = now 
            elapsed = now - self.last 
            # 间隔时间足够,增加令牌
            if elapsed * self.rate > 1: 
                self.tokens += elapsed * self.rate
                self.last = now 
            # 避免桶溢出
            self.tokens = min(self.rate, self.tokens)
            # 如果令牌足够,则发给请求的进程
            if self.tokens >= amount: 
                self.tokens -= amount
                return amount

            return 0

这个类的用法非常简单。

我们只需在主线程中创建一个 Throttle 实例(例如 Throttle(10),rate=10,允许每1/10秒发送一个请求,rate越大则允许的请求速度越快),并将其作为参数传递给每个工作线程:

  throttle = Throttle(10)
  ...
  threads = [
        Thread(target=worker, 
               args=(work_queue, results_queue, throttle)
               )
        for _ in range(THREAD_POOL_SIZE)
        ]

在worker中,需要消耗throttle:

def worker(work_queue, results_queue, throttle):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
            
    # 尝试获取和消耗令牌
        while not throttle.consume():
            time.sleep(.1)

        # 处理错误
        ...
相关文章
|
28天前
|
Java 开发者 Kotlin
华为仓颉语言初识:并发编程之线程的基本使用
本文详细介绍了仓颉语言中线程的基本使用,包括线程创建(通过`spawn`关键字)、线程名称设置、线程执行控制(使用`get`方法阻塞主线程以获取子线程结果)以及线程取消(通过`cancel()`方法)。文章还指出仓颉线程与Java等语言的差异,例如默认不提供线程名称。掌握这些内容有助于开发者高效处理并发任务,提升程序性能。
93 2
|
5月前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
眼疾识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了4种常见的眼疾图像数据集(白内障、糖尿病性视网膜病变、青光眼和正常眼睛) 再使用通过搭建的算法模型对数据集进行训练得到一个识别精度较高的模型,然后保存为为本地h5格式文件。最后使用Django框架搭建了一个Web网页平台可视化操作界面,实现用户上传一张眼疾图片识别其名称。
384 5
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
|
24天前
|
调度 Python
探索Python高级并发与网络编程技术。
可以看出,Python的高级并发和网络编程极具挑战,却也饱含乐趣。探索这些技术,你将会发现:它们好比是Python世界的海洋,有穿越风暴的波涛,也有寂静深海的奇妙。开始旅途,探索无尽可能吧!
54 15
|
2月前
|
数据采集 存储 监控
Python 原生爬虫教程:网络爬虫的基本概念和认知
网络爬虫是一种自动抓取互联网信息的程序,广泛应用于搜索引擎、数据采集、新闻聚合和价格监控等领域。其工作流程包括 URL 调度、HTTP 请求、页面下载、解析、数据存储及新 URL 发现。Python 因其丰富的库(如 requests、BeautifulSoup、Scrapy)和简洁语法成为爬虫开发的首选语言。然而,在使用爬虫时需注意法律与道德问题,例如遵守 robots.txt 规则、控制请求频率以及合法使用数据,以确保爬虫技术健康有序发展。
271 31
|
2月前
|
存储 监控 算法
基于 Python 哈希表算法的局域网网络监控工具:实现高效数据管理的核心技术
在当下数字化办公的环境中,局域网网络监控工具已成为保障企业网络安全、确保其高效运行的核心手段。此类工具通过对网络数据的收集、分析与管理,赋予企业实时洞察网络活动的能力。而在其运行机制背后,数据结构与算法发挥着关键作用。本文聚焦于 PHP 语言中的哈希表算法,深入探究其在局域网网络监控工具中的应用方式及所具备的优势。
88 7
|
2月前
|
存储 数据库 Python
利用Python获取网络数据的技巧
抓起你的Python魔杖,我们一起进入了网络之海,捕捉那些悠游在网络中的数据鱼,想一想不同的网络资源,是不是都像数不尽的海洋生物,我们要做的,就是像一个优秀的渔民一样,找到他们,把它们捕获,然后用他们制作出种种美味。 **1. 打开魔法之门:请求包** 要抓鱼,首先需要一个鱼网。在Python的世界里,我们就是通过所谓的“请求包”来发送“抓鱼”的请求。requests是Python中常用的发送HTTP请求的库,用它可以方便地与网络上的资源进行交互。所谓的GET,POST,DELETE,还有PUT,这些听起来像偶像歌曲一样的单词,其实就是我们鱼网的不同方式。 简单用法如下: ``` im
81 14
|
3月前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的【害虫识别】系统~卷积神经网络+TensorFlow+图像识别+人工智能
害虫识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了12种常见的害虫种类数据集【"蚂蚁(ants)", "蜜蜂(bees)", "甲虫(beetle)", "毛虫(catterpillar)", "蚯蚓(earthworms)", "蜚蠊(earwig)", "蚱蜢(grasshopper)", "飞蛾(moth)", "鼻涕虫(slug)", "蜗牛(snail)", "黄蜂(wasp)", "象鼻虫(weevil)"】 再使用通过搭建的算法模型对数据集进行训练得到一个识别精度较高的模型,然后保存为为本地h5格式文件。最后使用Djan
250 1
基于Python深度学习的【害虫识别】系统~卷积神经网络+TensorFlow+图像识别+人工智能
|
3月前
|
存储 网络协议 安全
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
147 23
|
4月前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的【蘑菇识别】系统~卷积神经网络+TensorFlow+图像识别+人工智能
蘑菇识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了9种常见的蘑菇种类数据集【"香菇(Agaricus)", "毒鹅膏菌(Amanita)", "牛肝菌(Boletus)", "网状菌(Cortinarius)", "毒镰孢(Entoloma)", "湿孢菌(Hygrocybe)", "乳菇(Lactarius)", "红菇(Russula)", "松茸(Suillus)"】 再使用通过搭建的算法模型对数据集进行训练得到一个识别精度较高的模型,然后保存为为本地h5格式文件。最后使用Django框架搭建了一个Web网页平台可视化操作界面,
280 11
基于Python深度学习的【蘑菇识别】系统~卷积神经网络+TensorFlow+图像识别+人工智能
|
5月前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
443 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析

推荐镜像

更多