6.4 队列
在线程中队列的方法有三种
Queue
LifoQueue
PriorityQueue
上述三种方法里面,Queue
的方法在进程并发中已经详细做了介绍,这里就不赘述了,而后边的LifoQueue
和PriorityQueue
的对象属性和Queue
是一样的,他们之间都是通用的,像什么qsize()
、empty()
、put()
、get()
都是通用的
6.4.1 Queue
在线程中的Queue
的用法和进程中的使用方法一样,而在线程中正是人们口中经常说的先进先出。
其实很好理解,就是先进去队列的对象先行出来,引用方法名就是先put()
进去的先get()
出来。
import queue q = queue.Queue(3) q_list = [] for i in range(3): q.put(i) q_list.append(q) for q in q_list: print(q.get())
6.4.2 LifoQueue
与Queue
正好相反,它是先进后出,这也就是著名的堆栈
还是上边代码稍作改动
import queue q = queue.LifoQueue(3) q_list = [] for i in range(3): q.put(i) q_list.append(q) for q in q_list: print(q.get())
6.4.3 PriorityQueue
指定优先级,put()
方法使用一样,指定优先级即可
import queue q = queue.PriorityQueue(3) q1 = q.put((2, 'Chancey')) q2 = q.put((3, 'Waller')) q3 = q.put((1, 'Mary')) print(q.get()) print(q.get()) print(q.get())
可以看到mary是最后入队列的,但是其优先级高于所有,所以先行出队列。
判断优先级是看值的大小,值越小优先级就越高咯,灰常滴简单
四、并发池
对于线程池和进程池的构造和使用,在Python中也处于一种比较高阶的技术。这里会着重讲解并发池的使用以及注意事项
1. 进程池
如果一个项目里面只需要开启几个或者几十个进程,就可用Process
手动创建或者for
循环创建,但是如果进程量很高呢,这就用到了进程池,它可以减少进程创建和释放的开销,极大的降低了计算机资源的浪费。
举个例子:早上洗脸,如果水一滴滴的落在脸上洗脸,是不是很慢?而找个盆子把水装起来洗脸就会方便很多,进程池也一样,就是将诸多的进程装进容器,等待调用。
进程池,顾名思义,一种特殊的容器,用来存储进程,而在进程数量的选择上,并不是越多越好,应该综合计算机软硬件的条件来设置。
特点:非常清楚,一个进程创建和销毁是需要大量的时间和计算机资源,如果有十万个进程在这里重复的开辟内存空间、释放内存,那这个计算机估计濒临灭亡(我指的是普通的家用计算机,高性能服务器自行测试),而有了进程池,假设进程池限制3个进程,那么在运行的时候,只创建3个进程,然后循环利用,最后统一回收。所以说,进程池可以极大的减少计算机资源的浪费。
对象创建
multiprocessing.Pool(processes)
processes
:允许入池的最大进程数
对象属性
apply()
:传递不定参数,主进程会被阻塞直到函数执行结束,在Python以后已经没有了该方法apply_async()
:与上述apply()
一样,但是非阻塞,且支持结果返回进行回调map(func, iterable[, chunksize=None])
:与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程close()
:关闭进程池terminate()
:结束进程池,未处理的任务也不会再处理join()
:主进程阻塞等待子进程退出,该方法必须在close()
或者terminate()
之后
import multiprocessing import time import os import random def hello(name): start = time.time() print('%s 开始执行,进程号:%s' % (name, os.getpid())) time.sleep(random.random()*2) end = time.time() print('%s 结束执行,进程号:%s,耗时%0.2fS' % (name, os.getpid(), end-start)) if __name__ == '__main__': p = multiprocessing.Pool() for i in range(4): p.apply_async(hello, (i, )) print('*'*30, '程序开始', '*'*30) p.close() p.join() print('*'*30, '程序结束', '*'*30)
可以清楚的看到,同时进行的只有3个进程,而且在进程池中的某一个进程处理完任务后不会回收,而是新入池一个任务继续进行,知道所有的任务执行完毕,进程才统一回收。
2. 线程池
线程池的创建有三种方式
threadpool
concurrent.futures
该方法也可用来创建进程池,后边会做介绍- 重写
threadpool
或者concurrent.futures.ThreadPoolExecutor
这里只介绍第一种,后边会详细介绍concurrent.futures
来创建
在python 2.7 以上包括Python 3.x ,支持第三方库threadpool
注意:该库现已停止官方支持,仅作为旧项目的支持
网上对于
threadpool
的介绍少之又少,作为从来规矩上网的我,官方文档怎么也看不明白,所以就剖其源码研究了一番
源码介绍:
Easy to use object-oriented thread pool framework. A thread pool is an object that maintains a pool of worker threads to perform time consuming operations in parallel. It assigns jobs to the threads by putting them in a work request queue, where they are picked up by the next available thread. This then performs the requested operation in the background and puts the results in another queue. The thread pool object can then collect the results from all threads from this queue as soon as they become available or after all threads have finished their work. It's also possible, to define callbacks to handle each result as it comes in. The basic concept and some code was taken from the book "Python in a Nutshell, 2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section 14.5 "Threaded Program Architecture". I wrapped the main program logic in the ThreadPool class, added the WorkRequest class and the callback system and tweaked the code here and there. Kudos also to Florent Aide for the exception handling mechanism. Basic usage:: >>> pool = ThreadPool(poolsize) >>> requests = makeRequests(some_callable, list_of_args, callback) >>> [pool.putRequest(req) for req in requests] >>> pool.wait() See the end of the module code for a brief, annotated usage example. Website : http://chrisarndt.de/projects/threadpool/
大致意思就是介绍了该库的运行原理,和基本用法,就是Basic usage的内容
pool = ThreadPool(poolsize) requests = makeRequests(some_callable, list_of_args, callback) [pool.putRequest(req) for req in requests] pool.wait()
分行解释代码:
- 定义线程池,参数为最大的线程数
- 调用
makeRequests()
创建了要开启多线程的函数或者方法,后边的list_of_args
为该函数的参数,默认为None,callback
为回调函数。也就是说,只需要两个参数即可开启 - 将线程扔进线程池。等同于
for item in requests: pool.putRequest(item)
- 等待所有的线程完成任务后退出
import threadpool import time def hello(name): print('%s 说 hello' % name) time.sleep(1) print('%s 说 bye' % name) if __name__ == '__main__': names = ['Chancey', 'Wanger', 'Mary', 'Alex', 'Guido'] start = time.time() pool = threadpool.ThreadPool(3) requests = threadpool.makeRequests(hello, names) [pool.putRequest(req) for req in requests] pool.wait() print('总共耗时:%0.2f' % (time.time() - start))
如果开启多线程的函数有比较多的参数的话,函数调用时第一个解包list
,然后解包dict
。这样的话就两种方法传参,一种是列表,一种字典。
- 列表传参
# 多个参数 import threadpool import time def counts(a, b, c): print('%d+%d+%d=%d' % (a, b, c, a+b+c)) time.sleep(2) if __name__ == '__main__': # 构造参数 number_one = [1, 2 ,3] number_two = [4, 5, 6] number_three = [7, 8, 9] params = [(number_one, None), (number_two, None), (number_three, None)] # 创建线程池 start = time.time() pool = threadpool.ThreadPool(2) requests = threadpool.makeRequests(counts, params) [pool.putRequest(req) for req in requests] pool.wait() print('总共耗时%0.2f' % (time.time() - start))
- 字典传参
# 多个参数 import threadpool import time def counts(a, b, c): print('%d+%d+%d=%d' % (a, b, c, a+b+c)) time.sleep(2) if __name__ == '__main__': # 构造参数 number_one = {'a':1, 'b':2, 'c':3} number_two = {'a':4, 'b':5, 'c':6} number_three = {'a':7, 'b':8, 'c':9} params = [(None, number_one), (None, number_two), (None, number_three)] # 创建线程池 start = time.time() pool = threadpool.ThreadPool(2) requests = threadpool.makeRequests(counts, params) [pool.putRequest(req) for req in requests] pool.wait() print('总共耗时%0.2f' % (time.time() - start))
依旧可以完美运行
不过
threadpool
并不建议在新项目中使用,官方是这样声明的:This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO NOT USE IT FOR NEW PROJECTS!
该模块已经过时,但仍在PyPi中提供,以支持仍然使用它的旧项目。请勿用于新项目!
3. 并发池
这里命名并发池是我自己想的,因为concurrent
支持多进程和多线程。这里也将是本文的亮点。
该模块是Python3自带包,而Python2.7以上也可安装使用。concurrent
包下只有一个模块futures
,模块下最常用的就是Executor
类,它下边有两个子类,分别是ThreadPoolExecutor
和ProcessPoolExecutor
,顾名思义,分别支持多线程和多进程。
3.1 ThreadPoolExecutor
该类专为多线程提供支持
创建对象
class concurrent.futures.ThreadPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
max_workes
:指定线程数。在Python 3.5 以上的版本中,为None或者没有指定的时候开启和计算机CPU相同数量的线程,并且在Windows上必须小于61,附上源码
if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or 1) * 5 if max_workers <= 0: raise ValueError("max_workers must be greater than 0")
map_context
:允许用户控制由进程池创建给工作者进程的开始方法initializer
和initargs
:在每个工作者线程开始处调用的一个可选可调用对象。initargs 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个 BrokenThreadPool。该功能在python 3.8 版本以上提供
对象属性
抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。
注意:下边介绍的对象方法是通用的
submit(fn, *args, **kwargs)
:异步提交,传参的方式依旧是元组map(func, *iterables, timeout=None, chunksize=1))
:类似于map()
,也就是将submit()
for
循环了
shutdown(wait=True)
:类似于进程池中的pool.close()
和pool.join()
的结合体。当wait为True时等待池子内所有任务完毕后释放,反之亦然,默认为True。
注意:不论wait为何值,整个程序都会等到所有任务执行完毕
result(timeout=None)
:获取结果add_done_callback(fn)
:回调函数
from concurrent.futures import ThreadPoolExecutor import os import time def say(name): print('%s 说 hello,我的PID:%s' % (name, os.getpid())) time.sleep(2) print('%s 说 bye,我的PID:%s' % (name, os.getpid())) if __name__ == '__main__': pool = ThreadPoolExecutor(2) names = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex'] start = time.time() for i in names: pool.submit(say, i) pool.shutdown(wait=True) print('耗时:%0.2f' % (time.time() - start))
3.2 ProcessPoolExecutor
用法相同,不再赘述。只不过换成ProcessPoolExecutor
,里面所有创建对象和对象方法都是一样的。
3.3 提交任务
任务提交有两种方式:
- 同步调用:提交任务后等待任务执行完毕,拿到结果后在执行下一步,这样下来的话,程序就变成了串行
- 异步调用:提交任务后不用等待
前边在介绍ThreadPoolExecutor
或者ProcessPoolExecutor
时提到了add_done_callback(fn)
,这个就是回调机制。异步调用和回调机制都是提交任务的方式。
先看一下同步提交的方式:
from concurrent.futures import ThreadPoolExecutor import time def get_url(keyword): url = 'https://www.suxin.site/%s' % keyword time.sleep(1) print('%s URL构造成功' % keyword) return url def get_html(url): html = '<html>%s</html>' % url time.sleep(2) print('%s HTML获取成功' % url) return html if __name__ == '__main__': pool = ThreadPoolExecutor(2) keyword_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex'] start = time.time() for keyword in keyword_list: msg = pool.submit(get_url, keyword).result() get_html(msg) pool.shutdown(wait=True) print('耗时:%0.2f' % (time.time() - start))
再看下异步提交
from concurrent.futures import ThreadPoolExecutor import time def get_url(keyword): url = 'https://www.suxin.site/%s' % keyword time.sleep(1) print('%s URL构造成功' % keyword) return url def get_html(url): url = url.result() html = '<html>%s</html>' % url time.sleep(2) print('%s HTML获取成功' % url) return html if __name__ == '__main__': pool = ThreadPoolExecutor(2) keyword_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex'] start = time.time() for keyword in keyword_list: pool.submit(get_url, keyword).add_done_callback(get_html) pool.shutdown(wait=True) print('耗时:%0.2f' % (time.time() - start))
效率一目了然
4. 重写
本来这里是打算将concurrent.fautres
单独拿出来说一说,但是发现本文已经够长了,所以在下篇详细讨论如何重写ThreadPoolExecutor
前边的线程池中有讲到可以通过重写concurrent
或者fautres
来使用并发池,这里需要详细的了解源码以及运行原理,建议对current
源码有研究的朋友们可以琢磨一下,考虑到篇幅这里就不说了。
5. 自构并发池
网络上对自构并发池大多千篇一律,然并卵,所以这里介绍一下,不过上边的方法已经足够了项目的使用,极特别的需求可能用到,有兴趣可以看看,没兴趣的直接跳到协程去看。
5.1 构思
python里面的Queue
类似于并发,可以说是低配版的并发
- 在队列中加入任务
- 创建队列
- 设置大小
- 真实创建的线程
- 处理任务
- 获取任务,每取出一个就剔除那个
- 判断任务是否为空
- 判断空闲线程的数量,等于0的时候不再创建
- 线程数不能超过线程池大小
- 根据任务的数量判断要创建线程的数量
- 创建线程
- 执行任务
5.2 实现
大致的思路就这些,接下来就是精彩的代码,里面有详细的注释,不必慌
import threading import time import queue stop = object() # 这个是用来标志任务停止 class ThreadPoolChancey(object): def __init__(self, max_thread=None): self.queue = queue.Queue() # 创建的队列可以放无限制的任务 self.max_thread = max_thread # 指定的最大线程数,默认为None self.terminal = False # 停止标志 self.create_thread_list = [] # 真实创建的线程数,这里以列表的方式存储,方便判断线程数量 self.free_thread_list = [] # 空闲线程数 def run(self, function, args, callback=None): ''' :param function : 执行函数 :param args : 要执行的函数的参数,定义为元组传参 :param callback : 回调函数,T or F 的返回值 :return : ''' # 判断是否创建真实线程 if len(self.free_thread_list) == 0 and len(self.create_thread_list) < self.max_thread: # 如果空闲线程为0并且创建的真实线程没有达到最大限度就创建 self.create_thread() task = (function, args, callback) self.queue.put(task) def callback(self): '''回调函数:用以循环获取任务并执行''' current_thread = threading.current_thread() # 获取当前线程 self.create_thread_list.append(current_thread) # 添加到线程列表里面 event = self.queue.get() # 获取一个任务并执行 while event != stop: # 用以判断是否终止任务 function, args, callback = event # 解开任务包,该包包含了执行函数、参数、回调函数 try: # 执行函数运行的结果,该判断执行成功,故状态为True message = function(*args) state = True except Exception as err: # 执行异常,状态为False message = err state = False if callback is not None: # 不为空则表示执行完毕 try: callback(state, message) # 执行回调函数 except Exception as err: print(err) # 抛出异常 else: pass if not self.terminal: self.free_thread_list.append(current_thread) # 有终止任务的时候就添加一个新任务 event = self.queue.get() self.free_thread_list.remove(current_thread) # 这里添加了任务,线程有一个占用,剔除空闲 else: event = stop # 停止put else: self.create_thread_list.remove(current_thread) # 剔除执行完毕的任务 def create_thread(self): '''创建线程''' t = threading.Thread(target=self.callback, ) t.start() def terminal(self): '''终止任务,无论队列是否还有任务''' self.terminal = True def close(self) : '''关闭线程池''' num = len(self.create_thread_list) # 将真实的线程全部添加进线程池 self.queue.empty() while num: self.queue.put(stop) num -= 1
将其放置在同级目录下并作为第三方模块导入试用一下:
这里模拟连接数据库示例
from DiyThreadPool import ThreadPoolChancey import time import random def connect(name): db = random.randint(10, 20) time.sleep(1) print('%s 连接到了数据库%s' % (name, db)) return db pool = ThreadPoolChancey(2) name_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex'] for name in name_list: pool.run(function=connect, args=(name, )) # pool.terminal() pool.close()