AI Agent在投资领域比较火爆,代码助手的投资也比较多,单AI Agent,多角色AI Agent逐渐会成为AI应用的方向。最近一个10岁孩子直接使用豆包和两个ChatGPT进行愚公该移山还是该搬家的辩论活动吸引了大家的眼球,用户作为主持人角色指导2个GPT进行正反方辩友进行辩论和1个GPT作为评委进行评价,这个就是典型的多角色AI代理的应用场景,使用了3个GPT,其实一个也可以的,虽然这个活动最终没有达到预期,Chat GPT作为评委的代理,虽然没有评论,但都记录了它“听”到的内容。这个场景是直接使用GPT作为代理的例子,多角色AI Agent打开走向AGI的想象空间。下面以我比较熟悉的领域,尝试实现一个多角色AI代理编写代码的例子。涉及的角色为:
用户UserInterface:输入任务指令,并在任务无法完成时,进行指导,如:修改代码。
需求分析RequirementAnalyzer:针对用户输入的指令进行需求分析。
代码结构ProgramStructure:针对需求,提出合理的代码结构。
编写代码CodeGenerator:根据需求及代码结构和语言设置,编写代码。
代码审查CodeReviewer:针对生成的代码进行代码审查,提出改进意见。
代码执行CodeExecutor:执行审查完成的代码,输出测试结果。
协作交互Coordinator:协作交互逻辑,通过多次迭代完成任务。
本文默认使用的ollama版本的qwen2.5 7b模型。
完整代码:
import gradio as gr
import requests
import subprocess
import logging
import json
from io import StringIO
import threading
import time
配置日志记录
log_stream = StringIO()
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # 输出到控制台
logging.StreamHandler(log_stream) # 输出到 StringIO 流
]
)
定义RequirementAnalyzer角色
class RequirementAnalyzer:
def init(self, api_url, model):
self.api_url = api_url
self.model = model
def analyze_instruction(self, instruction):
messages = [
{"role": "system", "content": "你是一个有帮助的助手。"},
{"role": "user", "content": f"分析以下指令: {instruction}"}
]
response = requests.post(self.api_url, json={"messages": messages, "model": self.model, "stream": True}, stream=True)
analysis = self.process_stream(response)
logging.info(f"[RequirementAnalyzer] 分析的需求: {analysis}")
return analysis
def process_stream(self, response):
content = ""
for chunk in response.iter_content(chunk_size=None):
if chunk:
chunk_data = chunk.decode('utf-8')
if "done" in chunk_data:
chunk_data = chunk_data.split("data: ")[-1]
chunk_json = json.loads(chunk_data)
if chunk_json["done"]:
break
else:
content += chunk_json["message"]["content"]
return content
定义ProgramStructure角色
class ProgramStructure:
def init(self, api_url, model):
self.api_url = api_url
self.model = model
def determine_structure(self, requirement):
messages = [
{"role": "system", "content": "你是一个有帮助的助手。"},
{"role": "user", "content": f"确定以下需求的类结构: {requirement}"}
]
response = requests.post(self.api_url, json={"messages": messages, "model": self.model, "stream": True}, stream=True)
structure = self.process_stream(response)
logging.info(f"[ProgramStructure] 确定的结构: {structure}")
return structure
def process_stream(self, response):
content = ""
for chunk in response.iter_content(chunk_size=None):
if chunk:
chunk_data = chunk.decode('utf-8')
if "done" in chunk_data:
chunk_data = chunk_data.split("data: ")[-1]
chunk_json = json.loads(chunk_data)
if chunk_json["done"]:
break
else:
content += chunk_json["message"]["content"]
return content
定义CodeGenerator角色
class CodeGenerator:
def init(self, api_url, model):
self.api_url = api_url
self.model = model
def generate_code(self, requirement, structure, language):
messages = [
{"role": "system", "content": "你是一个有帮助的助手。"},
{"role": "user", "content": f"根据以下需求和结构编写{language}代码: {requirement}, {structure}"}
]
response = requests.post(self.api_url, json={"messages": messages, "model": self.model, "stream": True}, stream=True)
code = self.process_stream(response)
code = self.extract_code(code) # 提取代码部分
logging.info(f"[CodeGenerator] 生成的代码: {code}")
return code
def process_stream(self, response):
content = ""
for chunk in response.iter_content(chunk_size=None):
if chunk:
chunk_data = chunk.decode('utf-8')
if "done" in chunk_data:
chunk_data = chunk_data.split("data: ")[-1]
chunk_json = json.loads(chunk_data)
if chunk_json["done"]:
break
else:
content += chunk_json["message"]["content"]
return content
def extract_code(self, content):
# 假设代码部分被包裹在 ``` 代码块中,并去掉打头的语言标识
import re
match = re.search(r'```.*?\n(.*?)```', content, re.DOTALL)
if match:
return match.group(1).strip()
return content.strip()
定义CodeReviewer角色
class CodeReviewer:
def init(self, api_url, model):
self.api_url = api_url
self.model = model
def review_code(self, code, language):
messages = [
{"role": "system", "content": "你是一个有帮助的助手。"},
{"role": "user", "content": f"审查以下{language}代码并提出改进建议,并给出结论,如:整体来说,这段代码已经很完善: {code}"}
]
response = requests.post(self.api_url, json={"messages": messages, "model": self.model, "stream": True}, stream=True)
review = self.process_stream(response)
logging.info(f"[CodeReviewer] 代码审查: {review}")
return review
def process_stream(self, response):
content = ""
for chunk in response.iter_content(chunk_size=None):
if chunk:
chunk_data = chunk.decode('utf-8')
if "done" in chunk_data:
chunk_data = chunk_data.split("data: ")[-1]
chunk_json = json.loads(chunk_data)
if chunk_json["done"]:
break
else:
content += chunk_json["message"]["content"]
return content
定义CodeExecutor角色
class CodeExecutor:
def execute_code(self, code, language):
if language == "Python":
with open("temp.py", "w") as f:
f.write(code)
try:
result = subprocess.run(["python", "temp.py"], capture_output=True, text=True)
logging.info(f"[CodeExecutor] 执行结果: {result.stdout}")
return result.stdout
except Exception as e:
logging.error(f"[CodeExecutor] 执行错误: {e}")
return str(e)
elif language == "JavaScript":
with open("temp.js", "w") as f:
f.write(code)
try:
result = subprocess.run(["node", "temp.js"], capture_output=True, text=True)
logging.info(f"[CodeExecutor] 执行结果: {result.stdout}")
return result.stdout
except Exception as e:
logging.error(f"[CodeExecutor] 执行错误: {e}")
return str(e)
else:
logging.error(f"[CodeExecutor] 不支持的语言")
return "不支持的语言"
定义UserInterface角色
class UserInterface:
def init(self, coordinator):
self.coordinator = coordinator
self.log_output = None
def ai_code_assistant(self, instruction, language, max_attempts):
return self.coordinator.process_instruction(instruction, language, max_attempts)
def update_logs(self):
while True:
time.sleep(1)
if self.log_output:
self.log_output.update(value=log_stream.getvalue())
def launch(self):
with gr.Blocks() as iface:
gr.Markdown("# AI代码助手")
with gr.Row():
with gr.Column():
instruction = gr.Textbox(label="指令")
language = gr.Dropdown(["Python", "JavaScript"], label="语言")
max_attempts = gr.Number(label="最大交互次数", value=5)
submit_btn = gr.Button("提交")
with gr.Column():
with gr.Tabs():
with gr.TabItem("生成的代码"):
generated_code = gr.Textbox(label="生成的代码", lines=10)
with gr.TabItem("程序结构"):
program_structure = gr.Textbox(label="程序结构", lines=10)
with gr.TabItem("审查结果"):
review_result = gr.Textbox(label="审查结果", lines=10)
with gr.TabItem("执行结果"):
execution_result = gr.Textbox(label="执行结果", lines=10)
with gr.TabItem("日志输出"):
self.log_output = gr.Textbox(label="日志输出", lines=10)
with gr.TabItem("修改代码"):
modified_code = gr.Textbox(label="修改代码", lines=10)
modify_btn = gr.Button("提交修改")
submit_btn.click(
fn=self.ai_code_assistant,
inputs=[instruction, language, max_attempts],
outputs=[generated_code, program_structure, review_result, execution_result, self.log_output]
)
modify_btn.click(
fn=self.coordinator.process_modified_code,
inputs=[modified_code, language],
outputs=[generated_code, program_structure, review_result, execution_result, self.log_output]
)
gr.Examples(
examples=[
["计算一个数的阶乘", "Python", 5],
["对一个数字列表按升序排序", "JavaScript", 5]
],
inputs=[instruction, language, max_attempts],
outputs=[generated_code, program_structure, review_result, execution_result, self.log_output]
)
# 启动日志更新线程
threading.Thread(target=self.update_logs, daemon=True).start()
iface.launch()
定义Coordinator角色
class Coordinator:
def init(self, requirement_analyzer, program_structure, code_generator, code_reviewer, code_executor):
self.requirement_analyzer = requirement_analyzer
self.program_structure = program_structure
self.code_generator = code_generator
self.code_reviewer = code_reviewer
self.code_executor = code_executor
self.interaction_count = 0
self.max_attempts = 5
self.current_code = None
self.current_language = None
self.continue_after_max_attempts = False
def process_instruction(self, instruction, language, max_attempts):
self.max_attempts = max_attempts
self.current_language = language
self.continue_after_max_attempts = False
# 分析用户指令
requirement = self.requirement_analyzer.analyze_instruction(instruction)
# 确定程序结构
structure = self.program_structure.determine_structure(requirement)
# 循环生成和审查代码,直到审查通过或达到最大交互次数
while self.interaction_count < self.max_attempts or self.continue_after_max_attempts:
self.interaction_count += 1
logging.info(f"[Coordinator] 第 {self.interaction_count} 次尝试")
# 生成代码
code = self.code_generator.generate_code(requirement, structure, language)
self.current_code = code
# 审查代码
review = self.code_reviewer.review_code(code, language)
# 如果审查通过,跳出循环
if "代码已经很完善" in review:
break
# 否则,重新生成代码
requirement = f"修复以下代码: {code}"
# 如果达到最大交互次数,提示用户修改代码
if self.interaction_count >= self.max_attempts and not self.continue_after_max_attempts:
logging.info("[Coordinator] 已达到最大交互次数,任务结束。")
return code, structure, review, "已达到最大交互次数,任务结束。", self.get_logs()
# 执行代码
result = self.code_executor.execute_code(code, language)
# 检查执行结果,如果发现问题,驱动相关角色行动
if "Error" in result:
self.handle_error(code, result)
return code, structure, review, result, self.get_logs()
def handle_error(self, code, error_message):
# 这里可以实现错误处理逻辑,例如重新生成代码或提示用户修改代码
logging.error(f"[Coordinator] 检测到错误: {error_message}")
logging.info("[Coordinator] 驱动相关角色采取行动...")
# 提示用户修改代码
self.current_code = code
return code, "", "", f"检测到错误,请修改代码: {error_message}", self.get_logs()
def process_modified_code(self, modified_code, language):
self.current_code = modified_code
self.continue_after_max_attempts = True
# 执行修改后的代码
result = self.code_executor.execute_code(modified_code, language)
# 检查执行结果,如果发现问题,驱动相关角色行动
if "Error" in result:
self.handle_error(modified_code, result)
return modified_code, "", "", result, self.get_logs()
def get_logs(self):
# 获取日志输出
return log_stream.getvalue()
主程序
def main():
# 初始化API URL和模型
api_url = "http://localhost:11434/api/chat"
model = "qwen2.5"
# 创建角色实例
requirement_analyzer = RequirementAnalyzer(api_url, model)
program_structure = ProgramStructure(api_url, model)
code_generator = CodeGenerator(api_url, model)
code_reviewer = CodeReviewer(api_url, model)
code_executor = CodeExecutor()
coordinator = Coordinator(requirement_analyzer, program_structure, code_generator, code_reviewer, code_executor)
user_interface = UserInterface(coordinator)
# 启动用户界面
user_interface.launch()
if name == "main":
main()
首次发表地址:https://blog.csdn.net/wxl781227/article/details/143475737