Python中的并发编程(3)线程池、锁

简介: Python中的并发编程(3)线程池、锁

concurrent.futures 提供的线程池

concurrent.futures模块提供了线程池和进程池简化了多线程/进程操作。

线程池原理是用一个任务队列让多个线程从中获取任务执行,然后返回结果。

常见的用法是创建线程池,提交任务,等待完成并获取结果

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  futures = [executor.submit(count, item) for item in number_list] # count是一个函数,item是其参数
  for future in concurrent.futures.as_completed(futures):
    print(future.result())

concurrent.futures.ThreadPoolExecutor(max_workers=5)创建了一个线程池,max_workers指定了线程数量上限。通过线程池可以创建和执行任务。

concurrent.futures使用Future类表示(未来的)任务。调用.submit()时会创建并执行一个任务(Future)。

.as_completed(futures)是一个迭代器,当futures中有任务完成时会产出该future.

Python最广为使用的并发处理库futures使用入门与内部原理 - 知乎 (zhihu.com)对这个过程做了比较好的说明:

主线程是通过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并立即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。

python-parallel-programming-cookbook-cn 1.0 文档 中的一个例子对使用顺序执行、线程池进程池三种方式进行计算的时间进行了比较:

import concurrent.futures
import time


# 一个耗时的计算
def count(number) :
    for i in range(0, 10000000):
        i=i+1
    return i * number

if __name__ == "__main__":
    number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 顺序执行
    start_time = time.time()
    for item in number_list:
        print(count(item))
    print("Sequential execution in " + str(time.time() - start_time), "seconds")
    # 线程池
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in " + str(time.time() - start_time_1), "seconds")
    
    # 进程池
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Process pool execution in " + str(time.time() - start_time_2), "seconds")

结果为:

Sequential execution in 7.095552206039429 seconds
Thread pool execution in 7.140377998352051 seconds
Process pool execution in 4.240718126296997 seconds

竞争和锁

由于共享内存,多线程程序容易遇到竞争问题:两个内存对同一个变量进行修改可能导致意想不到的问题。

看下面这个计数的例子:

我们创建了一个全局变量thread_visits,在visit_counter()中修改这个变量值。

from threading import Thread
thread_visits = 0
def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        thread_visits +=  1 #  thread_visits = thread_visits + 1

if __name__ == "__main__":
    thread_count = 100
    threads = [
        Thread(target=visit_counter)
        for _ in range(thread_count)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"thread_count={thread_count}, thread_visits={thread_visits}")

执行结果:

第1次 :thread_count=100, thread_visits=7227793
第2次 :thread_count=100, thread_visits=9544020
第3次 :thread_count=100, thread_visits=9851811

执行该程序会发现每次运行thread_visits的值都不一样。

因为在 thread_visits 变量上的读取和写入操作之间有一段时间,另一个线程可以介入并操作结果。这导致了竞争。

(线程1和线程2对变量thread_visits的竞争。两个线程都对thread_visits执行了+1的操作,但最后thread_visits的是1,而不是2。)


thread_visits += 1 实际包含读写两个操作,它等价于

thread_visits = thread_visits + 1,先读取thread_visits的值并+1,再写入到thread_visits。


正确方法是使用锁保证一次只有一个线程可以处理单个代码块

from threading import Thread
from threading import Lock

thread_visits = 0
thread_visits_lock = Lock()

def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        with thread_visits_lock:
            thread_visits +=  1 #  thread_visits = thread_visits + 1

运行结果:

thread_count=100, thread_visits=10000000
 

这次我们得到了正确的结果,但花费了接近一分钟的时间。因为受保护的块不能并行运行。此外,获取和释放锁是需要一些额外操作。

将锁放在外面的时候,会发现花费的时间减少了很多。因为减少了获取和释放锁的消耗。

with thread_visits_lock:
        for _ in range(100_000):  
            thread_visits +=  1

相关文章
|
11天前
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
|
11天前
|
Python
Python 高级编程与实战:深入理解面向对象与并发编程
本文深入探讨Python的高级特性,涵盖面向对象编程(继承、多态、特殊方法、类与实例属性)、异常处理(try-except、finally)和并发编程(多线程、多进程、异步编程)。通过实战项目如聊天服务器和异步文件下载器,帮助读者掌握这些技术,编写更复杂高效的Python程序。
|
16天前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
22 0
|
1月前
|
Python
python3多线程中使用线程睡眠
本文详细介绍了Python3多线程编程中使用线程睡眠的基本方法和应用场景。通过 `time.sleep()`函数,可以使线程暂停执行一段指定的时间,从而控制线程的执行节奏。通过实际示例演示了如何在多线程中使用线程睡眠来实现计数器和下载器功能。希望本文能帮助您更好地理解和应用Python多线程编程,提高程序的并发能力和执行效率。
54 20
|
2月前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
180 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
|
2月前
|
安全 Java 程序员
面试直击:并发编程三要素+线程安全全攻略!
并发编程三要素为原子性、可见性和有序性,确保多线程操作的一致性和安全性。Java 中通过 `synchronized`、`Lock`、`volatile`、原子类和线程安全集合等机制保障线程安全。掌握这些概念和工具,能有效解决并发问题,编写高效稳定的多线程程序。
100 11
|
2月前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
并发编程能够显著提升程序的效率和响应速度。例如,网络爬虫通过并发下载将耗时从1小时缩短至20分钟;APP页面加载时间从3秒优化到200毫秒。Python支持多线程、多进程、异步I/O和协程等并发编程方式,适用于不同场景。线程通信方式包括共享变量、消息传递和同步机制,如Lock、Queue等。Python的并发编程特性使其在处理大规模数据和高并发访问时表现出色,成为许多领域的首选语言。
|
3月前
|
Java 关系型数据库 MySQL
【JavaEE“多线程进阶”】——各种“锁”大总结
乐/悲观锁,轻/重量级锁,自旋锁,挂起等待锁,普通互斥锁,读写锁,公不公平锁,可不可重入锁,synchronized加锁三阶段过程,锁消除,锁粗化
|
4月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
488 6
|
4月前
|
设计模式 安全 Java
Java 多线程并发编程
Java多线程并发编程是指在Java程序中使用多个线程同时执行,以提高程序的运行效率和响应速度。通过合理管理和调度线程,可以充分利用多核处理器资源,实现高效的任务处理。本内容将介绍Java多线程的基础概念、实现方式及常见问题解决方法。
225 1

热门文章

最新文章