在http请求中,浏览器向服务器发送一个HTTP请求, 保持长连接,服务器不断单向地向浏览器推送“信息”,这样的http请求就是sse (server send event),一般用于实时更新页面用。
一个最简单的sse服务器
这里我们只用用asgi服务搭建一个最简单的sse,对,除了一个asgi服务器什么都不用那种。
按照 ASGI规范[1] , 可以写出如下的ASGIapp
async def app(scope, receive, send): if scope["type"] == "http": await send({"type": "http.response.start", "status": 200, "headers": [(b"Content-Type", b"text/event-stream"), (b"Cache-Control", b"no-cache"), (b"X-Accel-Buffering", b"no")]}) for i in range(10): await send({"type": "http.response.body", "body": f"data: {i}\r\n".encode(), "more_body": True}) await asyncio.sleep(1) await send({"type": "http.response.body", "body": b""})
用一个ASGI服务器运行,浏览器你们打开就可以看到结果了。如果你的浏览器足够新的话
这里我们可以看出他的几个特点:
- 服务器不会立刻返回全部响应体,而是保持连接,一点点推送
- 客户端也保持连接,当服务器新一次数据到来时,浏览器做出相应动作,称之为一次
event
。 这是[2] 更加详细的开发者规范。
集成到Fastapi
当然,上面的代码还是显得太臃肿了。幸好py中已经有这样的第三方库了。这里我们使用sse-starlette[3]
import asyncio from fastapi import FastAPI, APIRouter from fastapi.staticfiles import StaticFiles from sse_starlette.sse import EventSourceResponse app = FastAPI(title=__name__) router = APIRouter(prefix="/sse") async def numbers(minimum, maximum): i = 0 for j in range(20): await asyncio.sleep(1) yield {"data":f"You\r\ncan see me and I'm the {i}"} i += 1 @router.get("") async def handle(): generator = numbers(1, 100) return EventSourceResponse(generator) app.include_router(router) app.mount("/", StaticFiles(directory="./statics")) if __name__ == "__main__": from hypercorn.asyncio import serve from hypercorn.config import Config try: import uvloop uvloop.install() except ImportError: pass config = Config() config.bind = ["0.0.0.0:9000"] config.accesslog = "-" config.loglevel = "DEBUG" # config.keyfile = ".\key.pem" # config.certfile = ".\cert.pem" asyncio.run(serve(app, config))
当然,在同一个目录里面还有个static文件夹,里面还有个index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script> var es = new EventSource('sse'); es.onmessage = function (e) { console.log(e.data); }; es.addEventListener('myevent', function (e) { console.log(e.data); }); </script> <h1>look at the console</h1> </body> </html>
运行以上代码,控制台一看,就知道sse的模样了。
http1.1下的抓取
了解了sse服务端,那么如何抓取呢?
既然SSE是一个数据流,那么,只要只要流式读取响应体就是了,而不用等待全部返回, 毕竟sse可以是无尽的。
这里用我最常用的aiohttp来示范下。
aiohttp的响应体为ClientResponse
类,他有一个content属性是StreamReader
对象, 这正是流式读取的关键。
import asyncio import aiohttp async def main(): async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session: async with session.get("http://127.0.0.1:9000/sse",ssl=False) as resp: while data := await resp.content.readline(): print(data) asyncio.run(main())
如此,就可以从流中一直读到数据了。设置超时为None
是因为这个流可能是无限的,因此不能加 超时时间设置。
现在看下结果:
可以成功抓取到了
至于为什么要用hypercorn呢?因为hypercorn支持http/2。事实上他还支持http/3, 以及这些extension[4], 是功能最全面的ASGI服务器。
接下来就把上面的改成http/2。http/2的sse,要抓取到就得换个库了
http/2下的抓取
搭建http/2服务器,需要一个证书。证书使用openssl生成。
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -config ".\openssl.cnf"
如果你有安装git的话,git的ssl目录就有那个配置文件。修改下服务器代码
import asyncio from fastapi import FastAPI, APIRouter from fastapi.staticfiles import StaticFiles from sse_starlette.sse import EventSourceResponse app = FastAPI(title=__name__) router = APIRouter(prefix="/sse") async def numbers(minimum, maximum): i = 0 for j in range(20): await asyncio.sleep(1) yield {"data":f"You\r\ncan see me and I'm the {i}"} i += 1 @router.get("") async def handle(): generator = numbers(1, 100) return EventSourceResponse(generator) app.include_router(router) app.mount("/", StaticFiles(directory="./statics")) if __name__ == "__main__": from hypercorn.asyncio import serve from hypercorn.config import Config try: import uvloop uvloop.install() except ImportError: pass config = Config() config.bind = ["0.0.0.0:9000"] config.accesslog = "-" config.loglevel = "DEBUG" config.keyfile = ".\key.pem" config.certfile = ".\cert.pem" asyncio.run(serve(app, config))
现在这应该是http/2的服务器了,可以打开浏览器验证下。至于证书错误忽略就好。
再用aiohttp试试:
但是,http/2被退回到了1.1,
比如,我是对面的后端工程师,我写了个ASGI中间件
class H1BanMiddleWare: def __init__(self, app, allow_version=("1.1", "2")): self.app = app self.allow_version = allow_version async def __call__(self, scope, receive, send): if scope["type"] == "http": if scope["http_version"] in self.allow_version: await self.app(scope, receive, send) else: await send({"type": "http.response.start", "status": 404, "headers": [(b"Content-Type", b"text/html")]}) await send({"type": "http.response.body", "body": b"<h1>You have no power here</h1>"}) else: await self.app(scope, receive, send) app.add_middleware(H1BanMiddleWare, allow_version=("2",))
再用aiohttp就不行了
得用支持http/2的库,比如httpx
那么httpx如何流式读取呢?
深入源码,我们可以看到AsyncClient
的send方法其实可以指定stream=True
, 可惜这个选项并没有暴露给上层逻辑。再看httpx的Response
的stream字段,这是 一个异步迭代器,显然是我们需要的。
书写如下逻辑
import asyncio import httpx async def sse(): async with httpx.AsyncClient(http2=True,verify=False) as c: req = c.build_request( method="GET", url="https://127.0.0.1:9000/sse" ) resp = await c.send(req,stream=True) async for chunk in resp.stream: print(chunk) asyncio.run(sse())
现在正常了,日志显示是以http/2抓取的。
参考文献
[1]ASGI规
范: https://asgi.readthedocs.io/en/latest/specs/index.html[2]https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events[3]sse-starlette: https://github.com/sysid/sse-starlette[4]extension: https://asgi.readthedocs.io/en/latest/extensions.html
请关注微信公众号【未闻Code】获取更多精彩文章。