协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。
所谓用户态就是说协程是由用户来控制的,CPU不认识协程,协程是跑在线程中的。
协程拥有自己的寄存器上下文栈。协程调试切换时,将寄存器上下文栈保存到其他地方,在切回来时,恢复先前保存的寄存器上下文栈。
因此,协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,也就是进入上一次离开时所处逻辑流的位置。
线程切换时会将上下文和栈保存到CPU的寄存器中。
协程的标准定义,即符合以下所有条件就能称之为协程:
1.在单线程里实现并发
2.修改共享数据不需要加锁
3.用户程序里自己保存多个控制流的上下文栈
4.一个协程遇到IO操作自动切换到其它协程
协程的好处:
无需线程上下文切换的开销
无需原子操作锁定及同步的开销
原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都行,很适合用于高并发处理
协程的缺点:
无法利用多核资源:
协程的本质是个单线程,它不能同时将单个CPU的多个核用上
协程需要和进程配合才能运行在多CPU上。
进行阻塞(Blocking)操作(如IO)时会阻塞掉整个程序
使用yield实现协程的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/usr/bin/python
#Author:sean
import
time
def
consumer(name):
print
(
"--->start eating baozi..."
)
while
True
:
new_baozi
=
yield
print
(
"[%s] is eating baozi %s"
%
(name,new_baozi))
# time.sleep(2)
def
producter():
r
=
tom.__next__()
r
=
jerry.__next__()
n
=
0
while
n <
5
:
n
+
=
1
tom.send(n)
jerry.send(n)
print
(
"\033[32;1m[producter]\033[0m is making baozi %s"
%
n)
if
__name__
=
=
'__main__'
:
tom
=
consumer(
"tom"
)
jerry
=
consumer(
"jerry"
)
p
=
producter()
|
如何在单线程下实现并发效果?
答案是遇到IO操作就切换,因为IO操作耗时比较长
协程之所以能处理高并发,其实就是把IO操作给干掉了,就是一遇到IO操作就切换。
这样的话整个程序就变成了只有CPU在运算。
一遇到IO操作就切换,那么到底什么时候再切回去呢?
答案是当IO操作结束后就切回去。
那么问题又来了,python怎么来监测IO操作是否结束呢?带着这个问题先来看看几个例子
greenlet模块:
greenlet是一个封装好的协程,通过switch方法手动进行切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#!/usr/bin/python
#Author:sean
from
greenlet
import
greenlet
def
func1():
print
(
"haha11"
)
gr2.switch()
print
(
"haha22"
)
gr2.switch()
def
func2():
print
(
"haha33"
)
gr1.switch()
print
(
"haha44"
)
gr1
=
greenlet(func1)
gr2
=
greenlet(func2)
gr1.switch()
|
gevent模块:
gevent是一个第三方库,可以轻松实现并发同步或异步编程。
在gevent中用到的主要是greenlet,它是以C扩展模式形式接入python的轻量级协程。
greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度
gevent能够自动进行IO切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#!/usr/bin/python
#Author:sean
import
gevent
def
foo():
print
(
"Running in foo"
)
gevent.sleep(
0
)
#模仿IO操作
print
(
'Explicit context switch to foo again'
)
def
bar():
print
(
'Explicit context to bar'
)
gevent.sleep(
0
)
#模仿IO操作
print
(
'Implicit context switch back to bar'
)
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar)
])
|
同步与异步的区别:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/usr/bin/python
#Author:sean
import
gevent
def
task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(
0.5
)
print
(
'Task %s done'
%
pid)
def
synchronous():
for
i
in
range
(
1
,
10
):
task(i)
def
asynchronous():
threads
=
[gevent.spawn(task, i)
for
i
in
range
(
10
)]
gevent.joinall(threads)
print
(
'Synchronous:'
)
synchronous()
print
(
'Asynchronous:'
)
asynchronous()
|
用协程并发爬虫爬取网站:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/usr/bin/python
#Author:sean
from
urllib
import
request
import
gevent
#默认情况下,gevent并不知道urllib或者socket什么时候进行了IO操作
#默认情况下,gevent和urllib以及socket并没有任何关联,当然就无法提高效率,因为其实质上还是串行操作
#要想让gevent知道urllib或socket正在进行IO操作,需要给gevent打个补丁
from
gevent
import
monkey
monkey.patch_all()
#把当前程序的所有IO操作单独做上标记
def
f(url):
print
(
'GET: %s'
%
url)
resp
=
request.urlopen(url)
data
=
resp.read()
# f = open("url.html","wb")
# f.write(data)
# f.close()
print
(
'%d bytes received from %s.'
%
(
len
(data),url))
gevent.joinall([
gevent.spawn(f,
'https://www.python.org'
),
gevent.spawn(f,
'https://yahoo.com'
),
gevent.spawn(f,
'https://github.com'
)
])
|
用gevent协程写一个单线程高并发的socket:
服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#!/usr/bin/python
#Author:sean
import
sys
import
socket
import
time
import
gevent
from
gevent
import
socket,monkey
monkey.patch_all()
#把当前程序的所有IO操作单独做上标记
def
server(host,port):
s
=
socket.socket()
s.bind((host,port))
s.listen(
500
)
while
True
:
cli,addr
=
s.accept()
gevent.spawn(handle_request,cli)
def
handle_request(conn):
try
:
while
True
:
data
=
conn.recv(
1024
)
print
(
"recv: "
,data)
conn.send(data)
if
not
data:
conn.shutdown(socket.SHUT_WR)
except
Exception as e:
print
(e)
finally
:
conn.close()
if
__name__
=
=
'__main__'
:
server(
'0.0.0.0'
,
8001
)
|
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#!/usr/bin/python
#Author:sean
import
socket
HOST
=
'localhost'
#The remote host
PORT
=
8001
#The same port as used by the server
s
=
socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((HOST,PORT))
while
True
:
msg
=
bytes(
input
(
">>:"
),encoding
=
"utf-8"
)
s.sendall(msg)
data
=
s.recv(
1024
)
print
(
'Received'
,
repr
(data))
s.close()
|
并发100个sock连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/usr/bin/python
#Author:sean
import
socket
import
threading
def
sock_conn():
client
=
socket.socket()
client.connect((
"localhost"
,
8001
))
count
=
0
while
True
:
#msg = input(">>:").strip()
#if len(msg) == 0:continue
client.send( (
"hello %s"
%
count).encode(
"utf-8"
))
data
=
client.recv(
1024
)
print
(
"[%s]recv from server:"
%
threading.get_ident(),data.decode())
#结果
count
+
=
1
client.close()
for
i
in
range
(
100
):
t
=
threading.Thread(target
=
sock_conn)
t.start()
|
事件驱动与异步IO,请往这走
现在我们可以来回答下这个问题了,python如何监测IO操作是否结束?
IO操作是由操作系统进行处理的,当遇到IO操作时就切换
等IO操作完以后让其调用回调函数,回调函数会通知协程说这个IO操作完成了