Python中的并发编程(3)线程池、锁

简介: Python中的并发编程(3)线程池、锁

concurrent.futures 提供的线程池

concurrent.futures模块提供了线程池和进程池简化了多线程/进程操作。

线程池原理是用一个任务队列让多个线程从中获取任务执行,然后返回结果。

常见的用法是创建线程池,提交任务,等待完成并获取结果

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  futures = [executor.submit(count, item) for item in number_list] # count是一个函数,item是其参数
  for future in concurrent.futures.as_completed(futures):
    print(future.result())

concurrent.futures.ThreadPoolExecutor(max_workers=5)创建了一个线程池,max_workers指定了线程数量上限。通过线程池可以创建和执行任务。

concurrent.futures使用Future类表示(未来的)任务。调用.submit()时会创建并执行一个任务(Future)。

.as_completed(futures)是一个迭代器,当futures中有任务完成时会产出该future.

Python最广为使用的并发处理库futures使用入门与内部原理 - 知乎 (zhihu.com)对这个过程做了比较好的说明:

主线程是通过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并立即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。

python-parallel-programming-cookbook-cn 1.0 文档 中的一个例子对使用顺序执行、线程池进程池三种方式进行计算的时间进行了比较:

import concurrent.futures
import time


# 一个耗时的计算
def count(number) :
    for i in range(0, 10000000):
        i=i+1
    return i * number

if __name__ == "__main__":
    number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 顺序执行
    start_time = time.time()
    for item in number_list:
        print(count(item))
    print("Sequential execution in " + str(time.time() - start_time), "seconds")
    # 线程池
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in " + str(time.time() - start_time_1), "seconds")
    
    # 进程池
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Process pool execution in " + str(time.time() - start_time_2), "seconds")

结果为:

Sequential execution in 7.095552206039429 seconds
Thread pool execution in 7.140377998352051 seconds
Process pool execution in 4.240718126296997 seconds

竞争和锁

由于共享内存,多线程程序容易遇到竞争问题:两个内存对同一个变量进行修改可能导致意想不到的问题。

看下面这个计数的例子:

我们创建了一个全局变量thread_visits,在visit_counter()中修改这个变量值。

from threading import Thread
thread_visits = 0
def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        thread_visits +=  1 #  thread_visits = thread_visits + 1

if __name__ == "__main__":
    thread_count = 100
    threads = [
        Thread(target=visit_counter)
        for _ in range(thread_count)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"thread_count={thread_count}, thread_visits={thread_visits}")

执行结果:

第1次 :thread_count=100, thread_visits=7227793
第2次 :thread_count=100, thread_visits=9544020
第3次 :thread_count=100, thread_visits=9851811

执行该程序会发现每次运行thread_visits的值都不一样。

因为在 thread_visits 变量上的读取和写入操作之间有一段时间,另一个线程可以介入并操作结果。这导致了竞争。

(线程1和线程2对变量thread_visits的竞争。两个线程都对thread_visits执行了+1的操作,但最后thread_visits的是1,而不是2。)


thread_visits += 1 实际包含读写两个操作,它等价于

thread_visits = thread_visits + 1,先读取thread_visits的值并+1,再写入到thread_visits。


正确方法是使用锁保证一次只有一个线程可以处理单个代码块

from threading import Thread
from threading import Lock

thread_visits = 0
thread_visits_lock = Lock()

def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        with thread_visits_lock:
            thread_visits +=  1 #  thread_visits = thread_visits + 1

运行结果:

thread_count=100, thread_visits=10000000
 

这次我们得到了正确的结果,但花费了接近一分钟的时间。因为受保护的块不能并行运行。此外,获取和释放锁是需要一些额外操作。

将锁放在外面的时候,会发现花费的时间减少了很多。因为减少了获取和释放锁的消耗。

with thread_visits_lock:
        for _ in range(100_000):  
            thread_visits +=  1

相关文章
|
5月前
|
数据采集 存储 JSON
Python爬取知乎评论:多线程与异步爬虫的性能优化
Python爬取知乎评论:多线程与异步爬虫的性能优化
|
5月前
|
人工智能 安全 调度
Python并发编程之线程同步详解
并发编程在Python中至关重要,线程同步确保多线程程序正确运行。本文详解线程同步机制,包括互斥锁、信号量、事件、条件变量和队列,探讨全局解释器锁(GIL)的影响及解决线程同步问题的最佳实践,如避免全局变量、使用线程安全数据结构、精细化锁的使用等。通过示例代码帮助开发者理解并提升多线程程序的性能与可靠性。
213 0
|
5月前
|
数据采集 NoSQL 调度
当生成器遇上异步IO:Python并发编程的十大实战兵法
本文通过十大实战场景,详解Python中生成器与异步IO的高效结合。从协程演进、背压控制到分布式锁、性能剖析,全面展示如何利用asyncio与生成器构建高并发应用,助你掌握非阻塞编程核心技巧,提升I/O密集型程序性能。
199 0
|
7月前
|
Java 开发者 Kotlin
华为仓颉语言初识:并发编程之线程的基本使用
本文详细介绍了仓颉语言中线程的基本使用,包括线程创建(通过`spawn`关键字)、线程名称设置、线程执行控制(使用`get`方法阻塞主线程以获取子线程结果)以及线程取消(通过`cancel()`方法)。文章还指出仓颉线程与Java等语言的差异,例如默认不提供线程名称。掌握这些内容有助于开发者高效处理并发任务,提升程序性能。
274 2
|
2月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
340 1
|
2月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
328 0
|
4月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
369 1
|
5月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
6月前
|
JSON 算法 Java
打造终端里的下载利器:Python实现可恢复式多线程下载器
在数字时代,大文件下载已成为日常需求。本文教你用Python打造专业级下载器,支持断点续传、多线程加速、速度限制等功能,显著提升终端下载体验。内容涵盖智能续传、多线程分块下载、限速控制及Rich库构建现代终端界面,助你从零构建高效下载工具。
443 1

热门文章

最新文章

推荐镜像

更多