🎉🎉🎉欢迎来到我的博客,我是一名自学了2年半前端的大一学生,熟悉的技术是JavaScript与Vue.目前正在往全栈方向前进, 如果我的博客给您带来了帮助欢迎您关注我,我将会持续不断的更新文章!!!🙏🙏🙏
@[toc]
Python中的多进程
Unix/Linux操作系统提供了一个fork()
系统调用。fork()
调用一次,返回两次,因为操作系统自动把当前进程(父进程)复制一份(子进程),然后,分别在父进程和子进程内返回。子进程永远返回0
,而父进程返回子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID。
Python的os模块提供了fork()
函数。
由于Windows系统没有fork()
调用,,因此要实现跨平台的多进程编程,可以使用multiprocessing
模块的Process
类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool
)、用于进程间通信的队列(Queue
)和管道(Pipe
)等。
multiprocessing
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process
实例,用start()
方法启动。
join()
方法可以等待子进程结束后再继续往下运行,通常用于进程间的通信。
from multiprocessing import Process
import os
#子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
执行结果如下:
Parent process 16164.
Child process will start.
Run child process test(7652).....
Child process end.
Pool
对Pool
对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了。
Pool
的默认大小时CPU的核数,就是你的电脑最多同时执行几个进程。
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
执行结果如下:
Parent process 14880.
Waiting for all subprocess done...
Run task 0(12136)
Run task 1(11292)
Run task 2(6812)
Run task 3(9264)
Task 3 runs 1.01 second.
Run task 4(9264)
Task 1 runs 1.20 second.
Task 2 runs 2.03 second.
Task 0 runs 2.80 second.
Task 4 runs 2.25 second.
All subprocess done.
子进程
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。subprocess
模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。
import subprocess
print('$nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
执行结果如下:
$nslookup www.python.org
Non-authoritative answer:
Server: UnKnown
Address: 10.182.100.7
Name: dualstack.python.map.fastly.net
Addresses: 2a04:4e42:600::223
2a04:4e42:200::223
2a04:4e42::223
2a04:4e42:400::223
151.101.228.223
Aliases: www.python.org
Exit code: 0
如果子进程还需要输入,则可以通过communicate()
方法输入.
进程间通信
Process
之间肯定时需要通信的,操作系统提供了很多机制来实现进程间通信。Python的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。
from multiprocessing import Process, Queue
import os, time, random
#写数据进程执行的代码
def write(q):
print('Process to write:%s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
#读数据进程执行的代码
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__ == '__main__':
# 父进程创建Queue,并传给各个子进程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束
pw.join()
# pr进程时死循环,无法等待其结束,只能强制终止:
pr.terminate()
执行结果如下:
Process to write:13288
Put A to queue...
Process to read: 8396
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
小结
在Unix/Linux下,可以使用fork()调用实现多进程。
要实现跨平台的多进程,可以使用multiprocessing模块。
进程间通信是通过Queue、Pipes等实现的。
Python中的多线程
Python的标准库提供了两个模块:_thread
和threading
,_thread
是低级模块,threading
是高级模块,是对_thread
进行了封装。
启动一个线程就是把一个函数传入并创建Thread
实例,并调用start()
开始执行。
import time, threading
#新线程执行代码
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n += 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
执行结果如下:
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
由于任何进程默认启动一个线程,我们把该线程叫做主线程,主线程又可以启动新的线程,Python中的threading
模块有一个current_thread()
函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread
,子线程的名字在创建时指定,我们用LoopThread
命名子线程。名字仅仅在打印时用来显示,完全没有意义,如果不起名字Python就会自动给线程命名为Thread-1
,Thread-2
.......
Lock
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在与每个进程中,互不影响,而锁线程中,所用变量都有所用线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时该一个变量,把内容该乱了。
import threading
#假定这是你的银行存款
balance = 0
def change_it(n):
# 先存后取,结果应该为0
global balance
balance += n
balance -= n
def run_thread(n):
for i in range(100000):
change_it(n)
if __name__ == '__main__':
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
我们定义一个共享变量balace
,初始值为0
,并启动了两个线程,先存后取,理论上结果时0
,但是,由于线程的调度时有操作系统决定的,当t1、t2交替执行时,只要循环的次数足够多,balance
的结果就不一定为0
了。
原因是因为高级语言的一条语句在CPU执行时时若干条语句。
balance += n
也分两步:
- 计算
balance + n
,存入临时变量中; - 将临时变量的值赋给
balance
.
也就是可以看成:
由于t1和t2是交替运行的,如果操作系统以下面的顺序执行t1、t2:x = balance + n balance = x
~~~
初始值 balance = 0
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
结果 balance = -8
究其原因,是因为修改`balance`需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。
两个线程同时一存一取,就可能导致余额不对,你肯定不希望你的银行存款莫名其妙地变成了负数,所以,我们必须确保一个线程在修改`balance`的时候,别的线程一定不能改。
如果我们要确保`balance`计算正确,就要给`change_it()`上一把锁,当某个线程开始执行`change_it()`时,我们说,该线程因为获得了锁,因此其他线程不能同时执行`change_it()`,只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过`threading.Lock()`来实现:
~~~python
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()
当多个线程同时执行lock.acquire()
时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。
获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally
来确保锁一定会被释放。
锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
多核CPU
如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。
如果写一个死循环的话,会出现什么情况呢?
打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。
我们可以监控到一个死循环线程会100%占用一个CPU。
如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。
要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。
试试用Python写个死循环:
import threading, multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。
但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。
不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
小结
多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时,又要小心死锁的发生。
Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。多线程的并发在Python中就是一个美丽的梦。
ThreadLocal
在多线程环境中,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
但是局部变量也有问题,就是在函数调用时,传递起来很麻烦:
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
每个函数一层一层调用都这么传参数那还得了?用全局变量?也不行,因为每个线程处理不同的Student
对象,不能共享。
如果用一个全局dict
存放所有的Student对象,然后以thread
自身作为key
获得线程对应的Student
对象如何?
global_dict = {
}
def std_thread(name):
std = Student(name)
# 把std放到全局变量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函数都可以查找出当前线程的std变量:
std = global_dict[threading.current_thread()]
...
这种方式理论上是可行的,它最大的优点是消除了std
对象在每层函数中的传递问题,但是,每个函数获取std
的代码有点丑。
有没有更简单的方式?
ThreadLocal
应运而生,不用查找dict
,ThreadLocal
帮你自动做这件事:
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
执行结果:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全局变量local_school
就是一个ThreadLocal
对象,每个Thread
对它都可以读写student
属性,但互不影响。你可以把local_school
看成全局变量,但每个属性如local_school.student
都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal
内部会处理。
可以理解为全局变量local_school
是一个dict
,不但可以用local_school.student
,还可以绑定其他变量,如local_school.teacher
等等。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
小结
一个ThreadLocal
变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal
解决了参数在一个线程中各个函数之间互相传递的问题。
分布式进程
在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的multiprocessing
模块不但支持多进程,其中managers
子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers
模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
举个例子:如果我们已经有一个通过Queue
通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue
可以继续使用,但是,通过managers
模块把Queue
通过网络暴露出去,就可以让其他机器的进程访问Queue
了。
我们先看服务进程,服务进程负责启动Queue
,把Queue
注册到网络上,然后往Queue
里面写入任务:
Hi👋,这里是瑞雨溪一个喜欢JavaScript和Vue的大学生,如果我的文章给你带来的帮助,欢迎您关注我,我会持续不断的更新更多优质文章.你的关注就是我的动力!!!🎉🎉🎉