进程:资源的集合
线程:操作CPU的最小调试单位
最简单的多线程实例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
#!/usr/bin/python
#Author:sean
#线程有2种调用方式,如下:
#直接调用
import
threading
import
time
def
run(n):
print
(
"task "
,n)
time.sleep(
2
)
if
__name__
=
=
'__main__'
:
t1
=
threading.Thread(target
=
run,args
=
(
"t1"
,))
#生成一个线程实例
t2
=
threading.Thread(target
=
run,args
=
(
"t2"
,))
#生成另一个线程实例
t1.start()
#启动线程
t2.start()
#启动另一个线程
print
(t1.getName())
#获取线程名
print
(t2.getName())
run(
"t1"
)
run(
"t2"
)
#继承式调用
class
MyThread(threading.Thread):
def
__init__(
self
,num,sleep_time):
super
(MyThread,
self
).__init__()
self
.num
=
num
self
.sleep_time
=
sleep_time
def
run(
self
):
#定义每个线程要运行的函数
print
(
"running on number:%s"
%
self
.num)
time.sleep(
self
.sleep_time)
print
(
"task done,"
,
self
.num)
if
__name__
=
=
'__main__'
:
t1
=
MyThread(
"t1"
,
2
)
t2
=
MyThread(
"t2"
,
4
)
t1.start()
t2.start()
t1.join()
t2.join()
print
(
"The main"
)
print
(t1.getName())
print
(t2.getName())
|
进程与线程的执行速度没有可比性
进程至少包含一个线程
python中的线程和进程均是使用的操作系统的原生线程与进程
原生进程与原生线程是由操作系统维护与管理的
python中的多线程是伪多线程,实际上同一时间只有一个线程在运行
python中的多线程实质上就是一个单线程中上下文不停切换的效果表现形式
IO操作不占用CPU,如从硬盘中读取数据
计算占用CPU,如1+1
什么时候会用到伪多线程呢?如何提高效率?
python多线程不适合CPU密集操作型的任务
python多线程适合于IO操作密集型的任务,如socket
线程间内存是共享的
线程同一时间修改同一份数据时必须加锁(mutex互斥锁)
递归锁:锁中有锁,进2道门的例子
join:等待一个线程执行结束
1
2
3
4
5
6
|
def
run(n):
print
(
"run thread..."
)
t
=
threading.Thread(target
=
run,args
=
(n,))
t.start()
t.join()
|
信号量:同一时间允许多个线程来操作
线程间切换时会将上下文和栈保存到CPU的寄存器中
守护线程:服务于非守护线程(皇帝与宦官的关系,皇帝死了,官宦要殉葬)
队列的作用:
实现程序的解耦(使程序之间实现松耦合)
提高处理效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#!/usr/bin/python
#Author:sean
import
queue
#先进先出方式FIFO
q1
=
queue.Queue()
q1.put(
'd1'
)
q1.put(
'd2'
)
q1.put(
'd3'
)
print
(q1.qsize())
q1.get()
q1.get()
print
(q1.qsize())
print
(q1.get_nowait())
q1.get_nowait()
#后进先出方式(卖水果例子)
q2
=
queue.LifoQueue()
#Lifo即Last in first out
q2.put(
'd1'
)
q2.put(
'd2'
)
q2.put(
'd3'
)
print
(q2.qsize())
q2.get()
q2.get()
print
(q2.qsize())
print
(q2.get_nowait())
q2.get_nowait()
#按优先级来处理的方式
q3
=
queue.PriorityQueue()
#存储数据时可设置优先级的队列
q3.put((
-
1
,
"haha"
))
q3.put((
5
,
"tom"
))
q3.put((
10
,
"sean"
))
q3.put((
0
,
"jerry"
))
print
(q3.get())
print
(q3.get())
print
(q3.get())
print
(q3.get())
|
列表与队列区别:
从列表中取一个数据,相当于是复制了一份列表中的数据,列表中的元数据并没有被改动
从队列中取一个数据,取走后队列中就没了有这个被取的数据
生产者消费者模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#!/usr/bin/python
#Author:sean
import
threading
import
time
import
queue
q
=
queue.Queue(maxsize
=
10
)
def
Producer(name):
count
=
1
while
True
:
q.put(
"骨头%s"
%
count)
print
(
"生产了骨头"
,count)
count
+
=
1
time.sleep(
0.1
)
def
Consumer(name):
#while q.qsize()>0:
while
True
:
print
(
"[%s] 取到[%s] 并且吃了它..."
%
(name, q.get()))
time.sleep(
1
)
p
=
threading.Thread(target
=
Producer,args
=
(
"Sean"
,))
c
=
threading.Thread(target
=
Consumer,args
=
(
"Jason"
,))
c1
=
threading.Thread(target
=
Consumer,args
=
(
"小王"
,))
c2
=
threading.Thread(target
=
Consumer,args
=
(
"Jerry"
,))
c3
=
threading.Thread(target
=
Consumer,args
=
(
"Tom"
,))
p.start()
c.start()
c1.start()
c2.start()
c3.start()
|
事件:event
红绿灯例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
#!/usr/bin/python
#Author:sean
import
time
import
threading
event
=
threading.Event()
def
lighter():
count
=
0
event.
set
()
#先设为绿灯
while
True
:
if
count >
5
and
count <
10
:
#改成红灯
event.clear()
#把标志位清空
print
(
"\033[41;1mred light is on...\033[0m"
)
elif
count >
10
:
event.
set
()
#变绿灯
count
=
0
else
:
print
(
"\033[42;1mgreen light is on...\033[0m"
)
time.sleep(
1
)
count
+
=
1
def
car(name):
while
True
:
if
event.is_set():
#代表当前是绿灯
print
(
"[%s] running..."
%
name)
time.sleep(
1
)
else
:
print
(
"[%s] sees red light,waiting..."
%
name)
event.wait()
print
(
"\033[34;1m[%s] green light is on,start going...\033[0m"
%
name)
light
=
threading.Thread(target
=
lighter,)
light.start()
car1
=
threading.Thread(target
=
car,args
=
(
'Tesla'
,))
car1.start()
|
python中的多进程:
进程之间是独立的,内存是独享
多进程例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#!/usr/bin/python
#Author:sean
import
multiprocessing
import
time
def
f(name):
time.sleep(
2
)
print
(
"hello"
,name)
if
__name__
=
=
'__main__'
:
for
i
in
range
(
10
):
p
=
multiprocessing.Process(target
=
f,args
=
(
'bob %s'
%
i,))
p.start()
# p.join()
|
获取进程ID:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#!/usr/bin/python
#Author:sean
from
multiprocessing
import
Process
import
os
def
info(title):
print
(title)
print
(
'module name:'
, __name__)
print
(
'parent process:'
, os.getppid())
print
(
'process id:'
, os.getpid())
print
(
"\n\n"
)
def
f(name):
info(
'\033[31;1mcalled from child process function f\033[0m'
)
print
(
'hello'
, name)
if
__name__
=
=
'__main__'
:
info(
'\033[32;1mmain process line\033[0m'
)
p
=
Process(target
=
f, args
=
(
'bob'
,))
p.start()
# p.join()
|
在python中,每个子进程都是由其父进程启动的
进程间通讯:
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以使用以下方法:
1、Queues
使用方法与threading里的queue差不多
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#!/usr/bin/python
#Author:sean
from
multiprocessing
import
Process,Queue
def
f(q):
q.put([
42
,
None
,
'hello'
])
if
__name__
=
=
'__main__'
:
q
=
Queue()
p
=
Process(target
=
f,args
=
(q,))
p.start()
print
(q.get())
#prints "[42,None,'hello']"
p.join()
|
2、Pipes
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex(two-way).For example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#!/usr/bin/python
#Author:sean
from
multiprocessing
import
Process,Pipe
def
f(conn):
conn.send([
42
,
None
,
'hello from child'
])
conn.close()
if
__name__
=
=
'__main__'
:
parent_conn,child_conn
=
Pipe()
p
=
Process(target
=
f,args
=
(child_conn,))
p.start()
print
(parent_conn.recv())
#prints "[42,None,'hello']"
p.join()
|
上面所说的Queues和Pipes只是实现了进程之间的数据交换,并没有实现进程间的内存共享,要想实现进程之间内存共享,则要使用Managers
3、Managers
A manager object returnd by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support following types:
list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value and Array.For example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#!/usr/bin/python
#Author:sean
from
multiprocessing
import
Process, Manager
import
os
def
f(d, l):
d[
1
]
=
'1'
d[
'2'
]
=
2
d[
"pid%s"
%
os.getpid()]
=
os.getpid()
l.append(
1
)
print
(l,d)
if
__name__
=
=
'__main__'
:
with Manager() as manager:
d
=
manager.
dict
()
l
=
manager.
list
(
range
(
5
))
p_list
=
[]
for
i
in
range
(
10
):
p
=
Process(target
=
f, args
=
(d, l))
p.start()
p_list.append(p)
for
res
in
p_list:
res.join()
l.append(
"from parent"
)
print
(d)
print
(l)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/usr/bin/python
#Author:sean
from
multiprocessing
import
Process, Manager
import
os
def
f(d, l):
d[os.getpid()]
=
os.getpid()
l.append(os.getpid())
print
(l)
if
__name__
=
=
'__main__'
:
with Manager() as manager:
d
=
manager.
dict
()
#{} #生成一个字典,可在多个进程间共享和传递
l
=
manager.
list
(
range
(
5
))
#生成一个列表,可在多个进程间共享和传递
p_list
=
[]
for
i
in
range
(
10
):
p
=
Process(target
=
f, args
=
(d, l))
p.start()
p_list.append(p)
for
res
in
p_list:
#等待结果
res.join()
print
(d)
print
(l)
|
进程同步
without using the lock output from the different processes is liable to get all mixed up.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
#!/usr/bin/python
#Author:sean
from
multiprocessing
import
Process,Lock
def
f(l,i):
l.acquire()
try
:
print
(
'hello world'
,i)
finally
:
l.release()
if
__name__
=
=
'__main__'
:
lock
=
Lock()
for
num
in
range
(
10
):
Process(target
=
f,args
=
(lock,num)).start()
|
进程间也是有锁的概念的。
进程之间内存是独享的,独立的,那为什么还要锁呢?
对于python中的进程而言,进程间的内存是独享的,但是进程的标准输出是一致的,说白了屏幕是共享的。那么如果同一时间多个进程都争着打印信息到屏幕,如果没有锁的话就会出错,就会出现进程1还没打印完的时候进程2插进来了,导致最后的输出结果不正确。
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池有两个方法:
1、apply(同步执行、串行)
2、apply_async(异步执行、并行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#!/usr/bin/env python
#author:sean
from
multiprocessing
import
Process, Pool,freeze_support
#这里的freeze_support只在windows中时需要导入
import
time
import
os
def
Foo(i):
time.sleep(
2
)
print
(
"in process"
,os.getpid())
return
i
+
100
def
Bar(arg):
print
(
'-->exec done:'
, arg,os.getpid())
if
__name__
=
=
'__main__'
:
#freeze_support() #这一句只在windows中时需要
pool
=
Pool(processes
=
5
)
#允许进程池同时放入5个同时存在的进程
print
(
"主进程"
,os.getpid())
for
i
in
range
(
10
):
pool.apply_async(func
=
Foo, args
=
(i,), callback
=
Bar)
#callback=回调,由主进程执行回调,常用于主进程与子进程共享数据库连接的时候
#当主进程已经建立了数据库连接以后,子进程可以直接通过这个连接操作同一个数据库实例
#pool.apply(func=Foo, args=(i,)) #串行
#pool.apply_async(func=Foo, args=(i,)) #串行
print
(
'end'
)
pool.close()
pool.join()
#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。.join()
|