什么是流式输出?

简介: 流式输出在阿里内部已经遍地开花,大家耳熟能详却又好奇不已。清楚的是知道它是性能利器从而提升业务转化,不清楚的是到底什么样的技术才算是流式输出?支撑流式输出的技术理论又有哪些?流式输出适合什么样的应用场景?今天我们就来揭开这层面纱,让大家雾里看花但又能清清楚楚地看到“花”。

image.png

作者 | 幽霄
来源 | 阿里技术公众号

一 名词理解

1 流式

流式(Stream)亦称响应式,是一种基于异步数据流研发框架,是一种概念和编程模型,并非一种技术架构,目前在各技术栈都有响应式的技术框架,前端的React.js、RxJs,服务端以RxJava、Reactor,Android端的RXJava。由此而来的即是响应式编程。

2 反应式/响应式编程

反应式编程/响应式编程(Reactive Programming)是一种基于事件模型编程范式,众所周知异步编程模式中通常有两种获得上一个任务执行结果的方式,一个就是主动轮训,我们把它称为Proactive方式。另一个就是被动接收反馈,我们称为Reactive。简单来说,在Reactive方式中,上一个任务的结果的反馈就是一个事件,这个事件的到来将会触发下一个任务的执行。

这也就是Reactive的内涵。我们把处理和发出事件的主体称为Reactor,它可以接收事件并处理,也可以在处理完事件后,发出下一个事件给其他Reactor。

下面是一个Reactive模型的示意图:

image.png

当然一种新的编码模式,它的RunTime会减少上下文切流从而提升性能,减少内存消耗,与之相反带来的是代码的可维护性降低。衡量优劣需要根据场景带来的收益来衡量。

3 流式输出

流式输出就比较神奇,源自于团队内部在一次性能大赛结束后的总结中产生,是基于流式的理论基础在页面渲染以及渲染的HTML在网络传输中的具体应用而诞生,也有人也简单的称之为流式渲染。即:将页面拆分成独立的几部分模块,每个模块有单独的数据源和单独的页面模板,在server端流式的操作每个模块进行业务逻辑处理和页面模板的渲染,然后流式的将渲染出来的HTML输出到网络中,接着分块的HTML数据在网络中传输,接着流式的分块的HTML在浏览器逐个渲染展示。具体流程如下:

image.png

针对HTML可以如上所述进行流式输出,衍生出针对json数据的流式输出,其实也是如出一辙,无非少了一层渲染的逻辑,数据流式输出流程跟上图类似,不再赘述。这里可以把客户端的请求当做响应式的一个事件,所以总结就是客户端主动发出请求,服务端流式返回数据,即流式输出。

4 端到端响应式

基于流式输出,我们再深入一点,可以发现其实不只是用户端和web server之间的数据可以在网络上进行流式输出,微服务的各个server之间的数据其实也可以在网络上进行流式输出,如下图所示:

image.png

数据可以在网络之间的流式传输,再进一步来看,数据在整条请求响应链路上的流式传输会是什么样子,见下图所示:

image.png

综上所述我们定义:端到端响应式=流式输出+响应式编程。

二 流式输出理论基础

是什么基础技术理论,支撑我们能够像上述流程那样对数据进行流式输出和接收,下面有几个核心的技术点:

1 HTTP分块传输协议

分块传输编码(Chunked transfer encoding)是超文本传输协议(HTTP)中的一种数据传输机制,允许HTTP由网页服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在HTTP协议1.1版本(HTTP/1.1)中提供。

如果需要使用分块传输编码的响应格式,我们需要在HTTP响应中设置响应头Transfer-Encoding: chunked。它的具体传输格式是这样的(注意HTTP响应中换行符是\r\n):

HTTP/1.1 200 OK\r\n
\r\n
Transfer-Encoding: chunked\r\n
...\r\n
\r\n
<chunked 1 length>\r\n
<chunked 1 content>\r\n
<chunked 2 length>\r\n
<chunked 2 content>\r\n
...\r\n
0\r\n
\r\n
\r\n

具体流程见流式输出名词理解部分,分块传输编码例子:

func handleChunkedHttpResp(conn net.Conn) {
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err != nil {
        log.Fatalln(err)
    }
    fmt.Println(n, string(buffer))

    conn.Write([]byte("HTTP/1.1 200 OK\r\n"))
    conn.Write([]byte("Transfer-Encoding: chunked\r\n"))
    conn.Write([]byte("\r\n"))

    conn.Write([]byte("6\r\n"))
    conn.Write([]byte("hello,\r\n"))

    conn.Write([]byte("8\r\n"))
    conn.Write([]byte("chunked!\r\n"))

    conn.Write([]byte("0\r\n"))
    conn.Write([]byte("\r\n"))
}

这里需要注意的是HTTP分块传输对同步HTML输出比较适合(对于浏览器来讲),因为在很多web页面涉及SEO,SEO的TDK元素必须同步输出,所以这种方式比较适合,针对于JSON数据的流式输出通过SSE来实现,具体如下。

2 HTTP SSE协议

sse(Server Send Events)是HTTP的标准协议,是服务端向客户端发送事件流式的方式。在客户端中为一些事件类型绑定监听函数,从而做业务逻辑处理。这里要注意的是SEE是单向的,只能服务器向客户端发送事件流,具体流程如下:

image.png

SSE协议中约束了下面几个字段类型

1)event

事件类型。如果指定了该字段,则在客户端接收到该条消息时,会在当前的EventSource对象上触发一个事件,事件类型就是该字段的字段值,你可以使用addEventListener()方法在当前EventSource对象上监听任意类型的命名事件,如果该条消息没有event字段,则会触发onmessage属性上的事件处理函数。

2)data

消息的数据字段。如果该条消息包含多个data字段,则客户端会用换行符把它们连接成一个字符串来作为字段值。

3)id

事件ID,会成为当前EventSource对象的内部属性"最后一个事件ID"的属性值。

4)retry

一个整数值,指定了重新连接的时间(单位为毫秒),如果该字段值不是整数,则会被忽略。

客户端代码示例

// 客户端初始化事件源
const evtSource = new EventSource("//api.example.com/ssedemo.php", { withCredentials: true } );

// 对 message 事件添加一个处理函数开始监听从服务器发出的消息
evtSource.onmessage = function(event) {
  const newElement = document.createElement("li");
  const eventList = document.getElementById("list");

  newElement.innerHTML = "message: " + event.data;
  eventList.appendChild(newElement);
}

服务器代码示例

date_default_timezone_set("America/New_York");
header("Cache-Control: no-cache");
header("Content-Type: text/event-stream");
$counter = rand(1, 10);
while (true) {
  // Every second, send a "ping" event.
  echo "event: ping\n";
  $curDate = date(DATE_ISO8601);
  echo 'data: {"time": "' . $curDate . '"}';
  echo "\n\n";
  // Send a simple message at random intervals.
  $counter--;
  if (!$counter) {
    echo 'data: This is a message at time ' . $curDate . "\n\n";
    $counter = rand(1, 10);
  }
  ob_end_flush();
  flush();
  sleep(1);
}

效果示例

event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}
event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}

这里需要注意下,在未通过http2使用SSE时,SSE会收到最大连接数限制,此时默认的最大连接数只有6,即同一时间只能建立6个SSE连接,不过这里的限制是对同域名的,跨域的域名可以再建立6个SSE连接。通过HTTP2使用SSE时默认的最大连接数是100。

目前SSE已集成到spring5,Springboot2的webflux其实就是通过SSE的方式进行数据的流式输出。

3 WebSocket

Websocket就比较老生常谈了,这里主要介绍下它与SSE的区别:

  • Websocket是区别于HTTP的另外一种协议,是全双工通信,协议相对来说比较中,对代码侵入度比较高。
  • SSE是标准的HTTP协议,是半双工通信,支持断线重连和自定义事件和数据类型,相对轻便灵活。

4 RSocket

在微服务架构中,不同服务之间通过应用协议进行数据传输。典型的传输方式包括基于 HTTP 协议的 REST 或 SOAP API 和基于 TCP 字节流的 RPC 等。但是对于HTTP只支持请求响应模式,如果客户端需要获取最新的推送消息,就必须使用轮询,这无疑造成了大量的资源浪费。再者如果某个请求的响应时间过长,会阻塞之后的其他请求的处理;虽然服务器发送事件(Server-Sent Events,SSE)可以用来推送消息,不过 SSE 是一个简单的文本协议,仅提供有限的功能;而WebSocket 可以进行双向数据传输,不过它没有提供应用层协议支持,Rsocket很好的解决了已有协议面临的各种问题。

image.png

Rsocket是一个面向反应式应用程序的新型应用网络协议,它工作在网络七层模型中 5/6 层的协议,是 TCP/IP 之上的应用层协议,RSocket 可以使用不同的底层传输层,包括 TCP、WebSocket 和 Aeron。TCP 适用于分布式系统的各个组件之间交互,WebSocket 适用于浏览器和服务器之间的交互,Aeron 是基于 UDP 协议的传输方式,这就保证了 RSocket 可以适应于不同的场景,见上图。然后RSocket 使用二进制格式,保证了传输的高效,节省带宽。而且,通过基于反应式流控保证了消息传输中的双方不会因为请求的压力过大而崩溃。更多详细资料请移步RSocket[1]。雷卷也开源了alibaba-rsocket-broker[2],感兴趣可以去深入了解请教。

Rsocket提供了四种不同的交互模式满足所有场景:

image.png

RSocket 提供了不同语言的实现,包括Java、Kotlin、JavaScript、Go、.NET和C++ 等,如下为仅供学习了解的简单Java实现:

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Mono;

public class RequestResponseExample {

  public static void main(String[] args) {
    RSocketFactory.receive()
        .acceptor(((setup, sendingSocket) -> Mono.just(
            new AbstractRSocket() {
              @Override
              public Mono<Payload> requestResponse(Payload payload) {
                return Mono.just(DefaultPayload.create("ECHO >> " + payload.getDataUtf8()));
              }
            }
        )))
        .transport(TcpServerTransport.create("localhost", 7000)) //指定传输层实现
        .start() //启动服务器
        .subscribe();

    RSocket socket = RSocketFactory.connect()
        .transport(TcpClientTransport.create("localhost", 7000)) //指定传输层实现
        .start() //启动客户端
        .block();

    socket.requestResponse(DefaultPayload.create("hello"))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .block();

    socket.dispose();
  }
}

5 响应式编程框架

如果要在全链路实现响应式,那响应式编程框架是支撑这个技术的核心技术,这对于开发者来说是一种编程模式的变革,通过使用异步数据流进行编程对于原流程化的编程模式来说变化还很大。

简单示例如下:

@Override
public Single<Integer> remaining() {
    return Flowable.fromIterable(LotteryEnum.EFFECTIVE_LOTTERY_TYPE_LIST)
        .flatMap(lotteryType -> tairMCReactive.get(generateLotteryKey(lotteryType)))
        .filter(Result::isSuccess)
        .filter(result -> !ResultCode.DATANOTEXSITS.equals(result.getRc()))
        .map(result -> (Integer) result.getValue().getValue())
        .reduce((acc, lotteryRemaining) -> acc + lotteryRemaining)
        .toSingle(0);
}

总的来说通过HTTP分块传输协议和HTTP SSE协议以及RSocket我们可以实现流式输出,通过流式输出和响应式编程端到端的响应式才得以实现。

三 流式输出应用场景

性能、体验和数据是我们日常工作中抓的最紧的三件事情。对于性能来说也一直是我们追求极致和永无止境的核心点,流式输出也是在解决性能体验这个问题而诞生,那是不是所有的场景都适合流式输出呢?当然不是,我们来康康哪些场景适合?

image.png

以上为Resource Timing API规范提供的请求生命周期包含的主要阶段,通过上述来看下一下几个场景对于请求生命周期的影响。

1 页面流式输出场景

对于动态页面来说(相对于静态页面)主要由页面样式、页面交互的JS以及页面的动态数据构成,除了上述请求生命周期的各阶段耗时,还有页面渲染耗时阶段。浏览器拿到HTML会先进行DOM树构建、预加载扫描器、CSSOM树构建,Javascript编译执行,在过程中CSS文件的加载和JS文件的加载阻塞页面渲染过程。如果我们将页面按照以下方式进行拆分进行流式输出将会在性能上有很大的收益。

单接口动态页面

对于某些场景比如SEO,页面需要同步渲染输出,此时页面通常是单接口动态页面,就可以将页面拆分成body以上部分和body以下的部分,例如:

<!-- 模块1 -->
<html>
  <head>
  <meta  />
    <link  />  
  <style></style>  
  <script src=""></script>
  </head>
  <body>

<!-- 模块2 -->
        <div>xxx</div>
        <div>yyy</div>  
        <div>zzz</div> 
    </body>
</html>

当模块1到达页面模块2未到达时,模块1渲染后在等待模块2到来的同时可以进行CSS和JS的加载,在几个方面进行了性能提升:

  • 到达浏览器的首字节时间(TTFB)
  • 数据包到达浏览器下载HTML的时间
  • CSS和JS的加载及执行时间
  • 拆成模块之后网络传输的时间会有一定的降低

单接口多楼层页面

<!-- 模块1 -->
<html>
  <head>
  <meta  />
    <link  />  
  <style></style>  
  <script src=""></script>
  </head>
  <body>

<!-- 模块2 -->
        <div>xxx1</div>
        <div>yyy1</div>  
        <div>zzz1</div> 
    
<!-- 模块3 -->
        <div>xxx2</div>
        <div>yyy2</div>  
        <div>zzz2</div>
    
<!-- 模块4 -->
        <div>xxx3</div>
        <div>yyy3</div>  
        <div>zzz3</div>
    </body>
</html>

很多场景是一个页面展现多个楼层、譬如首页的四大金刚以及各种导购楼层,detail的信息楼层等等,甚至后面楼层依赖前面楼层的数据,类似这种情况可以将页面楼层拆分成多个模块进行输出,在上述几个方面进行了性能提升之外还有额外的性能提升:楼层之间数据相互依赖的数据处理时间。

多接口多楼层页面

一般情况下大部分页面都是由同步SSR渲染和异步CSR渲染进行,这时会涉及到JS异步加载异步楼层,如果同步渲染部分按照单接口多楼层进行拆分会在上述基础上提前加载运行异步楼层的渲染。

总的来说基于HTTP分块传输协议的流式输出几乎覆盖所有页面场景,供所有页面提升性能体验。

2 数据流式输出场景

单接口大数据

对于APP或者单页面系统更多的是通过异步加载数据的方式进行页面渲染,单个接口会造成单个接口的RT时间较长,以及数据包体太大导致在网络中拆包粘包的损耗较大。如果通过多个异步接口会因网络带宽受限而导致数据请求的延时较高以及网络IO的带来的CPU占有率较高,因此可以通过业务场景进行分析将单接口拆分成多个相互独立或者有一定耦合关系的业务模块,将这些模块的数据进行流式输出,以此带来以下性能体验上的提升。

  • 到达浏览器的首字节时间(TTFB)
  • 数据包到达端侧下载数据的时间
  • 数据在网络传输的时间

多相互依赖接口

但是在大部分场景中我们遇到的业务场景是相互耦合关联的,比方说榜单模块数据依赖它上面的新品模块的数据进行业务逻辑处理,这种情况在服务器侧处理完新品模块数据后对数据进行输出,再接着处理榜单模块数据进行输出,这里接节省了相互依赖等待的时间。

当然日常的业务场景会相对复杂的多,但是通过流式输出都会页面性能和体验会有很大的提升和助力。

四 小结

  • 流式输出的前世为流式渲染,今生为端到端的响应式,这些虽然带来了性能体验上的提升,但对研发模式变革的接受程度和运维成本的增加需要加以权衡。
  • 简单介绍了几种流式输出的技术方案,适合不同的业务场景。
  • 提出了流式输出适合的几种场景,以及对页面和数据进行拆分的方法。
相关链接
[1] https://rsocket.io/
[2] https://github.com/alibaba/alibaba-rsocket-broker
相关文章
|
2月前
|
安全 Java API
Stream流式编程,让代码变优雅
Stream流式编程,让代码变优雅
|
3月前
|
人工智能 前端开发
ProChat 1.0 使用问题之在 ProChat 中实现流式输出,如何操作
ProChat 1.0 使用问题之在 ProChat 中实现流式输出,如何操作
|
4月前
|
存储 数据处理 流计算
流模式vs批模式:你选对了吗?
本文由阿里云 Flink 团队刘文聪老师在撰写。文章分析了 Flink 的流批模式在不同维度存在的特点与差异,帮助开发者朋友们更好地理解 Flink 的流批模式。
584 12
流模式vs批模式:你选对了吗?
|
3月前
|
传感器 PyTorch 数据处理
流式数据处理:DataLoader 在实时数据流中的作用
【8月更文第29天】在许多现代应用中,数据不再是以静态文件的形式存在,而是以持续生成的流形式出现。例如,传感器数据、网络日志、社交媒体更新等都是典型的实时数据流。对于这些动态变化的数据,传统的批处理方式可能无法满足低延迟和高吞吐量的要求。因此,开发能够处理实时数据流的系统变得尤为重要。
83 0
|
4月前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用问题之处理包含时间戳并且希望在摄取、转换或输出时考虑特定时区的CDC数据,该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 关系型数据库 MySQL
[flink 实时流基础] 输出算子(Sink)
[flink 实时流基础] 输出算子(Sink)
143 1
|
6月前
|
负载均衡 算法 大数据
[flink 实时流基础] 转换算子
[flink 实时流基础] 转换算子
|
6月前
|
消息中间件 网络协议 大数据
[flink 实时流基础]源算子和转换算子
[flink 实时流基础]源算子和转换算子
|
6月前
|
前端开发 API
18_管道——转换
18_管道——转换
36 0
|
6月前
|
分布式计算 Hadoop 数据格式
MapReduce的输入和输出数据格式有哪些?请举例说明。
MapReduce的输入和输出数据格式有哪些?请举例说明。
113 0