ai对话---多线程并发处理问题

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: ai对话---多线程并发处理问题

ai对话—多线程并发处理问题


先简单回顾一下旧版本的对话接口如何实现


其实这里更多是涉及到多线程工作上的学习问题


在初代版本中 我自己以为的搞了一个线程池就能完成多线程的任务了

Java
public ThreadPoolExecutor pool=new ThreadPoolExecutor(13,13,8,
TimeUnit.MINUTES,new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

@RequestMapping("/get")
public Result get(@RequestParam(“question”) String question,@RequestParam(“id”) int id) throws Exception {

Future f1 = pool.submit(new callable(question,id));
String answer =f1.get();
return Result.ok(answer);
}

工作的代码就不放了 这里是重点 重点是我的接口中 每一次任务的执行都是new一个新的线程出来 去执行任务 但并没有主动的写关闭线程的语句 这就导致了 线程很容易堆满 每次执行完应该释放一个线程 而且这里并没有加多对异常的处理 如果对端那边的ai卡住了 就没有办法得知发生了什么事情


于是这里就有了下面的重写了的语句

Java
public ThreadPoolExecutor pool = new ThreadPoolExecutor(13, 13, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(6),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
@RequestMapping("/get")
public DeferredResult get(@RequestParam(“question”) String question, @RequestParam(“id”) String id) {

//创建了一个DeferredResult对象,并将其返回给前端。在异步任务执行完毕后,
// 通过调用deferredResult.setResult(result)方法将结果设置到DeferredResult对象中,从而实现异步返回结果给前端。
DeferredResult deferredResult = new DeferredResult<>();
CompletableFuture.supplyAsync(() -> {
try {
callable callable = new callable(question, id);
String answer = callable.call();
Result result = Result.ok(answer);
System.out.println(“接口:”+answer);
return result;
} catch (Exception e) {
Result result = Result.fail(e.getMessage());
return result;
}
}, pool).whenComplete((result, throwable) -> {
if (throwable != null) {
result = Result.fail(throwable.getMessage());
deferredResult.setResult(result);
} else {
deferredResult.setResult(result);
}
});
return deferredResult;
}
public class callable implements Callable{
private String id;
private String question;
public callable(String question,String id) {
//这里处理一下userId的长度 因为讯飞那边限制了
if (id.length() >= 30) {
id= id.substring(0, 30);
}
this.question = question;
this.id=id;
}
@Override
public String call() throws Exception {
String answer =main(question,id);
//System.out.println(answer);
answer = JSONUtil.toJsonStr(answer);
botText.content="";//清空
return answer;
}
}


再来讲这个和ai之间的对话的接口原理


实际上在每个main函数当中会构建一个WebSocket的服务区跟他进行对话 而当 每一个对话结束 实际上是没有把话说完的 是要进行n次回复 ai说的话才能被拼接好 这个过程就跟 ai一次性说完有比较大的区别 在于他的WebSocket每次都要新建这样的一个对象出来 来和对端的ai进行对话 并且要“等”ai说完


所以这里就遇到了几个问题:


  1. 主线程没办法精确的知道副线程当中 进行到什么地步了 容易没把话说完就回复给客户端了
  2. 如果进行了线程复用的话 很可能会串不同用户之间的对话历史记录
  3. 超时等待的时候 没有跳出 会直接让一个线程死在里面 如果并发线程量够大 足够造成死锁


下面就是解决的办法

Java
public String main(String newQuestion,String userid) throws Exception {
if(totalFlag){
totalFlag=false;
NewQuestion=newQuestion;
// 构建鉴权url
String authUrl = getAuthUrl(hostUrl, apiKey, apiSecret);
OkHttpClient client = new OkHttpClient.Builder().build();
String url = authUrl.toString().replace(“http://”, “ws://”).replace(“https://”, “wss://”);
Request request = new Request.Builder().url(url).build();
totalAnswer="";
//这里创建了大模型的新对象 实际上那些发送请求获取答案的操作都是在这个线程中做的
BigModelNew bigModelNew = null;
if (getHistory(userid)!=null){
bigModelNew=new BigModelNew(userid, false,getHistory(userid),stringRedisTemplate);
}
else {
bigModelNew=new BigModelNew(userid, false,historyList,stringRedisTemplate);
}
// 等待 WebSocket 的 run() 方法执行完毕
int maxWaitTime = 10000; // 最大等待时间,单位:毫秒
int currentWaitTime = 0; // 当前已等待的时间,单位:毫秒
int waitInterval = 1000;// 每次等待的时间间隔,单位:毫秒
WebSocket webSocket = client.newWebSocket(request, bigModelNew);
System.out.println(maxWaitTime);
while (currentWaitTime < maxWaitTime) {
if (bigModelNew.getBotContent().equals("")) {
// run() 方法还未执行完毕,可以进行一些其他操作或等待一段时间
Thread.sleep(waitInterval);
System.out.println(“正在执行线程”+Thread.currentThread().getName()+"…等待时间还剩:"+(maxWaitTime-currentWaitTime));
currentWaitTime += waitInterval;
} else {
return bigModelNew.getBotContent();
}
}
}
totalFlag=true;
return “网络开了点小差 试试重新发送你的消息吧”;
}


代码解释:


这是Spring提供的一种支持异步处理结果的类。在接口处理过程中,它会先返回一个空的DeferredResult对象给前端,然后在异步任务执行完毕后,通过调用deferredResult.setResult(result)方法将最终的结果设置到DeferredResult对象中,实现异步返回结果给前端。


在异步任务的实现中,使用CompletableFuture.supplyAsync()方法创建一个异步任务,并在其中执行具体的业务逻辑。这里使用了一个callable对象来处理问题和ID,并返回一个回答。


上方的代码解决了1和3 我们打印出来他的执行时间以及线程的名字 以便我们能够追踪到他


而超过了一定的时长 线程就会自动跳出 并且返回报错信息让用户重新发送


而线程2当中我们发现需要缓存历史记录 并且要对用户进行区分 所以在构造大模型对象的时候 我写了一个特殊的构造参数 (这里一定要记得 把Redis给注入进去 否则会爆空指针的错 大概原理是因为新的对象里面并没有被注入Redis 他作为一个新的Bean没有与这个Bean产生绑定的关系 这里涉及到Spring容器构成Bean的原理)

Java
// 构造函数
public BigModelNew(@org.springframework.beans.factory.annotation.Value("u s e r I d " ) S t r i n g u s e r I d < b r > ‘ ‘ , @ V a l u e ( " {userId}") String userId<br>` `,@Value("userId")StringuserId<br>‘‘,@Value("{wsCloseFlag}") Boolean wsCloseFlag
,@Value("H i s t o r y L i s t " ) L i s t < R o l e C o n t e n t > H i s t o r y L i s t < b r > ‘ ‘ , @ V a l u e ( " {HistoryList}")List<RoleContent> HistoryList<br>` `,@Value("HistoryList")List<RoleContent>HistoryList<br>‘‘,@Value("{stringRedisTemplate}") StringRedisTemplate stringRedisTemplate) {
this.userId = userId;
this.wsCloseFlag = wsCloseFlag;
this.historyList=HistoryList;
this.stringRedisTemplate = stringRedisTemplate;
}

而后 我们使用userId来进行区分 在每一个大模型对象当中 的静态变量中的userId给写死了,并且在初始化的时候 还要根据userId进行查询历史记录 如果有 就填充到其中的历史记录消息数组当中

Java
// 从 Redis 中获取对话历史
public List getHistory(String userId) {
String historyStr = stringRedisTemplate.opsForValue().get(“id:” + userId + “:history”);
if (historyStr==null){
return null;
}
return JSONUtil.toList(JSONUtil.parseArray(historyStr), RoleContent.class);
}

public String main(String newQuestion,String userid) throws Exception {
//…
//这里创建了大模型的新对象 实际上那些发送请求获取答案的操作都是在这个线程中做的
BigModelNew bigModelNew = null;
if (getHistory(userid)!=null){//这里进行了判断 这个用户有没有历史记录
bigModelNew=new BigModelNew(userid, false,getHistory(userid),stringRedisTemplate);
}
else {
bigModelNew=new BigModelNew(userid, false,historyList,stringRedisTemplate);
}
//…
}

这样我们就能异步的处理这些对话消息 并且把他们放在对应的缓存空间当中


这个是获取历史记录的方法

Java
@RequestMapping("/history")
public Result history(@RequestParam(“id”) String id) {

String history = stringRedisTemplate.opsForValue().get(“id:” + id + “:history”);
if (history == null) {
return Result.fail(“没有找到历史记录”);
}

JSONArray jsonObject = JSON.parseArray(history);
// String jsonString = JSON.toJSONString(jsonObject);
return Result.ok(jsonObject);
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
235 0
|
1月前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
198 59
|
1月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
41 6
|
1月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
58 6
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
4月前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
95 0
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
36 1
|
4月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
3月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。