第十一章 Autonomous Agents (自治智能体)
s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > [ s11 ] s12
“本专栏基于开源项目
learn-claude-code的官方文档。原文档非常硬核,为了方便像我一样的新手小白理解,我对文档进行了逐行精读,并加入了很多中文注释、大白话解释和踩坑记录。希望这套‘咀嚼版’教程能帮你推开 AI Agent 开发的大门。”
"队友自己看看板, 有活就认领" -- 不需要领导逐个分配, 自组织。
一、问题
s09-s10 中, 队友只在被明确指派时才动。领导得给每个队友写 prompt, 任务看板上 10 个未认领的任务得手动分配。这扩展不了。
真正的自治: 队友自己扫描任务看板, 认领没人做的任务, 做完再找下一个。
一个细节: 上下文压缩 (s06) 后智能体可能忘了自己是谁。身份重注入解决这个问题。
二、解决方案
Teammate lifecycle with idle cycle (队友生命周期 - 空闲循环):
+-------+
| spawn | <- 领导生成队友
+---+---+
|
v
+-------+ tool_use +-------+
| LLM | ----------------> | WORK | <- 工作阶段:执行工具调用
+-------+ (工具调用) +---+---+
|
| stop_reason != tool_use
| (LLM停止调用工具)
| 或调用 idle 工具
v
+--------+
| IDLE | <- 空闲阶段:主动寻找工作
+---+----+
|
| poll every 5s for up to 60s
| (每5秒轮询一次,最多60秒)
v
+-------------------------------+
| 1. check inbox (检查收件箱) |
| 有消息? -> 回到 WORK 阶段 |
+-------------------------------+
| 2. scan .tasks/ (扫描任务板) |
| 有未认领任务? -> 认领 -> WORK|
+-------------------------------+
| 3. 60s timeout (60秒超时) |
| 没有新工作 -> SHUTDOWN |
+-------------------------------+
|
v
+-----------+
| SHUTDOWN | <- 关闭队友,释放资源
+-----------+
上下文压缩后的身份重注入 (Identity re-injection after compression):
if len(messages) <= 3: # 如果消息历史 <= 3 条,说明可能发生了压缩
messages.insert(0, identity_block) # 在开头注入身份信息
三、工作原理
3.1 整体设计哲学:用文件系统模拟数据库
在 s11 中,任务板(Task Board)不是一个真正的数据库,而是一个文件目录。每个任务都是一个独立的 JSON 文件,存放在 .tasks/ 目录下。
为什么选择文件系统而不是数据库?
- 简单可靠:不需要额外的数据库服务,零依赖
- 持久化:任务数据直接保存在磁盘上,程序重启后依然存在
- 易于调试:可以直接用文本编辑器查看和修改任务
- 跨平台:文件系统是所有操作系统都支持的基础功能
核心设计思想:用文件代替数据库,用文件名代替索引,用目录代替表,用 glob 代替查询。
3.2 队友生命周期:WORK 与 IDLE 双阶段
s11 的队友生命周期分为两个核心阶段:
def _loop(self, name, role, prompt):
"""
队友的主循环函数,实现 WORK 和 IDLE 双阶段生命周期
参数:
name: 队友名称(如 "alice", "coder")
role: 队友角色(如 "backend", "frontend")
prompt: 初始任务提示
"""
while True: # 无限循环,直到队友被关闭
# ==================== WORK PHASE: 工作阶段 ====================
# 初始化消息列表,包含用户的初始任务提示
messages = [{
"role": "user", "content": prompt}]
# 最多执行 50 次工具调用,防止无限循环
for _ in range(50):
# 调用 LLM 获取响应
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=tools,
max_tokens=8000,
)
# 检查停止原因:如果不是工具调用,说明 LLM 完成了当前任务
if response.stop_reason != "tool_use":
break
# 执行工具调用...
if idle_requested: # 如果 LLM 主动调用 idle 工具
break
# ==================== IDLE PHASE: 空闲轮询阶段 ====================
# 设置状态为"空闲"
self._set_status(name, "idle")
# 进入空闲轮询,检查是否有新消息或新任务
resume = self._idle_poll(name, messages)
# 如果轮询超时(60秒内没有新工作),则关闭队友
if not resume:
self._set_status(name, "shutdown")
return # 退出循环,线程结束
# 恢复工作状态,进入下一轮 WORK 阶段
self._set_status(name, "working")
为什么需要两个阶段?
- WORK 阶段:执行实际工作,调用工具,处理任务
- IDLE 阶段:主动寻找新工作,而不是被动等待
关键参数:
range(50):每个 WORK 阶段最多执行 50 次工具调用,防止无限循环IDLE_TIMEOUT = 60:空闲超时时间,超过 60 秒没有新任务就关闭POLL_INTERVAL = 5:轮询间隔,每 5 秒检查一次
3.3 空闲轮询:主动找活的机制
def _idle_poll(self, name, messages):
"""
空闲轮询函数:在 IDLE 阶段主动寻找新工作
参数:
name: 队友名称
messages: 消息历史列表
返回:
True: 找到新工作,需要恢复 WORK 阶段
False: 超时,需要关闭队友
"""
# 计算轮询次数:60秒 / 5秒 = 12 次
# 例如:IDLE_TIMEOUT=60, POLL_INTERVAL=5,则轮询 12 次
for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):
# 等待 5 秒,避免过于频繁的轮询
time.sleep(POLL_INTERVAL)
# 第一步:检查收件箱(优先级最高)
inbox = BUS.read_inbox(name)
if inbox: # 如果有新消息
# 将消息注入到消息历史中
messages.append({
"role": "user",
"content": f"<inbox>{inbox}</inbox>"
})
return True # 返回 True,表示需要恢复工作
# 第二步:扫描任务板,寻找未认领的任务
unclaimed = scan_unclaimed_tasks()
if unclaimed: # 如果有未认领的任务
# 自动认领第一个任务(按 ID 排序)
claim_task(unclaimed[0]["id"], name)
# 将任务信息注入到消息历史中
messages.append({
"role": "user",
"content": f"<auto-claimed>Task #{unclaimed[0]['id']}: "
f"{unclaimed[0]['subject']}</auto-claimed>"
})
return True # 返回 True,表示需要恢复工作
# 第三步:超时关闭
return False # 60 秒内没有新工作,返回 False,触发关闭
轮询逻辑:
- 先检查收件箱:有消息就立即处理(优先级最高)
- 再扫描任务板:有未认领任务就自动认领
- 超时关闭:60 秒内没有新工作就关闭,节省资源
3.4 任务扫描:scan_unclaimed_tasks() 函数详解
def scan_unclaimed_tasks() -> list:
"""
扫描任务板,找出所有可认领的任务
返回:
list: 可认领任务的列表,每个任务是一个字典
可认领任务的三个条件:
1. status == "pending"(状态是"待处理")
2. owner 为空(还没有被任何人认领)
3. blockedBy 为空(没有被其他任务阻塞)
"""
# 确保任务目录存在,如果不存在则创建
# exist_ok=True 表示目录已存在时不会报错
TASKS_DIR.mkdir(exist_ok=True)
# 存储可认领任务的列表
unclaimed = []
# 遍历任务目录下的所有 task_*.json 文件
# sorted() 确保按文件名(即任务 ID)顺序处理,保证公平性
for f in sorted(TASKS_DIR.glob("task_*.json")):
# 读取并解析 JSON 文件
task = json.loads(f.read_text())
# 检查三个条件,必须同时满足
# 使用 get() 方法而不是 [],避免 KeyError 异常
if (task.get("status") == "pending" # 条件1:状态必须是"待处理"
and not task.get("owner") # 条件2:还没有被任何人认领
and not task.get("blockedBy")): # 条件3:没有被其他任务阻塞
unclaimed.append(task)
return unclaimed
3.4.1 为什么用 task_*.json 命名模式?
for f in sorted(TASKS_DIR.glob("task_*.json")):
设计原因:
- 统一前缀:
task_前缀让所有任务文件在目录中排列在一起,方便管理 - 通配符匹配:
*.json让我们可以用glob()一次性找到所有任务文件 - 有序遍历:
sorted()确保按文件名(即任务 ID)顺序处理,保证公平性
如果不用这种模式会怎样?
假设目录里还有 config.json、README.md 等文件,我们需要逐个判断哪些是任务文件,代码会变得复杂且容易出错。
3.4.2 为什么检查三个条件?
if (task.get("status") == "pending"
and not task.get("owner")
and not task.get("blockedBy")):
这是任务可认领的三个必要条件,缺一不可:
| 条件 | 检查什么 | 为什么重要 |
|---|---|---|
status == "pending" |
任务状态是"待处理" | 如果是 in_progress(进行中)或 completed(已完成),就不能再认领 |
not task.get("owner") |
任务还没有所有者 | 防止多个队友认领同一个任务 |
not task.get("blockedBy") |
任务没有被其他任务阻塞 | 如果有依赖,必须先完成依赖任务 |
为什么用 task.get() 而不是 task[]?
get()在键不存在时返回None,不会抛出异常- 这让代码更健壮,即使任务文件缺少某些字段也能正常工作
3.5 任务认领:claim_task() 函数详解
def claim_task(task_id: int, owner: str) -> str:
"""
认领指定 ID 的任务
参数:
task_id: 任务 ID(对应文件名 task_{task_id}.json)
owner: 认领者名称(队友名称)
返回:
str: 操作结果信息
线程安全:
使用 _claim_lock 确保同一时间只有一个线程能执行认领操作
"""
# 使用锁确保线程安全,防止多个队友同时认领同一个任务
with _claim_lock:
# 构建任务文件路径
path = TASKS_DIR / f"task_{task_id}.json"
# 检查任务文件是否存在
if not path.exists():
return f"Error: Task {task_id} not found"
# 读取并解析任务文件
task = json.loads(path.read_text())
# 设置任务所有者为当前队友
task["owner"] = owner
# 将任务状态从 "pending" 改为 "in_progress"
task["status"] = "in_progress"
# 将修改后的任务写回文件
# indent=2 使 JSON 格式化,便于阅读和调试
path.write_text(json.dumps(task, indent=2))
return f"Claimed task #{task_id} for {owner}"
3.5.1 为什么用 _claim_lock?
with _claim_lock:
设计原因:
- 并发安全:在 s11 中,多个队友(线程)可能同时运行
scan_unclaimed_tasks() - 竞态条件:如果没有锁,两个队友可能同时看到同一个未认领任务,都认为自己可以认领
- 锁的作用:确保同一时间只有一个线程能执行认领操作
举例说明竞态条件:
时间线:
T1: 队友A调用scan_unclaimed_tasks(),发现任务#5未认领
T2: 队友B调用scan_unclaimed_tasks(),也发现任务#5未认领
T3: 队友A调用claim_task(5, "A")
T4: 队友B调用claim_task(5, "B") # 错误!任务已经被A认领了
3.5.2 为什么同时修改 owner 和 status?
task["owner"] = owner
task["status"] = "in_progress"
设计原因:
- 原子性:虽然文件操作不是真正的原子操作,但这两个修改在逻辑上是一个整体
- 状态一致性:确保任务状态和所有者信息始终同步
in_progress而不是claimed:更准确地反映任务当前状态(正在进行中)
3.5.3 为什么用 indent=2?
path.write_text(json.dumps(task, indent=2))
设计原因:
- 可读性:缩进后的 JSON 更容易被人阅读和调试
- 版本控制友好:如果使用 Git,格式化后的 JSON 差异更清晰
- 标准实践:这是社区广泛接受的 JSON 格式化标准
3.6 身份重注入:防止代理"失忆"
# 检查消息历史长度,如果 <= 3 说明可能发生了上下文压缩
if len(messages) <= 3:
# 第一条消息:注入身份信息
# 使用 <identity> 标签包裹,便于 LLM 识别
messages.insert(0, {
"role": "user",
"content": f"<identity>You are '{name}', role: {role}, "
f"team: {team_name}. Continue your work.</identity>"
})
# 第二条消息:代理的确认响应
# 让代理确认自己的身份,形成完整的对话上下文
messages.insert(1, {
"role": "assistant",
"content": f"I am {name}. Continuing."
})
为什么需要身份重注入?
在 s06 中,我们学习了上下文压缩技术。当对话历史过长时,系统会自动压缩历史记录以节省 token。但压缩后,代理可能会忘记自己的身份(名字、角色、团队)。
身份重注入的触发条件:len(messages) <= 3
这意味着消息历史已经被压缩到只剩下 3 条消息(通常是:初始提示 + 系统响应 + 当前查询)。此时,代理可能已经"失忆",需要重新注入身份信息。
注入方式:
- 第一条消息:身份信息("你是谁")
- 第二条消息:代理的确认响应("我知道我是谁了")
这样,即使上下文被压缩,代理也能快速恢复身份认知。
四、相对 s10 的变更
| 组件 | 之前 (s10) | 之后 (s11) |
|---|---|---|
| Tools | 12 | 14 (+idle, +claim_task) |
| 自治性 | 领导指派 | 自组织 |
| 空闲阶段 | 无 | 轮询收件箱 + 任务看板 |
| 任务认领 | 仅手动 | 自动认领未分配任务 |
| 身份 | 系统提示 | + 压缩后重注入 |
| 超时 | 无 | 60 秒空闲 -> 自动关机 |
核心改进总结
s11 的核心价值是赋予队友真正的自主性。通过引入空闲循环和任务板扫描,队友从"被动等待指令"转变为"主动寻找工作",大大提高了团队的并行效率和可扩展性。
设计精髓:
- 扫描与认领分离:先发现机会,再采取行动
- 公平性:
sorted()确保按任务 ID 顺序处理,避免某些任务被饿死 - 简单高效:不需要复杂的调度算法,文件系统就是最好的"数据库"
- 线程安全:使用锁机制防止并发冲突
- 身份保持:上下文压缩后自动重注入身份信息
五、试一试
cd learn-claude-code
python agents/s11_autonomous_agents.py
试试这些 prompt (英文 prompt 对 LLM 效果更好, 也可以用中文):
Create 3 tasks on the board, then spawn alice and bob. Watch them auto-claim.Spawn a coder teammate and let it find work from the task board itselfCreate tasks with dependencies. Watch teammates respect the blocked order.- 输入
/tasks查看带 owner 的任务看板 - 输入
/team监控谁在工作、谁在空闲
观察重点:
- 队友进入 IDLE 状态后,是否会自动扫描任务板?
- 多个队友是否会竞争同一个任务?
- 有依赖关系的任务是否会被正确处理?
- 身份重注入是否在上下文压缩后生效?
七、完整代码
#!/usr/bin/env python3
import json
import os
import subprocess
import threading
import time
import uuid
from pathlib import Path
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TEAM_DIR = WORKDIR / ".team"
INBOX_DIR = TEAM_DIR / "inbox"
TASKS_DIR = WORKDIR / ".tasks"
POLL_INTERVAL = 5
IDLE_TIMEOUT = 60
SYSTEM = f"You are a team lead at {WORKDIR}. Teammates are autonomous -- they find work themselves."
VALID_MSG_TYPES = {
"message",
"broadcast",
"shutdown_request",
"shutdown_response",
"plan_approval_response",
}
# -- Request trackers --
shutdown_requests = {
}
plan_requests = {
}
_tracker_lock = threading.Lock()
_claim_lock = threading.Lock()
# -- MessageBus: JSONL inbox per teammate --
class MessageBus:
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("")
return messages
def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
BUS = MessageBus(INBOX_DIR)
# -- Task board scanning --
def scan_unclaimed_tasks() -> list:
TASKS_DIR.mkdir(exist_ok=True)
unclaimed = []
for f in sorted(TASKS_DIR.glob("task_*.json")):
task = json.loads(f.read_text())
if (task.get("status") == "pending"
and not task.get("owner")
and not task.get("blockedBy")):
unclaimed.append(task)
return unclaimed
def claim_task(task_id: int, owner: str) -> str:
with _claim_lock:
path = TASKS_DIR / f"task_{task_id}.json"
if not path.exists():
return f"Error: Task {task_id} not found"
task = json.loads(path.read_text())
task["owner"] = owner
task["status"] = "in_progress"
path.write_text(json.dumps(task, indent=2))
return f"Claimed task #{task_id} for {owner}"
# -- Identity re-injection after compression --
def make_identity_block(name: str, role: str, team_name: str) -> dict:
return {
"role": "user",
"content": f"<identity>You are '{name}', role: {role}, team: {team_name}. Continue your work.</identity>",
}
# -- Autonomous TeammateManager --
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {
}
def _load_config(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
return {
"team_name": "default", "members": []}
def _save_config(self):
self.config_path.write_text(json.dumps(self.config, indent=2))
def _find_member(self, name: str) -> dict:
for m in self.config["members"]:
if m["name"] == name:
return m
return None
def _set_status(self, name: str, status: str):
member = self._find_member(name)
if member:
member["status"] = status
self._save_config()
def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {
"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"
def _loop(self, name: str, role: str, prompt: str):
team_name = self.config["team_name"]
sys_prompt = (
f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. "
f"Use idle tool when you have no more work. You will auto-claim new tasks."
)
messages = [{
"role": "user", "content": prompt}]
tools = self._teammate_tools()
while True:
# -- WORK PHASE: standard agent loop --
for _ in range(50):
inbox = BUS.read_inbox(name)
for msg in inbox:
if msg.get("type") == "shutdown_request":
self._set_status(name, "shutdown")
return
messages.append({
"role": "user", "content": json.dumps(msg)})
try:
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=tools,
max_tokens=8000,
)
except Exception:
self._set_status(name, "idle")
return
messages.append({
"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
idle_requested = False
for block in response.content:
if block.type == "tool_use":
if block.name == "idle":
idle_requested = True
output = "Entering idle phase. Will poll for new tasks."
else:
output = self._exec(name, block.name, block.input)
print(f" [{name}] {block.name}: {str(output)[:120]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({
"role": "user", "content": results})
if idle_requested:
break
# -- IDLE PHASE: poll for inbox messages and unclaimed tasks --
self._set_status(name, "idle")
resume = False
polls = IDLE_TIMEOUT // max(POLL_INTERVAL, 1)
for _ in range(polls):
time.sleep(POLL_INTERVAL)
inbox = BUS.read_inbox(name)
if inbox:
for msg in inbox:
if msg.get("type") == "shutdown_request":
self._set_status(name, "shutdown")
return
messages.append({
"role": "user", "content": json.dumps(msg)})
resume = True
break
unclaimed = scan_unclaimed_tasks()
if unclaimed:
task = unclaimed[0]
claim_task(task["id"], name)
task_prompt = (
f"<auto-claimed>Task #{task['id']}: {task['subject']}\n"
f"{task.get('description', '')}</auto-claimed>"
)
if len(messages) <= 3:
messages.insert(0, make_identity_block(name, role, team_name))
messages.insert(1, {
"role": "assistant", "content": f"I am {name}. Continuing."})
messages.append({
"role": "user", "content": task_prompt})
messages.append({
"role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."})
resume = True
break
if not resume:
self._set_status(name, "shutdown")
return
self._set_status(name, "working")
def _exec(self, sender: str, tool_name: str, args: dict) -> str:
# these base tools are unchanged from s02
if tool_name == "bash":
return _run_bash(args["command"])
if tool_name == "read_file":
return _run_read(args["path"])
if tool_name == "write_file":
return _run_write(args["path"], args["content"])
if tool_name == "edit_file":
return _run_edit(args["path"], args["old_text"], args["new_text"])
if tool_name == "send_message":
return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
if tool_name == "read_inbox":
return json.dumps(BUS.read_inbox(sender), indent=2)
if tool_name == "shutdown_response":
req_id = args["request_id"]
with _tracker_lock:
if req_id in shutdown_requests:
shutdown_requests[req_id]["status"] = "approved" if args["approve"] else "rejected"
BUS.send(
sender, "lead", args.get("reason", ""),
"shutdown_response", {
"request_id": req_id, "approve": args["approve"]},
)
return f"Shutdown {'approved' if args['approve'] else 'rejected'}"
if tool_name == "plan_approval":
plan_text = args.get("plan", "")
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
plan_requests[req_id] = {
"from": sender, "plan": plan_text, "status": "pending"}
BUS.send(
sender, "lead", plan_text, "plan_approval_response",
{
"request_id": req_id, "plan": plan_text},
)
return f"Plan submitted (request_id={req_id}). Waiting for approval."
if tool_name == "claim_task":
return claim_task(args["task_id"], sender)
return f"Unknown tool: {tool_name}"
def _teammate_tools(self) -> list:
# these base tools are unchanged from s02
return [
{
"name": "bash", "description": "Run a shell command.",
"input_schema": {
"type": "object", "properties": {
"command": {
"type": "string"}}, "required": ["command"]}},
{
"name": "read_file", "description": "Read file contents.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}}, "required": ["path"]}},
{
"name": "write_file", "description": "Write content to file.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "content": {
"type": "string"}}, "required": ["path", "content"]}},
{
"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "old_text": {
"type": "string"}, "new_text": {
"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{
"name": "send_message", "description": "Send message to a teammate.",
"input_schema": {
"type": "object", "properties": {
"to": {
"type": "string"}, "content": {
"type": "string"}, "msg_type": {
"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{
"name": "read_inbox", "description": "Read and drain your inbox.",
"input_schema": {
"type": "object", "properties": {
}}},
{
"name": "shutdown_response", "description": "Respond to a shutdown request.",
"input_schema": {
"type": "object", "properties": {
"request_id": {
"type": "string"}, "approve": {
"type": "boolean"}, "reason": {
"type": "string"}}, "required": ["request_id", "approve"]}},
{
"name": "plan_approval", "description": "Submit a plan for lead approval.",
"input_schema": {
"type": "object", "properties": {
"plan": {
"type": "string"}}, "required": ["plan"]}},
{
"name": "idle", "description": "Signal that you have no more work. Enters idle polling phase.",
"input_schema": {
"type": "object", "properties": {
}}},
{
"name": "claim_task", "description": "Claim a task from the task board by ID.",
"input_schema": {
"type": "object", "properties": {
"task_id": {
"type": "integer"}}, "required": ["task_id"]}},
]
def list_all(self) -> str:
if not self.config["members"]:
return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)
def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]
TEAM = TeammateManager(TEAM_DIR)
# -- Base tool implementations (these base tools are unchanged from s02) --
def _safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def _run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def _run_read(path: str, limit: int = None) -> str:
try:
lines = _safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def _run_write(path: str, content: str) -> str:
try:
fp = _safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"
def _run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = _safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
# -- Lead-specific protocol handlers --
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
shutdown_requests[req_id] = {
"target": teammate, "status": "pending"}
BUS.send(
"lead", teammate, "Please shut down gracefully.",
"shutdown_request", {
"request_id": req_id},
)
return f"Shutdown request {req_id} sent to '{teammate}'"
def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
with _tracker_lock:
req = plan_requests.get(request_id)
if not req:
return f"Error: Unknown plan request_id '{request_id}'"
with _tracker_lock:
req["status"] = "approved" if approve else "rejected"
BUS.send(
"lead", req["from"], feedback, "plan_approval_response",
{
"request_id": request_id, "approve": approve, "feedback": feedback},
)
return f"Plan {req['status']} for '{req['from']}'"
def _check_shutdown_status(request_id: str) -> str:
with _tracker_lock:
return json.dumps(shutdown_requests.get(request_id, {
"error": "not found"}))
# -- Lead tool dispatch (14 tools) --
TOOL_HANDLERS = {
"bash": lambda **kw: _run_bash(kw["command"]),
"read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: _run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
"list_teammates": lambda **kw: TEAM.list_all(),
"send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
"read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
"broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
"shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
"shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")),
"plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")),
"idle": lambda **kw: "Lead does not idle.",
"claim_task": lambda **kw: claim_task(kw["task_id"], "lead"),
}
# these base tools are unchanged from s02
TOOLS = [
{
"name": "bash", "description": "Run a shell command.",
"input_schema": {
"type": "object", "properties": {
"command": {
"type": "string"}}, "required": ["command"]}},
{
"name": "read_file", "description": "Read file contents.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "limit": {
"type": "integer"}}, "required": ["path"]}},
{
"name": "write_file", "description": "Write content to file.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "content": {
"type": "string"}}, "required": ["path", "content"]}},
{
"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "old_text": {
"type": "string"}, "new_text": {
"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{
"name": "spawn_teammate", "description": "Spawn an autonomous teammate.",
"input_schema": {
"type": "object", "properties": {
"name": {
"type": "string"}, "role": {
"type": "string"}, "prompt": {
"type": "string"}}, "required": ["name", "role", "prompt"]}},
{
"name": "list_teammates", "description": "List all teammates.",
"input_schema": {
"type": "object", "properties": {
}}},
{
"name": "send_message", "description": "Send a message to a teammate.",
"input_schema": {
"type": "object", "properties": {
"to": {
"type": "string"}, "content": {
"type": "string"}, "msg_type": {
"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{
"name": "read_inbox", "description": "Read and drain the lead's inbox.",
"input_schema": {
"type": "object", "properties": {
}}},
{
"name": "broadcast", "description": "Send a message to all teammates.",
"input_schema": {
"type": "object", "properties": {
"content": {
"type": "string"}}, "required": ["content"]}},
{
"name": "shutdown_request", "description": "Request a teammate to shut down.",
"input_schema": {
"type": "object", "properties": {
"teammate": {
"type": "string"}}, "required": ["teammate"]}},
{
"name": "shutdown_response", "description": "Check shutdown request status.",
"input_schema": {
"type": "object", "properties": {
"request_id": {
"type": "string"}}, "required": ["request_id"]}},
{
"name": "plan_approval", "description": "Approve or reject a teammate's plan.",
"input_schema": {
"type": "object", "properties": {
"request_id": {
"type": "string"}, "approve": {
"type": "boolean"}, "feedback": {
"type": "string"}}, "required": ["request_id", "approve"]}},
{
"name": "idle", "description": "Enter idle state (for lead -- rarely used).",
"input_schema": {
"type": "object", "properties": {
}}},
{
"name": "claim_task", "description": "Claim a task from the board by ID.",
"input_schema": {
"type": "object", "properties": {
"task_id": {
"type": "integer"}}, "required": ["task_id"]}},
]
def agent_loop(messages: list):
while True:
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({
"role": "user",
"content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
})
messages.append({
"role": "assistant",
"content": "Noted inbox messages.",
})
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({
"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}: {str(output)[:200]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({
"role": "user", "content": results})
if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms11 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
if query.strip() == "/team":
print(TEAM.list_all())
continue
if query.strip() == "/inbox":
print(json.dumps(BUS.read_inbox("lead"), indent=2))
continue
if query.strip() == "/tasks":
TASKS_DIR.mkdir(exist_ok=True)
for f in sorted(TASKS_DIR.glob("task_*.json")):
t = json.loads(f.read_text())
marker = {
"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
owner = f" @{t['owner']}" if t.get("owner") else ""
print(f" {marker} #{t['id']}: {t['subject']}{owner}")
continue
history.append({
"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()