1. 什么是进程?
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。
一个进程至少包含一个线程。
2. 在python中有了多线程编程为何还需要多进程编程?
在python中由于有GIL(全局解释器锁)的存在,在任一时刻只有一个线程在运行(无论你的CPU是多少核),无法实现真正的多线程。那么该如何让python程序真正的并行运行呢?答案就是不要使用多线程,使用多进程。python标准库提供了multiprocessing模块(multiprocessing
是一个和threading
模块类似,提供API,生成进程的模块。multiprocessing
包提供本地和远程并发,通过使用子进程而不是线程有效地转移全局解释器锁。),它的API几乎复制了threading模块的API,当然它还有一行threading模块没有的API。
例一(multiprocessing模块的简单使用):
1 import multiprocessing,time 2 3 class Task(multiprocessing.Process): 4 def __init__(self): 5 super(Task, self).__init__() 6 7 def run(self): 8 print("Process---%s" % self.name) 9 time.sleep(2) 10 11 12 if __name__ == "__main__": 13 for i in range(1, 8+1): 14 t = Task() 15 t.start()
注:由于multiprocessing模块基本的API同threading模块,就不挨个演示了,本文主要讲解multiprocessing模块不同于threading模块的API的使用。要了解其他同threading模块相同的API的使用,可参见:http://www.cnblogs.com/God-Li/p/7732407.html
multiprocessing.Process源码:
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self.name = '' self.daemon = False #守护进程标志,必须在start()之前设置 self.authkey = None #The process’s authentication key (a byte string). self.exitcode = None #The child’s exit code. This will be None if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N. self.ident = 0 self.pid = 0 #进程ID。在生成进程之前,这将是Non。 self.sentinel = None #A numeric handle of a system object which will become “ready” when the process ends. def run(self): pass def start(self): pass def terminate(self): """ Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned. :return: """ pass def join(self, timeout=None): pass def is_alive(self): return False
multiprocessing模块中的队列:
class multiprocessing.
Queue
([maxsize])实现除task_done()
和join()
之外的queue.Queue
的所有方法,下面列出queue.Queue中没有的方法:
class multiprocessing.Queue([maxsize]) close() """ 指示当前进程不会在此队列上放置更多数据。 The background thread will quit once it has flushed all buffered data to the pipe. 当队列被垃圾回收时,这被自动调用。 """ join_thread() """ 加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到pipe。 默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。 该进程可以调用cancel_join_thread()使join_thread()不执行任何操作 """ cancel_join_thread() """ 使join_thread()不执行任何操作 """
class multiprocessing.
SimpleQueue是class
multiprocessing.
Queue
([maxsize])的简化,只有三个方法------empty(), get(), put()
class multiprocessing.
JoinableQueue
([maxsize])是class multiprocessing.
Queue
([maxsize])的子类,增加了take_done()和join()方法
注:由于进程之间内存空间不共享,所以必须将实例化后的queue对象当作参数传入其他进程,其他进程才能使用。而且,每传入一次相当于克隆一份,与原来的queue独立,只是python会同步queue中的数据,而不是像在多线程的queue数据只有一份。
进程之间的通信:
multiprocessing.
Pipe
([duplex]) --------------- 返回表示管道末端的Connection
对象(类似与socket中的连接可用于发送和接收数据)的(conn1, conn2)。
如果duplex是True
(默认值),则管道是双向的。如果duplex是False
,则管道是单向的:conn1
只能用于接收消息,conn2
用于发送消息。
例二(multiprocessing.Pipe使用演示):
1 import multiprocessing,time 2 3 class Processing_1(multiprocessing.Process): 4 def __init__(self, conn): 5 super(Processing_1, self).__init__() 6 self.conn = conn 7 def run(self): 8 send_data = "this message is from p1" 9 self.conn.send(send_data) #使用conn发送数据 10 time.sleep(0.8) 11 recv_data = self.conn.recv() #使用conn接收数据 12 print("p1 recv: " + recv_data) 13 self.conn.close() 14 15 16 class Processing_2(multiprocessing.Process): 17 def __init__(self, conn): 18 super(Processing_2, self).__init__() 19 self.conn = conn 20 21 def run(self): 22 send_data = "this message is from p2" 23 self.conn.send(send_data) 24 time.sleep(0.8) 25 recv_data = self.conn.recv() 26 print("p2 recv: " + recv_data) 27 self.conn.close() 28 29 if __name__ == "__main__": 30 conn1, conn2 = multiprocessing.Pipe() #实例化Pipe对象,conn1, conn2分别代表连接两端 31 32 p1 = Processing_1(conn1) #将连接对象当作参数传递给子进程 33 p2 = Processing_2(conn2) 34 35 p1.start() 36 p2.start() 37 38 p1.join() 39 p2.join()
进程之间的数据共享:
multiprocessing.
Manager
() ----------- 返回开始的SyncManager
对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并且具有将创建共享对象并返回相应代理的方法。管理器进程将在垃圾收集或其父进程退出时立即关闭。
例三(Manager的简单使用):
1 import multiprocessing,time 2 import os 3 4 class Processing(multiprocessing.Process): 5 def __init__(self, d, l): 6 super(Processing, self).__init__() 7 self.d = d 8 self.l = l 9 10 def run(self): 11 self.d[os.getpid()] = os.getpid() #当作正常dict使用即可 12 self.l.append(1) 13 print(self.l) 14 15 if __name__ == "__main__": 16 17 manager = multiprocessing.Manager() #生成Manager 对象 18 d = manager.dict() #生成共享dict 19 l = manager.list() #生成共享list 20 21 p_s = [] 22 for i in range(10): 23 p = Processing(d, l) 24 p.start() 25 p_s.append(p) 26 27 for p in p_s: 28 p.join() 29 30 print(d) 31 print(l)
manager可以生成以下共享数据对象(常用):
-
Event
() -
Create a shared
threading.Event
object and return a proxy for it.
-
Lock
() -
Create a shared
threading.Lock
object and return a proxy for it.
-
Namespace
() -
Create a shared
Namespace
object and return a proxy for it.
-
Queue
([maxsize]) -
Create a shared
queue.Queue
object and return a proxy for it.
-
RLock
() -
Create a shared
threading.RLock
object and return a proxy for it.
-
Semaphore
([value]) -
Create a shared
threading.Semaphore
object and return a proxy for it.
-
Array
(typecode, sequence) -
Create an array and return a proxy for it.
-
Value
(typecode, value)¶ -
Create an object with a writable
value
attribute and return a proxy for it.
-
dict
() -
dict
(mapping) -
dict
(sequence) -
Create a shared
dict
object and return a proxy for it.
-
list
() -
list
(sequence) -
Create a shared
list
object and return a proxy for it.
进程锁:
进程锁有两种multiprocessing.
Lock(非递归锁)和
multiprocessing.
RLock(递归锁)。
multiprocessing.
Lock(非递归锁):一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。
multiprocessing.
RLock(递归锁): A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.
这两种锁都只用两种方法:acquire
(block=True, timeout=None)和release
(),它们的使用基本和线程锁类似(只不是要把锁的示例对象当作参数传入其他的进程):http://www.cnblogs.com/God-Li/p/7732407.html
进程池:
为了便于对多进程的管理,通常使用进程池来进行多进程编程(而不是使用multiprocessing.Process)。
例:
1 import multiprocessing,os 2 import time 3 4 5 def run(): 6 print(str(os.getpid()) + "-----running") 7 time.sleep(2) 8 print(str(os.getpid()) + "-----done") 9 10 def done(): 11 print("done") 12 13 def error(): 14 print("error") 15 16 if __name__ == "__main__": 17 pool = multiprocessing.Pool(processes=4) #实力化进程池对象 18 19 for i in range(8): 20 # pool.apply(func=run) #进程池中的进程串行运行 21 pool.apply_async(func=run) 22 23 pool.close() 24 pool.join() 25 print("finish....")
Pool对象常用方法:
-
apply
(func[, args[, kwds]]) -
Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks,
apply_async()
is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.将任务提交到进程池,只有一个进程在工作,其他进程处于阻塞状态(相当于串行运行)。
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]]) -
A variant of the
apply()
method which returns a result object.If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.
If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
将任务提交到进程池,多个进程(进程数量由之前实例化时的processes参数设置)同时运行,callback工作进程完成时(由当前进程的父进程)调用由此传入的任务,error_callback工作进程出错时(由当前进程的父进程)调用由此传入的任务。
-
close
()
-
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
调用此方法后进程池不能在提交新的任务
-
terminate
() -
Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected
terminate()
will be called immediately.立即停止工作进程,而不需要等待未完成的工作进程。
-
join
() -
Wait for the worker processes to exit. One must call
close()
orterminate()
before usingjoin()
.等待进程池中的工作进程结束(在此之前必须调用close()或者terminate())。
注:Pool对象在生成时进程内的进程(阻塞)就已经启动,使用apply(或者apply_async)方法只是将任务提交给线程池,不会再建立新进程。