protected void processStream(Flux<Event> generator, Sinks.Many<ServerSentEvent<String>> sink) {
generator
.doOnNext(output -> logger().info("output = {}", output))
.filter(event -> !event.isLast())
.map(event -> event.getMessage())
.flatMap(msg -> Flux.fromIterable(msg.getContent()))
.flatMap(block -> {
if (block instanceof TextBlock) {
String content = ((TextBlock) block).getText();
ServerSentEvent<String> sse = ServerSentEvent.builder(content)
.event("text") // 文本回答事件
.build();
return Flux.just(sse);
} else if (block instanceof ThinkingBlock) {
String thinking = ((ThinkingBlock) block).getThinking();
ServerSentEvent<String> sse = ServerSentEvent.builder(thinking)
.event("thinking") // 思考过程事件
.build();
return Flux.just(sse);
} else if (block instanceof ToolUseBlock) {
ToolUseBlock toolBlock = (ToolUseBlock) block;
ServerSentEvent<String> sse = ServerSentEvent.builder(
JSON.toJSONString(toolBlock))
.event("tool_call") // 工具调用事件
.build();
return Flux.just(sse);
} else if (block instanceof ToolResultBlock) {
ToolResultBlock toolBlock = (ToolResultBlock) block;
ServerSentEvent<String> sse = ServerSentEvent.builder(
JSON.toJSONString(toolBlock))
.event("tool_result") // 工具响应事件
.build();
// TODO在这里返回工具调用结果,供前端展示 如搜索了xxx个网页
return Flux.just(sse);
} else {
return Flux.empty();
}
})
.doOnNext(sink::tryEmitNext)
.doOnError(e -> {
logger().error("Unexpected error in stream processing: {}", e.getMessage(), e);
ServerSentEvent<String> errorSse = ServerSentEvent.builder(
"System processing error, please try again later.")
.event("error") // 错误事件
.build();
sink.tryEmitNext(errorSse);
})
.doOnComplete(() -> {
logger().info("Stream processing completed successfully");
// 发送完成事件
ServerSentEvent<String> endSse = ServerSentEvent.builder("")
.event("end") // 结束事件
.build();
sink.tryEmitNext(endSse);
sink.tryEmitComplete();
})
.subscribe(
null,
e -> {
logger().error("Stream processing failed: {}", e.getMessage(), e);
sink.tryEmitError(e);
}
);
}
在调用时可以获取到ToolUseBlock事件,但是无法获取到结果事件
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。