干货:深入浅出讲解Python并发编程(三)

简介: 干货:深入浅出讲解Python并发编程

6.4 队列


在线程中队列的方法有三种


  • Queue
  • LifoQueue
  • PriorityQueue


上述三种方法里面,Queue的方法在进程并发中已经详细做了介绍,这里就不赘述了,而后边的LifoQueuePriorityQueue的对象属性和Queue是一样的,他们之间都是通用的,像什么qsize()empty()put()get()都是通用的


6.4.1 Queue


在线程中的Queue的用法和进程中的使用方法一样,而在线程中正是人们口中经常说的先进先出


其实很好理解,就是先进去队列的对象先行出来,引用方法名就是先put()进去的先get()出来。


import queue
q = queue.Queue(3)
q_list = []
for i in range(3):
    q.put(i)
    q_list.append(q)
for q in q_list:
    print(q.get())


640.png


6.4.2 LifoQueue


Queue正好相反,它是先进后出,这也就是著名的堆栈


还是上边代码稍作改动


import queue
q = queue.LifoQueue(3)
q_list = []
for i in range(3):
    q.put(i)
    q_list.append(q)
for q in q_list:
    print(q.get())


640.png


6.4.3 PriorityQueue


指定优先级,put()方法使用一样,指定优先级即可


import queue
q = queue.PriorityQueue(3)
q1 = q.put((2, 'Chancey'))
q2 = q.put((3, 'Waller'))
q3 = q.put((1, 'Mary'))
print(q.get())
print(q.get())
print(q.get())


640.png


可以看到mary是最后入队列的,但是其优先级高于所有,所以先行出队列。


判断优先级是看值的大小,值越小优先级就越高咯,灰常滴简单


四、并发池


对于线程池和进程池的构造和使用,在Python中也处于一种比较高阶的技术。这里会着重讲解并发池的使用以及注意事项


1. 进程池


如果一个项目里面只需要开启几个或者几十个进程,就可用Process手动创建或者for循环创建,但是如果进程量很高呢,这就用到了进程池,它可以减少进程创建和释放的开销,极大的降低了计算机资源的浪费。


举个例子:早上洗脸,如果水一滴滴的落在脸上洗脸,是不是很慢?而找个盆子把水装起来洗脸就会方便很多,进程池也一样,就是将诸多的进程装进容器,等待调用。


进程池,顾名思义,一种特殊的容器,用来存储进程,而在进程数量的选择上,并不是越多越好,应该综合计算机软硬件的条件来设置。


特点:非常清楚,一个进程创建和销毁是需要大量的时间和计算机资源,如果有十万个进程在这里重复的开辟内存空间、释放内存,那这个计算机估计濒临灭亡(我指的是普通的家用计算机,高性能服务器自行测试),而有了进程池,假设进程池限制3个进程,那么在运行的时候,只创建3个进程,然后循环利用,最后统一回收。所以说,进程池可以极大的减少计算机资源的浪费。


对象创建


multiprocessing.Pool(processes)


  • processes:允许入池的最大进程数


对象属性


  • apply():传递不定参数,主进程会被阻塞直到函数执行结束,在Python以后已经没有了该方法
  • apply_async():与上述apply()一样,但是非阻塞,且支持结果返回进行回调
  • map(func, iterable[, chunksize=None]):与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程
  • close():关闭进程池
  • terminate():结束进程池,未处理的任务也不会再处理
  • join():主进程阻塞等待子进程退出,该方法必须在close()或者terminate()之后


import multiprocessing
import time
import os
import random
def hello(name):
    start = time.time()
    print('%s 开始执行,进程号:%s' % (name, os.getpid()))
    time.sleep(random.random()*2)
    end = time.time()
    print('%s 结束执行,进程号:%s,耗时%0.2fS' % (name, os.getpid(), end-start))
if __name__ == '__main__':
    p = multiprocessing.Pool()
    for i in range(4):
        p.apply_async(hello, (i, ))
    print('*'*30, '程序开始', '*'*30)
    p.close()
    p.join()
    print('*'*30, '程序结束', '*'*30)


640.png


可以清楚的看到,同时进行的只有3个进程,而且在进程池中的某一个进程处理完任务后不会回收,而是新入池一个任务继续进行,知道所有的任务执行完毕,进程才统一回收。


2. 线程池


线程池的创建有三种方式


  • threadpool
  • concurrent.futures该方法也可用来创建进程池,后边会做介绍
  • 重写threadpool或者concurrent.futures.ThreadPoolExecutor


这里只介绍第一种,后边会详细介绍concurrent.futures来创建


在python 2.7 以上包括Python 3.x ,支持第三方库threadpool


注意:该库现已停止官方支持,仅作为旧项目的支持


网上对于threadpool的介绍少之又少,作为从来规矩上网的我,官方文档怎么也看不明白,所以就剖其源码研究了一番


源码介绍:


Easy to use object-oriented thread pool framework.
A thread pool is an object that maintains a pool of worker threads to perform
time consuming operations in parallel. It assigns jobs to the threads
by putting them in a work request queue, where they are picked up by the
next available thread. This then performs the requested operation in the
background and puts the results in another queue.
The thread pool object can then collect the results from all threads from
this queue as soon as they become available or after all threads have
finished their work. It's also possible, to define callbacks to handle
each result as it comes in.
The basic concept and some code was taken from the book "Python in a Nutshell,
2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
14.5 "Threaded Program Architecture". I wrapped the main program logic in the
ThreadPool class, added the WorkRequest class and the callback system and
tweaked the code here and there. Kudos also to Florent Aide for the exception
handling mechanism.
Basic usage::
    >>> pool = ThreadPool(poolsize)
    >>> requests = makeRequests(some_callable, list_of_args, callback)
    >>> [pool.putRequest(req) for req in requests]
    >>> pool.wait()
See the end of the module code for a brief, annotated usage example.
Website : http://chrisarndt.de/projects/threadpool/


大致意思就是介绍了该库的运行原理,和基本用法,就是Basic usage的内容


pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()


分行解释代码:


  • 定义线程池,参数为最大的线程数
  • 调用makeRequests()创建了要开启多线程的函数或者方法,后边的list_of_args为该函数的参数,默认为None,callback为回调函数。也就是说,只需要两个参数即可开启
  • 将线程扔进线程池。等同于


for item in requests:
    pool.putRequest(item)


  • 等待所有的线程完成任务后退出


import threadpool
import time
def hello(name):
    print('%s 说 hello' % name)
    time.sleep(1)
    print('%s 说 bye' % name)
if __name__ == '__main__':
    names = ['Chancey', 'Wanger', 'Mary', 'Alex', 'Guido']
    start = time.time()
    pool = threadpool.ThreadPool(3)
    requests = threadpool.makeRequests(hello, names)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('总共耗时:%0.2f' % (time.time() - start))


640.png


如果开启多线程的函数有比较多的参数的话,函数调用时第一个解包list,然后解包dict。这样的话就两种方法传参,一种是列表,一种字典。


  • 列表传参


# 多个参数
import threadpool
import time
def counts(a, b, c):
    print('%d+%d+%d=%d' % (a, b, c, a+b+c))
    time.sleep(2)
if __name__ == '__main__':
    # 构造参数
    number_one = [1, 2 ,3]
    number_two = [4, 5, 6]
    number_three = [7, 8, 9]
    params = [(number_one, None), (number_two, None), (number_three, None)]
    # 创建线程池
    start = time.time()
    pool = threadpool.ThreadPool(2)
    requests = threadpool.makeRequests(counts, params)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('总共耗时%0.2f' % (time.time() - start))


640.png


  • 字典传参


# 多个参数
import threadpool
import time
def counts(a, b, c):
    print('%d+%d+%d=%d' % (a, b, c, a+b+c))
    time.sleep(2)
if __name__ == '__main__':
    # 构造参数
    number_one = {'a':1, 'b':2, 'c':3}
    number_two = {'a':4, 'b':5, 'c':6}
    number_three = {'a':7, 'b':8, 'c':9}
    params = [(None, number_one), (None, number_two), (None, number_three)]
    # 创建线程池
    start = time.time()
    pool = threadpool.ThreadPool(2)
    requests = threadpool.makeRequests(counts, params)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('总共耗时%0.2f' % (time.time() - start))


依旧可以完美运行


不过threadpool并不建议在新项目中使用,官方是这样声明的:

This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO NOT USE IT FOR NEW PROJECTS!

该模块已经过时,但仍在PyPi中提供,以支持仍然使用它的旧项目。请勿用于新项目!


3. 并发池


这里命名并发池是我自己想的,因为concurrent支持多进程和多线程。这里也将是本文的亮点。


该模块是Python3自带包,而Python2.7以上也可安装使用。concurrent包下只有一个模块futures,模块下最常用的就是Executor类,它下边有两个子类,分别是ThreadPoolExecutorProcessPoolExecutor,顾名思义,分别支持多线程和多进程。


3.1 ThreadPoolExecutor


该类专为多线程提供支持


创建对象


class concurrent.futures.ThreadPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())


  • max_workes:指定线程数。在Python 3.5 以上的版本中,为None或者没有指定的时候开启和计算机CPU相同数量的线程,并且在Windows上必须小于61,附上源码


if max_workers is None:
    # Use this number because ThreadPoolExecutor is often
    # used to overlap I/O instead of CPU work.
    max_workers = (os.cpu_count() or 1) * 5
    if max_workers <= 0:
        raise ValueError("max_workers must be greater than 0")


  • map_context:允许用户控制由进程池创建给工作者进程的开始方法
  • initializerinitargs:在每个工作者线程开始处调用的一个可选可调用对象。initargs 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个 BrokenThreadPool。该功能在python 3.8 版本以上提供


对象属性


抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。


注意:下边介绍的对象方法是通用的


  • submit(fn, *args, **kwargs):异步提交,传参的方式依旧是元组
  • map(func, *iterables, timeout=None, chunksize=1)):类似于 map(),也就是将submit()for

循环了

  • shutdown(wait=True):类似于进程池中的pool.close()pool.join()的结合体。当wait为True时等待池子内所有任务完毕后释放,反之亦然,默认为True。

注意:不论wait为何值,整个程序都会等到所有任务执行完毕

  • result(timeout=None):获取结果
  • add_done_callback(fn):回调函数


from concurrent.futures import ThreadPoolExecutor
import os
import time
def say(name):
    print('%s 说 hello,我的PID:%s' % (name, os.getpid()))
    time.sleep(2)
    print('%s 说 bye,我的PID:%s' % (name, os.getpid()))
if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    names = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']
    start = time.time()
    for i in names:
        pool.submit(say, i)
    pool.shutdown(wait=True)
    print('耗时:%0.2f' % (time.time() - start))


640.png


3.2 ProcessPoolExecutor


用法相同,不再赘述。只不过换成ProcessPoolExecutor,里面所有创建对象和对象方法都是一样的。


3.3 提交任务


任务提交有两种方式:


  • 同步调用:提交任务后等待任务执行完毕,拿到结果后在执行下一步,这样下来的话,程序就变成了串行
  • 异步调用:提交任务后不用等待


前边在介绍ThreadPoolExecutor或者ProcessPoolExecutor时提到了add_done_callback(fn),这个就是回调机制。异步调用和回调机制都是提交任务的方式。

先看一下同步提交的方式:


from concurrent.futures import ThreadPoolExecutor
import time
def get_url(keyword):
    url = 'https://www.suxin.site/%s' % keyword
    time.sleep(1)
    print('%s URL构造成功' % keyword)
    return url
def get_html(url):
    html = '<html>%s</html>' % url
    time.sleep(2)
    print('%s HTML获取成功' % url)
    return html
if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    keyword_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']
    start = time.time()
    for keyword in keyword_list:
        msg = pool.submit(get_url, keyword).result()
        get_html(msg)
    pool.shutdown(wait=True)
    print('耗时:%0.2f' % (time.time() - start))


640.png


再看下异步提交


from concurrent.futures import ThreadPoolExecutor
import time
def get_url(keyword):
    url = 'https://www.suxin.site/%s' % keyword
    time.sleep(1)
    print('%s URL构造成功' % keyword)
    return url
def get_html(url):
    url = url.result()
    html = '<html>%s</html>' % url
    time.sleep(2)
    print('%s HTML获取成功' % url)
    return html
if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    keyword_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']
    start = time.time()
    for keyword in keyword_list:
        pool.submit(get_url, keyword).add_done_callback(get_html)
    pool.shutdown(wait=True)
    print('耗时:%0.2f' % (time.time() - start))


640.png


效率一目了然


4. 重写


本来这里是打算将concurrent.fautres单独拿出来说一说,但是发现本文已经够长了,所以在下篇详细讨论如何重写ThreadPoolExecutor


前边的线程池中有讲到可以通过重写concurrent或者fautres来使用并发池,这里需要详细的了解源码以及运行原理,建议对current源码有研究的朋友们可以琢磨一下,考虑到篇幅这里就不说了。


5. 自构并发池


网络上对自构并发池大多千篇一律,然并卵,所以这里介绍一下,不过上边的方法已经足够了项目的使用,极特别的需求可能用到,有兴趣可以看看,没兴趣的直接跳到协程去看。


5.1 构思


python里面的Queue类似于并发,可以说是低配版的并发


  • 在队列中加入任务
  • 创建队列
  • 设置大小
  • 真实创建的线程


  • 处理任务
  • 获取任务,每取出一个就剔除那个
  • 判断任务是否为空
  • 判断空闲线程的数量,等于0的时候不再创建
  • 线程数不能超过线程池大小
  • 根据任务的数量判断要创建线程的数量
  • 创建线程
  • 执行任务


5.2 实现


大致的思路就这些,接下来就是精彩的代码,里面有详细的注释,不必慌


import threading
import time
import queue
stop = object() # 这个是用来标志任务停止
class ThreadPoolChancey(object):
    def __init__(self, max_thread=None):
        self.queue = queue.Queue() # 创建的队列可以放无限制的任务
        self.max_thread = max_thread # 指定的最大线程数,默认为None
        self.terminal = False # 停止标志
        self.create_thread_list = [] # 真实创建的线程数,这里以列表的方式存储,方便判断线程数量
        self.free_thread_list = [] # 空闲线程数
    def run(self, function, args, callback=None):
        '''
        :param function : 执行函数
        :param args : 要执行的函数的参数,定义为元组传参
        :param callback : 回调函数,T or F 的返回值
        :return :
        '''
        # 判断是否创建真实线程
        if len(self.free_thread_list) == 0 and len(self.create_thread_list) < self.max_thread: # 如果空闲线程为0并且创建的真实线程没有达到最大限度就创建
            self.create_thread()
        task = (function, args, callback)
        self.queue.put(task)
    def callback(self):
        '''回调函数:用以循环获取任务并执行'''
        current_thread = threading.current_thread() # 获取当前线程
        self.create_thread_list.append(current_thread) # 添加到线程列表里面
        event = self.queue.get() # 获取一个任务并执行
        while event != stop: # 用以判断是否终止任务
            function, args, callback = event # 解开任务包,该包包含了执行函数、参数、回调函数
            try: # 执行函数运行的结果,该判断执行成功,故状态为True
                message = function(*args)
                state = True
            except Exception as err: # 执行异常,状态为False
                message = err
                state = False
            if callback is not None: # 不为空则表示执行完毕
                try:
                    callback(state, message) # 执行回调函数
                except Exception as err:
                    print(err) # 抛出异常
            else:
                pass
            if not self.terminal:
                self.free_thread_list.append(current_thread) # 有终止任务的时候就添加一个新任务
                event = self.queue.get()
                self.free_thread_list.remove(current_thread) # 这里添加了任务,线程有一个占用,剔除空闲
            else:
                event = stop # 停止put
        else:
            self.create_thread_list.remove(current_thread) # 剔除执行完毕的任务
    def create_thread(self):
        '''创建线程'''
        t = threading.Thread(target=self.callback, )
        t.start()
    def terminal(self):
        '''终止任务,无论队列是否还有任务'''
        self.terminal = True
    def close(self) :
        '''关闭线程池'''
        num = len(self.create_thread_list) # 将真实的线程全部添加进线程池
        self.queue.empty()
        while num:
            self.queue.put(stop)
            num -= 1


将其放置在同级目录下并作为第三方模块导入试用一下:


这里模拟连接数据库示例


from DiyThreadPool import ThreadPoolChancey
import time
import random
def connect(name):
    db = random.randint(10, 20)
    time.sleep(1)
    print('%s 连接到了数据库%s' % (name, db))
    return db
pool = ThreadPoolChancey(2)
name_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']
for name in name_list:
    pool.run(function=connect, args=(name, ))
# pool.terminal()
pool.close()


640.png

相关文章
|
3月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
在Python的并发编程世界中,没有万能的解决方案,只有最适合特定场景的方法。希望本文能够为你拨开迷雾,找到那条通往高效并发编程的光明大道。
48 2
|
4月前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
56 4
|
3月前
|
监控 并行计算 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
在Python编程的征途中,面对日益增长的性能需求,如何构建高效的应用成为了每位开发者必须面对的课题。并发与异步编程作为提升程序性能的两大法宝,在处理IO密集型与CPU密集型任务时展现出了巨大的潜力。今天,我们将深入探讨这些技术的最佳实践,助你打造高效Python应用。
48 0
|
3天前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
并发编程能够显著提升程序的效率和响应速度。例如,网络爬虫通过并发下载将耗时从1小时缩短至20分钟;APP页面加载时间从3秒优化到200毫秒。Python支持多线程、多进程、异步I/O和协程等并发编程方式,适用于不同场景。线程通信方式包括共享变量、消息传递和同步机制,如Lock、Queue等。Python的并发编程特性使其在处理大规模数据和高并发访问时表现出色,成为许多领域的首选语言。
|
2月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
2月前
|
API 数据处理 Python
探秘Python并发新世界:asyncio库,让你的代码并发更优雅!
在Python编程中,随着网络应用和数据处理需求的增长,并发编程变得愈发重要。asyncio库作为Python 3.4及以上版本的标准库,以其简洁的API和强大的异步编程能力,成为提升性能和优化资源利用的关键工具。本文介绍了asyncio的基本概念、异步函数的定义与使用、并发控制和资源管理等核心功能,通过具体示例展示了如何高效地编写并发代码。
47 2
|
2月前
|
数据库 开发者 Python
“Python异步编程革命:如何从编程新手蜕变为并发大师,掌握未来技术的制胜法宝”
【10月更文挑战第25天】介绍了Python异步编程的基础和高级技巧。文章从同步与异步编程的区别入手,逐步讲解了如何使用`asyncio`库和`async`/`await`关键字进行异步编程。通过对比传统多线程,展示了异步编程在I/O密集型任务中的优势,并提供了最佳实践建议。
24 1
|
3月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
42 3
|
3月前
|
数据处理 Python
深入探索:Python中的并发编程新纪元——协程与异步函数解析
深入探索:Python中的并发编程新纪元——协程与异步函数解析
33 3
|
3月前
|
中间件 API 调度
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
深入探究 Python 异步编程:利用 asyncio 和 aiohttp 构建高效并发应用
53 4