【从零手写 ClaudeCode:learn-claude-code 项目实战笔记】(11)Autonomous Agents (自治智能体)

简介: 摘要:本文介绍了自治智能体(Autonomous Agents)的实现方案,重点解决智能体被动等待指令的问题。通过设计WORK和IDLE双阶段生命周期,智能体能够主动扫描任务板并认领未分配任务。系统采用文件系统模拟数据库,每个任务存储为独立的JSON文件。关键创新包括:空闲轮询机制、身份重注入技术(防止上下文压缩导致失忆)、以及线程安全的任务认领机制。

第十一章 Autonomous Agents (自治智能体)

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > [ s11 ] s12

“本专栏基于开源项目 learn-claude-code 的官方文档。原文档非常硬核,为了方便像我一样的新手小白理解,我对文档进行了逐行精读,并加入了很多中文注释、大白话解释和踩坑记录。希望这套‘咀嚼版’教程能帮你推开 AI Agent 开发的大门。”

项目地址:shareAI-lab/learn-claude-code: Bash is all you need - A nano Claude Code–like agent, built from 0 to 1

"队友自己看看板, 有活就认领" -- 不需要领导逐个分配, 自组织。

一、问题

s09-s10 中, 队友只在被明确指派时才动。领导得给每个队友写 prompt, 任务看板上 10 个未认领的任务得手动分配。这扩展不了。

真正的自治: 队友自己扫描任务看板, 认领没人做的任务, 做完再找下一个。

一个细节: 上下文压缩 (s06) 后智能体可能忘了自己是谁。身份重注入解决这个问题。

二、解决方案

Teammate lifecycle with idle cycle (队友生命周期 - 空闲循环):

        +-------+
        | spawn |  <- 领导生成队友
        +---+---+
            |
            v
    +-------+     tool_use      +-------+
    |  LLM  | ----------------> |  WORK |  <- 工作阶段:执行工具调用
    +-------+    (工具调用)       +---+---+
                                    |
                                    | stop_reason != tool_use
                                    | (LLM停止调用工具)
                                    | 或调用 idle 工具
                                    v
                                +--------+
                                |  IDLE  |  <- 空闲阶段:主动寻找工作
                                +---+----+
                                    |
                                    | poll every 5s for up to 60s
                                    | (每5秒轮询一次,最多60秒)
                                    v
                    +-------------------------------+
                    | 1. check inbox (检查收件箱)    |
                    |    有消息? -> 回到 WORK 阶段   |
                    +-------------------------------+
                    | 2. scan .tasks/ (扫描任务板)   |
                    |    有未认领任务? -> 认领 -> WORK|
                    +-------------------------------+
                    | 3. 60s timeout (60秒超时)      |
                    |    没有新工作 -> SHUTDOWN       |
                    +-------------------------------+
                                    |
                                    v
                                +-----------+
                                | SHUTDOWN  |  <- 关闭队友,释放资源
                                +-----------+

上下文压缩后的身份重注入 (Identity re-injection after compression):
  if len(messages) <= 3:  # 如果消息历史 <= 3 条,说明可能发生了压缩
    messages.insert(0, identity_block)  # 在开头注入身份信息

三、工作原理

3.1 整体设计哲学:用文件系统模拟数据库

在 s11 中,任务板(Task Board)不是一个真正的数据库,而是一个文件目录。每个任务都是一个独立的 JSON 文件,存放在 .tasks/ 目录下。

为什么选择文件系统而不是数据库?

  1. 简单可靠:不需要额外的数据库服务,零依赖
  2. 持久化:任务数据直接保存在磁盘上,程序重启后依然存在
  3. 易于调试:可以直接用文本编辑器查看和修改任务
  4. 跨平台:文件系统是所有操作系统都支持的基础功能

核心设计思想:用文件代替数据库,用文件名代替索引,用目录代替表,用 glob 代替查询。

3.2 队友生命周期:WORK 与 IDLE 双阶段

s11 的队友生命周期分为两个核心阶段:

def _loop(self, name, role, prompt):
    """
    队友的主循环函数,实现 WORK 和 IDLE 双阶段生命周期

    参数:
        name: 队友名称(如 "alice", "coder")
        role: 队友角色(如 "backend", "frontend")
        prompt: 初始任务提示
    """
    while True:  # 无限循环,直到队友被关闭
        # ==================== WORK PHASE: 工作阶段 ====================
        # 初始化消息列表,包含用户的初始任务提示
        messages = [{
   "role": "user", "content": prompt}]

        # 最多执行 50 次工具调用,防止无限循环
        for _ in range(50):
            # 调用 LLM 获取响应
            response = client.messages.create(
                model=MODEL,
                system=sys_prompt,
                messages=messages,
                tools=tools,
                max_tokens=8000,
            )

            # 检查停止原因:如果不是工具调用,说明 LLM 完成了当前任务
            if response.stop_reason != "tool_use":
                break

            # 执行工具调用...
            if idle_requested:  # 如果 LLM 主动调用 idle 工具
                break

        # ==================== IDLE PHASE: 空闲轮询阶段 ====================
        # 设置状态为"空闲"
        self._set_status(name, "idle")

        # 进入空闲轮询,检查是否有新消息或新任务
        resume = self._idle_poll(name, messages)

        # 如果轮询超时(60秒内没有新工作),则关闭队友
        if not resume:
            self._set_status(name, "shutdown")
            return  # 退出循环,线程结束

        # 恢复工作状态,进入下一轮 WORK 阶段
        self._set_status(name, "working")

为什么需要两个阶段?

  • WORK 阶段:执行实际工作,调用工具,处理任务
  • IDLE 阶段:主动寻找新工作,而不是被动等待

关键参数

  • range(50):每个 WORK 阶段最多执行 50 次工具调用,防止无限循环
  • IDLE_TIMEOUT = 60:空闲超时时间,超过 60 秒没有新任务就关闭
  • POLL_INTERVAL = 5:轮询间隔,每 5 秒检查一次

3.3 空闲轮询:主动找活的机制

def _idle_poll(self, name, messages):
    """
    空闲轮询函数:在 IDLE 阶段主动寻找新工作

    参数:
        name: 队友名称
        messages: 消息历史列表

    返回:
        True: 找到新工作,需要恢复 WORK 阶段
        False: 超时,需要关闭队友
    """
    # 计算轮询次数:60秒 / 5秒 = 12 次
    # 例如:IDLE_TIMEOUT=60, POLL_INTERVAL=5,则轮询 12 次
    for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):
        # 等待 5 秒,避免过于频繁的轮询
        time.sleep(POLL_INTERVAL)

        # 第一步:检查收件箱(优先级最高)
        inbox = BUS.read_inbox(name)
        if inbox:  # 如果有新消息
            # 将消息注入到消息历史中
            messages.append({
   
                "role": "user",
                "content": f"<inbox>{inbox}</inbox>"
            })
            return True  # 返回 True,表示需要恢复工作

        # 第二步:扫描任务板,寻找未认领的任务
        unclaimed = scan_unclaimed_tasks()
        if unclaimed:  # 如果有未认领的任务
            # 自动认领第一个任务(按 ID 排序)
            claim_task(unclaimed[0]["id"], name)
            # 将任务信息注入到消息历史中
            messages.append({
   
                "role": "user",
                "content": f"<auto-claimed>Task #{unclaimed[0]['id']}: "
                           f"{unclaimed[0]['subject']}</auto-claimed>"
            })
            return True  # 返回 True,表示需要恢复工作

    # 第三步:超时关闭
    return False  # 60 秒内没有新工作,返回 False,触发关闭

轮询逻辑

  1. 先检查收件箱:有消息就立即处理(优先级最高)
  2. 再扫描任务板:有未认领任务就自动认领
  3. 超时关闭:60 秒内没有新工作就关闭,节省资源

3.4 任务扫描:scan_unclaimed_tasks() 函数详解

def scan_unclaimed_tasks() -> list:
    """
    扫描任务板,找出所有可认领的任务

    返回:
        list: 可认领任务的列表,每个任务是一个字典

    可认领任务的三个条件:
        1. status == "pending"(状态是"待处理")
        2. owner 为空(还没有被任何人认领)
        3. blockedBy 为空(没有被其他任务阻塞)
    """
    # 确保任务目录存在,如果不存在则创建
    # exist_ok=True 表示目录已存在时不会报错
    TASKS_DIR.mkdir(exist_ok=True)

    # 存储可认领任务的列表
    unclaimed = []

    # 遍历任务目录下的所有 task_*.json 文件
    # sorted() 确保按文件名(即任务 ID)顺序处理,保证公平性
    for f in sorted(TASKS_DIR.glob("task_*.json")):
        # 读取并解析 JSON 文件
        task = json.loads(f.read_text())

        # 检查三个条件,必须同时满足
        # 使用 get() 方法而不是 [],避免 KeyError 异常
        if (task.get("status") == "pending"      # 条件1:状态必须是"待处理"
                and not task.get("owner")        # 条件2:还没有被任何人认领
                and not task.get("blockedBy")):  # 条件3:没有被其他任务阻塞
            unclaimed.append(task)

    return unclaimed

3.4.1 为什么用 task_*.json 命名模式?

for f in sorted(TASKS_DIR.glob("task_*.json")):

设计原因

  • 统一前缀task_ 前缀让所有任务文件在目录中排列在一起,方便管理
  • 通配符匹配*.json 让我们可以用 glob() 一次性找到所有任务文件
  • 有序遍历sorted() 确保按文件名(即任务 ID)顺序处理,保证公平性

如果不用这种模式会怎样?
假设目录里还有 config.jsonREADME.md 等文件,我们需要逐个判断哪些是任务文件,代码会变得复杂且容易出错。

3.4.2 为什么检查三个条件?

if (task.get("status") == "pending"
        and not task.get("owner")
        and not task.get("blockedBy")):

这是任务可认领的三个必要条件,缺一不可:

条件 检查什么 为什么重要
status == "pending" 任务状态是"待处理" 如果是 in_progress(进行中)或 completed(已完成),就不能再认领
not task.get("owner") 任务还没有所有者 防止多个队友认领同一个任务
not task.get("blockedBy") 任务没有被其他任务阻塞 如果有依赖,必须先完成依赖任务

为什么用 task.get() 而不是 task[]

  • get() 在键不存在时返回 None,不会抛出异常
  • 这让代码更健壮,即使任务文件缺少某些字段也能正常工作

3.5 任务认领:claim_task() 函数详解

def claim_task(task_id: int, owner: str) -> str:
    """
    认领指定 ID 的任务

    参数:
        task_id: 任务 ID(对应文件名 task_{task_id}.json)
        owner: 认领者名称(队友名称)

    返回:
        str: 操作结果信息

    线程安全:
        使用 _claim_lock 确保同一时间只有一个线程能执行认领操作
    """
    # 使用锁确保线程安全,防止多个队友同时认领同一个任务
    with _claim_lock:
        # 构建任务文件路径
        path = TASKS_DIR / f"task_{task_id}.json"

        # 检查任务文件是否存在
        if not path.exists():
            return f"Error: Task {task_id} not found"

        # 读取并解析任务文件
        task = json.loads(path.read_text())

        # 设置任务所有者为当前队友
        task["owner"] = owner

        # 将任务状态从 "pending" 改为 "in_progress"
        task["status"] = "in_progress"

        # 将修改后的任务写回文件
        # indent=2 使 JSON 格式化,便于阅读和调试
        path.write_text(json.dumps(task, indent=2))

    return f"Claimed task #{task_id} for {owner}"

3.5.1 为什么用 _claim_lock

with _claim_lock:

设计原因

  • 并发安全:在 s11 中,多个队友(线程)可能同时运行 scan_unclaimed_tasks()
  • 竞态条件:如果没有锁,两个队友可能同时看到同一个未认领任务,都认为自己可以认领
  • 锁的作用:确保同一时间只有一个线程能执行认领操作

举例说明竞态条件

时间线:
T1: 队友A调用scan_unclaimed_tasks(),发现任务#5未认领
T2: 队友B调用scan_unclaimed_tasks(),也发现任务#5未认领
T3: 队友A调用claim_task(5, "A")
T4: 队友B调用claim_task(5, "B")  # 错误!任务已经被A认领了

3.5.2 为什么同时修改 ownerstatus

task["owner"] = owner
task["status"] = "in_progress"

设计原因

  • 原子性:虽然文件操作不是真正的原子操作,但这两个修改在逻辑上是一个整体
  • 状态一致性:确保任务状态和所有者信息始终同步
  • in_progress 而不是 claimed:更准确地反映任务当前状态(正在进行中)

3.5.3 为什么用 indent=2

path.write_text(json.dumps(task, indent=2))

设计原因

  • 可读性:缩进后的 JSON 更容易被人阅读和调试
  • 版本控制友好:如果使用 Git,格式化后的 JSON 差异更清晰
  • 标准实践:这是社区广泛接受的 JSON 格式化标准

3.6 身份重注入:防止代理"失忆"

# 检查消息历史长度,如果 <= 3 说明可能发生了上下文压缩
if len(messages) <= 3:
    # 第一条消息:注入身份信息
    # 使用 <identity> 标签包裹,便于 LLM 识别
    messages.insert(0, {
   
        "role": "user",
        "content": f"<identity>You are '{name}', role: {role}, "
                   f"team: {team_name}. Continue your work.</identity>"
    })

    # 第二条消息:代理的确认响应
    # 让代理确认自己的身份,形成完整的对话上下文
    messages.insert(1, {
   
        "role": "assistant",
        "content": f"I am {name}. Continuing."
    })

为什么需要身份重注入?

在 s06 中,我们学习了上下文压缩技术。当对话历史过长时,系统会自动压缩历史记录以节省 token。但压缩后,代理可能会忘记自己的身份(名字、角色、团队)。

身份重注入的触发条件len(messages) <= 3

这意味着消息历史已经被压缩到只剩下 3 条消息(通常是:初始提示 + 系统响应 + 当前查询)。此时,代理可能已经"失忆",需要重新注入身份信息。

注入方式

  1. 第一条消息:身份信息("你是谁")
  2. 第二条消息:代理的确认响应("我知道我是谁了")

这样,即使上下文被压缩,代理也能快速恢复身份认知。

四、相对 s10 的变更

组件 之前 (s10) 之后 (s11)
Tools 12 14 (+idle, +claim_task)
自治性 领导指派 自组织
空闲阶段 轮询收件箱 + 任务看板
任务认领 仅手动 自动认领未分配任务
身份 系统提示 + 压缩后重注入
超时 60 秒空闲 -> 自动关机

核心改进总结

s11 的核心价值是赋予队友真正的自主性。通过引入空闲循环和任务板扫描,队友从"被动等待指令"转变为"主动寻找工作",大大提高了团队的并行效率和可扩展性。

设计精髓

  1. 扫描与认领分离:先发现机会,再采取行动
  2. 公平性sorted() 确保按任务 ID 顺序处理,避免某些任务被饿死
  3. 简单高效:不需要复杂的调度算法,文件系统就是最好的"数据库"
  4. 线程安全:使用锁机制防止并发冲突
  5. 身份保持:上下文压缩后自动重注入身份信息

五、试一试

cd learn-claude-code
python agents/s11_autonomous_agents.py

试试这些 prompt (英文 prompt 对 LLM 效果更好, 也可以用中文):

  1. Create 3 tasks on the board, then spawn alice and bob. Watch them auto-claim.
  2. Spawn a coder teammate and let it find work from the task board itself
  3. Create tasks with dependencies. Watch teammates respect the blocked order.
  4. 输入 /tasks 查看带 owner 的任务看板
  5. 输入 /team 监控谁在工作、谁在空闲

观察重点

  • 队友进入 IDLE 状态后,是否会自动扫描任务板?
  • 多个队友是否会竞争同一个任务?
  • 有依赖关系的任务是否会被正确处理?
  • 身份重注入是否在上下文压缩后生效?

七、完整代码

#!/usr/bin/env python3
import json
import os
import subprocess
import threading
import time
import uuid
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
    os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TEAM_DIR = WORKDIR / ".team"
INBOX_DIR = TEAM_DIR / "inbox"
TASKS_DIR = WORKDIR / ".tasks"

POLL_INTERVAL = 5
IDLE_TIMEOUT = 60

SYSTEM = f"You are a team lead at {WORKDIR}. Teammates are autonomous -- they find work themselves."

VALID_MSG_TYPES = {
   
    "message",
    "broadcast",
    "shutdown_request",
    "shutdown_response",
    "plan_approval_response",
}

# -- Request trackers --
shutdown_requests = {
   }
plan_requests = {
   }
_tracker_lock = threading.Lock()
_claim_lock = threading.Lock()


# -- MessageBus: JSONL inbox per teammate --
class MessageBus:
    def __init__(self, inbox_dir: Path):
        self.dir = inbox_dir
        self.dir.mkdir(parents=True, exist_ok=True)

    def send(self, sender: str, to: str, content: str,
             msg_type: str = "message", extra: dict = None) -> str:
        if msg_type not in VALID_MSG_TYPES:
            return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
        msg = {
   
            "type": msg_type,
            "from": sender,
            "content": content,
            "timestamp": time.time(),
        }
        if extra:
            msg.update(extra)
        inbox_path = self.dir / f"{to}.jsonl"
        with open(inbox_path, "a") as f:
            f.write(json.dumps(msg) + "\n")
        return f"Sent {msg_type} to {to}"

    def read_inbox(self, name: str) -> list:
        inbox_path = self.dir / f"{name}.jsonl"
        if not inbox_path.exists():
            return []
        messages = []
        for line in inbox_path.read_text().strip().splitlines():
            if line:
                messages.append(json.loads(line))
        inbox_path.write_text("")
        return messages

    def broadcast(self, sender: str, content: str, teammates: list) -> str:
        count = 0
        for name in teammates:
            if name != sender:
                self.send(sender, name, content, "broadcast")
                count += 1
        return f"Broadcast to {count} teammates"


BUS = MessageBus(INBOX_DIR)


# -- Task board scanning --
def scan_unclaimed_tasks() -> list:
    TASKS_DIR.mkdir(exist_ok=True)
    unclaimed = []
    for f in sorted(TASKS_DIR.glob("task_*.json")):
        task = json.loads(f.read_text())
        if (task.get("status") == "pending"
                and not task.get("owner")
                and not task.get("blockedBy")):
            unclaimed.append(task)
    return unclaimed


def claim_task(task_id: int, owner: str) -> str:
    with _claim_lock:
        path = TASKS_DIR / f"task_{task_id}.json"
        if not path.exists():
            return f"Error: Task {task_id} not found"
        task = json.loads(path.read_text())
        task["owner"] = owner
        task["status"] = "in_progress"
        path.write_text(json.dumps(task, indent=2))
    return f"Claimed task #{task_id} for {owner}"


# -- Identity re-injection after compression --
def make_identity_block(name: str, role: str, team_name: str) -> dict:
    return {
   
        "role": "user",
        "content": f"<identity>You are '{name}', role: {role}, team: {team_name}. Continue your work.</identity>",
    }


# -- Autonomous TeammateManager --
class TeammateManager:
    def __init__(self, team_dir: Path):
        self.dir = team_dir
        self.dir.mkdir(exist_ok=True)
        self.config_path = self.dir / "config.json"
        self.config = self._load_config()
        self.threads = {
   }

    def _load_config(self) -> dict:
        if self.config_path.exists():
            return json.loads(self.config_path.read_text())
        return {
   "team_name": "default", "members": []}

    def _save_config(self):
        self.config_path.write_text(json.dumps(self.config, indent=2))

    def _find_member(self, name: str) -> dict:
        for m in self.config["members"]:
            if m["name"] == name:
                return m
        return None

    def _set_status(self, name: str, status: str):
        member = self._find_member(name)
        if member:
            member["status"] = status
            self._save_config()

    def spawn(self, name: str, role: str, prompt: str) -> str:
        member = self._find_member(name)
        if member:
            if member["status"] not in ("idle", "shutdown"):
                return f"Error: '{name}' is currently {member['status']}"
            member["status"] = "working"
            member["role"] = role
        else:
            member = {
   "name": name, "role": role, "status": "working"}
            self.config["members"].append(member)
        self._save_config()
        thread = threading.Thread(
            target=self._loop,
            args=(name, role, prompt),
            daemon=True,
        )
        self.threads[name] = thread
        thread.start()
        return f"Spawned '{name}' (role: {role})"

    def _loop(self, name: str, role: str, prompt: str):
        team_name = self.config["team_name"]
        sys_prompt = (
            f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. "
            f"Use idle tool when you have no more work. You will auto-claim new tasks."
        )
        messages = [{
   "role": "user", "content": prompt}]
        tools = self._teammate_tools()

        while True:
            # -- WORK PHASE: standard agent loop --
            for _ in range(50):
                inbox = BUS.read_inbox(name)
                for msg in inbox:
                    if msg.get("type") == "shutdown_request":
                        self._set_status(name, "shutdown")
                        return
                    messages.append({
   "role": "user", "content": json.dumps(msg)})
                try:
                    response = client.messages.create(
                        model=MODEL,
                        system=sys_prompt,
                        messages=messages,
                        tools=tools,
                        max_tokens=8000,
                    )
                except Exception:
                    self._set_status(name, "idle")
                    return
                messages.append({
   "role": "assistant", "content": response.content})
                if response.stop_reason != "tool_use":
                    break
                results = []
                idle_requested = False
                for block in response.content:
                    if block.type == "tool_use":
                        if block.name == "idle":
                            idle_requested = True
                            output = "Entering idle phase. Will poll for new tasks."
                        else:
                            output = self._exec(name, block.name, block.input)
                        print(f"  [{name}] {block.name}: {str(output)[:120]}")
                        results.append({
   
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": str(output),
                        })
                messages.append({
   "role": "user", "content": results})
                if idle_requested:
                    break

            # -- IDLE PHASE: poll for inbox messages and unclaimed tasks --
            self._set_status(name, "idle")
            resume = False
            polls = IDLE_TIMEOUT // max(POLL_INTERVAL, 1)
            for _ in range(polls):
                time.sleep(POLL_INTERVAL)
                inbox = BUS.read_inbox(name)
                if inbox:
                    for msg in inbox:
                        if msg.get("type") == "shutdown_request":
                            self._set_status(name, "shutdown")
                            return
                        messages.append({
   "role": "user", "content": json.dumps(msg)})
                    resume = True
                    break
                unclaimed = scan_unclaimed_tasks()
                if unclaimed:
                    task = unclaimed[0]
                    claim_task(task["id"], name)
                    task_prompt = (
                        f"<auto-claimed>Task #{task['id']}: {task['subject']}\n"
                        f"{task.get('description', '')}</auto-claimed>"
                    )
                    if len(messages) <= 3:
                        messages.insert(0, make_identity_block(name, role, team_name))
                        messages.insert(1, {
   "role": "assistant", "content": f"I am {name}. Continuing."})
                    messages.append({
   "role": "user", "content": task_prompt})
                    messages.append({
   "role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."})
                    resume = True
                    break

            if not resume:
                self._set_status(name, "shutdown")
                return
            self._set_status(name, "working")

    def _exec(self, sender: str, tool_name: str, args: dict) -> str:
        # these base tools are unchanged from s02
        if tool_name == "bash":
            return _run_bash(args["command"])
        if tool_name == "read_file":
            return _run_read(args["path"])
        if tool_name == "write_file":
            return _run_write(args["path"], args["content"])
        if tool_name == "edit_file":
            return _run_edit(args["path"], args["old_text"], args["new_text"])
        if tool_name == "send_message":
            return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
        if tool_name == "read_inbox":
            return json.dumps(BUS.read_inbox(sender), indent=2)
        if tool_name == "shutdown_response":
            req_id = args["request_id"]
            with _tracker_lock:
                if req_id in shutdown_requests:
                    shutdown_requests[req_id]["status"] = "approved" if args["approve"] else "rejected"
            BUS.send(
                sender, "lead", args.get("reason", ""),
                "shutdown_response", {
   "request_id": req_id, "approve": args["approve"]},
            )
            return f"Shutdown {'approved' if args['approve'] else 'rejected'}"
        if tool_name == "plan_approval":
            plan_text = args.get("plan", "")
            req_id = str(uuid.uuid4())[:8]
            with _tracker_lock:
                plan_requests[req_id] = {
   "from": sender, "plan": plan_text, "status": "pending"}
            BUS.send(
                sender, "lead", plan_text, "plan_approval_response",
                {
   "request_id": req_id, "plan": plan_text},
            )
            return f"Plan submitted (request_id={req_id}). Waiting for approval."
        if tool_name == "claim_task":
            return claim_task(args["task_id"], sender)
        return f"Unknown tool: {tool_name}"

    def _teammate_tools(self) -> list:
        # these base tools are unchanged from s02
        return [
            {
   "name": "bash", "description": "Run a shell command.",
             "input_schema": {
   "type": "object", "properties": {
   "command": {
   "type": "string"}}, "required": ["command"]}},
            {
   "name": "read_file", "description": "Read file contents.",
             "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}}, "required": ["path"]}},
            {
   "name": "write_file", "description": "Write content to file.",
             "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "content": {
   "type": "string"}}, "required": ["path", "content"]}},
            {
   "name": "edit_file", "description": "Replace exact text in file.",
             "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "old_text": {
   "type": "string"}, "new_text": {
   "type": "string"}}, "required": ["path", "old_text", "new_text"]}},
            {
   "name": "send_message", "description": "Send message to a teammate.",
             "input_schema": {
   "type": "object", "properties": {
   "to": {
   "type": "string"}, "content": {
   "type": "string"}, "msg_type": {
   "type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
            {
   "name": "read_inbox", "description": "Read and drain your inbox.",
             "input_schema": {
   "type": "object", "properties": {
   }}},
            {
   "name": "shutdown_response", "description": "Respond to a shutdown request.",
             "input_schema": {
   "type": "object", "properties": {
   "request_id": {
   "type": "string"}, "approve": {
   "type": "boolean"}, "reason": {
   "type": "string"}}, "required": ["request_id", "approve"]}},
            {
   "name": "plan_approval", "description": "Submit a plan for lead approval.",
             "input_schema": {
   "type": "object", "properties": {
   "plan": {
   "type": "string"}}, "required": ["plan"]}},
            {
   "name": "idle", "description": "Signal that you have no more work. Enters idle polling phase.",
             "input_schema": {
   "type": "object", "properties": {
   }}},
            {
   "name": "claim_task", "description": "Claim a task from the task board by ID.",
             "input_schema": {
   "type": "object", "properties": {
   "task_id": {
   "type": "integer"}}, "required": ["task_id"]}},
        ]

    def list_all(self) -> str:
        if not self.config["members"]:
            return "No teammates."
        lines = [f"Team: {self.config['team_name']}"]
        for m in self.config["members"]:
            lines.append(f"  {m['name']} ({m['role']}): {m['status']}")
        return "\n".join(lines)

    def member_names(self) -> list:
        return [m["name"] for m in self.config["members"]]


TEAM = TeammateManager(TEAM_DIR)


# -- Base tool implementations (these base tools are unchanged from s02) --
def _safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path


def _run_bash(command: str) -> str:
    dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"]
    if any(d in command for d in dangerous):
        return "Error: Dangerous command blocked"
    try:
        r = subprocess.run(
            command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=120,
        )
        out = (r.stdout + r.stderr).strip()
        return out[:50000] if out else "(no output)"
    except subprocess.TimeoutExpired:
        return "Error: Timeout (120s)"


def _run_read(path: str, limit: int = None) -> str:
    try:
        lines = _safe_path(path).read_text().splitlines()
        if limit and limit < len(lines):
            lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
        return "\n".join(lines)[:50000]
    except Exception as e:
        return f"Error: {e}"


def _run_write(path: str, content: str) -> str:
    try:
        fp = _safe_path(path)
        fp.parent.mkdir(parents=True, exist_ok=True)
        fp.write_text(content)
        return f"Wrote {len(content)} bytes"
    except Exception as e:
        return f"Error: {e}"


def _run_edit(path: str, old_text: str, new_text: str) -> str:
    try:
        fp = _safe_path(path)
        c = fp.read_text()
        if old_text not in c:
            return f"Error: Text not found in {path}"
        fp.write_text(c.replace(old_text, new_text, 1))
        return f"Edited {path}"
    except Exception as e:
        return f"Error: {e}"


# -- Lead-specific protocol handlers --
def handle_shutdown_request(teammate: str) -> str:
    req_id = str(uuid.uuid4())[:8]
    with _tracker_lock:
        shutdown_requests[req_id] = {
   "target": teammate, "status": "pending"}
    BUS.send(
        "lead", teammate, "Please shut down gracefully.",
        "shutdown_request", {
   "request_id": req_id},
    )
    return f"Shutdown request {req_id} sent to '{teammate}'"


def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
    with _tracker_lock:
        req = plan_requests.get(request_id)
    if not req:
        return f"Error: Unknown plan request_id '{request_id}'"
    with _tracker_lock:
        req["status"] = "approved" if approve else "rejected"
    BUS.send(
        "lead", req["from"], feedback, "plan_approval_response",
        {
   "request_id": request_id, "approve": approve, "feedback": feedback},
    )
    return f"Plan {req['status']} for '{req['from']}'"


def _check_shutdown_status(request_id: str) -> str:
    with _tracker_lock:
        return json.dumps(shutdown_requests.get(request_id, {
   "error": "not found"}))


# -- Lead tool dispatch (14 tools) --
TOOL_HANDLERS = {
   
    "bash":              lambda **kw: _run_bash(kw["command"]),
    "read_file":         lambda **kw: _run_read(kw["path"], kw.get("limit")),
    "write_file":        lambda **kw: _run_write(kw["path"], kw["content"]),
    "edit_file":         lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]),
    "spawn_teammate":    lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
    "list_teammates":    lambda **kw: TEAM.list_all(),
    "send_message":      lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
    "read_inbox":        lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
    "broadcast":         lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
    "shutdown_request":  lambda **kw: handle_shutdown_request(kw["teammate"]),
    "shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")),
    "plan_approval":     lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")),
    "idle":              lambda **kw: "Lead does not idle.",
    "claim_task":        lambda **kw: claim_task(kw["task_id"], "lead"),
}

# these base tools are unchanged from s02
TOOLS = [
    {
   "name": "bash", "description": "Run a shell command.",
     "input_schema": {
   "type": "object", "properties": {
   "command": {
   "type": "string"}}, "required": ["command"]}},
    {
   "name": "read_file", "description": "Read file contents.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "limit": {
   "type": "integer"}}, "required": ["path"]}},
    {
   "name": "write_file", "description": "Write content to file.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "content": {
   "type": "string"}}, "required": ["path", "content"]}},
    {
   "name": "edit_file", "description": "Replace exact text in file.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "old_text": {
   "type": "string"}, "new_text": {
   "type": "string"}}, "required": ["path", "old_text", "new_text"]}},
    {
   "name": "spawn_teammate", "description": "Spawn an autonomous teammate.",
     "input_schema": {
   "type": "object", "properties": {
   "name": {
   "type": "string"}, "role": {
   "type": "string"}, "prompt": {
   "type": "string"}}, "required": ["name", "role", "prompt"]}},
    {
   "name": "list_teammates", "description": "List all teammates.",
     "input_schema": {
   "type": "object", "properties": {
   }}},
    {
   "name": "send_message", "description": "Send a message to a teammate.",
     "input_schema": {
   "type": "object", "properties": {
   "to": {
   "type": "string"}, "content": {
   "type": "string"}, "msg_type": {
   "type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
    {
   "name": "read_inbox", "description": "Read and drain the lead's inbox.",
     "input_schema": {
   "type": "object", "properties": {
   }}},
    {
   "name": "broadcast", "description": "Send a message to all teammates.",
     "input_schema": {
   "type": "object", "properties": {
   "content": {
   "type": "string"}}, "required": ["content"]}},
    {
   "name": "shutdown_request", "description": "Request a teammate to shut down.",
     "input_schema": {
   "type": "object", "properties": {
   "teammate": {
   "type": "string"}}, "required": ["teammate"]}},
    {
   "name": "shutdown_response", "description": "Check shutdown request status.",
     "input_schema": {
   "type": "object", "properties": {
   "request_id": {
   "type": "string"}}, "required": ["request_id"]}},
    {
   "name": "plan_approval", "description": "Approve or reject a teammate's plan.",
     "input_schema": {
   "type": "object", "properties": {
   "request_id": {
   "type": "string"}, "approve": {
   "type": "boolean"}, "feedback": {
   "type": "string"}}, "required": ["request_id", "approve"]}},
    {
   "name": "idle", "description": "Enter idle state (for lead -- rarely used).",
     "input_schema": {
   "type": "object", "properties": {
   }}},
    {
   "name": "claim_task", "description": "Claim a task from the board by ID.",
     "input_schema": {
   "type": "object", "properties": {
   "task_id": {
   "type": "integer"}}, "required": ["task_id"]}},
]


def agent_loop(messages: list):
    while True:
        inbox = BUS.read_inbox("lead")
        if inbox:
            messages.append({
   
                "role": "user",
                "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
            })
            messages.append({
   
                "role": "assistant",
                "content": "Noted inbox messages.",
            })
        response = client.messages.create(
            model=MODEL,
            system=SYSTEM,
            messages=messages,
            tools=TOOLS,
            max_tokens=8000,
        )
        messages.append({
   "role": "assistant", "content": response.content})
        if response.stop_reason != "tool_use":
            return
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                try:
                    output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
                except Exception as e:
                    output = f"Error: {e}"
                print(f"> {block.name}: {str(output)[:200]}")
                results.append({
   
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": str(output),
                })
        messages.append({
   "role": "user", "content": results})


if __name__ == "__main__":
    history = []
    while True:
        try:
            query = input("\033[36ms11 >> \033[0m")
        except (EOFError, KeyboardInterrupt):
            break
        if query.strip().lower() in ("q", "exit", ""):
            break
        if query.strip() == "/team":
            print(TEAM.list_all())
            continue
        if query.strip() == "/inbox":
            print(json.dumps(BUS.read_inbox("lead"), indent=2))
            continue
        if query.strip() == "/tasks":
            TASKS_DIR.mkdir(exist_ok=True)
            for f in sorted(TASKS_DIR.glob("task_*.json")):
                t = json.loads(f.read_text())
                marker = {
   "pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
                owner = f" @{t['owner']}" if t.get("owner") else ""
                print(f"  {marker} #{t['id']}: {t['subject']}{owner}")
            continue
        history.append({
   "role": "user", "content": query})
        agent_loop(history)
        response_content = history[-1]["content"]
        if isinstance(response_content, list):
            for block in response_content:
                if hasattr(block, "text"):
                    print(block.text)
        print()
目录
相关文章
|
3天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
10465 47
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
23天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
23631 121
|
9天前
|
人工智能 JavaScript API
解放双手!OpenClaw Agent Browser全攻略(阿里云+本地部署+免费API+网页自动化场景落地)
“让AI聊聊天、写代码不难,难的是让它自己打开网页、填表单、查数据”——2026年,无数OpenClaw用户被这个痛点困扰。参考文章直击核心:当AI只能“纸上谈兵”,无法实际操控浏览器,就永远成不了真正的“数字员工”。而Agent Browser技能的出现,彻底打破了这一壁垒——它给OpenClaw装上“上网的手和眼睛”,让AI能像真人一样打开网页、点击按钮、填写表单、提取数据,24小时不间断完成网页自动化任务。
2236 5

热门文章

最新文章