关于协程
协程 coroutine 微线程,一种用户态的轻量级线程
好处:
无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展+低成本,一个cup支持上万的协程都不是问题
缺点:
无法利用多核资源,协程的本质是单线程,
进程阻塞blocking操作如io时会阻塞整个程序
单线程下实现并发效果:遇到io就切换
服务器处理模型:
1.一个进程处理一个请求
2.一个线程处理一个请求
3.主进程处理事件队列的请求
事件驱动模型:
多个事件 -> 消息队列 -> 处理线程
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。
它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。
另外两种常见的编程范式是(单线程)同步以及多线程编程。
异步asynchronous
用户空间application 和 内核空间kernel
进程控制块(Processing Control Block)
参考:
http://www.cnblogs.com/alex3714/articles/5876749.html
io操作两个阶段:内核缓冲区数据准备,拷贝到应用程序地址空间
阻塞io blocking 一直等待
非阻塞io nonblocking 准备数据立刻返回
多路复用 select,poll,epoll
异步io 没有被阻塞
twisted框架
手动切换协程
from greenlet import greenlet def foo1(): print("foo11") gr2.switch() print("foo12") gr2.switch() def foo2(): print("foo21") gr1.switch() print("foo22") gr1 = greenlet(foo1) # 创建协程 gr2 = greenlet(foo2) print("main") gr1.switch() # 切换协程 print("done") """ main foo11 foo21 foo12 foo22 done """
自动切换协程
import gevent # 第三方库 def foo1(): print("foo11") gevent.sleep(2) print("foo12") def foo2(): print("foo21") gevent.sleep(2) print("foo22") def foo3(): print("foo31") gevent.sleep(0) print("foo32") gevent.joinall([ gevent.spawn(foo1), gevent.spawn(foo2), gevent.spawn(foo3) ]) """ foo11 foo21 foo31 foo32 foo12 foo22 """
gevent 多并发socket
import socket import gevent from gevent import monkey monkey.patch_all() # 猴子补丁 def server(port): s = socket.socket() s.bind(("0.0.0.0", port)) s.listen(500) print("服务已启动") while True: conn, addr = s.accept() print(addr) gevent.spawn(handle_request, conn) def handle_request(conn): # 处理请求的协程 try: while True: data = conn.recv(1024) print(conn, data.decode()) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as e: print(e) finally: print("关闭", conn) conn.close() if __name__ == "__main__": server(6969)
select 多并发socket
# fd 文件描述符 import select import socket import queue server = socket.socket() server.setblocking(False) # 不阻塞 server.bind(("localhost", 6969)) server.listen(5) print("服务已开启") inputs = [server, ] # server本身也是一个文件描述符 outputs = [] message_dict = {} while True: # 如果没有任何 fd 就绪,就会阻塞在这里 readable, writeable, exceptional = select.select(inputs, outputs, inputs) print("readable", readable) print("writeable", writeable) print("exceptional", exceptional) for r in readable: if r is server: # server就绪,新连接来了 conn, addr = r.accept() print("已连接", conn) conn.setblocking(False) inputs.append(conn) # 为了不阻塞程序,收到连接对象后放入列表,如果接收到信息,fd就绪 message_dict[conn] = queue.Queue() # 新建消息队列,不立刻返回,逐个处理 else: # 如果不是服务器,就是客户端 data = r.recv(1024) if not data: print("连接断开") else: print("收到数据", data.decode("utf-8")) message_dict[r].put(data) # 放入消息队列 outputs.append(r) # 不影响其他客户端连接,后续处理发送信息 for s in outputs: # 要返回给客户端的连接表 data = message_dict[s].get() s.send(data) outputs.remove(s) # 处理完就删除 for e in exceptional: # 删除异常连接 if e in outputs: outputs.remove(e) inputs.remove(e) del message_dict[e]
selectors 多并发socket
import selectors import socket def accept(server, mask): # 接受连接 conn, addr = server.accept() print("conn:", conn, "addr:", addr, "mask:", mask) conn.setblocking(False) selector.register(conn, selectors.EVENT_READ, action) def action(conn, mask): # 接收数据 data = conn.recv(1024) if data: print("conn:", conn, "data:", data.decode("utf-8")) conn.send(data) else: print("断开连接") conn.close() selector.unregister(conn) server = socket.socket() server.setblocking(False) address = ("localhost", 6969) server.bind(address) server.listen(1000) selector = selectors.DefaultSelector() selector.register(server, selectors.EVENT_READ, accept) # accept回调函数 print("服务启动") while True: events = selector.select() # 默认阻塞,有活动则返回活动列表 for key, mask in events: callback = key.data # accept callback(key.fileobj, mask) # fileobj 文件句柄
异步爬虫(并行)
from urllib import request from gevent import monkey import time, gevent monkey.patch_all() # 把当前程序所有io操作都做上标记 def get_html(url): response = request.urlopen(url) html = response.read() print("receive:", len(html)) urls = [ "https://www.python.org/", "https://www.yahoo.com/", "https://github.com/" ] start_time = time.time() for url in urls: html = get_html(url) end_time = time.time() print("串行:", end_time - start_time) async_start_time = time.time() gevent.joinall([ gevent.spawn(get_html, urls[0]), gevent.spawn(get_html, urls[1]), gevent.spawn(get_html, urls[2]) ]) async_end_time = time.time() print("并行:", async_end_time - async_start_time) """ receive: 48893 receive: 511654 receive: 52225 串行: 4.339118003845215 receive: 48893 receive: 502566 receive: 52223 并行: 1.4489901065826416 """