python 小知识 - 并发编程

简介: 并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。

1. 并发基础概念

可以说,任何一种高级语言都会涉及到并发编程;我们都能说出并发的最大好处:可以提高程序的执行的效率;

并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。

并发通常会涉及到两个概念:线程和进程;

线程和进程有什么区别?划重点

  • 首先,进程(Process)是操作系统中最小资源管理单元,一个进程拥有独立的代码、数据和内存空间,是应用程序启动的一个实例;你只要在系统中执行一个程序(比如py代码),操作系统都会分配相应的资源来执行,执行完成会回收资源, 如果进程使用资源威胁到整个操作系统(比如内存飙升到99%),操作系统会强杀它。也就是资源是收操作系统控制的。
  • 其次,进程下面允许有多个任务执行单元,这个任务执行单元就是线程(thread);同时线程有不同的状态(如创建,可运行,运行中,阻塞,死亡)。线程的状态控制也是由操作系统控制。
  • 当然还有一个协程的概念;进程和线程的资源控制是操作系统的分配,对于单核cpu来说,多个线程轮流的获取的cpu的使用权,达到并发的目的。协程的一个特点是资源是控制权在于设计者手上,协程在线程下面,也是并发编程的一个组成部分。

从上面的说明,我们可以总结下:

  • 进程下面有线程,线程下面有协程
  • 进程和进程之前的资源是独立的
  • 而同一个进程下的多个线程是共享资源,比如数据和内存空间等
  • 同时线程共享同一个资源,多个线程同时改变资源时,就会出现不一致性,这个时候考虑,独占资源

通常情况下,分布式计算都是多个进程的方式,就如上面总结的,进程之间资源是独立的,就会有一个共享资源的机制来负责进程之间数据的共享和传递(通常由master类的角色完成),这个在python多进程并行处理会涉及,区别是如何实现共享数据的处理。

我觉得,理解上面的基础概念是非常重要的,它们是并发编程的底层逻辑,是所有并发编程的基础。

2. 如何并发编程

用一个python的多线程的方式来说明如何并发编程?

from concurrent import futures
from time import sleep, strftime

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(3, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        
        
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
# [22:59:24][22:59:24]   download done file = 0.png 
# [22:59:24]   download done file = 2.png 
#    download done file = 1.png 
# [22:59:25][22:59:25][22:59:25]   download done file = 4.png 
#    download done file = 3.png 
#    download done file = 5.png 
# [22:59:26][22:59:26][22:59:26]     download done file = 8.png 
#    download done file = 6.png  download done file = 7.png 

# [22:59:27]   download done file = 9.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png

python 中的concurrent.futurest提供了ThreadPoolExecutor线程池来创建线程池

  • 通过map把每个任务分配给执行函数执行
  • 并返回结果
  • with来管理线程池的关闭

从上面例子,我们需要知道:

  • 并发编程的关键是编写一个任务的处理逻辑
  • 更重要的是如何将一个大任务拆分从小任务,比如前面例子中的 files列表大任务,每个列表中的值就是一个小任务。

并发编程的关键:拆分任务,发起线程或者进程执行小任务,合并小任务结果大任务返回结果(如果需要返回任务的情况)。

希望今天的分享对你有帮助。

接下来的部分,我们将来了解python并发的特性,以及线程和多进程开发,敬请期待。

3. python的并发编程特性

我们先来看另一个例子

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)

print('all cost is ', time.time()-ts)

# one task time is  3.5929086208343506
# one task time is  4.961361646652222
# one task time is  5.242505311965942
# one task time is  5.411738872528076
# one task time is  5.5458598136901855
# all cost is  5.8427581787109375

这个例子中,并发执行的任务时CPU计算型的。执行的结果和我们预期的并不一样,经测试单纯的执行download_resource需要1.2s的时间,预期的5个并发执行5个任务耗时应该小于3s吧。实际的结果却是串行执行的结果(6s)差别不大。

这是为什么?

这就要说道,python线程的伪并发性;python的线程在处理cpu计算型的任务时其实是单线程执行的。这个原因是python的C解释器是单线程执行的,在执行多线程任务时,C解释器都会给线程上一个全局解释锁GIL(global interpreter lock),使得多个线程轮流使用cpu时间。

为什么需要GIL?
单线程的好处是线程安全的,共享的数据不会出现异常的情况。python中的一切都是对象,python解释器负责管理这些对象,包括对象的销毁并自动回收内存;python解释器确切的说Cpython解释器会给每一个对象记录一个引用计数,每多一次引用,计数就会+1,反之则-1,如果引用计数为0,Cpython解释器会回收这些资源。

在这种机制下,多线程执行时会出现什么情况?两个线程A和B同时引用一个对象obj,这个时候obj的引用计数为2;A打算撤销对obj的引用,完成第一步时引用计数减去1时,这时发生了线程切换,A挂起等待,还没判断是否需要执行销毁对象操作。B进入运行状态,这个时候B也对obj撤销引用,并完成引用计数减1,销毁对象,这个时候obj的引用数为0,释放内存。如果此时A重新唤醒,单判断obj引用计数为0,开始销毁对象,可是这个时候已经没有对象了。 所以为了保证不出现数据污染,才引入GIL。

也就是多线程情况下,Cpython解释器让每一个线程不断的获得GIL锁和释放锁,保证每一次只有一个线程在执行。

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(10, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
print(time.time()-ts)

# [23:58:06]   download done file = 5.png 
# [23:58:06]   download done file = 2.png 
# [23:58:06]   download done file = 4.png 
# [23:58:06]   download done file = 9.png 
# [23:58:06]   download done file = 1.png 
# [23:58:06]   download done file = 3.png 
# [23:58:06]   download done file = 8.png 
# [23:58:06]   download done file = 6.png 
# [23:58:06]   download done file = 7.png 
# [23:58:06]   download done file = 0.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png
# all cost is  1.0396032333374023

看上面例子,确实是并发执行了。这个任务并不是cpu计算型的。为什么?

当线程获得GIL锁时,Cpython解释器为线程设置一个check_interavl,满足条件时线程就会释放锁。
条件时check_interavl是当前线程遇见IO操作或者ticks计数达到100。上面time.sleep也可以是IO操作,所以python比较适合于IO密集型的任务(比如爬虫下载网络数据)。

需要真正利用多核cpu达到真正的并行,就需要多进程来实现。

4. python多进程

上一小节中分享了python并发的特性,如果想真正利用现在多核的cpu进行并行计算就需要利用多进程。

就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;

python的多进程方式, 有:

  • 有官方提供的concurrentProcessPoolExecutor
  • multiprocessing
  • 第三方的任务处理队列库celery(常见的中间服务机制是redis和RabbitMQ)
  • 分布式计算框架Ray(以redis为任务管理)

4.1 ProcessPoolExecutor

from concurrent import futures
from time import sleep, strftime
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    
    result_list = []
    ret = []
    with futures.ProcessPoolExecutor(workers) as executor:
        for file in sorted(files):
            future_result = executor.submit(cpu_task, file)
            result_list.append(future_result)
        for future in result_list:
            res = future.result()
            ret.append(res)
    return ret
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)
    print(res)

    print('all cost is ', time.time()-ts)

# one task time is  0.503483772277832
# one task time is  0.503483772277832
# one task time is  0.5034773349761963
# one task time is  0.5095388889312744
# one task time is  0.5119566917419434
# ['0.png', '1.png', '2.png', '3.png', '4.png']
# all cost is  0.6866297721862793

从上面的执行结果可知,每个cpu_task在0.5s,5个任务执行完只要0.68s,真正的并行计算。

从并发编程步骤来看:

  • 任务分解:cpu_task每次处理一个file
  • 通过submit并行提交任务
  • 每个submit会返回一个future对象
  • 通过future对象result()来获取进行的返回值,需要注意的是result()方法在没有获得返回值时阻塞进程

4.2 multiprocessing

multiprocessing库也提供了关于python多线程开发的基本功能和进阶功能(如资源共享和同步锁)。

我们看上面ProcessPoolExecutor同样的例子multiprocessing是怎么实现的。

from multiprocessing import Pool
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    
    result_list = []

    for file in sorted(files):
        res = p.apply_async(func=cpu_task, args=(file,), callback=end_call)
        result_list.append(res)
    p.close()
    p.join()

    for ret in result_list:
        print(ret.get())
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)
    
# one task time is  0.44005799293518066
# end_call 1.png
# one task time is  0.45325803756713867
# end_call 3.png
# one task time is  0.497267484664917
# end_call 2.png
# one task time is  0.4813237190246582
# end_call 4.png
# one task time is  0.5435652732849121
# end_call 0.png
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# all cost is  0.7456285953521729

multiprocessing是通过Pool来构建进程池,并通过apply_async异步提交任务,apply_async的返回值类型是ApplyResult, 该对象的get方法可以获取任务的返回值;close表示线程池在接受这些任务之后不再接收其他任务,而join是表示等待线程池所有任务执行完。

多进程之间的进程共享数据,multiprocessing通过multiprocessing.Manager来进行进程间通信。

一个multiprocessing.Manager对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。

Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。

from multiprocessing import Pool, Manager
import time

def cpu_task(file, l):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    l.append(file)
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    with Manager() as manager:
        l = manager.list()
        for file in sorted(files):
            res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call)
        p.close()
        p.join()

        print(list(l))
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)

# one task time is  0.5007572174072266
# end_call 2.png
# one task time is  0.5093979835510254
# end_call 3.png
# one task time is  0.5186748504638672
# end_call 4.png
# one task time is  0.518937349319458
# end_call 0.png
# one task time is  0.5269668102264404
# end_call 1.png
# ['2.png', '3.png', '4.png', '0.png', '1.png']
# all cost is  0.7396669387817383
目录
相关文章
|
3月前
|
安全 Python
Python并发编程必备技能:掌握threading模块,让你的代码跑得更快!
【8月更文挑战第22天】Python并发编程采用多线程技术实现任务的同时执行。利用`threading`模块可轻松管理和创建线程。通过`Thread`类实例化线程并用`start()`方法启动。线程同步通过`Lock`确保资源访问互斥,或用`Semaphore`控制并发数量。线程间通信则可通过`Queue`安全传递数据,实现生产者-消费者模式等功能。这些工具有效避免了竞态条件,确保了程序的正确性和效率。
56 1
|
1月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
在Python的并发编程世界中,没有万能的解决方案,只有最适合特定场景的方法。希望本文能够为你拨开迷雾,找到那条通往高效并发编程的光明大道。
42 2
|
5天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
1月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
30 3
|
1月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
深入探索:Python中的并发编程新纪元——协程与异步函数解析
26 3
|
1月前
|
数据采集 数据处理 调度
探索Python的并发编程
本文深入探讨Python中的并发编程,包括线程、进程和异步I/O。通过实例展示如何有效利用这些工具提升程序性能,并讨论在应用中需注意的问题及最佳实践。
|
2月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
在Python异步编程领域,协程与异步函数成为处理并发任务的关键工具。协程(微线程)比操作系统线程更轻量级,通过`async def`定义并在遇到`await`表达式时暂停执行。异步函数利用`await`实现任务间的切换。事件循环作为异步编程的核心,负责调度任务;`asyncio`库提供了事件循环的管理。Future对象则优雅地处理异步结果。掌握这些概念,可使代码更高效、简洁且易于维护。
26 1
|
2月前
|
负载均衡 Java 调度
探索Python的并发编程:线程与进程的比较与应用
本文旨在深入探讨Python中的并发编程,重点比较线程与进程的异同、适用场景及实现方法。通过分析GIL对线程并发的影响,以及进程间通信的成本,我们将揭示何时选择线程或进程更为合理。同时,文章将提供实用的代码示例,帮助读者更好地理解并运用这些概念,以提升多任务处理的效率和性能。
60 3
|
2月前
|
Java Serverless Python
探索Python中的并发编程与`concurrent.futures`模块
探索Python中的并发编程与`concurrent.futures`模块
27 4
|
2月前
|
UED 开发者 Python
Python并发编程新纪元:异步编程如何重塑IO与CPU密集型任务的处理方式?
在Python编程中,异步编程作为一种非阻塞模式,通过允许程序在等待IO操作时继续执行其他任务,提高了程序的响应性和吞吐量。与传统同步编程相比,它减少了线程等待时间,尤其在处理IO密集型任务时表现出色,如使用`asyncio`库进行异步HTTP请求。尽管对CPU密集型任务的直接提升有限,但结合多进程或多线程可间接提高效率。异步编程虽强大,但也带来了代码复杂度增加和调试难度提升等挑战,需要开发者掌握最佳实践来克服这些问题。随着其技术的成熟,异步编程正在逐步改变我们处理IO与CPU密集型任务的方式,成为提升性能和优化用户体验的重要工具。
22 0