通过okhttp调用SSE流式接口,并将消息返回给客户端

简介: 通过okhttp调用SSE流式接口,并将消息返回给客户端

通过一个完整的java示例来演示如何通过okhttp来调用远程的sse流式接口

背景:我们有一个智能AI的聊天界面,需要调用三方厂商的大模型chat接口,返回答案(因为AI去理解并检索你的问题的时候这个是比较耗时的,这个时候客户端需要同步的在等待最终结果),所以我们的方案是通过流的方式把结果陆续的返回给客户端,这样能极大的提高用户的体验

1.引入相关依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>4.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp-sse</artifactId>
            <version>4.2.0</version>
        </dependency>
        <dependency>
            <groupId>io.jsonwebtoken</groupId>
            <artifactId>jjwt</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>

2. controller

package com.demo.controller;
import com.alibaba.fastjson.JSON;
import com.demo.listener.SSEListener;
import com.demo.params.req.ChatGlmDto;
import com.demo.utils.ApiTokenUtil;
import com.demo.utils.ExecuteSSEUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletResponse;
@RestController
@Slf4j
public class APITestController {
    private static final String API_KEY = "xxxx";
    
    private static final String URL = "xxx";
    @PostMapping(value = "/sse-invoke", produces = "text/event-stream;charset=UTF-8")
    public void sse(@RequestBody ChatGlmDto chatGlmDto, HttpServletResponse rp) {
        try {
            String token = ApiTokenUtil.generateClientToken(API_KEY);
            SSEListener sseListener = new SSEListener(chatGlmDto, rp);
            ExecuteSSEUtil.executeSSE(URL, token, sseListener, JSON.toJSONString(chatGlmDto));
        } catch (Exception e) {
            log.error("请求SSE错误处理", e);
        }
    }
}

3. 监听器

监听器里的事件可以自己定义,然后自己去实现自己相关的业务逻辑,onEvent主要用来接收消息

package com.demo.listener;
import com.alibaba.fastjson.JSON;
import com.demo.params.req.ChatGlmDto;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import javax.servlet.http.HttpServletResponse;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Data
public class SSEListener extends EventSourceListener {
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    private ChatGlmDto chatGlmDto;
    private HttpServletResponse rp;
    private StringBuffer output = new StringBuffer();
    public SSEListener(ChatGlmDto chatGlmDto, HttpServletResponse response) {
        this.chatGlmDto = chatGlmDto;
        this.rp = response;
    }
    /**
     * {@inheritDoc}
     * 建立sse连接
     */
    @Override
    public void onOpen(final EventSource eventSource, final Response
            response) {
        if (rp != null) {
            rp.setContentType("text/event-stream");
            rp.setCharacterEncoding("UTF-8");
            rp.setStatus(200);
            log.info("建立sse连接..." + JSON.toJSONString(chatGlmDto));
        } else {
            log.info("客户端非sse推送" + JSON.toJSONString(chatGlmDto));
        }
    }
    /**
     * 事件
     *
     * @param eventSource
     * @param id
     * @param type
     * @param data
     */
    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        try {
            output.append(data);
            if ("finish".equals(type)) {
                log.info("请求结束{} {}", chatGlmDto.getMessageId(), output.toString());
            }
            if ("error".equals(type)) {
                log.info("{}: {}source {}", chatGlmDto.getMessageId(), data, JSON.toJSONString(chatGlmDto));
            }
            if (rp != null) {
                if ("\n".equals(data)) {
                    rp.getWriter().write("event:" + type + "\n");
                    rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n");
                    rp.getWriter().write("data:\n\n");
                    rp.getWriter().flush();
                } else {
                    String[] dataArr = data.split("\\n");
                    for (int i = 0; i < dataArr.length; i++) {
                        if (i == 0) {
                            rp.getWriter().write("event:" + type + "\n");
                            rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n");
                        }
                        if (i == dataArr.length - 1) {
                            rp.getWriter().write("data:" + dataArr[i] + "\n\n");
                            rp.getWriter().flush();
                        } else {
                            rp.getWriter().write("data:" + dataArr[i] + "\n");
                            rp.getWriter().flush();
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("消息错误[" + JSON.toJSONString(chatGlmDto) + "]", e);
            countDownLatch.countDown();
            throw new RuntimeException(e);
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void onClosed(final EventSource eventSource) {
        log.info("sse连接关闭:{}", chatGlmDto.getMessageId());
        log.info("结果输出:{}" + output.toString());
        countDownLatch.countDown();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void onFailure(final EventSource eventSource, final Throwable t, final Response response) {
        log.error("使用事件源时出现异常... [响应:{}]...", chatGlmDto.getMessageId());
        countDownLatch.countDown();
    }
    public CountDownLatch getCountDownLatch() {
        return this.countDownLatch;
    }
}

4. 相关工具类

获取token ApiTokenUtil类,这个根据自己的业务需求看是否需要,我这里为了程序能跑起来,就保留了

package com.demo.utils;
import com.alibaba.fastjson.JSON;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class ApiTokenUtil {
    public static String generateClientToken(String apikey) {
        String[] apiKeyParts = apikey.split("\\.");
        String api_key = apiKeyParts[0];
        String secret = apiKeyParts[1];
        Map<String, Object> header = new HashMap<>();
        header.put("alg", SignatureAlgorithm.HS256);
        header.put("sign_type", "SIGN");
        Map<String, Object> payload = new HashMap<>();
        payload.put("api_key", api_key);
        payload.put("exp", System.currentTimeMillis() + 5 * 600 * 1000);
        payload.put("timestamp", System.currentTimeMillis());
        String token = null;
        try {
            token = Jwts.builder().setHeader(header)
                    .setPayload(JSON.toJSONString(payload))
                    .signWith(SignatureAlgorithm.HS256, secret.getBytes(StandardCharsets.UTF_8))
                    .compact();
        } catch (Exception e) {
            System.out.println();
        }
        return token;
    }
}

ExecuteSSEUtil 类

package com.demo.utils;
import com.demo.listener.SSEListener;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSources;
@Slf4j
public class ExecuteSSEUtil {
    public static void executeSSE(String url, String authToken, SSEListener eventSourceListener, String chatGlm) throws Exception {
        RequestBody formBody = RequestBody.create(chatGlm, MediaType.parse("application/json; charset=utf-8"));
        Request.Builder requestBuilder = new Request.Builder();
        requestBuilder.addHeader("Authorization", authToken);
        Request request = requestBuilder.url(url).post(formBody).build();
        EventSource.Factory factory = EventSources.createFactory(OkHttpUtil.getInstance());
        //创建事件
        factory.newEventSource(request, eventSourceListener);
        eventSourceListener.getCountDownLatch().await();
    }
}

OkHttpUtil 类

package com.demo.utils;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import java.net.Proxy;
import java.util.concurrent.TimeUnit;
public class OkHttpUtil {
    private static OkHttpClient okHttpClient;
    public static ConnectionPool connectionPool = new ConnectionPool(10, 5, TimeUnit.MINUTES);
    public static OkHttpClient getInstance() {
        if (okHttpClient == null) { //加同步安全
            synchronized (OkHttpClient.class) {
                if (okHttpClient == null) { //okhttp可以缓存数据....指定缓存路径
                    okHttpClient = new OkHttpClient.Builder()//构建器
                            .proxy(Proxy.NO_PROXY) //来屏蔽系统代理
                            .connectionPool(connectionPool)
                            .connectTimeout(600, TimeUnit.SECONDS)//连接超时
                            .writeTimeout(600, TimeUnit.SECONDS)//写入超时
                            .readTimeout(600, TimeUnit.SECONDS)//读取超时
                            .build();
                    okHttpClient.dispatcher().setMaxRequestsPerHost(200);
                    okHttpClient.dispatcher().setMaxRequests(200);
                }
            }
        }
        return okHttpClient;
    }
}

ChatGlmDto 请求实体类

package com.demo.params.req;
import lombok.Data;
/**
 * Created by WeiRan on  2023.03.20 19:19
 */
@Data
public class ChatGlmDto {
    private String messageId;
    private Object prompt;
    private String requestTaskNo;
    private boolean incremental = true;
    private boolean notSensitive = true;
}

5. 接口调用调试

我这里就直接使用curl命令来调用了

curl 'http://localhost:8080/sse-invoke' --data '{"prompt":[{"role":"user","content":"泰山有多高?"}]}' -H 'Content-Type: application/json'

返回结果:

分割线---------------------------------------------------------------------------------------------------------------------------------

创作不易,三连支持一下吧 👍

最后送大家一句话白驹过隙,沧海桑田

相关文章
|
28天前
|
设计模式 IDE API
C# 一分钟浅谈:GraphQL 客户端调用
本文介绍了如何在C#中调用GraphQL API,涵盖基本步骤、常见问题及解决方案。首先,通过安装`GraphQL.Client`库并创建客户端实例,连接到GraphQL服务器。接着,展示了如何编写查询和突变,以及处理查询语法错误、变量类型不匹配等常见问题。最后,通过具体案例(如管理用户和订单)演示了如何在实际项目中应用这些技术,帮助开发者更高效地利用GraphQL。
70 38
C# 一分钟浅谈:GraphQL 客户端调用
|
1月前
|
Java UED Spring
Springboot通过SSE实现实时消息返回
通过Spring Boot实现SSE,可以简单高效地将实时消息推送给客户端。虽然SSE有其限制,但对于许多实时消息推送场景而言,它提供了一种简洁而强大的解决方案。在实际开发中,根据具体需求选择合适的技术,可以提高系统的性能和用户体验。希望本文能帮助你深入理解Spring Boot中SSE的实现和应用。
128 1
|
1月前
|
XML Java Maven
WebService客户端调用的5种常见方式
本文介绍了在Java中创建和调用WebService的方法,包括服务端的搭建、配置类的添加以及客户端的多种调用方式(如使用JDK原生代码、wsimport命令、动态调用、代理工厂及HttpClient)。文中详细展示了每种方法的实现步骤和示例代码,强调了服务端与客户端参数实体类字段的兼容性,并推荐使用代理工厂方式进行调用。
102 0
WebService客户端调用的5种常见方式
|
2月前
|
机器学习/深度学习 移动开发 前端开发
解密 SSE,像 ChatGPT 一样返回流式响应
解密 SSE,像 ChatGPT 一样返回流式响应
147 1
|
3月前
|
Python
8. 如何解决 Tornado 检测到了有事件(events)被发送到一个已经关闭的流(stream)。在 Tornado 中,一个流代表一个请求或响应的数据流。这个警告可能意味着在请求处理的过程中,
8. 如何解决 Tornado 检测到了有事件(events)被发送到一个已经关闭的流(stream)。在 Tornado 中,一个流代表一个请求或响应的数据流。这个警告可能意味着在请求处理的过程中,
|
Android开发
【OkHttp】OkHttp Get 和 Post 请求 ( 同步 Get 请求 | 异步 Get 请求 | 同步 Post 请求 | 异步 Post 请求 )(一)
【OkHttp】OkHttp Get 和 Post 请求 ( 同步 Get 请求 | 异步 Get 请求 | 同步 Post 请求 | 异步 Post 请求 )(一)
1095 0
|
6月前
|
C++
gRPC 四模式之 服务器端流RPC模式
gRPC 四模式之 服务器端流RPC模式
141 0
|
6月前
|
安全 C++
gRPC 四模式之 客户端流RPC模式
gRPC 四模式之 客户端流RPC模式
67 0
|
7月前
gRPC三种流和消息格式(一)
gRPC三种流和消息格式
258 0
|
网络协议 算法 Java
gRPC 客户端调用服务端需要连接池吗?
gRPC 客户端调用服务端需要连接池吗?