一、 引言:从被动响应到主动执行的范式革命
传统AI系统主要扮演被动响应的角色,而AI智能体则能够主动感知环境、制定计划并执行复杂任务序列。这种范式转变带来了全新的能力:
任务分解:将复杂目标拆解为可执行步骤
工具使用:调用外部API、数据库和系统服务
自主决策:基于上下文动态调整策略
持续学习:从执行结果中积累经验
Java在企业级系统集成方面的优势使其成为构建生产级AI智能体的理想平台。本文将基于ReAct框架、工具调用模式和记忆系统,演示如何构建能够处理现实世界复杂任务的Java智能体系统。
二、 智能体系统架构设计
- 核心架构组件
text
感知层 → 推理引擎 → 行动执行 → 记忆系统
↓ ↓ ↓ ↓
环境观察 → 任务规划 → 工具调用 → 经验存储
↓ ↓ ↓ ↓
状态监控 → 策略决策 → 结果验证 → 学习优化
- 技术栈选择
推理引擎:Spring AI + 大语言模型
工具框架:自定义工具调用框架
记忆系统:Redis + 向量数据库
任务编排:Spring State Machine
监控观测:Micrometer + 结构化日志
- 项目依赖配置
xml
1.0.0-M5
3.2.0
org.springframework.boot
spring-boot-starter-web
${spring-boot.version}
<!-- Spring AI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<!-- 状态机 -->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-starter</artifactId>
<version>3.2.0</version>
</dependency>
<!-- 向量存储 -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-redis-vector-store</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<!-- 工具集成 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
三、 智能体核心引擎实现
- 智能体基础架构
java
// IntelligentAgent.java
@Component
@Slf4j
public class IntelligentAgent {
private final ReasoningEngine reasoningEngine;
private final ToolExecutor toolExecutor;
private final MemorySystem memorySystem;
private final StateMachine<AgentState, AgentEvent> stateMachine;
private final ObservationSystem observationSystem;
public IntelligentAgent(ReasoningEngine reasoningEngine,
ToolExecutor toolExecutor,
MemorySystem memorySystem,
StateMachine<AgentState, AgentEvent> stateMachine,
ObservationSystem observationSystem) {
this.reasoningEngine = reasoningEngine;
this.toolExecutor = toolExecutor;
this.memorySystem = memorySystem;
this.stateMachine = stateMachine;
this.observationSystem = observationSystem;
}
/**
* 执行复杂任务
*/
public AgentResponse executeTask(AgentTask task) {
log.info("开始执行任务: {}", task.getDescription());
try {
// 启动状态机
stateMachine.sendEvent(AgentEvent.START);
// 主执行循环
AgentContext context = new AgentContext(task);
int maxIterations = 10;
int iteration = 0;
while (!context.isTaskComplete() && iteration < maxIterations) {
log.info("执行第 {} 轮迭代", iteration + 1);
// 1. 观察环境状态
EnvironmentObservation observation = observationSystem.observe(context);
context.addObservation(observation);
// 2. 推理和规划
ActionPlan plan = reasoningEngine.planNextAction(context);
context.setCurrentPlan(plan);
// 3. 执行行动
ActionResult result = executeActionPlan(plan, context);
context.addActionResult(result);
// 4. 学习和记忆
memorySystem.recordExperience(context, result);
iteration++;
// 检查终止条件
if (result.isTerminal() || context.isGoalAchieved()) {
break;
}
}
// 完成状态
stateMachine.sendEvent(AgentEvent.COMPLETE);
return buildResponse(context, iteration);
} catch (Exception e) {
log.error("任务执行失败", e);
stateMachine.sendEvent(AgentEvent.ERROR);
return AgentResponse.failure(e.getMessage());
}
}
private ActionResult executeActionPlan(ActionPlan plan, AgentContext context) {
List<ActionStep> steps = plan.getSteps();
List<StepResult> stepResults = new ArrayList<>();
for (ActionStep step : steps) {
try {
log.info("执行步骤: {}", step.getDescription());
// 执行单个步骤
StepResult result = toolExecutor.executeTool(step.getToolName(),
step.getParameters());
stepResults.add(result);
// 更新上下文
context.addStepResult(step, result);
// 如果步骤失败,停止执行
if (!result.isSuccess()) {
log.warn("步骤执行失败: {}", result.getError());
break;
}
} catch (Exception e) {
log.error("步骤执行异常", e);
stepResults.add(StepResult.failure(e.getMessage()));
break;
}
}
return new ActionResult(stepResults, context);
}
private AgentResponse buildResponse(AgentContext context, int iterations) {
AgentResponse response = new AgentResponse();
response.setIterations(iterations);
response.setSuccess(context.isGoalAchieved());
response.setFinalResult(context.getFinalResult());
response.setExecutionTrace(context.getExecutionTrace());
response.setLessonsLearned(memorySystem.extractLessons(context));
return response;
}
/**
* 流式执行 - 用于长任务
*/
public Flux<AgentUpdate> executeTaskStreaming(AgentTask task) {
return Flux.create(sink -> {
try {
AgentContext context = new AgentContext(task);
sink.next(AgentUpdate.started(task));
int iteration = 0;
while (!context.isTaskComplete() && iteration < 15) {
// 推理
ActionPlan plan = reasoningEngine.planNextAction(context);
sink.next(AgentUpdate.planning(plan));
// 执行
ActionResult result = executeActionPlan(plan, context);
sink.next(AgentUpdate.action(result));
context.addActionResult(result);
iteration++;
if (result.isTerminal()) {
break;
}
}
sink.next(AgentUpdate.completed(context));
sink.complete();
} catch (Exception e) {
sink.error(e);
}
});
}
}
- ReAct推理引擎
java
// ReasoningEngine.java
@Component
@Slf4j
public class ReasoningEngine {
private final ChatClient chatClient;
private final ToolRegistry toolRegistry;
private final PromptTemplate reactPromptTemplate;
public ReasoningEngine(ChatClient chatClient, ToolRegistry toolRegistry) {
this.chatClient = chatClient;
this.toolRegistry = toolRegistry;
this.reactPromptTemplate = new ReactPromptTemplate();
}
public ActionPlan planNextAction(AgentContext context) {
// 构建ReAct提示
String prompt = buildReactPrompt(context);
// 调用LLM进行推理
String reasoning = chatClient.prompt(prompt)
.call()
.content();
// 解析响应
return parseReasoningResponse(reasoning, context);
}
private String buildReactPrompt(AgentContext context) {
StringBuilder prompt = new StringBuilder();
// 系统指令
prompt.append("你是一个智能助手,使用ReAct框架解决问题。\n");
prompt.append("请按照以下格式响应:\n");
prompt.append("Thought: 你的思考过程\n");
prompt.append("Action: 工具名称(参数)\n");
prompt.append("或者\n");
prompt.append("Thought: 最终结论\n");
prompt.append("Final Answer: 最终答案\n\n");
// 可用工具
prompt.append("可用工具:\n");
for (ToolDefinition tool : toolRegistry.getAvailableTools()) {
prompt.append("- ").append(tool.getName())
.append(": ").append(tool.getDescription())
.append("\n");
}
// 任务历史
prompt.append("\n任务历史:\n");
for (AgentHistory history : context.getHistory()) {
prompt.append(history.format()).append("\n");
}
// 当前目标
prompt.append("\n当前目标: ").append(context.getCurrentGoal());
prompt.append("\n\n请继续:");
return prompt.toString();
}
private ActionPlan parseReasoningResponse(String response, AgentContext context) {
try {
// 解析Thought-Action模式
String[] lines = response.split("\n");
List<ActionStep> steps = new ArrayList<>();
String currentThought = "";
for (String line : lines) {
if (line.startsWith("Thought:")) {
currentThought = line.substring(8).trim();
} else if (line.startsWith("Action:")) {
ActionStep step = parseActionLine(line, currentThought);
if (step != null) {
steps.add(step);
}
} else if (line.startsWith("Final Answer:")) {
// 任务完成
context.setFinalResult(line.substring(13).trim());
break;
}
}
return new ActionPlan(steps, currentThought);
} catch (Exception e) {
log.error("解析推理响应失败", e);
return ActionPlan.fallbackPlan(context);
}
}
private ActionStep parseActionLine(String actionLine, String thought) {
try {
// 解析格式: Action: toolName(param1=value1, param2=value2)
String actionPart = actionLine.substring(7).trim();
int parenStart = actionPart.indexOf('(');
if (parenStart == -1) {
return null;
}
String toolName = actionPart.substring(0, parenStart).trim();
String paramsStr = actionPart.substring(parenStart + 1, actionPart.lastIndexOf(')'));
Map<String, Object> parameters = parseParameters(paramsStr);
return new ActionStep(toolName, parameters, thought);
} catch (Exception e) {
log.warn("解析行动步骤失败: {}", actionLine, e);
return null;
}
}
private Map<String, Object> parseParameters(String paramsStr) {
Map<String, Object> parameters = new HashMap<>();
if (paramsStr.trim().isEmpty()) {
return parameters;
}
String[] pairs = paramsStr.split(",");
for (String pair : pairs) {
String[] keyValue = pair.split("=", 2);
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String value = keyValue[1].trim();
parameters.put(key, value);
}
}
return parameters;
}
/**
* 反思和学习
*/
public void reflectOnExperience(AgentContext context, ActionResult result) {
if (!result.isSuccess()) {
String reflectionPrompt = buildReflectionPrompt(context, result);
String reflection = chatClient.prompt(reflectionPrompt).call().content();
// 提取教训
String lesson = extractLesson(reflection);
context.addLearnedLesson(lesson);
}
}
private String buildReflectionPrompt(AgentContext context, ActionResult result) {
return String.format(
"任务执行失败,请分析原因并提出改进建议:\n" +
"任务:%s\n" +
"执行步骤:%s\n" +
"错误:%s\n" +
"请指出问题所在和后续应该如何避免:",
context.getTask().getDescription(),
context.getExecutionTrace(),
result.getError()
);
}
private String extractLesson(String reflection) {
// 简化提取逻辑
return reflection.length() > 200 ?
reflection.substring(0, 200) + "..." : reflection;
}
}
四、 工具调用框架
- 工具注册与执行
java
// ToolExecutor.java
@Component
@Slf4j
public class ToolExecutor {
private final Map<String, Tool> toolRegistry;
private final ToolValidationService validationService;
public ToolExecutor(List<Tool> tools, ToolValidationService validationService) {
this.validationService = validationService;
this.toolRegistry = new HashMap<>();
// 注册所有工具
for (Tool tool : tools) {
toolRegistry.put(tool.getName(), tool);
log.info("注册工具: {}", tool.getName());
}
}
public StepResult executeTool(String toolName, Map<String, Object> parameters) {
try {
Tool tool = toolRegistry.get(toolName);
if (tool == null) {
return StepResult.failure("未知工具: " + toolName);
}
// 验证参数
ValidationResult validation = validationService.validateParameters(tool, parameters);
if (!validation.isValid()) {
return StepResult.failure("参数验证失败: " + validation.getError());
}
// 执行工具
long startTime = System.currentTimeMillis();
Object result = tool.execute(parameters);
long duration = System.currentTimeMillis() - startTime;
log.info("工具执行成功: {} ({}ms)", toolName, duration);
return StepResult.success(result, duration);
} catch (Exception e) {
log.error("工具执行异常: {}", toolName, e);
return StepResult.failure(e.getMessage());
}
}
public List<ToolDefinition> getAvailableTools() {
return toolRegistry.values().stream()
.map(Tool::getDefinition)
.collect(Collectors.toList());
}
}
// 工具接口
public interface Tool {
String getName();
String getDescription();
ToolDefinition getDefinition();
Object execute(Map parameters) throws Exception;
}
// 基础工具实现
@Component
@Slf4j
public class WebSearchTool implements Tool {
private final WebClient webClient;
public WebSearchTool(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("https://api.search.com").build();
}
@Override
public String getName() {
return "web_search";
}
@Override
public String getDescription() {
return "搜索网络信息,参数: query(搜索查询), max_results(最大结果数)";
}
@Override
public ToolDefinition getDefinition() {
return new ToolDefinition(getName(), getDescription(),
List.of(
new ParameterDefinition("query", "string", "搜索查询", true),
new ParameterDefinition("max_results", "number", "最大结果数", false)
));
}
@Override
public Object execute(Map<String, Object> parameters) throws Exception {
String query = (String) parameters.get("query");
Integer maxResults = (Integer) parameters.getOrDefault("max_results", 5);
Map<String, Object> request = Map.of(
"query", query,
"limit", maxResults
);
WebClient.ResponseSpec response = webClient.post()
.uri("/search")
.bodyValue(request)
.retrieve();
return response.bodyToMono(Map.class).block();
}
}
// 数据库查询工具
@Component
@Slf4j
public class DatabaseQueryTool implements Tool {
private final JdbcTemplate jdbcTemplate;
public DatabaseQueryTool(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public String getName() {
return "query_database";
}
@Override
public String getDescription() {
return "执行数据库查询,参数: sql(SQL查询语句)";
}
@Override
public ToolDefinition getDefinition() {
return new ToolDefinition(getName(), getDescription(),
List.of(new ParameterDefinition("sql", "string", "SQL查询语句", true)));
}
@Override
public Object execute(Map<String, Object> parameters) throws Exception {
String sql = (String) parameters.get("sql");
// 安全性检查 - 防止SQL注入
if (!isSafeQuery(sql)) {
throw new SecurityException("不安全的SQL查询");
}
return jdbcTemplate.queryForList(sql);
}
private boolean isSafeQuery(String sql) {
// 简化的安全检查
String lowerSql = sql.toLowerCase();
return !lowerSql.contains("delete") &&
!lowerSql.contains("update") &&
!lowerSql.contains("insert") &&
!lowerSql.contains("drop");
}
}
// 文件操作工具
@Component
@Slf4j
public class FileOperationTool implements Tool {
@Override
public String getName() {
return "file_operations";
}
@Override
public String getDescription() {
return "文件操作,参数: operation(操作类型), path(文件路径), content(内容-可选)";
}
@Override
public ToolDefinition getDefinition() {
return new ToolDefinition(getName(), getDescription(),
List.of(
new ParameterDefinition("operation", "string", "操作类型: read/write/list", true),
new ParameterDefinition("path", "string", "文件路径", true),
new ParameterDefinition("content", "string", "写入内容", false)
));
}
@Override
public Object execute(Map<String, Object> parameters) throws Exception {
String operation = (String) parameters.get("operation");
String path = (String) parameters.get("path");
switch (operation) {
case "read":
return readFile(path);
case "write":
String content = (String) parameters.get("content");
return writeFile(path, content);
case "list":
return listFiles(path);
default:
throw new IllegalArgumentException("不支持的操作: " + operation);
}
}
private String readFile(String path) throws IOException {
return Files.readString(Paths.get(path));
}
private Boolean writeFile(String path, String content) throws IOException {
Files.writeString(Paths.get(path), content);
return true;
}
private List<String> listFiles(String path) throws IOException {
return Files.list(Paths.get(path))
.map(p -> p.getFileName().toString())
.collect(Collectors.toList());
}
}
五、 记忆与学习系统
- 向量记忆存储
java
// VectorMemorySystem.java
@Component
@Slf4j
public class VectorMemorySystem {
private final VectorStore vectorStore;
private final EmbeddingModel embeddingModel;
private final RedisTemplate<String, String> redisTemplate;
public VectorMemorySystem(VectorStore vectorStore,
EmbeddingModel embeddingModel,
RedisTemplate<String, String> redisTemplate) {
this.vectorStore = vectorStore;
this.embeddingModel = embeddingModel;
this.redisTemplate = redisTemplate;
}
/**
* 存储经验记忆
*/
public void storeExperience(AgentExperience experience) {
try {
// 创建记忆文档
Document document = new Document(
experience.toTextForm(),
Map.of(
"task_type", experience.getTaskType(),
"success", experience.isSuccess(),
"timestamp", experience.getTimestamp(),
"lessons", experience.getLessonsLearned()
)
);
// 存储到向量数据库
vectorStore.add(List.of(document));
// 同时存储到Redis用于快速检索
String key = "agent:experience:" + experience.getSessionId();
redisTemplate.opsForValue().set(key, experience.toJson());
log.info("存储经验记忆: {}", experience.getSessionId());
} catch (Exception e) {
log.error("存储经验失败", e);
}
}
/**
* 检索相关经验
*/
public List<AgentExperience> retrieveRelevantExperiences(String currentTask, int maxResults) {
try {
// 使用当前任务作为查询
List<Document> similarDocs = vectorStore.similaritySearch(currentTask, maxResults);
return similarDocs.stream()
.map(this::documentToExperience)
.filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("检索经验失败", e);
return List.of();
}
}
/**
* 提取教训模式
*/
public List<String> extractPatterns(String taskType, int minOccurrences) {
// 从记忆中提取频繁出现的教训模式
String patternKey = "agent:patterns:" + taskType;
String patternsJson = redisTemplate.opsForValue().get(patternKey);
if (patternsJson != null) {
return parsePatterns(patternsJson);
}
// 如果没有缓存,从向量存储中分析
List<Document> allExperiences = vectorStore.similaritySearch(taskType, 100);
List<String> patterns = analyzePatterns(allExperiences, minOccurrences);
// 缓存结果
redisTemplate.opsForValue().set(patternKey, toJson(patterns));
return patterns;
}
private AgentExperience documentToExperience(Document doc) {
try {
Map<String, Object> metadata = doc.getMetadata();
return new AgentExperience(
(String) metadata.get("task_type"),
(Boolean) metadata.get("success"),
doc.getText(),
(List<String>) metadata.get("lessons")
);
} catch (Exception e) {
log.warn("转换文档到经验失败", e);
return null;
}
}
private List<String> analyzePatterns(List<Document> experiences, int minOccurrences) {
// 分析经验中的模式
Map<String, Integer> patternCounts = new HashMap<>();
for (Document doc : experiences) {
List<String> lessons = (List<String>) doc.getMetadata().get("lessons");
if (lessons != null) {
for (String lesson : lessons) {
patternCounts.merge(lesson, 1, Integer::sum);
}
}
}
return patternCounts.entrySet().stream()
.filter(entry -> entry.getValue() >= minOccurrences)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
private String toJson(List<String> patterns) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(patterns);
} catch (Exception e) {
return "[]";
}
}
private List<String> parsePatterns(String json) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, new TypeReference<List<String>>() {});
} catch (Exception e) {
return List.of();
}
}
}
- 状态管理与监控
java
// AgentStateManager.java
@Component
@Slf4j
public class AgentStateManager {
private final StateMachine<AgentState, AgentEvent> stateMachine;
private final MeterRegistry meterRegistry;
private final Counter successCounter;
private final Counter failureCounter;
private final Timer executionTimer;
public AgentStateManager(StateMachine<AgentState, AgentEvent> stateMachine,
MeterRegistry meterRegistry) {
this.stateMachine = stateMachine;
this.meterRegistry = meterRegistry;
// 初始化监控指标
this.successCounter = Counter.builder("agent.tasks.completed")
.tag("status", "success")
.register(meterRegistry);
this.failureCounter = Counter.builder("agent.tasks.completed")
.tag("status", "failure")
.register(meterRegistry);
this.executionTimer = Timer.builder("agent.task.duration")
.register(meterRegistry);
}
@EventListener
public void onStateChange(StateMachineEvent<AgentState, AgentEvent> event) {
AgentState source = event.getSource();
AgentState target = event.getTarget();
log.info("智能体状态变更: {} -> {}", source, target);
// 记录状态转移指标
meterRegistry.counter("agent.state.transitions",
"from", source.name(),
"to", target.name())
.increment();
// 处理完成状态
if (target == AgentState.COMPLETED) {
successCounter.increment();
} else if (target == AgentState.ERROR) {
failureCounter.increment();
}
}
public void recordTaskMetrics(long duration, int iterations, boolean success) {
executionTimer.record(duration, TimeUnit.MILLISECONDS);
meterRegistry.gauge("agent.task.iterations",
Tags.of("success", String.valueOf(success)),
iterations);
}
}
// 状态机配置
@Configuration
@EnableStateMachine
public class AgentStateMachineConfig extends StateMachineConfigurerAdapter {
@Override
public void configure(StateMachineStateConfigurer<AgentState, AgentEvent> states)
throws Exception {
states
.withStates()
.initial(AgentState.IDLE)
.state(AgentState.PLANNING)
.state(AgentState.EXECUTING)
.state(AgentState.LEARNING)
.state(AgentState.COMPLETED)
.state(AgentState.ERROR);
}
@Override
public void configure(StateMachineTransitionConfigurer<AgentState, AgentEvent> transitions)
throws Exception {
transitions
.withExternal()
.source(AgentState.IDLE).target(AgentState.PLANNING)
.event(AgentEvent.START)
.and()
.withExternal()
.source(AgentState.PLANNING).target(AgentState.EXECUTING)
.event(AgentEvent.PLAN_READY)
.and()
.withExternal()
.source(AgentState.EXECUTING).target(AgentState.LEARNING)
.event(AgentEvent.ACTION_COMPLETE)
.and()
.withExternal()
.source(AgentState.LEARNING).target(AgentState.COMPLETED)
.event(AgentEvent.COMPLETE)
.and()
.withExternal()
.source(AgentState.LEARNING).target(AgentState.PLANNING)
.event(AgentEvent.CONTINUE)
.and()
.withExternal()
.source(AgentState.ERROR).target(AgentState.IDLE)
.event(AgentEvent.RESET);
}
}
// 状态枚举
public enum AgentState {
IDLE, PLANNING, EXECUTING, LEARNING, COMPLETED, ERROR
}
public enum AgentEvent {
START, PLAN_READY, ACTION_COMPLETE, COMPLETE, CONTINUE, ERROR, RESET
}
六、 智能体编排与API
- 智能体编排服务
java
// AgentOrchestrationService.java
@Service
@Slf4j
public class AgentOrchestrationService {
private final IntelligentAgent primaryAgent;
private final Map<String, SpecializedAgent> specializedAgents;
private final TaskRouter taskRouter;
public AgentOrchestrationService(IntelligentAgent primaryAgent,
List<SpecializedAgent> agents,
TaskRouter taskRouter) {
this.primaryAgent = primaryAgent;
this.taskRouter = taskRouter;
this.specializedAgents = new HashMap<>();
// 注册专业智能体
for (SpecializedAgent agent : agents) {
specializedAgents.put(agent.getSpecialization(), agent);
log.info("注册专业智能体: {}", agent.getSpecialization());
}
}
/**
* 路由任务到合适的智能体
*/
public AgentResponse routeAndExecute(AgentTask task) {
String agentType = taskRouter.determineBestAgent(task);
log.info("任务路由到: {}", agentType);
if ("primary".equals(agentType)) {
return primaryAgent.executeTask(task);
} else {
SpecializedAgent agent = specializedAgents.get(agentType);
if (agent != null) {
return agent.executeTask(task);
} else {
log.warn("未找到专业智能体: {}, 使用主智能体", agentType);
return primaryAgent.executeTask(task);
}
}
}
/**
* 多智能体协作
*/
public AgentResponse collaborativeExecution(ComplexTask complexTask) {
List<Subtask> subtasks = complexTask.decompose();
List<CompletableFuture<AgentResponse>> futures = new ArrayList<>();
// 并行执行子任务
for (Subtask subtask : subtasks) {
CompletableFuture<AgentResponse> future = CompletableFuture.supplyAsync(() -> {
String agentType = taskRouter.determineBestAgent(subtask.toAgentTask());
if (specializedAgents.containsKey(agentType)) {
return specializedAgents.get(agentType).executeTask(subtask.toAgentTask());
} else {
return primaryAgent.executeTask(subtask.toAgentTask());
}
});
futures.add(future);
}
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
try {
allFutures.get(300, TimeUnit.SECONDS); // 5分钟超时
// 收集结果
List<AgentResponse> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 整合结果
return integrateResults(results, complexTask);
} catch (Exception e) {
log.error("协作执行失败", e);
return AgentResponse.failure("协作执行超时或失败");
}
}
private AgentResponse integrateResults(List<AgentResponse> results, ComplexTask task) {
AgentResponse integrated = new AgentResponse();
integrated.setSuccess(results.stream().allMatch(AgentResponse::isSuccess));
// 整合子任务结果
Map<String, Object> integratedResult = new HashMap<>();
for (int i = 0; i < results.size(); i++) {
integratedResult.put("subtask_" + i, results.get(i).getFinalResult());
}
integrated.setFinalResult(integratedResult);
return integrated;
}
}
// 专业智能体示例
@Component
@Slf4j
public class DataAnalysisAgent implements SpecializedAgent {
private final IntelligentAgent baseAgent;
private final List<Tool> dataTools;
public DataAnalysisAgent(IntelligentAgent baseAgent,
@Qualifier("dataTools") List<Tool> dataTools) {
this.baseAgent = baseAgent;
this.dataTools = dataTools;
}
@Override
public String getSpecialization() {
return "data_analysis";
}
@Override
public AgentResponse executeTask(AgentTask task) {
// 数据分智能体有专门的工具和提示
log.info("数据分智能体执行任务: {}", task.getDescription());
// 可以在这里添加数据分特有的预处理逻辑
AgentTask enhancedTask = enhanceTaskForDataAnalysis(task);
return baseAgent.executeTask(enhancedTask);
}
private AgentTask enhanceTaskForDataAnalysis(AgentTask original) {
// 为数据分任务添加特定上下文
Map<String, Object> enhancedContext = new HashMap<>(original.getContext());
enhancedContext.put("specialization", "data_analysis");
enhancedContext.put("preferred_tools",
dataTools.stream().map(Tool::getName).collect(Collectors.toList()));
return new AgentTask(
original.getDescription(),
enhancedContext,
original.getPriority()
);
}
}
- REST API接口
java
// AgentController.java
@RestController
@RequestMapping("/api/agents")
@Slf4j
public class AgentController {
private final AgentOrchestrationService orchestrationService;
private final IntelligentAgent primaryAgent;
public AgentController(AgentOrchestrationService orchestrationService,
IntelligentAgent primaryAgent) {
this.orchestrationService = orchestrationService;
this.primaryAgent = primaryAgent;
}
@PostMapping("/execute")
public ResponseEntity<AgentResponse> executeTask(@RequestBody AgentTaskRequest request) {
try {
AgentTask task = request.toAgentTask();
AgentResponse response = orchestrationService.routeAndExecute(task);
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("任务执行失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(AgentResponse.failure(e.getMessage()));
}
}
@PostMapping("/execute/complex")
public ResponseEntity<AgentResponse> executeComplexTask(@RequestBody ComplexTaskRequest request) {
try {
AgentResponse response = orchestrationService.collaborativeExecution(request.toComplexTask());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("复杂任务执行失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(AgentResponse.failure(e.getMessage()));
}
}
@GetMapping(value = "/execute/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<AgentUpdate> executeTaskStreaming(@RequestParam String taskDescription,
@RequestParam Map<String, Object> context) {
AgentTask task = new AgentTask(taskDescription, context, 1);
return primaryAgent.executeTaskStreaming(task);
}
@GetMapping("/tools")
public ResponseEntity<List<ToolDefinition>> getAvailableTools() {
// 返回可用工具列表
return ResponseEntity.ok(List.of()); // 简化实现
}
@GetMapping("/health")
public ResponseEntity<Map<String, Object>> healthCheck() {
Map<String, Object> health = new HashMap<>();
health.put("status", "healthy");
health.put("timestamp", System.currentTimeMillis());
health.put("agents", List.of("primary", "data_analysis", "research"));
return ResponseEntity.ok(health);
}
// DTO类
@Data
public static class AgentTaskRequest {
private String description;
private Map<String, Object> context;
private Integer priority = 1;
public AgentTask toAgentTask() {
return new AgentTask(description, context != null ? context : Map.of(), priority);
}
}
@Data
public static class ComplexTaskRequest {
private String description;
private List<String> components;
private Map<String, Object> requirements;
public ComplexTask toComplexTask() {
return new ComplexTask(description, components, requirements);
}
}
}
七、 生产环境配置与优化
- 应用配置
yaml
application.yml
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4
temperature: 0.1
redis:
host: localhost
port: 6379
datasource:
url: jdbc:postgresql://localhost:5432/agentdb
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
agent:
execution:
max-iterations: 15
timeout-ms: 300000
enable-memory: true
tools:
web-search:
enabled: true
api-url: https://api.search.com
database:
enabled: true
read-only: true
file-operations:
enabled: false # 生产环境谨慎开启
management:
endpoints:
web:
exposure:
include: health,metrics,info
endpoint:
health:
show-details: always
- 安全配置
java
// SecurityConfiguration.java
@Configuration
@EnableWebSecurity
public class SecurityConfiguration {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(authz -> authz
.requestMatchers("/api/agents/execute/**").authenticated()
.requestMatchers("/api/agents/tools").permitAll()
.requestMatchers("/api/agents/health").permitAll()
.anyRequest().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2.jwt(Customizer.withDefaults()));
return http.build();
}
@Bean
public ToolSecurityManager toolSecurityManager() {
return new ToolSecurityManager();
}
}
@Component
@Slf4j
public class ToolSecurityManager {
private final Set<String> dangerousTools = Set.of("file_operations", "system_command");
public boolean isToolAllowed(String toolName, Authentication authentication) {
// 检查工具权限
if (dangerousTools.contains(toolName)) {
return hasAdminRole(authentication);
}
return true;
}
public boolean validateParameters(String toolName, Map<String, Object> parameters) {
// 参数安全检查
switch (toolName) {
case "file_operations":
return validateFileOperation(parameters);
case "query_database":
return validateDatabaseQuery(parameters);
default:
return true;
}
}
private boolean validateFileOperation(Map<String, Object> parameters) {
String path = (String) parameters.get("path");
// 防止路径遍历攻击
return path != null && !path.contains("..") && path.startsWith("/allowed/path/");
}
private boolean validateDatabaseQuery(Map<String, Object> parameters) {
String sql = (String) parameters.get("sql");
// 防止SQL注入
return sql != null && !sql.toLowerCase().contains("delete")
&& !sql.toLowerCase().contains("update")
&& !sql.toLowerCase().contains("insert");
}
private boolean hasAdminRole(Authentication authentication) {
return authentication.getAuthorities().stream()
.anyMatch(auth -> "ROLE_ADMIN".equals(auth.getAuthority()));
}
}
八、 应用场景与总结
- 典型应用场景
自动化工作流:处理复杂的多步骤业务流程
数据分析报告:自动收集数据、分析趋势并生成报告
客户服务:处理复杂的客户咨询和问题解决
系统运维:监控系统状态并执行维护任务
研究辅助:收集信息、分析文献并生成综述
- 系统优势
自主性:能够独立完成复杂任务序列
适应性:根据环境反馈调整策略
可扩展性:易于添加新工具和专业智能体
安全性:内置权限控制和参数验证
可观测性:完整的执行追踪和监控
- 总结
通过本文的实践,我们成功构建了一个基于Java的AI智能体系统,具备以下核心能力:
推理规划:使用ReAct框架进行任务分解和规划
工具调用:安全地执行外部操作和系统交互
记忆学习:从经验中积累知识并改进策略
状态管理:可靠的任务执行和错误恢复
多智能体协作:复杂任务的并行处理和结果整合
这种架构将大语言模型的推理能力与Java系统的稳定性和企业级集成能力相结合,为构建真正自主的智能系统提供了坚实的技术基础。随着智能体技术的不断发展,这种基于Java的智能体架构将在企业自动化、智能决策和复杂系统管理等领域发挥越来越重要的作用。