1. 并发基础概念
可以说,任何一种高级语言都会涉及到并发编程;我们都能说出并发的最大好处:可以提高程序的执行的效率;
并发简单的说就是利用多个资源同时执行多个小的任务,以提供大的任务的执行耗时; 这和分布式计算有点类似,分布式也是一种并发,只不过可利用的资源不只是单机的资源,而是有网络连接的计算机集群,当然这就增加了机器之间网络传输的开销,同时要处理更多有传输异常导致的异常容错处理机制。
并发通常会涉及到两个概念:线程和进程;
线程和进程有什么区别?划重点
- 首先,进程(Process)是操作系统中最小资源管理单元,一个进程拥有独立的代码、数据和内存空间,是应用程序启动的一个实例;你只要在系统中执行一个程序(比如py代码),操作系统都会分配相应的资源来执行,执行完成会回收资源, 如果进程使用资源威胁到整个操作系统(比如内存飙升到99%),操作系统会强杀它。也就是资源是收操作系统控制的。
- 其次,进程下面允许有多个任务执行单元,这个任务执行单元就是线程(thread);同时线程有不同的状态(如创建,可运行,运行中,阻塞,死亡)。线程的状态控制也是由操作系统控制。
- 当然还有一个协程的概念;进程和线程的资源控制是操作系统的分配,对于单核cpu来说,多个线程轮流的获取的cpu的使用权,达到并发的目的。协程的一个特点是资源是控制权在于设计者手上,协程在线程下面,也是并发编程的一个组成部分。
从上面的说明,我们可以总结下:
- 进程下面有线程,线程下面有协程
- 进程和进程之前的资源是独立的
- 而同一个进程下的多个线程是共享资源,比如数据和内存空间等
- 同时线程共享同一个资源,多个线程同时改变资源时,就会出现不一致性,这个时候考虑锁,独占资源
通常情况下,分布式计算都是多个进程的方式,就如上面总结的,进程之间资源是独立的,就会有一个共享资源的机制来负责进程之间数据的共享和传递(通常由master类的角色完成),这个在python多进程并行处理会涉及,区别是如何实现共享数据的处理。
我觉得,理解上面的基础概念是非常重要的,它们是并发编程的底层逻辑,是所有并发编程的基础。
2. 如何并发编程
用一个python的多线程的方式来说明如何并发编程?
from concurrent import futures
from time import sleep, strftime
def download_resource(file):
sleep(1)
print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
return file
def run(files):
workers = min(3, len(files))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_resource, sorted(files))
return res
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)
print('='*10, 'return information')
for i in res:
print(i)
# [22:59:24][22:59:24] download done file = 0.png
# [22:59:24] download done file = 2.png
# download done file = 1.png
# [22:59:25][22:59:25][22:59:25] download done file = 4.png
# download done file = 3.png
# download done file = 5.png
# [22:59:26][22:59:26][22:59:26] download done file = 8.png
# download done file = 6.png download done file = 7.png
# [22:59:27] download done file = 9.png
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png
python 中的concurrent.futurest提供了ThreadPoolExecutor线程池来创建线程池
- 通过map把每个任务分配给执行函数执行
- 并返回结果
- with来管理线程池的关闭
从上面例子,我们需要知道:
- 并发编程的关键是编写一个任务的处理逻辑
- 更重要的是如何将一个大任务拆分从小任务,比如前面例子中的 files列表大任务,每个列表中的值就是一个小任务。
并发编程的关键:拆分任务,发起线程或者进程执行小任务,合并小任务结果大任务返回结果(如果需要返回任务的情况)。
希望今天的分享对你有帮助。
接下来的部分,我们将来了解python并发的特性,以及线程和多进程开发,敬请期待。
3. python的并发编程特性
我们先来看另一个例子
from concurrent import futures
from time import sleep, strftime
import time
def download_resource(file):
st = time.time()
t = 0
for i in range(1000000):
t += i**2
print('one task time is ',time.time()-st)
return file
def run(files):
workers = min(5, len(files))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_resource, sorted(files))
return res
ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)
print('all cost is ', time.time()-ts)
# one task time is 3.5929086208343506
# one task time is 4.961361646652222
# one task time is 5.242505311965942
# one task time is 5.411738872528076
# one task time is 5.5458598136901855
# all cost is 5.8427581787109375
这个例子中,并发执行的任务时CPU计算型的。执行的结果和我们预期的并不一样,经测试单纯的执行download_resource
需要1.2s的时间,预期的5个并发执行5个任务耗时应该小于3s吧。实际的结果却是串行执行的结果(6s)差别不大。
这是为什么?
这就要说道,python线程的伪并发性;python的线程在处理cpu计算型的任务时其实是单线程执行的。这个原因是python的C解释器是单线程执行的,在执行多线程任务时,C解释器都会给线程上一个全局解释锁GIL(global interpreter lock),使得多个线程轮流使用cpu时间。
为什么需要GIL?
单线程的好处是线程安全的,共享的数据不会出现异常的情况。python中的一切都是对象,python解释器负责管理这些对象,包括对象的销毁并自动回收内存;python解释器确切的说Cpython解释器会给每一个对象记录一个引用计数,每多一次引用,计数就会+1,反之则-1,如果引用计数为0,Cpython解释器会回收这些资源。
在这种机制下,多线程执行时会出现什么情况?两个线程A和B同时引用一个对象obj,这个时候obj的引用计数为2;A打算撤销对obj的引用,完成第一步时引用计数减去1时,这时发生了线程切换,A挂起等待,还没判断是否需要执行销毁对象操作。B进入运行状态,这个时候B也对obj撤销引用,并完成引用计数减1,销毁对象,这个时候obj的引用数为0,释放内存。如果此时A重新唤醒,单判断obj引用计数为0,开始销毁对象,可是这个时候已经没有对象了。 所以为了保证不出现数据污染,才引入GIL。
也就是多线程情况下,Cpython解释器让每一个线程不断的获得GIL锁和释放锁,保证每一次只有一个线程在执行。
from concurrent import futures
from time import sleep, strftime
import time
def download_resource(file):
sleep(1)
print(strftime('[%H:%M:%S]'), ' ', 'download done file = {} '.format(file))
return file
def run(files):
workers = min(10, len(files))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_resource, sorted(files))
return res
ts = time.time()
files = ['{}.png'.format(i) for i in range(10)]
res = run(files)
print('='*10, 'return information')
for i in res:
print(i)
print(time.time()-ts)
# [23:58:06] download done file = 5.png
# [23:58:06] download done file = 2.png
# [23:58:06] download done file = 4.png
# [23:58:06] download done file = 9.png
# [23:58:06] download done file = 1.png
# [23:58:06] download done file = 3.png
# [23:58:06] download done file = 8.png
# [23:58:06] download done file = 6.png
# [23:58:06] download done file = 7.png
# [23:58:06] download done file = 0.png
# ========== return information
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# 5.png
# 6.png
# 7.png
# 8.png
# 9.png
# all cost is 1.0396032333374023
看上面例子,确实是并发执行了。这个任务并不是cpu计算型的。为什么?
当线程获得GIL锁时,Cpython解释器为线程设置一个check_interavl,满足条件时线程就会释放锁。
条件时check_interavl是当前线程遇见IO操作或者ticks计数达到100。上面time.sleep也可以是IO操作,所以python比较适合于IO密集型的任务(比如爬虫下载网络数据)。
需要真正利用多核cpu达到真正的并行,就需要多进程来实现。
4. python多进程
上一小节中分享了python并发的特性,如果想真正利用现在多核的cpu进行并行计算就需要利用多进程。
就如前面所说的,进程是拥有独立数据和内存空间的,进程之间彼此独立;所以如果说进程之间如果要进行数据交互或者返回数据等交互操作,就需要利用中间的服务机制来协调;
python的多进程方式, 有:
- 有官方提供的
concurrent
的ProcessPoolExecutor
- multiprocessing
- 第三方的任务处理队列库celery(常见的中间服务机制是redis和RabbitMQ)
- 分布式计算框架Ray(以redis为任务管理)
4.1 ProcessPoolExecutor
from concurrent import futures
from time import sleep, strftime
import time
def cpu_task(file):
st = time.time()
t = 0
for i in range(1000000):
t += i**2
print('one task time is ',time.time()-st)
return file
def run(files):
workers = min(5, len(files))
result_list = []
ret = []
with futures.ProcessPoolExecutor(workers) as executor:
for file in sorted(files):
future_result = executor.submit(cpu_task, file)
result_list.append(future_result)
for future in result_list:
res = future.result()
ret.append(res)
return ret
if __name__ == "__main__":
ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)
print(res)
print('all cost is ', time.time()-ts)
# one task time is 0.503483772277832
# one task time is 0.503483772277832
# one task time is 0.5034773349761963
# one task time is 0.5095388889312744
# one task time is 0.5119566917419434
# ['0.png', '1.png', '2.png', '3.png', '4.png']
# all cost is 0.6866297721862793
从上面的执行结果可知,每个cpu_task在0.5s,5个任务执行完只要0.68s,真正的并行计算。
从并发编程步骤来看:
- 任务分解:cpu_task每次处理一个file
- 通过submit并行提交任务
- 每个submit会返回一个future对象
- 通过future对象result()来获取进行的返回值,需要注意的是result()方法在没有获得返回值时阻塞进程
4.2 multiprocessing
multiprocessing库也提供了关于python多线程开发的基本功能和进阶功能(如资源共享和同步锁)。
我们看上面ProcessPoolExecutor
同样的例子multiprocessing
是怎么实现的。
from multiprocessing import Pool
import time
def cpu_task(file):
st = time.time()
t = 0
for i in range(1000000):
t += i**2
print('one task time is ',time.time()-st)
return file
def end_call(arg):
print("end_call",arg)
def run(files):
workers = min(5, len(files))
p = Pool(workers)
result_list = []
for file in sorted(files):
res = p.apply_async(func=cpu_task, args=(file,), callback=end_call)
result_list.append(res)
p.close()
p.join()
for ret in result_list:
print(ret.get())
return ""
if __name__ == "__main__":
ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)
print('all cost is ', time.time()-ts)
# one task time is 0.44005799293518066
# end_call 1.png
# one task time is 0.45325803756713867
# end_call 3.png
# one task time is 0.497267484664917
# end_call 2.png
# one task time is 0.4813237190246582
# end_call 4.png
# one task time is 0.5435652732849121
# end_call 0.png
# 0.png
# 1.png
# 2.png
# 3.png
# 4.png
# all cost is 0.7456285953521729
multiprocessing
是通过Pool来构建进程池,并通过apply_async
异步提交任务,apply_async
的返回值类型是ApplyResult, 该对象的get方法可以获取任务的返回值;close
表示线程池在接受这些任务之后不再接收其他任务,而join
是表示等待线程池所有任务执行完。
多进程之间的进程共享数据,multiprocessing
通过multiprocessing.Manager来进行进程间通信。
一个multiprocessing.Manager对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。从而达到多进程间数据通信且安全。
Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array。
from multiprocessing import Pool, Manager
import time
def cpu_task(file, l):
st = time.time()
t = 0
for i in range(1000000):
t += i**2
l.append(file)
print('one task time is ',time.time()-st)
return file
def end_call(arg):
print("end_call",arg)
def run(files):
workers = min(5, len(files))
p = Pool(workers)
with Manager() as manager:
l = manager.list()
for file in sorted(files):
res = p.apply_async(func=cpu_task, args=(file,l, ), callback=end_call)
p.close()
p.join()
print(list(l))
return ""
if __name__ == "__main__":
ts = time.time()
files = ['{}.png'.format(i) for i in range(5)]
res = run(files)
print('all cost is ', time.time()-ts)
# one task time is 0.5007572174072266
# end_call 2.png
# one task time is 0.5093979835510254
# end_call 3.png
# one task time is 0.5186748504638672
# end_call 4.png
# one task time is 0.518937349319458
# end_call 0.png
# one task time is 0.5269668102264404
# end_call 1.png
# ['2.png', '3.png', '4.png', '0.png', '1.png']
# all cost is 0.7396669387817383