ai对话—多线程并发处理问题
先简单回顾一下旧版本的对话接口如何实现
其实这里更多是涉及到多线程工作上的学习问题
在初代版本中 我自己以为的搞了一个线程池就能完成多线程的任务了
Java public ThreadPoolExecutor pool=new ThreadPoolExecutor(13,13,8, TimeUnit.MINUTES,new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); @RequestMapping("/get") public Result get(@RequestParam(“question”) String question,@RequestParam(“id”) int id) throws Exception { Future f1 = pool.submit(new callable(question,id)); String answer =f1.get(); return Result.ok(answer); }
工作的代码就不放了 这里是重点 重点是我的接口中 每一次任务的执行都是new一个新的线程出来 去执行任务 但并没有主动的写关闭线程的语句 这就导致了 线程很容易堆满 每次执行完应该释放一个线程 而且这里并没有加多对异常的处理 如果对端那边的ai卡住了 就没有办法得知发生了什么事情
于是这里就有了下面的重写了的语句
Java public ThreadPoolExecutor pool = new ThreadPoolExecutor(13, 13, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); @RequestMapping("/get") public DeferredResult get(@RequestParam(“question”) String question, @RequestParam(“id”) String id) { //创建了一个DeferredResult对象,并将其返回给前端。在异步任务执行完毕后, // 通过调用deferredResult.setResult(result)方法将结果设置到DeferredResult对象中,从而实现异步返回结果给前端。 DeferredResult deferredResult = new DeferredResult<>(); CompletableFuture.supplyAsync(() -> { try { callable callable = new callable(question, id); String answer = callable.call(); Result result = Result.ok(answer); System.out.println(“接口:”+answer); return result; } catch (Exception e) { Result result = Result.fail(e.getMessage()); return result; } }, pool).whenComplete((result, throwable) -> { if (throwable != null) { result = Result.fail(throwable.getMessage()); deferredResult.setResult(result); } else { deferredResult.setResult(result); } }); return deferredResult; } public class callable implements Callable{ private String id; private String question; public callable(String question,String id) { //这里处理一下userId的长度 因为讯飞那边限制了 if (id.length() >= 30) { id= id.substring(0, 30); } this.question = question; this.id=id; } @Override public String call() throws Exception { String answer =main(question,id); //System.out.println(answer); answer = JSONUtil.toJsonStr(answer); botText.content="";//清空 return answer; } }
再来讲这个和ai之间的对话的接口原理
实际上在每个main函数当中会构建一个WebSocket的服务区跟他进行对话 而当 每一个对话结束 实际上是没有把话说完的 是要进行n次回复 ai说的话才能被拼接好 这个过程就跟 ai一次性说完有比较大的区别 在于他的WebSocket每次都要新建这样的一个对象出来 来和对端的ai进行对话 并且要“等”ai说完
所以这里就遇到了几个问题:
- 主线程没办法精确的知道副线程当中 进行到什么地步了 容易没把话说完就回复给客户端了
- 如果进行了线程复用的话 很可能会串不同用户之间的对话历史记录
- 超时等待的时候 没有跳出 会直接让一个线程死在里面 如果并发线程量够大 足够造成死锁
下面就是解决的办法
Java public String main(String newQuestion,String userid) throws Exception { if(totalFlag){ totalFlag=false; NewQuestion=newQuestion; // 构建鉴权url String authUrl = getAuthUrl(hostUrl, apiKey, apiSecret); OkHttpClient client = new OkHttpClient.Builder().build(); String url = authUrl.toString().replace(“http://”, “ws://”).replace(“https://”, “wss://”); Request request = new Request.Builder().url(url).build(); totalAnswer=""; //这里创建了大模型的新对象 实际上那些发送请求获取答案的操作都是在这个线程中做的 BigModelNew bigModelNew = null; if (getHistory(userid)!=null){ bigModelNew=new BigModelNew(userid, false,getHistory(userid),stringRedisTemplate); } else { bigModelNew=new BigModelNew(userid, false,historyList,stringRedisTemplate); } // 等待 WebSocket 的 run() 方法执行完毕 int maxWaitTime = 10000; // 最大等待时间,单位:毫秒 int currentWaitTime = 0; // 当前已等待的时间,单位:毫秒 int waitInterval = 1000;// 每次等待的时间间隔,单位:毫秒 WebSocket webSocket = client.newWebSocket(request, bigModelNew); System.out.println(maxWaitTime); while (currentWaitTime < maxWaitTime) { if (bigModelNew.getBotContent().equals("")) { // run() 方法还未执行完毕,可以进行一些其他操作或等待一段时间 Thread.sleep(waitInterval); System.out.println(“正在执行线程”+Thread.currentThread().getName()+"…等待时间还剩:"+(maxWaitTime-currentWaitTime)); currentWaitTime += waitInterval; } else { return bigModelNew.getBotContent(); } } } totalFlag=true; return “网络开了点小差 试试重新发送你的消息吧”; }
代码解释:
这是Spring提供的一种支持异步处理结果的类。在接口处理过程中,它会先返回一个空的DeferredResult对象给前端,然后在异步任务执行完毕后,通过调用deferredResult.setResult(result)方法将最终的结果设置到DeferredResult对象中,实现异步返回结果给前端。
在异步任务的实现中,使用CompletableFuture.supplyAsync()方法创建一个异步任务,并在其中执行具体的业务逻辑。这里使用了一个callable对象来处理问题和ID,并返回一个回答。
上方的代码解决了1和3 我们打印出来他的执行时间以及线程的名字 以便我们能够追踪到他
而超过了一定的时长 线程就会自动跳出 并且返回报错信息让用户重新发送
而线程2当中我们发现需要缓存历史记录 并且要对用户进行区分 所以在构造大模型对象的时候 我写了一个特殊的构造参数 (这里一定要记得 把Redis给注入进去 否则会爆空指针的错 大概原理是因为新的对象里面并没有被注入Redis 他作为一个新的Bean没有与这个Bean产生绑定的关系 这里涉及到Spring容器构成Bean的原理)
Java // 构造函数 public BigModelNew(@org.springframework.beans.factory.annotation.Value("u s e r I d " ) S t r i n g u s e r I d < b r > ‘ ‘ , @ V a l u e ( " {userId}") String userId<br>` `,@Value("userId")StringuserId<br>‘‘,@Value("{wsCloseFlag}") Boolean wsCloseFlag ,@Value("H i s t o r y L i s t " ) L i s t < R o l e C o n t e n t > H i s t o r y L i s t < b r > ‘ ‘ , @ V a l u e ( " {HistoryList}")List<RoleContent> HistoryList<br>` `,@Value("HistoryList")List<RoleContent>HistoryList<br>‘‘,@Value("{stringRedisTemplate}") StringRedisTemplate stringRedisTemplate) { this.userId = userId; this.wsCloseFlag = wsCloseFlag; this.historyList=HistoryList; this.stringRedisTemplate = stringRedisTemplate; }
而后 我们使用userId来进行区分 在每一个大模型对象当中 的静态变量中的userId给写死了,并且在初始化的时候 还要根据userId进行查询历史记录 如果有 就填充到其中的历史记录消息数组当中
Java // 从 Redis 中获取对话历史 public List getHistory(String userId) { String historyStr = stringRedisTemplate.opsForValue().get(“id:” + userId + “:history”); if (historyStr==null){ return null; } return JSONUtil.toList(JSONUtil.parseArray(historyStr), RoleContent.class); } public String main(String newQuestion,String userid) throws Exception { //… //这里创建了大模型的新对象 实际上那些发送请求获取答案的操作都是在这个线程中做的 BigModelNew bigModelNew = null; if (getHistory(userid)!=null){//这里进行了判断 这个用户有没有历史记录 bigModelNew=new BigModelNew(userid, false,getHistory(userid),stringRedisTemplate); } else { bigModelNew=new BigModelNew(userid, false,historyList,stringRedisTemplate); } //… }
这样我们就能异步的处理这些对话消息 并且把他们放在对应的缓存空间当中
这个是获取历史记录的方法
Java @RequestMapping("/history") public Result history(@RequestParam(“id”) String id) { String history = stringRedisTemplate.opsForValue().get(“id:” + id + “:history”); if (history == null) { return Result.fail(“没有找到历史记录”); } JSONArray jsonObject = JSON.parseArray(history); // String jsonString = JSON.toJSONString(jsonObject); return Result.ok(jsonObject); }