开发者社区> AIweker> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

python 小知识 - 并发编程

简介: 并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。
+关注继续查看

1. 并发基础概念

可以说,任何一种高级语言都会涉及到并发编程;我们都能说出并发的最大好处:可以提高程序的执行的效率;

并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。

并发通常会涉及到两个概念:线程和进程;

线程和进程有什么区别?划重点

  • 首先,进程(Process)是操作系统中最小资源管理单元,一个进程拥有独立的代码、数据和内存空间,是应用程序启动的一个实例;你只要在系统中执行一个程序(比如py代码),操作系统都会分配相应的资源来执行,执行完成会回收资源, 如果进程使用资源威胁到整个操作系统(比如内存飙升到99%),操作系统会强杀它。也就是资源是收操作系统控制的。
  • 其次,进程下面允许有多个任务执行单元,这个任务执行单元就是线程(thread);同时线程有不同的状态(如创建,可运行,运行中,阻塞,死亡)。线程的状态控制也是由操作系统控制。
  • 当然还有一个协程的概念;进程和线程的资源控制是操作系统的分配,对于单核cpu来说,多个线程轮流的获取的cpu的使用权,达到并发的目的。协程的一个特点是资源是控制权在于设计者手上,协程在线程下面,也是并发编程的一个组成部分。

从上面的说明,我们可以总结下:

  • 进程下面有线程,线程下面有协程
  • 进程和进程之前的资源是独立的
  • 而同一个进程下的多个线程是共享资源,比如数据和内存空间等
  • 同时线程共享同一个资源,多个线程同时改变资源时,就会出现不一致性,这个时候考虑,独占资源

通常情况下,分布式计算都是多个进程的方式,就如上面总结的,进程之间资源是独立的,就会有一个共享资源的机制来负责进程之间数据的共享和传递(通常由master类的角色完成),这个在python多进程并行处理会涉及,区别是如何实现共享数据的处理。

我觉得,理解上面的基础概念是非常重要的,它们是并发编程的底层逻辑,是所有并发编程的基础。

2. 如何并发编程

用一个python的多线程的方式来说明如何并发编程?

from concurrent import futures
from time import sleep, strftime

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(3, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        
        
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
# [22:59:24][22:59:24]   download done file = 0.png 
# [22:59:24]   download done file = 2.png 
#    download done file = 1.png 
# [22:59:25][22:59:25][22:59:25]   download done file = 4.png 
#    download done file = 3.png 
#    download done file = 5.png 
# [22:59:26][22:59:26][22:59:26]     download done file = 8.png 
#    download done file = 6.png  download done file = 7.png 

# [22:59:27]   download done file = 9.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png

python 中的concurrent.futurest提供了ThreadPoolExecutor线程池来创建线程池

  • 通过map把每个任务分配给执行函数执行
  • 并返回结果
  • with来管理线程池的关闭

从上面例子,我们需要知道:

  • 并发编程的关键是编写一个任务的处理逻辑
  • 更重要的是如何将一个大任务拆分从小任务,比如前面例子中的 files列表大任务,每个列表中的值就是一个小任务。

并发编程的关键:拆分任务,发起线程或者进程执行小任务,合并小任务结果大任务返回结果(如果需要返回任务的情况)。

希望今天的分享对你有帮助。

接下来的部分,我们将来了解python并发的特性,以及线程和多进程开发,敬请期待。

3. python的并发编程特性

我们先来看另一个例子

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)

print('all cost is ', time.time()-ts)

# one task time is  3.5929086208343506
# one task time is  4.961361646652222
# one task time is  5.242505311965942
# one task time is  5.411738872528076
# one task time is  5.5458598136901855
# all cost is  5.8427581787109375

这个例子中,并发执行的任务时CPU计算型的。执行的结果和我们预期的并不一样,经测试单纯的执行download_resource需要1.2s的时间,预期的5个并发执行5个任务耗时应该小于3s吧。实际的结果却是串行执行的结果(6s)差别不大。

这是为什么?

这就要说道,python线程的伪并发性;python的线程在处理cpu计算型的任务时其实是单线程执行的。这个原因是python的C解释器是单线程执行的,在执行多线程任务时,C解释器都会给线程上一个全局解释锁GIL(global interpreter lock),使得多个线程轮流使用cpu时间。

为什么需要GIL?
单线程的好处是线程安全的,共享的数据不会出现异常的情况。python中的一切都是对象,python解释器负责管理这些对象,包括对象的销毁并自动回收内存;python解释器确切的说Cpython解释器会给每一个对象记录一个引用计数,每多一次引用,计数就会+1,反之则-1,如果引用计数为0,Cpython解释器会回收这些资源。

在这种机制下,多线程执行时会出现什么情况?两个线程A和B同时引用一个对象obj,这个时候obj的引用计数为2;A打算撤销对obj的引用,完成第一步时引用计数减去1时,这时发生了线程切换,A挂起等待,还没判断是否需要执行销毁对象操作。B进入运行状态,这个时候B也对obj撤销引用,并完成引用计数减1,销毁对象,这个时候obj的引用数为0,释放内存。如果此时A重新唤醒,单判断obj引用计数为0,开始销毁对象,可是这个时候已经没有对象了。 所以为了保证不出现数据污染,才引入GIL。

也就是多线程情况下,Cpython解释器让每一个线程不断的获得GIL锁和释放锁,保证每一次只有一个线程在执行。

from concurrent import futures
from time import sleep, strftime
import time

def download_resource(file):
    sleep(1)
    print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
    return file
    
def run(files):
    workers = min(10, len(files))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_resource, sorted(files))
    return res
        

ts = time.time()
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)

print('='*10, 'return information')
for i in res:
    print(i)
print(time.time()-ts)

# [23:58:06]   download done file = 5.png 
# [23:58:06]   download done file = 2.png 
# [23:58:06]   download done file = 4.png 
# [23:58:06]   download done file = 9.png 
# [23:58:06]   download done file = 1.png 
# [23:58:06]   download done file = 3.png 
# [23:58:06]   download done file = 8.png 
# [23:58:06]   download done file = 6.png 
# [23:58:06]   download done file = 7.png 
# [23:58:06]   download done file = 0.png 
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png
# all cost is  1.0396032333374023

看上面例子,确实是并发执行了。这个任务并不是cpu计算型的。为什么?

当线程获得GIL锁时,Cpython解释器为线程设置一个check_interavl,满足条件时线程就会释放锁。
条件时check_interavl是当前线程遇见IO操作或者ticks计数达到100。上面time.sleep也可以是IO操作,所以python比较适合于IO密集型的任务(比如爬虫下载网络数据)。

需要真正利用多核cpu达到真正的并行,就需要多进程来实现。

4. python多进程

上一小节中分享了python并发的特性,如果想真正利用现在多核的cpu进行并行计算就需要利用多进程。

就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;

python的多进程方式, 有:

  • 有官方提供的concurrentProcessPoolExecutor
  • multiprocessing
  • 第三方的任务处理队列库celery(常见的中间服务机制是redis和RabbitMQ)
  • 分布式计算框架Ray(以redis为任务管理)

4.1 ProcessPoolExecutor

from concurrent import futures
from time import sleep, strftime
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file
    
def run(files):
    workers = min(5, len(files))
    
    result_list = []
    ret = []
    with futures.ProcessPoolExecutor(workers) as executor:
        for file in sorted(files):
            future_result = executor.submit(cpu_task, file)
            result_list.append(future_result)
        for future in result_list:
            res = future.result()
            ret.append(res)
    return ret
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)
    print(res)

    print('all cost is ', time.time()-ts)

# one task time is  0.503483772277832
# one task time is  0.503483772277832
# one task time is  0.5034773349761963
# one task time is  0.5095388889312744
# one task time is  0.5119566917419434
# ['0.png', '1.png', '2.png', '3.png', '4.png']
# all cost is  0.6866297721862793

从上面的执行结果可知,每个cpu_task在0.5s,5个任务执行完只要0.68s,真正的并行计算。

从并发编程步骤来看:

  • 任务分解:cpu_task每次处理一个file
  • 通过submit并行提交任务
  • 每个submit会返回一个future对象
  • 通过future对象result()来获取进行的返回值,需要注意的是result()方法在没有获得返回值时阻塞进程

4.2 multiprocessing

multiprocessing库也提供了关于python多线程开发的基本功能和进阶功能(如资源共享和同步锁)。

我们看上面ProcessPoolExecutor同样的例子multiprocessing是怎么实现的。

from multiprocessing import Pool
import time

def cpu_task(file):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    
    result_list = []

    for file in sorted(files):
        res = p.apply_async(func=cpu_task, args=(file,), callback=end_call)
        result_list.append(res)
    p.close()
    p.join()

    for ret in result_list:
        print(ret.get())
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)
    
# one task time is  0.44005799293518066
# end_call 1.png
# one task time is  0.45325803756713867
# end_call 3.png
# one task time is  0.497267484664917
# end_call 2.png
# one task time is  0.4813237190246582
# end_call 4.png
# one task time is  0.5435652732849121
# end_call 0.png
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# all cost is  0.7456285953521729

multiprocessing是通过Pool来构建进程池,并通过apply_async异步提交任务,apply_async的返回值类型是ApplyResult, 该对象的get方法可以获取任务的返回值;close表示线程池在接受这些任务之后不再接收其他任务,而join是表示等待线程池所有任务执行完。

多进程之间的进程共享数据,multiprocessing通过multiprocessing.Manager来进行进程间通信。

一个multiprocessing.Manager对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。

Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。

from multiprocessing import Pool, Manager
import time

def cpu_task(file, l):
    st = time.time()
    t = 0
    for i in range(1000000):
        t += i**2
    l.append(file)
    print('one task time is ',time.time()-st)
    return file

def end_call(arg):
    print("end_call",arg)


def run(files):
    workers = min(5, len(files))
    p = Pool(workers)
    with Manager() as manager:
        l = manager.list()
        for file in sorted(files):
            res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call)
        p.close()
        p.join()

        print(list(l))
            
    return ""
        
if __name__ == "__main__":
    ts = time.time()
    files = ['{}.png'.format(i) for i in range(5)]
    res = run(files)

    print('all cost is ', time.time()-ts)

# one task time is  0.5007572174072266
# end_call 2.png
# one task time is  0.5093979835510254
# end_call 3.png
# one task time is  0.5186748504638672
# end_call 4.png
# one task time is  0.518937349319458
# end_call 0.png
# one task time is  0.5269668102264404
# end_call 1.png
# ['2.png', '3.png', '4.png', '0.png', '1.png']
# all cost is  0.7396669387817383

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
python-并发编程之多进程
一、操作系统基础: 进程的概念起源于操作系统,操作系统其它所有概念都是围绕进程来的,所以我们了解进程之前先来了解一下操作系统 操作系统位于计算机硬件与应用软件之间,本质也是一个软件。操作系统由操作系统的内核(运行于内核态,管理硬件资源)以及系统调用(运行于用户态,为应用程序员写的应用程序提供系统调用接口)两部分组成 两大功能:   1.
1165 0
《 Python树莓派编程》——导读
在2006年,当Eben Upton和其他树莓派基金会的创办人看到大学计算机专业学生的编程状况时,他们感到无比沮丧。在美国,计算机专业的编程课程被缩减为“CS 101:如何使用Word程序”和“CS 203:优化你的Facebook主页”。
4105 0
《树莓派Python编程入门与实战(第2版)》——2.4 LXDE图形界面
本节书摘来自异步社区《树莓派Python编程入门与实战(第2版)》一书中的第2章,第2.4节,作者[美] Richard Blum Christine Bresnahan,陈晓明 马立新 译,更多章节内容可以访问云栖社区“异步社区”公众号查看。
5116 0
Spark机器学习1·编程入门(scala/java/python)
Spark机器学习1·编程入门
5976 0
Spark RDD编程(Python和Scala版本)
Spark中的RDD就是一个不可变的分布式对象集合,是一种具有兼容性的基于内存的集群计算抽象方法,Spark则是这个方法的抽象。 Spark的RDD操作分为转化操作(transformation)和行动操作(action),两者的区别在于:        a.
814 0
Python编程-数据库-利用PyMysql访问windows下的MySql数据库
1. 下载PyMysql并且安装 下载地址 下载zip包后解压到目录,进入该目录,执行以下命令安装 python setup.py install   2. 编写一个简单的数据库访问程序 simple_mysql.
904 0
+关注
AIweker
人工智能微客(aiweker)长期跟踪和分享人工智能前沿技术、应用、领域知识,不定期的发布相关产品和应用,欢迎关注和转发
文章
问答
文章排行榜
最热
最新
相关电子书
更多
Python系列直播第一讲——Python中的一切皆对象
立即下载
Python 脚本速查手册
立即下载
Python 系列直播——深入Python与日志服务,玩转大规模数据分析处理实战第二讲
立即下载