第四章 流式输出与响应式编程
s01 > s02 > s03 > [ s04 ] s05 > s06 > s07 > s08 > s09 > s10 > s11 > s12 > s13 > s14 > s15 > s16 > s17 > s18
"让结果边生成边返回, 体验就完全不一样" -- 流式输出解决的不是能不能答, 而是等得久不久。
一、为什么需要流式输出?
1.1 传统请求 vs 流式输出
在传统的 HTTP 请求中,流程是这样的:
客户端 ───────────────────────────────> 服务器
请求(Request)
处理中...
处理中...
处理中...
响应(Response)────────────── 客户端
(完整内容一次性返回)
而流式输出(Streaming)的流程是:
客户端 ───────────────────────────────> 服务器
请求
处理ing...
获─────────────────────────> 客户端 (第一部分)
得 处理ing...
流 第二部分 ─────────> 客户端
式 第三部分 ─────────> 客户端
数据 ...
处理完成
1.2 生活化比喻
传统方式(等完整答案):
你问 AI:"请帮我写一篇 5000 字的文章"
AI 思考了 30 秒,然后一次性把 5000 字全部给你
体验:等了很久,然后突然一大坨内容出现
流式输出(打字机效果):
同样问 AI 写 5000 字文章
AI 思考完后,开始一点一点输出
第1秒给你100字...
第2秒再给200字...
体验:AI 正在努力工作,感觉响应很快
1.3 流式输出的优势
| 优势 | 说明 |
|---|---|
| 首字延迟低 | 不需要等 AI 生成完整答案,就能开始显示 |
| 用户体验好 | 看着文字一点点出现,有互动感 |
| 资源利用率高 | 服务器不需要等全部生成完再响应 |
| 支持长内容 | 生成几万字的内容也不需要长时间等待 |
二、核心技术概念
2.1 Flux 是什么?
Flux 是 Project Reactor(响应式编程库)的核心类型之一,它代表一个异步的、0到N个元素的数据流。
简单理解:
String= 一个单独的字符串List<String>= 一组字符串(一次全部返回)Flux<String>= 一个随时间推移逐步产生的字符串序列
2.2 响应式编程科普
响应式编程(Reactive Programming) 是一种编程范式,它的核心思想是:
"数据是流动的,程序要响应数据的变化"
在传统编程中:
// 你叫我,我才做
String result = chatModel.call(msg);
System.out.println(result);
在响应式编程中:
// 我订阅了这个消息,数据来了我就自动处理
chatModel.stream(msg).subscribe(result -> {
System.out.println(result); // 收到一部分就显示一部分
});
2.3 Spring AI 中的流式输出
// ChatModel 提供了两个核心方法
// 一次性返回(同步,等全部生成完)
ChatResponse call(Prompt prompt);
// 流式返回(异步,边生成边返回)
Flux<ChatResponse> stream(Prompt prompt);
三、项目代码详解
3.1 项目结构
SAA-04StreamingOutput/
├── pom.xml
├── src/main/java/com/atguigu/study/
│ ├── config/
│ │ └── SaaLLMConfig.java
│ ├── controller/
│ │ └── StreamOutputController.java # 流式输出控制器
│ └── ...
3.2 配置分析
package com.atguigu.study.config;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.ollama.OllamaChatModel;
import org.springframework.ai.ollama.OllamaOptions;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置类:同时注册阿里云和 Ollama 两种 ChatModel
*/
@Configuration
public class SaaLLMConfig
{
/**
* 阿里云百炼 DashScope ChatModel
* 通过 @Qualifier 指定 Bean 名称,便于后续注入选择
*/
@Bean
@Qualifier("dashScopeChatModel")
public ChatModel dashScopeChatModel()
{
return null; // 由 Spring AI 自动配置完成
}
/**
* Ollama 本地 ChatModel
*/
@Bean
@Qualifier("ollamaChatModel")
public ChatModel ollamaChatModel()
{
return null; // 由 Spring AI 自动配置完成
}
/**
* 基于 DashScope 的 ChatClient
*/
@Bean("dashScopeChatClient")
public ChatClient dashScopeChatClient(@Qualifier("dashScopeChatModel") ChatModel chatModel)
{
return ChatClient.builder(chatModel).build();
}
}
3.3 流式输出控制器
package com.atguigu.study.controller;
import jakarta.annotation.Resource;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model flux.ChatResponseFlux;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.prompt.SystemPromptTemplate;
import org.springframework.ai.chat.prompt.UserPromptTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Map;
/**
* 流式输出控制器
* 展示多种流式调用的实现方式
*/
@RestController
public class StreamOutputController
{
// 注入两个不同的 ChatModel
@Resource
@Qualifier("dashScopeChatModel")
private ChatModel dashScopeChatModel;
@Resource
@Qualifier("dashScopeChatModel")
private ChatModel chatModel;
// 注入 ChatClient
@Resource(name = "dashScopeChatClient")
private ChatClient chatClient;
/**
* 方式一:使用 ChatModel 的 stream 方法(最底层)
*
* 接口:http://localhost:8004/stream/chat?msg=今天天气怎么样
*
* @param msg 用户消息
* @return Flux<String> 流式返回的字符串
*/
@GetMapping("/stream/chat")
public Flux<String> streamChat(@RequestParam(name = "msg", defaultValue = "你是谁") String msg)
{
// chatModel.stream() 返回 Flux<ChatResponse>
// ChatResponse 是完整的响应对象,包含元数据
// 我们通过 map 提取文本内容
return chatModel.stream(msg)
.map(chatResponse ->
// getResults() 获取生成的所有片段
// get(0) 取第一个结果(因为是一段一段生成的)
// getOutput().getText() 获取文本内容
chatResponse.getResults().get(0).getOutput().getText()
);
}
/**
* 方式二:使用 ChatClient 的 stream 方法(更推荐)
*
* 接口:http://localhost:8004/stream/chat2?msg=用Java写个冒泡排序
*
* ChatClient 已经封装好了,直接返回字符串的 Flux
*/
@GetMapping("/stream/chat2")
public Flux<String> streamChat2(@RequestParam(name = "msg", defaultValue = "你是谁") String msg)
{
// 直接返回字符串类型的 Flux,无需额外转换
return chatClient.prompt()
.user(msg)
.stream() // 开启流式输出
.content(); // 直接获取文本内容
}
/**
* 方式三:带系统提示词的流式输出
*
* 接口:http://localhost:8004/stream/chat3?msg=什么是Spring
*
* 使用 SystemPromptTemplate 设置 AI 的角色和行为
*/
@GetMapping("/stream/chat3")
public Flux<String> streamChat3(@RequestParam(name = "msg", defaultValue = "你是谁") String msg)
{
// 1. 创建系统提示词模板
// 你是一个技术作家,用通俗易懂的语言解释概念
SystemPromptTemplate systemPromptTemplate = new SystemPromptTemplate(
"你是一个技术作家,用通俗易懂的语言解释概念,"
+ "回答控制在300字以内,并且用HTML格式输出。"
);
// 2. 构建完整的 Prompt(系统消息 + 用户消息)
Prompt prompt = new Prompt(
systemPromptTemplate.createMessage(), // 系统消息
new org.springframework.ai.chat.messages.UserMessage(msg) // 用户消息
);
// 3. 流式调用
return chatModel.stream(prompt)
.map(response -> response.getResults().get(0).getOutput().getText());
}
/**
* 方式四:使用模板变量的流式输出
*
* 接口:http://localhost:8004/stream/chat4?topic=AI&style=幽默
*
* PromptTemplate 支持占位符,类似 String.format()
*/
@GetMapping("/stream/chat4")
public Flux<String> streamChat4(
@RequestParam(name = "topic", defaultValue = "Java") String topic,
@RequestParam(name = "style", defaultValue = "专业") String style)
{
// 1. 创建带占位符的模板
// {topic} 和 {style} 是占位符,会被 param 替换
UserPromptTemplate userTemplate = new UserPromptTemplate(
"用{style}的风格介绍{topic},控制在200字以内"
);
// 2. 填充变量(map 中的 key 对应模板中的占位符)
Prompt prompt = userTemplate.createMessage(Map.of(
"topic", topic,
"style", style
));
// 3. 流式调用
return chatClient.prompt(prompt)
.stream()
.content();
}
}
四、前端展示流式输出
4.1 后端返回 vs 前端接收
这里先澄清一个很容易混淆的问题:
SSE、WebSocket、Fetch + ReadableStream这些并不完全是"前端框架技术",更准确地说,它们是浏览器和服务端之间进行实时/流式通信的 Web 技术。
也就是说:
SSE、WebSocket更像是通信机制/协议方案EventSource、fetch、ReadableStream是浏览器提供的 Web APIaxios是前端常用的 HTTP 请求库,但它在浏览器里对流式读取的支持并不如fetch直接
后端通过 Flux 返回流式数据后,前端可以用不同方式接收:
| 方式 | 类型 | 是否双向 | 适合场景 | 说明 |
|---|---|---|---|---|
| SSE | 服务端推送 | 否 | AI 打字机输出、进度通知 | 浏览器原生支持,最适合单向流式文本 |
| WebSocket | 长连接通信 | 是 | 聊天室、协同编辑、双向实时交互 | 功能更强,但实现更复杂 |
| Fetch + ReadableStream | HTTP 响应流读取 | 否 | 自定义流式解析、LLM 文本分块展示 | 现代前端最常见的流式处理方式 |
| axios | HTTP 请求库 | 否 | 普通请求 | 在浏览器中不太适合做真正的流式文本消费 |
4.2 这些方式分别是什么?
1)SSE 是什么?
SSE 全称是 Server-Sent Events,中文一般叫服务端发送事件。
它的特点是:
- 浏览器发起一次普通 HTTP 请求
- 服务端不马上关闭连接,而是持续不断地往下推送数据
- 浏览器端通过
EventSource持续接收
它非常适合 AI 对话这种场景,因为:
- 用户发一个问题给后端
- 后端持续把模型生成的文本往前端推
- 前端一边接收一边拼接显示
前端示例:
const eventSource = new EventSource("/stream/sse?msg=你好");
eventSource.onmessage = function (event) {
console.log("收到服务端推送:", event.data);
// 把 event.data 追加到页面上
};
eventSource.onerror = function () {
eventSource.close();
};
优点:简单、浏览器原生支持、特别适合单向文本流输出。
缺点:只能服务端推给客户端,客户端不能在同一个连接里反向持续发送数据。
2)WebSocket 是什么?
WebSocket 是一种全双工通信协议,建立连接后,前后端都可以随时主动发消息。
它和 SSE 的区别可以理解为:
SSE:像广播喇叭,服务端说,前端听WebSocket:像打电话,双方都能随时说话
前端示例:
const socket = new WebSocket("ws://localhost:8004/ws/chat");
socket.onopen = function () {
socket.send("你好,请介绍一下 Spring AI Alibaba");
};
socket.onmessage = function (event) {
console.log("收到消息:", event.data);
};
优点:支持双向通信,实时性强。
缺点:开发和维护成本更高,如果只是展示 AI 生成中的文本,往往有点"大材小用"。
3)Fetch + ReadableStream 是什么?
这是现代前端里处理流式 HTTP 响应最常见的一种方式。
浏览器调用 fetch() 后,如果服务端返回的是流式响应体,前端可以通过 response.body.getReader() 一块一块读取数据。
前端示例:
async function streamChat() {
const response = await fetch("/stream/chat2?msg=你好");
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
while (true) {
const {
done, value } = await reader.read();
if (done) {
break;
}
const chunk = decoder.decode(value, {
stream: true });
console.log("收到分块数据:", chunk);
// 把 chunk 追加到页面
}
}
这种方式的本质是:
- 后端还是走普通 HTTP
- 但响应不是一次性读完
- 而是前端主动按块读取
优点:灵活、现代、适合自定义解析。
缺点:前端代码比 SSE 稍复杂一些。
4)axios 能不能做流式接收?
很多同学会自然想到 axios,因为它平时写接口请求最常用。
但要注意:
- 在 Node.js 环境 下,
axios对流支持较好 - 在 浏览器环境 下,
axios对真正的流式文本消费不如fetch + ReadableStream直接
所以如果你是在浏览器里做 AI 打字机效果,通常更推荐:
- 简单场景:
SSE - 自定义流解析:
fetch + ReadableStream - 强双向交互:
WebSocket
4.3 到底算不算前端技术?
严格来说,它们不全是传统意义上的"前端页面样式技术",而是 前端工程中负责网络通信和实时交互的一部分技术栈。
你可以这样理解:
HTML/CSS/JavaScript负责页面长什么样、怎么交互SSE/WebSocket/fetch负责页面怎么从后端持续拿到数据
所以把它们放在"前端展示流式输出"这一节是合理的,但如果说它们全部都是"纯前端技术",就不够准确。更准确的说法应该是:
它们是前端接收后端流式数据时常用的通信技术。
4.4 最简单的测试方式
在浏览器地址栏直接访问流式接口是不会看到流式效果的,因为浏览器会把请求完整加载完才显示。
正确的测试方式:
# 使用 curl 的 streaming 模式
curl -N http://localhost:8004/stream/chat2?msg=你好
# -N 参数表示不缓存,实时显示服务器的响应
或者使用 Postman(勾选 "Send without waiting for response")。
如果你是要真正做页面展示,推荐优先级如下:
- AI 文本逐字输出:优先考虑
SSE - 需要自己解析流式分块:优先考虑
fetch + ReadableStream - 需要双向实时通信:选择
WebSocket
五、响应式编程补充
5.1 为什么用 Flux/Mono?
在 AI 场景中,响应时间可能很长(几秒到几十秒),如果用传统方式:
// 同步等待:用户要一直转圈圈等待
String result = chatModel.call(msg); // 30秒后才能拿到结果
用响应式:
// 流式返回:开始显示第一个字的时候就展示给用户
Flux<String> flux = chatModel.stream(msg); // 立即返回,用户体验好
5.2 Flux 的特点
// Flux 可以发出 0 个、1 个、或 N 个元素
// 就像是一个水管,可以流出一个个水滴
Flux.empty(); // 0个元素(完成)
Flux.just("a"); // 1个元素(完成)
Flux.just("a","b","c"); // 3个元素(逐步流出)
// AI 生成的过程就是典型的 Flux:逐步产出内容
chatModel.stream(msg)
.map(response -> response.getResults().get(0).getOutput().getText());
// 比如AI生成了"今天天气很好",会分多次流出:
// 第一次:"今"
// 第二次:"今天"
// 第三次:"今天天"
// ...
六、本章小结
6.1 核心知识点
| 概念 | 说明 |
|---|---|
| Flux | 响应式编程中的异步流类型 |
| 流式输出 | 边生成边返回,提升用户体验 |
| SSE | Server-Sent Events,服务端推送技术 |
| ChatModel.stream() | 底层流式调用方法 |
| ChatClient.prompt().stream() | 高级流式调用封装 |
6.2 实战要点
使用场景:
- 单次生成内容超过几百字 → 建议流式
-客户需要实时看到进展 → 必须流式 - 长对话、生成报告 → 流式体验更好
注意事项:
- 流式输出在前端需要特殊处理(SSE/WebSocket)
- 测试时不要用浏览器地址栏测试,用 curl 或 Postman
本章重点:
- 理解为什么需要流式输出(用户体验)
- 掌握 Flux 响应式流的使用
- 学会多种流式调用的代码写法
下章剧透(s05):
了解了流式输出的实现后,下一章我们将深入学习 Prompt(提示词)的艺术——如何写出高质量的提示词让 AI 输出更准确的结果。
📝 编辑者:Flittly
📅 更新时间:2026年3月
🔗 相关资源:Spring WebFlux 响应式编程 | Reactor 官方文档