基于ASGI的SSE服务器及其数据采集方法

简介: 基于ASGI的SSE服务器及其数据采集方法

在http请求中,浏览器向服务器发送一个HTTP请求, 保持长连接,服务器不断单向地向浏览器推送“信息”,这样的http请求就是sse (server send event),一般用于实时更新页面用。


一个最简单的sse服务器


这里我们只用用asgi服务搭建一个最简单的sse,对,除了一个asgi服务器什么都不用那种。


按照 ASGI规范[1] , 可以写出如下的ASGIapp


async def app(scope, receive, send):
    if scope["type"] == "http":
        await send({"type": "http.response.start", "status": 200,
                    "headers": [(b"Content-Type", b"text/event-stream"), (b"Cache-Control", b"no-cache"),
                                (b"X-Accel-Buffering", b"no")]})
        for i in range(10):
            await send({"type": "http.response.body", "body": f"data: {i}\r\n".encode(), "more_body": True})
            await asyncio.sleep(1)
        await send({"type": "http.response.body", "body": b""})


用一个ASGI服务器运行,浏览器你们打开就可以看到结果了。如果你的浏览器足够新的话

这里我们可以看出他的几个特点:


  1. 服务器不会立刻返回全部响应体,而是保持连接,一点点推送


  1. 客户端也保持连接,当服务器新一次数据到来时,浏览器做出相应动作,称之为一次event这是[2] 更加详细的开发者规范。


集成到Fastapi


当然,上面的代码还是显得太臃肿了。幸好py中已经有这样的第三方库了。这里我们使用sse-starlette[3]


import asyncio
from fastapi import FastAPI, APIRouter
from fastapi.staticfiles import StaticFiles
from sse_starlette.sse import EventSourceResponse
app = FastAPI(title=__name__)
router = APIRouter(prefix="/sse")
async def numbers(minimum, maximum):
    i = 0
    for j in range(20):
        await asyncio.sleep(1)
        yield {"data":f"You\r\ncan see me and I'm the {i}"}
        i += 1
@router.get("")
async def handle():
    generator = numbers(1, 100)
    return EventSourceResponse(generator)
app.include_router(router)
app.mount("/", StaticFiles(directory="./statics"))
if __name__ == "__main__":
    from hypercorn.asyncio import serve
    from hypercorn.config import Config
    try:
        import uvloop
        uvloop.install()
    except ImportError:
        pass
    config = Config()
    config.bind = ["0.0.0.0:9000"]
    config.accesslog = "-"
    config.loglevel = "DEBUG"
    # config.keyfile = ".\key.pem"
    # config.certfile = ".\cert.pem"
    asyncio.run(serve(app, config))


当然,在同一个目录里面还有个static文件夹,里面还有个index.html


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var es = new EventSource('sse');
    es.onmessage = function (e) {
        console.log(e.data);
    };
    es.addEventListener('myevent', function (e) {
        console.log(e.data);
    });
</script>
<h1>look at the console</h1>
</body>
</html>


运行以上代码,控制台一看,就知道sse的模样了。


http1.1下的抓取


了解了sse服务端,那么如何抓取呢?


既然SSE是一个数据流,那么,只要只要流式读取响应体就是了,而不用等待全部返回, 毕竟sse可以是无尽的。


这里用我最常用的aiohttp来示范下。


aiohttp的响应体为ClientResponse类,他有一个content属性是StreamReader对象, 这正是流式读取的关键。


import asyncio
import aiohttp
async def main():
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
        async with session.get("http://127.0.0.1:9000/sse",ssl=False) as resp:
            while data := await resp.content.readline():
                print(data)
asyncio.run(main())


如此,就可以从流中一直读到数据了。设置超时为None是因为这个流可能是无限的,因此不能加 超时时间设置。


现在看下结果:


1.jpg

可以成功抓取到了


至于为什么要用hypercorn呢?因为hypercorn支持http/2。事实上他还支持http/3, 以及这些extension[4], 是功能最全面的ASGI服务器。


接下来就把上面的改成http/2。http/2的sse,要抓取到就得换个库了


http/2下的抓取


搭建http/2服务器,需要一个证书。证书使用openssl生成。


openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -config ".\openssl.cnf"

如果你有安装git的话,git的ssl目录就有那个配置文件。修改下服务器代码

import asyncio
from fastapi import FastAPI, APIRouter
from fastapi.staticfiles import StaticFiles
from sse_starlette.sse import EventSourceResponse
app = FastAPI(title=__name__)
router = APIRouter(prefix="/sse")
async def numbers(minimum, maximum):
    i = 0
    for j in range(20):
        await asyncio.sleep(1)
        yield {"data":f"You\r\ncan see me and I'm the {i}"}
        i += 1
@router.get("")
async def handle():
    generator = numbers(1, 100)
    return EventSourceResponse(generator)
app.include_router(router)
app.mount("/", StaticFiles(directory="./statics"))
if __name__ == "__main__":
    from hypercorn.asyncio import serve
    from hypercorn.config import Config
    try:
        import uvloop
        uvloop.install()
    except ImportError:
        pass
    config = Config()
    config.bind = ["0.0.0.0:9000"]
    config.accesslog = "-"
    config.loglevel = "DEBUG"
    config.keyfile = ".\key.pem"
    config.certfile = ".\cert.pem"
    asyncio.run(serve(app, config))


现在这应该是http/2的服务器了,可以打开浏览器验证下。至于证书错误忽略就好。

再用aiohttp试试:

9.png

image.gif

但是,http/2被退回到了1.1,


比如,我是对面的后端工程师,我写了个ASGI中间件


class H1BanMiddleWare:
    def __init__(self, app, allow_version=("1.1", "2")):
        self.app = app
        self.allow_version = allow_version
    async def __call__(self, scope, receive, send):
        if scope["type"] == "http":
            if scope["http_version"] in self.allow_version:
                await self.app(scope, receive, send)
            else:
                await send({"type": "http.response.start", "status": 404,
                            "headers": [(b"Content-Type", b"text/html")]})
                await send({"type": "http.response.body", "body": b"<h1>You have no power here</h1>"})
        else:
            await self.app(scope, receive, send)
app.add_middleware(H1BanMiddleWare, allow_version=("2",))


再用aiohttp就不行了


7.png


得用支持http/2的库,比如httpx


那么httpx如何流式读取呢?


深入源码,我们可以看到AsyncClient的send方法其实可以指定stream=True, 可惜这个选项并没有暴露给上层逻辑。再看httpx的Response的stream字段,这是 一个异步迭代器,显然是我们需要的。


书写如下逻辑


import asyncio
import httpx
async def sse():
    async with httpx.AsyncClient(http2=True,verify=False) as c:
        req = c.build_request(
            method="GET",
            url="https://127.0.0.1:9000/sse"
        )
        resp = await c.send(req,stream=True)
        async for chunk in resp.stream:
            print(chunk)
asyncio.run(sse())


2.png


现在正常了,日志显示是以http/2抓取的。


参考文献


[1]ASGI规

范: https://asgi.readthedocs.io/en/latest/specs/index.html[2]https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events[3]sse-starlette: https://github.com/sysid/sse-starlette[4]extension: https://asgi.readthedocs.io/en/latest/extensions.html


请关注微信公众号【未闻Code】获取更多精彩文章。

目录
相关文章
|
2天前
|
安全 网络安全 API
163邮箱IMAP服务器设置方法
```markdown 使用IMAP协议同步163邮箱:登录邮箱→设置→账户→IMAP/SMTP→开启服务→配置服务器(imap.163.com:993, SSL/TLS)→设置用户名和密码→保存并在邮件客户端添加账号。确保多设备邮件同步,定期更新设置。[≤240字符] ```
|
2天前
|
弹性计算 安全 Linux
新手一键开服!阿里云3分钟成功幻兽帕鲁联机服务器流程方法
新手一键开服!阿里云3分钟成功幻兽帕鲁联机服务器流程方法,如何自建幻兽帕鲁服务器?基于阿里云服务器搭建幻兽帕鲁palworld服务器教程来了,一看就懂系列。本文是利用OOS中幻兽帕鲁扩展程序来一键部署幻兽帕鲁服务器,阿里云百科分享官方基于阿里云服务器快速创建幻兽帕鲁服务器教程:
|
2天前
|
存储 缓存 负载均衡
优化服务器响应时间的方法如下
【4月更文挑战第25天】
31 5
|
2天前
|
缓存 监控 安全
有哪些搭建代理服务器的好方法?--代理IP小课堂
今天我们就来说一说,要如何搭建代理服务器,以此来帮助你快速入门代理服务器的搭建和使用。
|
2天前
|
弹性计算 Ubuntu Windows
阿里云自建《幻兽帕鲁Palworld》多人游戏专属服务器,搭建方法分享
对于《幻兽帕鲁》的忠实粉丝来说,与好友一同在游戏中探险、生存无疑增加了更多的乐趣。而为了实现这一愿望,搭建一个专属的多人游戏服务器就显得尤为重要。今天,我将为大家带来一篇极简教程,教您如何在三次点击内,轻松搭建《幻兽帕鲁》的专属服务器。
|
2天前
|
弹性计算 Ubuntu Linux
幻兽帕鲁Palworld服务器搭建教程,2024年阿里云部署帕鲁服务器保姆级方法
对于热爱《幻兽帕鲁》的玩家们来说,能够拥有属于自己的游戏服务器,无疑是提升了游戏的自由度和乐趣。那么,怎样才能部署幻兽帕鲁服务器呢?接下来,就让我们一起走进这个简单易懂、保姆级的教程吧!
120 0
|
2天前
|
弹性计算 Ubuntu Linux
2024年Palworld/幻兽帕鲁联机服务器搭建方法分享,详细步骤
想要和好友一起畅游幻兽帕鲁的世界吗?别再为联机而烦恼,因为搭建自己的服务器其实并不难!今天,就为大家带来一篇超详细的幻兽帕鲁服务器搭建教程,保证让你轻松上手!
23 0
|
2天前
|
弹性计算 Ubuntu Linux
Palworld/幻兽帕鲁游戏多人组队!Palworld/幻兽帕鲁联机服务器搭建方法分享,超容易
想要和好友一起畅游幻兽帕鲁的世界吗?别再为联机而烦恼,因为搭建自己的服务器其实并不难!今天,就为大家带来一篇超详细的幻兽帕鲁服务器搭建教程,保证让你轻松上手!
|
2天前
|
弹性计算 安全 关系型数据库
带你读《从基础到应用云上安全航行指南》——来上课!一文掌握守住ECS网络安全的最佳方法(1)
带你读《从基础到应用云上安全航行指南》——来上课!一文掌握守住ECS网络安全的最佳方法(1)
159 0
|
2天前
|
弹性计算 运维 安全
带你读《从基础到应用云上安全航行指南》——来上课!一文掌握守住ECS网络安全的最佳方法(2)
带你读《从基础到应用云上安全航行指南》——来上课!一文掌握守住ECS网络安全的最佳方法(2)
39 2

热门文章

最新文章