python 小知识 - 并发编程

简介: 并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。

1. 并发基础概念

可以说,任何一种高级语言都会涉及到并发编程;我们都能说出并发的最大好处:可以提高程序的执行的效率;

并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。

并发通常会涉及到两个概念:线程和进程;

线程和进程有什么区别?划重点

  • 首先,进程(Process)是操作系统中最小资源管理单元,一个进程拥有独立的代码、数据和内存空间,是应用程序启动的一个实例;你只要在系统中执行一个程序(比如py代码),操作系统都会分配相应的资源来执行,执行完成会回收资源, 如果进程使用资源威胁到整个操作系统(比如内存飙升到99%),操作系统会强杀它。也就是资源是收操作系统控制的。
  • 其次,进程下面允许有多个任务执行单元,这个任务执行单元就是线程(thread);同时线程有不同的状态(如创建,可运行,运行中,阻塞,死亡)。线程的状态控制也是由操作系统控制。
  • 当然还有一个协程的概念;进程和线程的资源控制是操作系统的分配,对于单核cpu来说,多个线程轮流的获取的cpu的使用权,达到并发的目的。协程的一个特点是资源是控制权在于设计者手上,协程在线程下面,也是并发编程的一个组成部分。

从上面的说明,我们可以总结下:

  • 进程下面有线程,线程下面有协程
  • 进程和进程之前的资源是独立的
  • 而同一个进程下的多个线程是共享资源,比如数据和内存空间等
  • 同时线程共享同一个资源,多个线程同时改变资源时,就会出现不一致性,这个时候考虑,独占资源

通常情况下,分布式计算都是多个进程的方式,就如上面总结的,进程之间资源是独立的,就会有一个共享资源的机制来负责进程之间数据的共享和传递(通常由master类的角色完成),这个在python多进程并行处理会涉及,区别是如何实现共享数据的处理。

我觉得,理解上面的基础概念是非常重要的,它们是并发编程的底层逻辑,是所有并发编程的基础。

2. 如何并发编程

用一个python的多线程的方式来说明如何并发编程?

from concurrent import futures
from time import sleep, strftime

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(3, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        
        
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
# [22:59:24][22:59:24]   download done file = 0.png 
# [22:59:24]   download done file = 2.png 
#    download done file = 1.png 
# [22:59:25][22:59:25][22:59:25]   download done file = 4.png 
#    download done file = 3.png 
#    download done file = 5.png 
# [22:59:26][22:59:26][22:59:26]     download done file = 8.png 
#    download done file = 6.png  download done file = 7.png 

# [22:59:27]   download done file = 9.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png

python 中的concurrent.futurest提供了ThreadPoolExecutor线程池来创建线程池

  • 通过map把每个任务分配给执行函数执行
  • 并返回结果
  • with来管理线程池的关闭

从上面例子,我们需要知道:

  • 并发编程的关键是编写一个任务的处理逻辑
  • 更重要的是如何将一个大任务拆分从小任务,比如前面例子中的 files列表大任务,每个列表中的值就是一个小任务。

并发编程的关键:拆分任务,发起线程或者进程执行小任务,合并小任务结果大任务返回结果(如果需要返回任务的情况)。

希望今天的分享对你有帮助。

接下来的部分,我们将来了解python并发的特性,以及线程和多进程开发,敬请期待。

3. python的并发编程特性

我们先来看另一个例子

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)

print('all cost is ', time.time()-ts)

# one task time is  3.5929086208343506
# one task time is  4.961361646652222
# one task time is  5.242505311965942
# one task time is  5.411738872528076
# one task time is  5.5458598136901855
# all cost is  5.8427581787109375

这个例子中,并发执行的任务时CPU计算型的。执行的结果和我们预期的并不一样,经测试单纯的执行download_resource需要1.2s的时间,预期的5个并发执行5个任务耗时应该小于3s吧。实际的结果却是串行执行的结果(6s)差别不大。

这是为什么?

这就要说道,python线程的伪并发性;python的线程在处理cpu计算型的任务时其实是单线程执行的。这个原因是python的C解释器是单线程执行的,在执行多线程任务时,C解释器都会给线程上一个全局解释锁GIL(global interpreter lock),使得多个线程轮流使用cpu时间。

为什么需要GIL?
单线程的好处是线程安全的,共享的数据不会出现异常的情况。python中的一切都是对象,python解释器负责管理这些对象,包括对象的销毁并自动回收内存;python解释器确切的说Cpython解释器会给每一个对象记录一个引用计数,每多一次引用,计数就会+1,反之则-1,如果引用计数为0,Cpython解释器会回收这些资源。

在这种机制下,多线程执行时会出现什么情况?两个线程A和B同时引用一个对象obj,这个时候obj的引用计数为2;A打算撤销对obj的引用,完成第一步时引用计数减去1时,这时发生了线程切换,A挂起等待,还没判断是否需要执行销毁对象操作。B进入运行状态,这个时候B也对obj撤销引用,并完成引用计数减1,销毁对象,这个时候obj的引用数为0,释放内存。如果此时A重新唤醒,单判断obj引用计数为0,开始销毁对象,可是这个时候已经没有对象了。 所以为了保证不出现数据污染,才引入GIL。

也就是多线程情况下,Cpython解释器让每一个线程不断的获得GIL锁和释放锁,保证每一次只有一个线程在执行。

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(10, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
print(time.time()-ts)

# [23:58:06]   download done file = 5.png 
# [23:58:06]   download done file = 2.png 
# [23:58:06]   download done file = 4.png 
# [23:58:06]   download done file = 9.png 
# [23:58:06]   download done file = 1.png 
# [23:58:06]   download done file = 3.png 
# [23:58:06]   download done file = 8.png 
# [23:58:06]   download done file = 6.png 
# [23:58:06]   download done file = 7.png 
# [23:58:06]   download done file = 0.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png
# all cost is  1.0396032333374023

看上面例子,确实是并发执行了。这个任务并不是cpu计算型的。为什么?

当线程获得GIL锁时,Cpython解释器为线程设置一个check_interavl,满足条件时线程就会释放锁。
条件时check_interavl是当前线程遇见IO操作或者ticks计数达到100。上面time.sleep也可以是IO操作,所以python比较适合于IO密集型的任务(比如爬虫下载网络数据)。

需要真正利用多核cpu达到真正的并行,就需要多进程来实现。

4. python多进程

上一小节中分享了python并发的特性,如果想真正利用现在多核的cpu进行并行计算就需要利用多进程。

就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;

python的多进程方式, 有:

  • 有官方提供的concurrentProcessPoolExecutor
  • multiprocessing
  • 第三方的任务处理队列库celery(常见的中间服务机制是redis和RabbitMQ)
  • 分布式计算框架Ray(以redis为任务管理)

4.1 ProcessPoolExecutor

from concurrent import futures
from time import sleep, strftime
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    
    result_list = []
    ret = []
    with futures.ProcessPoolExecutor(workers) as executor:
        for file in sorted(files):
            future_result = executor.submit(cpu_task, file)
            result_list.append(future_result)
        for future in result_list:
            res = future.result()
            ret.append(res)
    return ret
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)
    print(res)

    print('all cost is ', time.time()-ts)

# one task time is  0.503483772277832
# one task time is  0.503483772277832
# one task time is  0.5034773349761963
# one task time is  0.5095388889312744
# one task time is  0.5119566917419434
# ['0.png', '1.png', '2.png', '3.png', '4.png']
# all cost is  0.6866297721862793

从上面的执行结果可知,每个cpu_task在0.5s,5个任务执行完只要0.68s,真正的并行计算。

从并发编程步骤来看:

  • 任务分解:cpu_task每次处理一个file
  • 通过submit并行提交任务
  • 每个submit会返回一个future对象
  • 通过future对象result()来获取进行的返回值,需要注意的是result()方法在没有获得返回值时阻塞进程

4.2 multiprocessing

multiprocessing库也提供了关于python多线程开发的基本功能和进阶功能(如资源共享和同步锁)。

我们看上面ProcessPoolExecutor同样的例子multiprocessing是怎么实现的。

from multiprocessing import Pool
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    
    result_list = []

    for file in sorted(files):
        res = p.apply_async(func=cpu_task, args=(file,), callback=end_call)
        result_list.append(res)
    p.close()
    p.join()

    for ret in result_list:
        print(ret.get())
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)
    
# one task time is  0.44005799293518066
# end_call 1.png
# one task time is  0.45325803756713867
# end_call 3.png
# one task time is  0.497267484664917
# end_call 2.png
# one task time is  0.4813237190246582
# end_call 4.png
# one task time is  0.5435652732849121
# end_call 0.png
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# all cost is  0.7456285953521729

multiprocessing是通过Pool来构建进程池,并通过apply_async异步提交任务,apply_async的返回值类型是ApplyResult, 该对象的get方法可以获取任务的返回值;close表示线程池在接受这些任务之后不再接收其他任务,而join是表示等待线程池所有任务执行完。

多进程之间的进程共享数据,multiprocessing通过multiprocessing.Manager来进行进程间通信。

一个multiprocessing.Manager对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。

Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。

from multiprocessing import Pool, Manager
import time

def cpu_task(file, l):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    l.append(file)
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    with Manager() as manager:
        l = manager.list()
        for file in sorted(files):
            res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call)
        p.close()
        p.join()

        print(list(l))
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)

# one task time is  0.5007572174072266
# end_call 2.png
# one task time is  0.5093979835510254
# end_call 3.png
# one task time is  0.5186748504638672
# end_call 4.png
# one task time is  0.518937349319458
# end_call 0.png
# one task time is  0.5269668102264404
# end_call 1.png
# ['2.png', '3.png', '4.png', '0.png', '1.png']
# all cost is  0.7396669387817383
目录
相关文章
|
安全 Python
Python并发编程必备技能:掌握threading模块,让你的代码跑得更快!
【8月更文挑战第22天】Python并发编程采用多线程技术实现任务的同时执行。利用`threading`模块可轻松管理和创建线程。通过`Thread`类实例化线程并用`start()`方法启动。线程同步通过`Lock`确保资源访问互斥,或用`Semaphore`控制并发数量。线程间通信则可通过`Queue`安全传递数据,实现生产者-消费者模式等功能。这些工具有效避免了竞态条件,确保了程序的正确性和效率。
165 1
|
2月前
|
人工智能 安全 调度
Python并发编程之线程同步详解
并发编程在Python中至关重要,线程同步确保多线程程序正确运行。本文详解线程同步机制,包括互斥锁、信号量、事件、条件变量和队列,探讨全局解释器锁(GIL)的影响及解决线程同步问题的最佳实践,如避免全局变量、使用线程安全数据结构、精细化锁的使用等。通过示例代码帮助开发者理解并提升多线程程序的性能与可靠性。
|
2月前
|
数据采集 NoSQL 调度
当生成器遇上异步IO:Python并发编程的十大实战兵法
本文通过十大实战场景,详解Python中生成器与异步IO的高效结合。从协程演进、背压控制到分布式锁、性能剖析,全面展示如何利用asyncio与生成器构建高并发应用,助你掌握非阻塞编程核心技巧,提升I/O密集型程序性能。
89 0
|
3月前
|
数据采集 搜索推荐 调度
当生成器遇上异步IO:Python并发编程的十大实战兵法
生成器与异步IO是Python并发编程中的两大利器,二者结合可解决诸多复杂问题。本文通过十个真实场景展示其强大功能:从优雅追踪日志文件、API调用流量整形,到实时数据流反压控制、大文件分片处理等,每个场景都体现了生成器按需生成数据与异步IO高效利用I/O的优势。两者配合不仅内存可控、响应及时,还能实现资源隔离与任务独立调度,为高并发系统提供优雅解决方案。这种组合如同乐高积木,虽单个模块简单,但组合后却能构建出复杂高效的系统。
82 0
|
11月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
在Python的并发编程世界中,没有万能的解决方案,只有最适合特定场景的方法。希望本文能够为你拨开迷雾,找到那条通往高效并发编程的光明大道。
145 2
|
6月前
|
Python
Python 高级编程与实战:深入理解面向对象与并发编程
本文深入探讨Python的高级特性,涵盖面向对象编程(继承、多态、特殊方法、类与实例属性)、异常处理(try-except、finally)和并发编程(多线程、多进程、异步编程)。通过实战项目如聊天服务器和异步文件下载器,帮助读者掌握这些技术,编写更复杂高效的Python程序。
|
6月前
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
|
8月前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
并发编程能够显著提升程序的效率和响应速度。例如,网络爬虫通过并发下载将耗时从1小时缩短至20分钟;APP页面加载时间从3秒优化到200毫秒。Python支持多线程、多进程、异步I/O和协程等并发编程方式,适用于不同场景。线程通信方式包括共享变量、消息传递和同步机制,如Lock、Queue等。Python的并发编程特性使其在处理大规模数据和高并发访问时表现出色,成为许多领域的首选语言。
168 3
|
10月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
11月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
123 3

热门文章

最新文章

推荐镜像

更多