MCP协议重大升级,Spring AI Alibaba联合Higress发布业界首个Streamable HTTP实现方案

本文涉及的产品
云原生网关 MSE Higress,422元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 本文由Spring AI Alibaba Contributor刘军、张宇撰写,探讨MCP官方引入的全新Streamable HTTP传输层对原有HTTP+SSE机制的重大改进。文章解析Streamable HTTP的设计思想与技术细节,并介绍Spring AI Alibaba开源框架提供的Java实现,包含无状态服务器模式、流式进度反馈模式等多种场景的应用示例。同时,文章还展示了Spring AI Alibaba + Higress的完整可运行示例,分析当前实现限制及未来优化方向,为开发者提供参考。

1.gif

本文作者:刘军、张宇,Spring AI Alibaba Contributor


文章摘要


MCP 官方引入了全新的 Streamable HTTP 传输层,对原有 HTTP+SSE 传输机制有重大改进。本文将:


1. 详细解析 Streamable HTTP 的设计思想、技术细节以及实际应用。


2. 详解 Spring AI Alibaba 开源框架提供的 Streamable HTTP Java 实现。


3. 提供 Spring AI Alibaba + Higress 的 Streamable HTTP 完整可运行示例与讲解。


HTTP+SSE 原理及缺陷


1.png


在原有的 MCP 实现中,客户端和服务器通过两个主要通道通信:


  • HTTP 请求/响应:客户端通过标准 HTTP 请求发送消息到服务器
  • 服务器发送事件(SSE):服务器通过专门的 /sse 端点向客户端推送消息


主要问题


这种设计虽然简单直观,但存在几个关键问题:


不支持断线重连/恢复


当 SSE 连接断开时,所有会话状态丢失,客户端必须重新建立连接并初始化整个会话。例如,正在执行的大型文档分析任务会因 WiFi 不稳定而完全中断,迫使用户重新开始整个过程。


服务器需维护长连接


服务器必须为每个客户端维护一个长时间的 SSE 连接,大量并发用户会导致资源消耗剧增。当服务器需要重启或扩容时,所有连接都会中断,影响用户体验和系统可靠性。


服务器消息只能通过 SSE 传递


即使是简单的请求-响应交互,服务器也必须通过 SSE 通道返回信息,造成不必要的复杂性和开销。对于某些环境(如云函数)不适合长时间保持 SSE 连接。


基础设施兼容性限制


许多现有的 Web 基础设施如 CDN、负载均衡器、API 网关等可能不能正确处理长时间的 SSE 连接,企业防火墙可能会强制关闭超时连接,导致服务不可靠。


Streamable HTTP 原理与改进


相比原有 HTTP+SSE 机制,Streamable HTTP 引入了几项关键改进:


1. 统一 Endoint:移除专门的 /sse 端点,所有通信通过单一端点(当前官方 sdk 实现为 /mcp)进行


2. 按需流式传输:服务器可灵活选择是返回普通 HTTP 响应还是升级为 SSE 流


3. 会话标识:引入会话 ID 机制,支持状态管理和恢复


4. 灵活初始化:客户端可通过空 GET 请求主动初始化 SSE 流


Streamable 工作原理


Streamable HTTP 的工作流程如下:


  1. 会话初始化(非强制,适用于有状态实现场景):
  • 客户端发送初始化请求到 /mcp 端点
  • 服务器可选择生成会话 ID 返回给客户端
  • 会话 ID 用于后续请求中标识会话


2. 客户端向服务器通信:

  • 所有消息通过 HTTP POST 请求发送到 /mcp 端点
  • 如果有会话 ID,则包含在请求中


3. 服务器响应方式:

  • 普通响应:直接返回 HTTP 响应,适合简单交互
  • 流式响应:升级连接为 SSE,发送一系列事件后关闭
  • 长连接:维持 SSE 连接持续发送事件


4. 主动建立 SSE 流:

  • 客户端可发送 GET 请求到 /mcp 端点主动建立 SSE 流
  • 服务器可通过该流推送通知或请求


5. 连接恢复:

  • 连接中断时,客户端可使用之前的会话 ID 重新连接
  • 服务器可恢复会话状态继续之前的交互


Streamable 请求示例


无状态服务器模式


场景简单工具 API 服务,如数学计算、文本处理等。


实现



客户端                                 服务器
   |                                    |
   |-- POST /message (计算请求) -------->|
   |                                    |-- 执行计算
   |<------- HTTP 200 (计算结果) -------|
   |                                    |


优势极简部署,无需状态管理,适合无服务器架构和微服务。


流式进度反馈模式


场景:长时间运行的任务,如大文件处理、复杂 AI 生成等。


实现

客户端                                 服务器
   |                                    |
   |-- POST /message (处理请求) -------->|
   |                                    |-- 启动处理任务
   |<------- HTTP 200 (SSE开始) --------|
   |                                    |
   |<------- SSE: 进度10% ---------------|
   |<------- SSE: 进度30% ---------------|
   |<------- SSE: 进度70% ---------------|
   |<------- SSE: 完成 + 结果 ------------|
   |                                    |


优势提供实时反馈,但不需要永久保持连接状态。


复杂 AI 会话模式


场景多轮对话 AI 助手,需要维护上下文。


实现


客户端                                 服务器
   |                                    |
   |-- POST /message (初始化) ---------->|
   |<-- HTTP 200 (会话ID: abc123) ------|
   |                                    |
   |-- GET /message (会话ID: abc123) --->|
   |<------- SSE流建立 -----------------|
   |                                    |
   |-- POST /message (问题1, abc123) --->|
   |<------- SSE: 思考中... -------------|
   |<------- SSE: 回答1 ----------------|
   |                                    |
   |-- POST /message (问题2, abc123) --->|
   |<------- SSE: 思考中... -------------|
   |<------- SSE: 回答2 ----------------|


优势维护会话上下文,支持复杂交互,同时允许水平扩展。


断线恢复模式


场景不稳定网络环境下的 AI 应用使用。


实现


客户端                                 服务器
   |                                    |
   |-- POST /message (处理请求) -------->|
   |                                    |-- 启动处理任务
   |<------- HTTP 200 (SSE开始) --------|
   |                                    |
   |<------- SSE: 进度10% ---------------|
   |<------- SSE: 进度30% ---------------|
   |<------- SSE: 进度70% ---------------|
   |<------- SSE: 完成 + 结果 ------------|
   |                                    |


优势:提供实时反馈,但不需要永久保持连接状态。


复杂 AI 会话模式


场景:多轮对话 AI 助手,需要维护上下文。


实现



客户端                                 服务器
   |                                    |
   |-- POST /message (初始化) ---------->|
   |<-- HTTP 200 (会话ID: xyz789) ------|
   |                                    |
   |-- GET /message (会话ID: xyz789) --->|
   |<------- SSE流建立 -----------------|
   |                                    |
   |-- POST /message (长任务, xyz789) -->|
   |<------- SSE: 进度30% ---------------|
   |                                    |
   |     [网络中断]                      |
   |                                    |
   |-- GET /message (会话ID: xyz789) --->|
   |<------- SSE流重新建立 --------------|
   |<------- SSE: 进度60% ---------------|
   |<------- SSE: 完成 ------------------|


优势提高弱网环境下的可靠性,改善用户体验。


Spring AI Alibaba 社区的 Streamable HTTP 实现


在前文中,我们从理论上列举了 HTTP+SSE 与 Streamable 两种模式各自的优劣。在实际落地应用中,由于 HTTP+SSE 模式割裂的请求与响应模式,导致它在架构实现与扩展性上存在一个非常棘手的问题:它强制要求在 client 与 server 之间维持粘性会话连接(Sticky Session),即使只是维持无状态 Stateless 通信,我们也需要维护一个 session id,并确保具备相同 session id 的请求发送到相同的服务端机器,这对于 client 和 server 端实现都是非常重的负担,对于 server 水平扩展是个挑战。


因此,前期的很多 MCP server 都是 stdio 模式,包括 C 端的很多通用智能体如 Claude 支持的也是 stdio 模式居多。stdio 模式下对于安装在个人电脑上的 Claude 来说不是一个大问题,而对于部署在服务器上的企业级业务 client 来说则是一个很重的动作,因此 stdio 模式也并不适合企业级业务落地。


对于 Streamable  HTTP 模式,如果只是要维持 Stateless 通信的话,是完全不需要维护 sticky session 的。而考虑到 90% 以上的 MCP 服务可能都是无状态的,这对于整体架构的可扩展性是一个非常大的提升。


当然,如果要实现 Stateful 通信的话,Streamable HTTP 模式同样也还是需要维护 session id 的。


Streamable HTTP Java 版实现方案


当前 MCP 和 Spring AI 官方并没有给出 Streamable 目前我们只给出了 Stream HTTP Client 实现,且只支持 Stateless 模式,可以调通官方的 Typescript server 实现、Higress 社区的 server 实现。

完整可运行示例可参考:

https://github.com/springaialibaba/spring-ai-alibaba-examples


由于 streamable http 方案的 MCP java sdk 实现还在开发中,因此该示例仓库中包含如下两个仓库的定制源码:


1. MCP java-sdk,在项目 io.modelcontextprotocol 包。
2. Spring AI,在项目 org.springframework.ai.mcp.client.autoconfigure 包。

示例集成了支持 MCP Streamable HTTP 协议实现的 Higress 网关,该实现还有很多限制,如不支持 GET 请求、不支持 session-id 管理等。


新增 StreamHttpClientTransport


GET 请求(空请求体),主动建立 SSE 连接


客户端可以通过主动发送 GET 请求到 /mcp 端点建立 SSE 连接,并用作后续请求响应通道。


return Mono.defer(() -> Mono.fromFuture(() -> {
        final HttpRequest.Builder builder = requestBuilder.copy().GET().uri(uri);
        final String lastId = lastEventId.get();
        if (lastId != null) {
            builder.header("Last-Event-ID", lastId);
        }
        return httpClient.sendAsync(builder.build(), HttpResponse.BodyHandlers.ofInputStream());
    }).flatMap(response -> {
        if (response.statusCode() == 405 || response.statusCode() == 404) {
            // .....
        }
        return handleStreamingResponse(response, handler);
    })
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(err -> err instanceof IllegalStateException))
    .doOnSuccess(v -> state.set(TransportState.CONNECTED))
    .doOnTerminate(() -> state.set(TransportState.CLOSED))
    .onErrorResume(e -> {
        System.out.println("Ignore GET connection error.");
        LOGGER.error("Streamable transport connection error", e);
        state.set(TransportState.CONNECTED);
        return Mono.just("Streamable transport connection error").then();
    }));


POST 请求,服务端可以普通 response 响应或者升级 SSE 响应


对等的示例 http 请求,listTool 与 callTool 是类似的请求。



curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -H "Accept: text/event-stream"     -d '{
  "jsonrpc" : "2.0",   
  "method" : "initialize",   
  "id" : "9afdedcc-0",
  "params" : {             
    "protocolVersion" : "2024-11-05",   
    "capabilities" : { 
      "roots" : {        
        "listChanged" : true
      }  
    },  
    "clientInfo" : {
      "name" : "Java SDK MCP Client",                                                 
      "version" : "1.0.0"
    }
  }
}' -i http://localhost:3000/mcp 


可以启动并使用官方 typescript-sdk 提供的 Streamable Server 配合当前 client 实现进行测试。

Java代码实现


// 发送 POST 请求到 /mcp,包括
public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
            final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
    // ... 
    return sentPost(message, handler).onErrorResume(e -> {
        LOGGER.error("Streamable transport sendMessage error", e);
        return Mono.error(e);
    });
}

// 实际发送 POST 请求并处理响应
private Mono<Void> sentPost(final Object msg,
        final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
    return serializeJson(msg).flatMap(json -> {
        final HttpRequest request = requestBuilder.copy()
            .POST(HttpRequest.BodyPublishers.ofString(json))
            .uri(uri)
            .build();
        return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()))
            .flatMap(response -> {
                // If the response is 202 Accepted, there's no body to process
                if (response.statusCode() == 202) {
                    return Mono.empty();
                }

                if (response.statusCode() == 405 || response.statusCode() == 404) {
                 // ...
                }

                if (response.statusCode() >= 400) {
                 // ...
                }

                return handleStreamingResponse(response, handler);
            });
    });

}

// 处理服务端可能发回的不同类型响应
private Mono<Void> handleStreamingResponse(final HttpResponse<InputStream> response,
            final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
    final String contentType = response.headers().firstValue("Content-Type").orElse("");
    if (contentType.contains("application/json-seq")) {
        return handleJsonStream(response, handler);
    }
    else if (contentType.contains("text/event-stream")) {
        return handleSseStream(response, handler);
    }
    else if (contentType.contains("application/json")) {
        return handleSingleJson(response, handler);
    }
    else {
        return Mono.error(new UnsupportedOperationException("Unsupported Content-Type: " + contentType));
    }
}


集成到 Spring AI 框架


@AutoConfiguration
@ConditionalOnClass({ McpSchema.class, McpSyncClient.class })
@EnableConfigurationProperties({ McpStreamableClientProperties.class, McpClientCommonProperties.class })
@ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "enabled", havingValue = "true",
        matchIfMissing = true)
public class StreamableHttpClientTransportAutoConfiguration {
    @Bean
    public List<NamedClientMcpTransport> mcpHttpClientTransports(McpStreamableClientProperties streamableProperties,
            ObjectProvider<ObjectMapper> objectMapperProvider) {

        ObjectMapper objectMapper = objectMapperProvider.getIfAvailable(ObjectMapper::new);

        List<NamedClientMcpTransport> sseTransports = new ArrayList<>();

        for (Map.Entry<String, McpStreamableClientProperties.StreamableParameters> serverParameters : streamableProperties.getConnections().entrySet()) {

            var transport = StreamableHttpClientTransport.builder(serverParameters.getValue().url()).withObjectMapper(objectMapper).build();
            sseTransports.add(new NamedClientMcpTransport(serverParameters.getKey(), transport));
        }

        return sseTransports;
    }

}


完整 Spring AI Alibaba + Higress Streamable HTTP 示例


通过配置如下,可以开启 Streamable HTTP Transport。配置如下 Higress 提供的 MCP Server 地址(支持有限的 Streamable HTTP Server 实现)。


spring:
  ai:
    mcp:
      client:
        toolcallback:
          enabled: true
        streamable:
          connections:
            server1:
              url: (虚拟地址需自行替换)http://env-xxxxxjem1hkjat42sxxx-ap-southeast-1.alicloudapi.com/mcp-quark


@SpringBootApplication(exclude = {
        org.springframework.ai.mcp.client.autoconfigure.SseHttpClientTransportAutoConfiguration.class,
})
@ComponentScan(basePackages = "org.springframework.ai.mcp.client")
public class Application {
    @Bean
    public CommandLineRunner predefinedQuestions(ChatClient.Builder chatClientBuilder, ToolCallbackProvider tools,
            ConfigurableApplicationContext context) {
        return args -> {
            var chatClient = chatClientBuilder
                    .defaultTools(tools)
                    .build();

            System.out.println("\n>>> QUESTION: " + "阿里巴巴西溪园区");
            System.out.println("\n>>> ASSISTANT: " + chatClient.prompt("阿里巴巴西溪园区").call().content());

            System.out.println("\n>>> QUESTION: " + "黄金价格走势");
            System.out.println("\n>>> ASSISTANT: " + chatClient.prompt("黄金价格走势").call().content());

        };
    }
}


运行示例后,看到成功连接 MCP Server 并执行 list tool,Higress 示例内置了两个 tool。


{
    "jsonrpc": "2.0",
    "id": "32124bd9-1",
    "result": {
        "nextCursor": "",
        "tools": [{
            "description": "Performs a web search using the Quark Search API, ideal for general queries, news, articles, and online content.\nUse this for broad information gathering, recent events, or when you need diverse web sources.\nBecause Quark search performs poorly for English searches, please use Chinese for the query parameters.",
            "inputSchema": {
                "additionalProperties": false,
                "properties": {
                    "contentMode": {
                        "default": "summary",
                        "description": "Return the level of content detail, choose to use summary or full text",
                        "enum": ["full", "summary"],
                        "type": "string"
                    },
                    "number": {
                        "default": 5,
                        "description": "Number of results",
                        "type": "integer"
                    },
                    "query": {
                        "description": "Search query, please use Chinese",
                        "examples": ["黄金价格走势"],
                        "type": "string"
                    }
                },
                "required": ["query"],
                "type": "object"
            },
            "name": "web_search"
        }]
    }
}


示例发起 chat 会话,模型会引导智能体调用 web_search tool 并返回结果。


当前实现限制


当前只是在官方 java sdk 的基础上新增了 Streamable HTTP 模式的 McpClientTransport 实现,但其实这种改造方法没办法很好的完全支持 Streamable HTTP,因为它的工作流程在很多地方和 HTTP+SSE 不一致,而之前的 java sdk 很多流程是强绑定 HTTP+SSE 模式设计的,导致当前的 sdk 实现上需要做一些结构化修改。


如,有如下几点当前的实现就是受限的:


1. Initialization 过程在 Streamable HTTP 中并不是必须的,只有实现有状态管理时才需要,并且Initializated之后,后续的所有请求都需要带上 mcp-session-id。而当前的 java-sdk 设计是强制检查 Initialization 状态,并且在 initiialization 后没有管理 mcp-session-id
2. /mcp GET 请求在协议中是被约束为 client 主动发起 SSE 请求时使用,而当前实现是在每次 connect 时发起 GET 请求并建立 SSE 会话,后续的 POST 请求通过也依赖这里发回响应,这可以通过 pendingResponses 属性的操作看出来。

当前 Spring AI Alibaba 社区的多位核心贡献者在积极参与 MCP 官方 sdk 建设,包括问题修复、Streamable HTTP 方案实现等,目前我们已经将相关 PR 提交官方社区。以下是社区的 Streamable 方案 PR:


1. https://github.com/modelcontextprotocol/java-sdk/pull/144


2. https://github.com/modelcontextprotocol/java-sdk/pull/163


官方实现与参考资料


1. Spring AI Alibaba 官方网站:
https://java2ai.com/

2. Spring AI Alibaba 开源项目源码仓库:https://github.com/alibaba/spring-ai-alibaba

3. mcp-streamable-http:https://www.claudemcp.com/blog/mcp-streamable-http

4. MCP java-sdk:https://github.com/modelcontextprotocol/java-sdk

5. Streamable HTTP:https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http

相关文章
|
1月前
|
存储 人工智能 NoSQL
阿里云表格存储 Tablestore 全面升级 AI 能力,存储成本直降 30%
近日,阿里云表格存储 Tablestore 宣布全面升级 AI 场景支持能力,正式推出 AI Agent 记忆存储功能,在保障高性能与高可用的同时,整体存储成本降低 30%,标志着 Tablestore 在构建 AI 数据处理和存储的技术内核能力上,迈出关键一步。
183 5
|
2月前
|
消息中间件 存储 人工智能
Apache RocketMQ for AI 战略升级,开启 AI MQ 新时代
Apache RocketMQ 顺应AIGC浪潮,针对长时会话、稀缺算力调度及AI Agent协作等挑战,推出专为AI时代打造的消息引擎。通过“会话即主题”的Lite-Topic机制,实现百万级队列动态管理,保障会话连续性与断点续传;结合智能资源调度能力,如定速消费与优先级队列,提升算力利用率与服务公平性;同时构建高效异步通信枢纽,支撑Agent-to-Agent及AI工作流的非阻塞协同。已在阿里集团与阿里云多个AI产品中大规模验证,助力开发者构建稳定、高效、可扩展的AI应用基础设施。
|
2月前
|
人工智能 安全 Cloud Native
Nacos 3.0 架构升级,AI 时代更安全的 Registry
随着Nacos3.0的发布,定位由“更易于构建云原生应用的动态服务发现、配置管理和服务管理平台”升级至“ 一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台 ”。
|
2月前
|
人工智能 供应链 安全
AI驱动攻防升级,API安全走到关键档口
在AI与数字化转型加速背景下,API已成为企业连接内外业务的核心枢纽,但其面临的安全威胁也日益严峻。瑞数信息发布的《API安全趋势报告》指出,2024年API攻击流量同比增长162%,占所有网络攻击的78%。攻击呈现规模化、智能化、链式扩散等新特征,传统防护手段已难应对。报告建议企业构建覆盖API全生命周期的安全体系,强化资产梳理、访问控制、LLM防护、供应链管控等七大能力,提升动态防御水平,保障AI时代下的业务安全与稳定。
119 0
|
1月前
|
人工智能 分布式计算 大数据
ODPS重磅升级!全面支撑AI应用爆发
阿里云全面升级自研大数据平台ODPS架构,旗下MaxCompute、Hologres和DataWorks等核心产品全面融合AI技术,提升数据处理能力与多模态计算支持,推动企业智能化转型。
110 0
ODPS重磅升级!全面支撑AI应用爆发
|
30天前
|
存储 人工智能 NoSQL
阿里云表格存储 Tablestore 全面升级 AI 能力,存储成本直降 30%
让 AI 记得久、找得快、用得上,表格存储加速智能体记忆进化。
|
2月前
|
存储 机器学习/深度学习 人工智能
还在为释放医疗数据潜能,驱动智慧医联体升级 ——AI赋能的病历全流程智能管理解决方案
AI赋能病历管理,破解录入低效、存储难、数据沉睡等痛点。实现病历数字化、结构化、智能化,降本增效,助力医院智慧升级。
80 0
|
人工智能 缓存 NoSQL
【深度】企业 AI 落地实践(四):如何构建端到端的 AI 应用观测体系
本文探讨了AI应用在实际落地过程中面临的三大核心问题:如何高效使用AI模型、控制成本以及保障输出质量。文章详细分析了AI应用的典型架构,并提出通过全栈可观测体系实现从用户端到模型推理层的端到端监控与诊断。结合阿里云的实践经验,介绍了基于OpenTelemetry的Trace全链路追踪、关键性能指标(如TTFT、TPOT)采集、模型质量评估与MCP工具调用观测等技术手段,帮助企业在生产环境中实现AI应用的稳定、高效运行。同时,针对Dify等低代码平台的应用部署与优化提供了具体建议,助力企业构建可扩展、可观测的AI应用体系。