python 小知识 - 并发编程

简介: 并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。

1. 并发基础概念

可以说,任何一种高级语言都会涉及到并发编程;我们都能说出并发的最大好处:可以提高程序的执行的效率;

并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。

并发通常会涉及到两个概念:线程和进程;

线程和进程有什么区别?划重点

  • 首先,进程(Process)是操作系统中最小资源管理单元,一个进程拥有独立的代码、数据和内存空间,是应用程序启动的一个实例;你只要在系统中执行一个程序(比如py代码),操作系统都会分配相应的资源来执行,执行完成会回收资源, 如果进程使用资源威胁到整个操作系统(比如内存飙升到99%),操作系统会强杀它。也就是资源是收操作系统控制的。
  • 其次,进程下面允许有多个任务执行单元,这个任务执行单元就是线程(thread);同时线程有不同的状态(如创建,可运行,运行中,阻塞,死亡)。线程的状态控制也是由操作系统控制。
  • 当然还有一个协程的概念;进程和线程的资源控制是操作系统的分配,对于单核cpu来说,多个线程轮流的获取的cpu的使用权,达到并发的目的。协程的一个特点是资源是控制权在于设计者手上,协程在线程下面,也是并发编程的一个组成部分。

从上面的说明,我们可以总结下:

  • 进程下面有线程,线程下面有协程
  • 进程和进程之前的资源是独立的
  • 而同一个进程下的多个线程是共享资源,比如数据和内存空间等
  • 同时线程共享同一个资源,多个线程同时改变资源时,就会出现不一致性,这个时候考虑,独占资源

通常情况下,分布式计算都是多个进程的方式,就如上面总结的,进程之间资源是独立的,就会有一个共享资源的机制来负责进程之间数据的共享和传递(通常由master类的角色完成),这个在python多进程并行处理会涉及,区别是如何实现共享数据的处理。

我觉得,理解上面的基础概念是非常重要的,它们是并发编程的底层逻辑,是所有并发编程的基础。

2. 如何并发编程

用一个python的多线程的方式来说明如何并发编程?

from concurrent import futures
from time import sleep, strftime

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(3, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        
        
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
# [22:59:24][22:59:24]   download done file = 0.png 
# [22:59:24]   download done file = 2.png 
#    download done file = 1.png 
# [22:59:25][22:59:25][22:59:25]   download done file = 4.png 
#    download done file = 3.png 
#    download done file = 5.png 
# [22:59:26][22:59:26][22:59:26]     download done file = 8.png 
#    download done file = 6.png  download done file = 7.png 

# [22:59:27]   download done file = 9.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png

python 中的concurrent.futurest提供了ThreadPoolExecutor线程池来创建线程池

  • 通过map把每个任务分配给执行函数执行
  • 并返回结果
  • with来管理线程池的关闭

从上面例子,我们需要知道:

  • 并发编程的关键是编写一个任务的处理逻辑
  • 更重要的是如何将一个大任务拆分从小任务,比如前面例子中的 files列表大任务,每个列表中的值就是一个小任务。

并发编程的关键:拆分任务,发起线程或者进程执行小任务,合并小任务结果大任务返回结果(如果需要返回任务的情况)。

希望今天的分享对你有帮助。

接下来的部分,我们将来了解python并发的特性,以及线程和多进程开发,敬请期待。

3. python的并发编程特性

我们先来看另一个例子

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)

print('all cost is ', time.time()-ts)

# one task time is  3.5929086208343506
# one task time is  4.961361646652222
# one task time is  5.242505311965942
# one task time is  5.411738872528076
# one task time is  5.5458598136901855
# all cost is  5.8427581787109375

这个例子中,并发执行的任务时CPU计算型的。执行的结果和我们预期的并不一样,经测试单纯的执行download_resource需要1.2s的时间,预期的5个并发执行5个任务耗时应该小于3s吧。实际的结果却是串行执行的结果(6s)差别不大。

这是为什么?

这就要说道,python线程的伪并发性;python的线程在处理cpu计算型的任务时其实是单线程执行的。这个原因是python的C解释器是单线程执行的,在执行多线程任务时,C解释器都会给线程上一个全局解释锁GIL(global interpreter lock),使得多个线程轮流使用cpu时间。

为什么需要GIL?
单线程的好处是线程安全的,共享的数据不会出现异常的情况。python中的一切都是对象,python解释器负责管理这些对象,包括对象的销毁并自动回收内存;python解释器确切的说Cpython解释器会给每一个对象记录一个引用计数,每多一次引用,计数就会+1,反之则-1,如果引用计数为0,Cpython解释器会回收这些资源。

在这种机制下,多线程执行时会出现什么情况?两个线程A和B同时引用一个对象obj,这个时候obj的引用计数为2;A打算撤销对obj的引用,完成第一步时引用计数减去1时,这时发生了线程切换,A挂起等待,还没判断是否需要执行销毁对象操作。B进入运行状态,这个时候B也对obj撤销引用,并完成引用计数减1,销毁对象,这个时候obj的引用数为0,释放内存。如果此时A重新唤醒,单判断obj引用计数为0,开始销毁对象,可是这个时候已经没有对象了。 所以为了保证不出现数据污染,才引入GIL。

也就是多线程情况下,Cpython解释器让每一个线程不断的获得GIL锁和释放锁,保证每一次只有一个线程在执行。

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(10, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
print(time.time()-ts)

# [23:58:06]   download done file = 5.png 
# [23:58:06]   download done file = 2.png 
# [23:58:06]   download done file = 4.png 
# [23:58:06]   download done file = 9.png 
# [23:58:06]   download done file = 1.png 
# [23:58:06]   download done file = 3.png 
# [23:58:06]   download done file = 8.png 
# [23:58:06]   download done file = 6.png 
# [23:58:06]   download done file = 7.png 
# [23:58:06]   download done file = 0.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png
# all cost is  1.0396032333374023

看上面例子,确实是并发执行了。这个任务并不是cpu计算型的。为什么?

当线程获得GIL锁时,Cpython解释器为线程设置一个check_interavl,满足条件时线程就会释放锁。
条件时check_interavl是当前线程遇见IO操作或者ticks计数达到100。上面time.sleep也可以是IO操作,所以python比较适合于IO密集型的任务(比如爬虫下载网络数据)。

需要真正利用多核cpu达到真正的并行,就需要多进程来实现。

4. python多进程

上一小节中分享了python并发的特性,如果想真正利用现在多核的cpu进行并行计算就需要利用多进程。

就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;

python的多进程方式, 有:

  • 有官方提供的concurrentProcessPoolExecutor
  • multiprocessing
  • 第三方的任务处理队列库celery(常见的中间服务机制是redis和RabbitMQ)
  • 分布式计算框架Ray(以redis为任务管理)

4.1 ProcessPoolExecutor

from concurrent import futures
from time import sleep, strftime
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    
    result_list = []
    ret = []
    with futures.ProcessPoolExecutor(workers) as executor:
        for file in sorted(files):
            future_result = executor.submit(cpu_task, file)
            result_list.append(future_result)
        for future in result_list:
            res = future.result()
            ret.append(res)
    return ret
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)
    print(res)

    print('all cost is ', time.time()-ts)

# one task time is  0.503483772277832
# one task time is  0.503483772277832
# one task time is  0.5034773349761963
# one task time is  0.5095388889312744
# one task time is  0.5119566917419434
# ['0.png', '1.png', '2.png', '3.png', '4.png']
# all cost is  0.6866297721862793

从上面的执行结果可知,每个cpu_task在0.5s,5个任务执行完只要0.68s,真正的并行计算。

从并发编程步骤来看:

  • 任务分解:cpu_task每次处理一个file
  • 通过submit并行提交任务
  • 每个submit会返回一个future对象
  • 通过future对象result()来获取进行的返回值,需要注意的是result()方法在没有获得返回值时阻塞进程

4.2 multiprocessing

multiprocessing库也提供了关于python多线程开发的基本功能和进阶功能(如资源共享和同步锁)。

我们看上面ProcessPoolExecutor同样的例子multiprocessing是怎么实现的。

from multiprocessing import Pool
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    
    result_list = []

    for file in sorted(files):
        res = p.apply_async(func=cpu_task, args=(file,), callback=end_call)
        result_list.append(res)
    p.close()
    p.join()

    for ret in result_list:
        print(ret.get())
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)
    
# one task time is  0.44005799293518066
# end_call 1.png
# one task time is  0.45325803756713867
# end_call 3.png
# one task time is  0.497267484664917
# end_call 2.png
# one task time is  0.4813237190246582
# end_call 4.png
# one task time is  0.5435652732849121
# end_call 0.png
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# all cost is  0.7456285953521729

multiprocessing是通过Pool来构建进程池,并通过apply_async异步提交任务,apply_async的返回值类型是ApplyResult, 该对象的get方法可以获取任务的返回值;close表示线程池在接受这些任务之后不再接收其他任务,而join是表示等待线程池所有任务执行完。

多进程之间的进程共享数据,multiprocessing通过multiprocessing.Manager来进行进程间通信。

一个multiprocessing.Manager对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。

Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。

from multiprocessing import Pool, Manager
import time

def cpu_task(file, l):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    l.append(file)
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    with Manager() as manager:
        l = manager.list()
        for file in sorted(files):
            res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call)
        p.close()
        p.join()

        print(list(l))
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)

# one task time is  0.5007572174072266
# end_call 2.png
# one task time is  0.5093979835510254
# end_call 3.png
# one task time is  0.5186748504638672
# end_call 4.png
# one task time is  0.518937349319458
# end_call 0.png
# one task time is  0.5269668102264404
# end_call 1.png
# ['2.png', '3.png', '4.png', '0.png', '1.png']
# all cost is  0.7396669387817383
目录
相关文章
|
25天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
1月前
|
算法 大数据 计算机视觉
Python中的并发编程技术探究
本文将深入探讨Python中的并发编程技术,包括多线程、多进程、协程等,并分析它们在提高程序性能和效率方面的应用场景和优势。通过比较不同并发编程方式的特点和适用场景,读者可以更好地理解如何利用Python强大的并发处理能力来优化程序设计。
|
24天前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
python并发编程:什么是并发编程?python对并发编程有哪些支持?
20 0
|
24天前
|
数据采集 安全 Python
python并发编程:Python实现生产者消费者爬虫
python并发编程:Python实现生产者消费者爬虫
25 0
python并发编程:Python实现生产者消费者爬虫
|
1月前
|
安全 Python
Python中的并发编程:多线程与多进程技术探究
本文将深入探讨Python中的并发编程技术,重点介绍多线程和多进程两种并发处理方式的原理、应用场景及优缺点,并结合实例分析如何在Python中实现并发编程,以提高程序的性能和效率。
|
1月前
|
数据采集 存储 Java
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
|
1月前
|
Python
Python中的并发编程与多线程
在当今高并发的网络应用环境中,如何充分利用计算资源来提高程序的执行效率是一个关键问题。本文将探讨Python中的并发编程技术,重点介绍了多线程的使用方法和注意事项,帮助读者更好地理解并发编程在Python中的应用。
|
1天前
|
API 调度 开发者
Python中的并发编程:使用asyncio库实现异步IO
传统的Python编程模式中,使用多线程或多进程实现并发操作可能存在性能瓶颈和复杂性问题。而随着Python 3.5引入的asyncio库,开发者可以利用异步IO来更高效地处理并发任务。本文将介绍如何利用asyncio库实现异步IO,提升Python程序的并发性能。
|
24天前
|
数据采集 Java API
python并发编程: Python使用线程池在Web服务中实现加速
python并发编程: Python使用线程池在Web服务中实现加速
18 3
python并发编程: Python使用线程池在Web服务中实现加速
|
1月前
|
并行计算 Python
Python中的并发编程:多线程与多进程的比较
在Python编程中,实现并发操作是提升程序性能的重要手段之一。本文将探讨Python中的多线程与多进程两种并发编程方式的优劣及适用场景,帮助读者更好地选择合适的方法来提高程序运行效率。