基于ASGI的SSE服务器及其数据采集方法

简介: 基于ASGI的SSE服务器及其数据采集方法

在http请求中,浏览器向服务器发送一个HTTP请求, 保持长连接,服务器不断单向地向浏览器推送“信息”,这样的http请求就是sse (server send event),一般用于实时更新页面用。


一个最简单的sse服务器


这里我们只用用asgi服务搭建一个最简单的sse,对,除了一个asgi服务器什么都不用那种。


按照 ASGI规范[1] , 可以写出如下的ASGIapp


async def app(scope, receive, send):
    if scope["type"] == "http":
        await send({"type": "http.response.start", "status": 200,
                    "headers": [(b"Content-Type", b"text/event-stream"), (b"Cache-Control", b"no-cache"),
                                (b"X-Accel-Buffering", b"no")]})
        for i in range(10):
            await send({"type": "http.response.body", "body": f"data: {i}\r\n".encode(), "more_body": True})
            await asyncio.sleep(1)
        await send({"type": "http.response.body", "body": b""})


用一个ASGI服务器运行,浏览器你们打开就可以看到结果了。如果你的浏览器足够新的话

这里我们可以看出他的几个特点:


  1. 服务器不会立刻返回全部响应体,而是保持连接,一点点推送


  1. 客户端也保持连接,当服务器新一次数据到来时,浏览器做出相应动作,称之为一次event这是[2] 更加详细的开发者规范。


集成到Fastapi


当然,上面的代码还是显得太臃肿了。幸好py中已经有这样的第三方库了。这里我们使用sse-starlette[3]


import asyncio
from fastapi import FastAPI, APIRouter
from fastapi.staticfiles import StaticFiles
from sse_starlette.sse import EventSourceResponse
app = FastAPI(title=__name__)
router = APIRouter(prefix="/sse")
async def numbers(minimum, maximum):
    i = 0
    for j in range(20):
        await asyncio.sleep(1)
        yield {"data":f"You\r\ncan see me and I'm the {i}"}
        i += 1
@router.get("")
async def handle():
    generator = numbers(1, 100)
    return EventSourceResponse(generator)
app.include_router(router)
app.mount("/", StaticFiles(directory="./statics"))
if __name__ == "__main__":
    from hypercorn.asyncio import serve
    from hypercorn.config import Config
    try:
        import uvloop
        uvloop.install()
    except ImportError:
        pass
    config = Config()
    config.bind = ["0.0.0.0:9000"]
    config.accesslog = "-"
    config.loglevel = "DEBUG"
    # config.keyfile = ".\key.pem"
    # config.certfile = ".\cert.pem"
    asyncio.run(serve(app, config))


当然,在同一个目录里面还有个static文件夹,里面还有个index.html


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var es = new EventSource('sse');
    es.onmessage = function (e) {
        console.log(e.data);
    };
    es.addEventListener('myevent', function (e) {
        console.log(e.data);
    });
</script>
<h1>look at the console</h1>
</body>
</html>


运行以上代码,控制台一看,就知道sse的模样了。


http1.1下的抓取


了解了sse服务端,那么如何抓取呢?


既然SSE是一个数据流,那么,只要只要流式读取响应体就是了,而不用等待全部返回, 毕竟sse可以是无尽的。


这里用我最常用的aiohttp来示范下。


aiohttp的响应体为ClientResponse类,他有一个content属性是StreamReader对象, 这正是流式读取的关键。


import asyncio
import aiohttp
async def main():
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
        async with session.get("http://127.0.0.1:9000/sse",ssl=False) as resp:
            while data := await resp.content.readline():
                print(data)
asyncio.run(main())


如此,就可以从流中一直读到数据了。设置超时为None是因为这个流可能是无限的,因此不能加 超时时间设置。


现在看下结果:


1.jpg

可以成功抓取到了


至于为什么要用hypercorn呢?因为hypercorn支持http/2。事实上他还支持http/3, 以及这些extension[4], 是功能最全面的ASGI服务器。


接下来就把上面的改成http/2。http/2的sse,要抓取到就得换个库了


http/2下的抓取


搭建http/2服务器,需要一个证书。证书使用openssl生成。


openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -config ".\openssl.cnf"

如果你有安装git的话,git的ssl目录就有那个配置文件。修改下服务器代码

import asyncio
from fastapi import FastAPI, APIRouter
from fastapi.staticfiles import StaticFiles
from sse_starlette.sse import EventSourceResponse
app = FastAPI(title=__name__)
router = APIRouter(prefix="/sse")
async def numbers(minimum, maximum):
    i = 0
    for j in range(20):
        await asyncio.sleep(1)
        yield {"data":f"You\r\ncan see me and I'm the {i}"}
        i += 1
@router.get("")
async def handle():
    generator = numbers(1, 100)
    return EventSourceResponse(generator)
app.include_router(router)
app.mount("/", StaticFiles(directory="./statics"))
if __name__ == "__main__":
    from hypercorn.asyncio import serve
    from hypercorn.config import Config
    try:
        import uvloop
        uvloop.install()
    except ImportError:
        pass
    config = Config()
    config.bind = ["0.0.0.0:9000"]
    config.accesslog = "-"
    config.loglevel = "DEBUG"
    config.keyfile = ".\key.pem"
    config.certfile = ".\cert.pem"
    asyncio.run(serve(app, config))


现在这应该是http/2的服务器了,可以打开浏览器验证下。至于证书错误忽略就好。

再用aiohttp试试:

9.png

image.gif

但是,http/2被退回到了1.1,


比如,我是对面的后端工程师,我写了个ASGI中间件


class H1BanMiddleWare:
    def __init__(self, app, allow_version=("1.1", "2")):
        self.app = app
        self.allow_version = allow_version
    async def __call__(self, scope, receive, send):
        if scope["type"] == "http":
            if scope["http_version"] in self.allow_version:
                await self.app(scope, receive, send)
            else:
                await send({"type": "http.response.start", "status": 404,
                            "headers": [(b"Content-Type", b"text/html")]})
                await send({"type": "http.response.body", "body": b"<h1>You have no power here</h1>"})
        else:
            await self.app(scope, receive, send)
app.add_middleware(H1BanMiddleWare, allow_version=("2",))


再用aiohttp就不行了


7.png


得用支持http/2的库,比如httpx


那么httpx如何流式读取呢?


深入源码,我们可以看到AsyncClient的send方法其实可以指定stream=True, 可惜这个选项并没有暴露给上层逻辑。再看httpx的Response的stream字段,这是 一个异步迭代器,显然是我们需要的。


书写如下逻辑


import asyncio
import httpx
async def sse():
    async with httpx.AsyncClient(http2=True,verify=False) as c:
        req = c.build_request(
            method="GET",
            url="https://127.0.0.1:9000/sse"
        )
        resp = await c.send(req,stream=True)
        async for chunk in resp.stream:
            print(chunk)
asyncio.run(sse())


2.png


现在正常了,日志显示是以http/2抓取的。


参考文献


[1]ASGI规

范: https://asgi.readthedocs.io/en/latest/specs/index.html[2]https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events[3]sse-starlette: https://github.com/sysid/sse-starlette[4]extension: https://asgi.readthedocs.io/en/latest/extensions.html


请关注微信公众号【未闻Code】获取更多精彩文章。

目录
相关文章
|
1月前
|
弹性计算 小程序 容灾
2025购买阿里云服务器配置选择方法:企业+个人+学生攻略
2025年阿里云服务器购买省钱攻略,涵盖个人、中小企业及高性能配置推荐。个人用户优选38元轻量或99元ECS,企业用户选199元2核4G服务器,游戏用户适合4核16G或8核32G配置,详情请参考最新活动及攻略。
384 11
|
6月前
|
移动开发 JavaScript 前端开发
精通服务器推送事件(SSE)与 Python 和 Go 实现实时数据流 🚀
服务器推送事件(SSE)是HTML5规范的一部分,允许服务器通过HTTP向客户端实时推送更新。相比WebSocket,SSE更轻量、简单,适合单向通信场景,如实时股票更新或聊天消息。它基于HTTP协议,使用`EventSource` API实现客户端监听,支持自动重连和事件追踪。虽然存在单向通信与连接数限制,但其高效性使其成为许多轻量级实时应用的理想选择。文中提供了Python和Go语言的服务器实现示例,以及HTML/JavaScript的客户端代码,帮助开发者快速集成SSE功能,提升用户体验。
|
1月前
|
缓存 监控 前端开发
详述uniapp项目部署于Nginx服务器的配置优化方法。
综上所述,uniapp项目部署于Nginx的优化方法多种多样,应根据实际情况灵活地采取合适的策略。配置后持续监控和调试,适时调整配置以保持最佳性能,并确保随着应用需求和访问模式的变化,服务器配置得到适当的更新和优化。
117 0
|
2月前
|
安全 关系型数据库 网络安全
安全加固:启动PostgreSQL 14服务器SSL加密的方法指南在CentOS 7环境中
通过上述步骤,你可以为PostgreSQL数据库服务器设置SSL加密,从而增加数据在传输中的安全性。确保维持证书的有效性,并且定期更新和管理密钥,以防止未授权访问。
139 0
|
10月前
|
Java
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
208 9
|
5月前
|
缓存 PHP 数据库
WordPress网站服务器性能优化方法,站长必备。
最后,当你将这些方法组合起来并实施时,您将发现你的WordPress网站性能有了显著的提高。别忘了,这不是一次性的任务,要定期执行,保持你的车(网站)始终在轨道上飞驰。
200 21
|
9月前
|
SQL 存储 关系型数据库
MySQL/SqlServer跨服务器增删改查(CRUD)的一种方法
通过上述方法,MySQL和SQL Server均能够实现跨服务器的增删改查操作。MySQL通过联邦存储引擎提供了直接的跨服务器表访问,而SQL Server通过链接服务器和分布式查询实现了灵活的跨服务器数据操作。这些技术为分布式数据库管理提供了强大的支持,能够满足复杂的数据操作需求。
445 12
|
10月前
|
弹性计算 异构计算
2024年阿里云GPU服务器多少钱1小时?亲测价格查询方法
2024年阿里云GPU服务器每小时收费因实例规格不同而异。可通过阿里云GPU服务器页面选择“按量付费”查看具体价格。例如,NVIDIA A100的gn7e实例为34.742元/小时,NVIDIA A10的gn7i实例为12.710156元/小时。更多详情请访问阿里云官网。
1567 2
|
10月前
|
人工智能 弹性计算 关系型数据库
学生免费领取阿里云服务器一年的方法,以及各种活动
学生可以免费领取阿里云服务器一年,新人可获2核4G,非新人2核2G。访问链接注册并完成学生认证,领取300元无门槛优惠券,购买轻量应用服务器。此外,还有多项活动可赢取实物奖品。
3662 2
|
11月前
|
前端开发 Docker 容器
主机host服务器和Docker容器之间的文件互传方法汇总
Docker 成为前端工具,可实现跨设备兼容。本文介绍主机与 Docker 容器/镜像间文件传输的三种方法:1. 构建镜像时使用 `COPY` 或 `ADD` 指令;2. 启动容器时使用 `-v` 挂载卷;3. 运行时使用 `docker cp` 命令。每种方法适用于不同场景,如静态文件打包、开发时文件同步及临时文件传输。注意权限问题、容器停止后的文件传输及性能影响。
2846 1