Python中的并发编程(3)线程池、锁

简介: Python中的并发编程(3)线程池、锁

concurrent.futures 提供的线程池

concurrent.futures模块提供了线程池和进程池简化了多线程/进程操作。

线程池原理是用一个任务队列让多个线程从中获取任务执行,然后返回结果。

常见的用法是创建线程池,提交任务,等待完成并获取结果

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  futures = [executor.submit(count, item) for item in number_list] # count是一个函数,item是其参数
  for future in concurrent.futures.as_completed(futures):
    print(future.result())

concurrent.futures.ThreadPoolExecutor(max_workers=5)创建了一个线程池,max_workers指定了线程数量上限。通过线程池可以创建和执行任务。

concurrent.futures使用Future类表示(未来的)任务。调用.submit()时会创建并执行一个任务(Future)。

.as_completed(futures)是一个迭代器,当futures中有任务完成时会产出该future.

Python最广为使用的并发处理库futures使用入门与内部原理 - 知乎 (zhihu.com)对这个过程做了比较好的说明:

主线程是通过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并立即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。

python-parallel-programming-cookbook-cn 1.0 文档 中的一个例子对使用顺序执行、线程池进程池三种方式进行计算的时间进行了比较:

import concurrent.futures
import time


# 一个耗时的计算
def count(number) :
    for i in range(0, 10000000):
        i=i+1
    return i * number

if __name__ == "__main__":
    number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 顺序执行
    start_time = time.time()
    for item in number_list:
        print(count(item))
    print("Sequential execution in " + str(time.time() - start_time), "seconds")
    # 线程池
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in " + str(time.time() - start_time_1), "seconds")
    
    # 进程池
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(count, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Process pool execution in " + str(time.time() - start_time_2), "seconds")

结果为:

Sequential execution in 7.095552206039429 seconds
Thread pool execution in 7.140377998352051 seconds
Process pool execution in 4.240718126296997 seconds

竞争和锁

由于共享内存,多线程程序容易遇到竞争问题:两个内存对同一个变量进行修改可能导致意想不到的问题。

看下面这个计数的例子:

我们创建了一个全局变量thread_visits,在visit_counter()中修改这个变量值。

from threading import Thread
thread_visits = 0
def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        thread_visits +=  1 #  thread_visits = thread_visits + 1

if __name__ == "__main__":
    thread_count = 100
    threads = [
        Thread(target=visit_counter)
        for _ in range(thread_count)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"thread_count={thread_count}, thread_visits={thread_visits}")

执行结果:

第1次 :thread_count=100, thread_visits=7227793
第2次 :thread_count=100, thread_visits=9544020
第3次 :thread_count=100, thread_visits=9851811

执行该程序会发现每次运行thread_visits的值都不一样。

因为在 thread_visits 变量上的读取和写入操作之间有一段时间,另一个线程可以介入并操作结果。这导致了竞争。

(线程1和线程2对变量thread_visits的竞争。两个线程都对thread_visits执行了+1的操作,但最后thread_visits的是1,而不是2。)


thread_visits += 1 实际包含读写两个操作,它等价于

thread_visits = thread_visits + 1,先读取thread_visits的值并+1,再写入到thread_visits。


正确方法是使用锁保证一次只有一个线程可以处理单个代码块

from threading import Thread
from threading import Lock

thread_visits = 0
thread_visits_lock = Lock()

def visit_counter():
    global thread_visits
    for _ in range(100_000):  
        with thread_visits_lock:
            thread_visits +=  1 #  thread_visits = thread_visits + 1

运行结果:

thread_count=100, thread_visits=10000000
 

这次我们得到了正确的结果,但花费了接近一分钟的时间。因为受保护的块不能并行运行。此外,获取和释放锁是需要一些额外操作。

将锁放在外面的时候,会发现花费的时间减少了很多。因为减少了获取和释放锁的消耗。

with thread_visits_lock:
        for _ in range(100_000):  
            thread_visits +=  1

相关文章
|
2天前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
【7月更文挑战第17天】Python并发编程中,异步编程(如`asyncio`)在IO密集型任务中提高效率,利用等待时间执行其他任务。但对CPU密集型任务,由于GIL限制,多线程效率不高,此时应选用`multiprocessing`进行多进程并行计算以突破限制。选择合适的并发策略是关键:异步适合IO,多进程适合CPU。理解这些能帮助构建高效并发程序。
13 6
|
3天前
|
安全 Python
在Python中,实现多线程
【7月更文挑战第16天】在Python中,实现多线程
17 6
|
1天前
|
开发框架 并行计算 .NET
从菜鸟到大神:Python并发编程深度剖析,IO与CPU的异步战争!
【7月更文挑战第18天】Python并发涉及多线程、多进程和异步IO(asyncio)。异步IO适合IO密集型任务,如并发HTTP请求,能避免等待提高效率。多进程在CPU密集型任务中更优,因可绕过GIL限制实现并行计算。通过正确选择并发策略,开发者能提升应用性能和响应速度。
|
1天前
|
监控 安全 调度
在Python中,线程管理
【7月更文挑战第18天】在Python中,线程管理
6 3
|
4天前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
【7月更文挑战第15天】Python 3.5+引入的协程和异步函数革新了并发编程。协程,轻量级线程,由程序控制切换,降低开销。异步函数是协程的高级形式,允许等待异步操作。通过`asyncio`库,如示例所示,能并发执行任务,提高I/O密集型任务效率,实现并发而非并行,优化CPU利用率。理解和掌握这些工具对于构建高效网络应用至关重要。
17 6
|
1天前
|
UED 开发者 Python
Python并发编程新纪元:异步编程如何重塑IO与CPU密集型任务的处理方式?
【7月更文挑战第18天】Python异步编程提升IO任务效率,非阻塞模式减少等待时间,优化用户体验。asyncio库与await关键字助力编写非阻塞代码,示例展示异步HTTP请求。CPU密集型任务中,异步编程结合多进程可提升效率。异步编程挑战包括代码复杂性,解决策略包括使用类型提示、异步框架及最佳实践。异步编程重塑任务处理方式,成为现代Python开发的关键。
8 2
|
1天前
|
开发者 Python
Python线程
【7月更文挑战第18天】Python线程
5 2
|
1天前
|
Python
Python线程是操作系统能够进行运算的最小单位
【7月更文挑战第18天】Python线程是操作系统能够进行运算的最小单位
4 1
|
3天前
|
数据采集 并行计算 数据处理
工具人必看:Python并发编程工具箱大揭秘,IO与CPU密集型任务的最佳拍档!
【7月更文挑战第16天】Python并发编程助力IO密集型(asyncio+aiohttp,异步Web爬虫示例)和CPU密集型(multiprocessing,并行计算数组和)任务。asyncio利用单线程异步IO提升Web应用效率,multiprocessing通过多进程克服GIL限制,实现多核并行计算。善用这些工具,可优化不同场景下的程序性能。
9 1
|
4天前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
12 2