第十五章 Python多进程与多线程

简介:
15.1 multiprocessing
multiprocessing是多进程模块,多进程提供了任务并发性,能充分利用多核处理器。避免了GIL(全局解释锁)对资源的影响。
有以下常用类:
描述
Process(group=None, target=None, name=None, args=(), kwargs={})
派生一个进程对象,然后调用start()方法启动

Pool(processes=None, initializer=None, initargs=())
返回一个进程池对象,processes进程池进程数量
Pipe(duplex=True)
返回两个连接对象由管道连接
Queue(maxsize=0)
返回队列对象,操作方法跟Queue.Queue一样
multiprocessing.dummy
这个库是用于实现多线程
Process()类有以下些方法:
run()

start()
启动进程对象
join([timeout])
等待子进程终止,才返回结果。可选超时。
name
进程名字
is_alive()
返回进程是否存活
daemon
进程的守护标记,一个布尔值
pid
返回进程ID
exitcode
子进程退出状态码
terminate()
终止进程。在unix上使用SIGTERM信号,在windows上使用TerminateProcess()。
Pool()类有以下些方法:
apply(func, args=(), kwds={})
等效内建函数apply()
apply_async(func, args=(), kwds={}, callback=None)
异步,等效内建函数apply()
map(func, iterable, chunksize=None)
等效内建函数map()
map_async(func, iterable, chunksize=None, callback=None)
异步,等效内建函数map()
imap(func, iterable, chunksize=1)
等效内建函数itertools.imap()
imap_unordered(func, iterable, chunksize=1)
像imap()方法,但结果顺序是任意的
close()
关闭进程池
terminate()
终止工作进程,垃圾收集连接池对象
join()
等待工作进程退出。必须先调用close()或terminate()
Pool.apply_async()和Pool.map_aysnc()又提供了以下几个方法:
get([timeout])
获取结果对象里的结果。如果超时没有,则抛出TimeoutError异常
wait([timeout])
等待可用的结果或超时
ready()
返回调用是否已经完成
successful()


博客地址:http://lizhenliang.blog.51cto.com and https://yq.aliyun.com/u/lizhenliang
QQ群:323779636(Shell/Python运维开发群)

举例:
1)简单的例子,用子进程处理函数
from multiprocessing import Process
import os

def worker(name):
    print name
    print 'parent process id:', os.getppid()
    print 'process id:', os.getpid()

if __name__ == '__main__':
    p = Process(target=worker, args=('function worker.',))
    p.start()
    p.join()
    print p.name

# python test.py
function worker.
parent process id: 9079
process id: 9080
Process-1
Process实例传入worker函数作为派生进程执行的任务,用start()方法启动这个实例。
2)加以说明join()方法
from multiprocessing import Process
import os

def worker(n):
    print 'hello world', n

if __name__ == '__main__':
    print 'parent process id:', os.getppid()
    for n in range(5):
        p = Process(target=worker, args=(n,))
        p.start()
        p.join()
        print 'child process id:', p.pid
        print 'child process name:', p.name

# python test.py
parent process id: 9041
hello world 0
child process id: 9132
child process name: Process-1
hello world 1
child process id: 9133
child process name: Process-2
hello world 2
child process id: 9134
child process name: Process-3
hello world 3
child process id: 9135
child process name: Process-4
hello world 4
child process id: 9136
child process name: Process-5

# 把p.join()注释掉再执行
# python test.py
parent process id: 9041
child process id: 9125
child process name: Process-1
child process id: 9126
child process name: Process-2
child process id: 9127
child process name: Process-3
child process id: 9128
child process name: Process-4
hello world 0
hello world 1
hello world 3
hello world 2
child process id: 9129
child process name: Process-5
hello world 4
可以看出,在使用join()方法时,输出的结果都是顺序排列的。相反是乱序的。因此join()方法是堵塞父进程,要等待当前子进程执行完后才会继续执行下一个子进程。否则会一直生成子进程去执行任务。
在要求输出的情况下使用join()可保证每个结果是完整的。
3)给子进程命名,方便管理
from multiprocessing import Process
import os, time

def worker1(n):
    print 'hello world', n
def worker2():
    print 'worker2...'

if __name__ == '__main__':
    print 'parent process id:', os.getppid()
    for n in range(3):
        p1 = Process(name='worker1', target=worker1, args=(n,))
        p1.start()
        p1.join()
        print 'child process id:', p1.pid
        print 'child process name:', p1.name
    p2 = Process(name='worker2', target=worker2)
    p2.start()
    p2.join()
    print 'child process id:', p2.pid
    print 'child process name:', p2.name

# python test.py
parent process id: 9041
hello world 0
child process id: 9248
child process name: worker1
hello world 1
child process id: 9249
child process name: worker1
hello world 2
child process id: 9250
child process name: worker1
worker2...
child process id: 9251
child process name: worker2
4)设置守护进程,父进程退出也不影响子进程运行
from multiprocessing import Process

def worker1(n):
    print 'hello world', n
def worker2():
    print 'worker2...'

if __name__ == '__main__':
    for n in range(3):
        p1 = Process(name='worker1', target=worker1, args=(n,))
        p1.daemon = True
        p1.start()
        p1.join()
    p2 = Process(target=worker2)
    p2.daemon = False
    p2.start()
    p2.join()
5)使用进程池
#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Pool, current_process
import os, time, sys

def worker(n):
    print 'hello world', n
    print 'process name:', current_process().name  # 获取当前进程名字
    time.sleep(1)    # 休眠用于执行时有时间查看当前执行的进程

if __name__ == '__main__':
    p = Pool(processes=3)
    for i in range(8):
        r = p.apply_async(worker, args=(i,))
        r.get(timeout=5)  # 获取结果中的数据
    p.close()

# python test.py
hello world 0
process name: PoolWorker-1
hello world 1
process name: PoolWorker-2
hello world 2
process name: PoolWorker-3
hello world 3
process name: PoolWorker-1
hello world 4
process name: PoolWorker-2
hello world 5
process name: PoolWorker-3
hello world 6
process name: PoolWorker-1
hello world 7
process name: PoolWorker-2
进程池生成了3个子进程,通过循环执行8次worker函数,进程池会从子进程1开始去处理任务,当到达最大进程时,会继续从子进程1开始。
在运行此程序同时,再打开一个终端窗口会看到生成的子进程:
# ps -ef |grep python
root      40244   9041  4 16:43 pts/3    00:00:00 python test.py
root      40245  40244  0 16:43 pts/3    00:00:00 python test.py
root      40246  40244  0 16:43 pts/3    00:00:00 python test.py
root      40247  40244  0 16:43 pts/3    00:00:00 python test.py
6)进程池map()方法
map()方法是将序列中的元素通过函数处理返回新列表。
from multiprocessing import Pool

def worker(url):
    return 'http://%s' % url

urls = ['www.baidu.com', 'www.jd.com']
p = Pool(processes=2)
r = p.map(worker, urls)
p.close()
print r

# python test.py
['http://www.baidu.com', 'http://www.jd.com']
7)Queue进程间通信
multiprocessing支持两种类型进程间通信:Queue和Pipe。
Queue库已经封装到multiprocessing库中,在第十章 Python常用标准库已经讲解到Queue库使用,有需要请查看以前博文。
例如:一个子进程向队列写数据,一个子进程读取队列数据
#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
# 写数据到队列
def write(q):
    for n in range(5):
        q.put(n)
        print 'Put %s to queue.' % n
# 从队列读数据
def read(q):
    while True:
        if not q.empty():
            value = q.get()
            print 'Get %s from queue.' % value
        else:
            break
if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))   
    pw.start()
    pw.join()
    pr.start()
    pr.join()

# python test.py
Put 0 to queue.
Put 1 to queue.
Put 2 to queue.
Put 3 to queue.
Put 4 to queue.
Get 0 from queue.
Get 1 from queue.
Get 2 from queue.
Get 3 from queue.
Get 4 from queue.
8)Pipe进程间通信
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv() 
    p.join()

# python test.py
[42, None, 'hello']
Pipe()创建两个连接对象,每个链接对象都有send()和recv()方法,
9)进程间对象共享
Manager类返回一个管理对象,它控制服务端进程。提供一些共享方式:Value()、Array()、list()、dict()、Event()等
创建Manger对象存放资源,其他进程通过访问Manager获取。
from multiprocessing import Process, Manager

def f(v, a, l, d):
    v.value = 100
    a[0] = 123
    l.append('Hello')
    d['a'] = 1

mgr = Manager()
v = mgr.Value('v', 0)
a = mgr.Array('d', range(5))
l = mgr.list()
d = mgr.dict()

p = Process(target=f, args=(v, a, l, d))
p.start()
p.join()

print(v)
print(a)
print(l)
print(d)

# python test.py
Value('v', 100)
array('d', [123.0, 1.0, 2.0, 3.0, 4.0])
['Hello']
{'a': 1}
10)写一个多进程的例子
比如:多进程监控URL是否正常
from multiprocessing import Pool, current_process
import urllib2

urls = [
    'http://www.baidu.com',
    'http://www.jd.com',
    'http://www.sina.com',
    'http://www.163.com',
]

def status_code(url):
    print 'process name:', current_process().name
    try:
        req = urllib2.urlopen(url, timeout=5)
        return req.getcode()
    except urllib2.URLError:
        return

p = Pool(processes=4)
for url in urls:
    r = p.apply_async(status_code, args=(url,))
    if r.get(timeout=5) == 200:
        print "%s OK" %url
    else:
        print "%s NO" %url

# python test.py
process name: PoolWorker-1
http://www.baidu.com OK
process name: PoolWorker-2
http://www.jd.com OK
process name: PoolWorker-3
http://www.sina.com OK
process name: PoolWorker-4
http://www.163.com OK

15.2 threading
threading模块类似于multiprocessing多进程模块,使用方法也基本一样。threading库是对thread库进行二次封装,我们主要用到Thread类,用Thread类派生线程对象。
1)使用Thread类实现多线程
from threading import Thread, current_thread

def worker(n):
    print 'thread name:', current_thread().name
    print 'hello world', n
    
for n in range(5):
    t = Thread(target=worker, args=(n, ))
    t.start()
    t.join()  # 等待主进程结束

# python test.py
thread name: Thread-1
hello world 0
thread name: Thread-2
hello world 1
thread name: Thread-3
hello world 2
thread name: Thread-4
hello world 3
thread name: Thread-5
hello world 4
2)还有一种方式继承Thread类实现多线程,子类可以重写__init__和run()方法实现功能逻辑。
#!/usr/bin/python
# -*- coding: utf-8 -*-
from threading import Thread, current_thread

class Test(Thread):
    # 重写父类构造函数,那么父类构造函数将不会执行
    def __init__(self, n):
        Thread.__init__(self)
        self.n = n
    def run(self):
        print 'thread name:', current_thread().name
        print 'hello world', self.n

if __name__ == '__main__':
    for n in range(5):
        t = Test(n)
        t.start()
        t.join()

# python test.py
thread name: Thread-1
hello world 0
thread name: Thread-2
hello world 1
thread name: Thread-3
hello world 2
thread name: Thread-4
hello world 3
thread name: Thread-5
hello world 4
3)Lock
from threading import Thread, Lock, current_thread

lock = Lock()
class Test(Thread):
    # 重写父类构造函数,那么父类构造函数将不会执行
    def __init__(self, n):
        Thread.__init__(self)
        self.n = n
    def run(self):
        lock.acquire()  # 获取锁
        print 'thread name:', current_thread().name
        print 'hello world', self.n
        lock.release()  # 释放锁

if __name__ == '__main__':
    for n in range(5):
        t = Test(n)
        t.start()
        t.join()
众所周知,Python多线程有GIL全局锁,意思是把每个线程执行代码时都上了锁,执行完成后会自动释放GIL锁,意味着同一时间只有一个线程在运行代码。由于所有线程共享父进程内存、变量、资源,很容易多个线程对其操作,导致内容混乱。
当你在写多线程程序的时候如果输出结果是混乱的,这时你应该考虑到在不使用锁的情况下,多个线程运行时可能会修改原有的变量,导致输出不一样。
由此看来Python多线程是不能利用多核CPU提高处理性能,但在IO密集情况下,还是能提高一定的并发性能。也不必担心,多核CPU情况可以使用多进程实现多核任务。Python多进程是复制父进程资源,互不影响,有各自独立的GIL锁,保证数据不会混乱。能用多进程就用吧!
目录
相关文章
|
16天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
21天前
|
Linux 调度 C语言
深入理解操作系统:进程和线程的管理
【10月更文挑战第32天】本文旨在通过浅显易懂的语言和实际代码示例,带领读者探索操作系统中进程与线程的奥秘。我们将从基础知识出发,逐步深入到它们在操作系统中的实现和管理机制,最终通过实践加深对这一核心概念的理解。无论你是编程新手还是希望复习相关知识的资深开发者,这篇文章都将为你提供有价值的见解。
|
18天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
24 1
|
23天前
深入理解操作系统:进程与线程的管理
【10月更文挑战第30天】操作系统是计算机系统的核心,它负责管理计算机硬件资源,为应用程序提供基础服务。本文将深入探讨操作系统中进程和线程的概念、区别以及它们在资源管理中的作用。通过本文的学习,读者将能够更好地理解操作系统的工作原理,并掌握进程和线程的管理技巧。
37 2
|
24天前
|
调度 Python
深入浅出操作系统:进程与线程的奥秘
【10月更文挑战第28天】在数字世界的幕后,操作系统悄无声息地扮演着关键角色。本文将拨开迷雾,深入探讨操作系统中的两个基本概念——进程和线程。我们将通过生动的比喻和直观的解释,揭示它们之间的差异与联系,并展示如何在实际应用中灵活运用这些知识。准备好了吗?让我们开始这段揭秘之旅!
|
28天前
|
Java Unix 调度
python多线程!
本文介绍了线程的基本概念、多线程技术、线程的创建与管理、线程间的通信与同步机制,以及线程池和队列模块的使用。文章详细讲解了如何使用 `_thread` 和 `threading` 模块创建和管理线程,介绍了线程锁 `Lock` 的作用和使用方法,解决了多线程环境下的数据共享问题。此外,还介绍了 `Timer` 定时器和 `ThreadPoolExecutor` 线程池的使用,最后通过一个具体的案例展示了如何使用多线程爬取电影票房数据。文章还对比了进程和线程的优缺点,并讨论了计算密集型和IO密集型任务的适用场景。
49 4
|
10天前
|
数据采集 Java Python
爬取小说资源的Python实践:从单线程到多线程的效率飞跃
本文介绍了一种使用Python从笔趣阁网站爬取小说内容的方法,并通过引入多线程技术大幅提高了下载效率。文章首先概述了环境准备,包括所需安装的库,然后详细描述了爬虫程序的设计与实现过程,包括发送HTTP请求、解析HTML文档、提取章节链接及多线程下载等步骤。最后,强调了性能优化的重要性,并提醒读者遵守相关法律法规。
45 0
|
5月前
|
安全 Python
告别低效编程!Python线程与进程并发技术详解,让你的代码飞起来!
【7月更文挑战第9天】Python并发编程提升效率:**理解并发与并行,线程借助`threading`模块处理IO密集型任务,受限于GIL;进程用`multiprocessing`实现并行,绕过GIL限制。示例展示线程和进程创建及同步。选择合适模型,注意线程安全,利用多核,优化性能,实现高效并发编程。
75 3
|
2月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
2月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
31 3