【从零手写 ClaudeCode:learn-claude-code 项目实战笔记】(12)Worktree + Task Isolation (Worktree 任务隔离)

简介: 本章引入 Git worktree 实现任务隔离:为每个任务分配独立目录与分支,通过任务 ID 双向绑定控制平面(`.tasks/`)与执行平面(`.worktrees/`),配合事件总线记录全生命周期,解决多任务并发修改冲突问题,提升可恢复性与可观测性。

第十二章 Worktree + Task Isolation (Worktree 任务隔离)

s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > [ s12 ]

“本专栏基于开源项目 learn-claude-code 的官方文档。原文档非常硬核,为了方便像我一样的新手小白理解,我对文档进行了逐行精读,并加入了很多中文注释、大白话解释和踩坑记录。希望这套‘咀嚼版’教程能帮你推开 AI Agent 开发的大门。”

项目地址:shareAI-lab/learn-claude-code: Bash is all you need - A nano Claude Code–like agent, built from 0 to 1

"各干各的目录, 互不干扰" -- 任务管目标, worktree 管目录, 按 ID 绑定。

一、问题

到 s11, 智能体已经能自主认领和完成任务。但所有任务共享一个目录。两个智能体同时重构不同模块 -- A 改 config.py, B 也改 config.py, 未提交的改动互相污染, 谁也没法干净回滚。

任务板管 "做什么" 但不管 "在哪做"。解法: 给每个任务一个独立的 git worktree 目录, 用任务 ID 把两边关联起来。

二、解决方案

双平面架构 (Dual-Plane Architecture):
任务管目标,worktree 管目录,按 ID 绑定

控制平面 (.tasks/)                    执行平面 (.worktrees/)
(管理目标: 做什么)                    (管理环境: 在哪做)

+--------------------+                +-------------------------+
| task_1.json        |                | auth-refactor/          |
|   id: 1            |                |   branch: wt/auth-refactor
|   status: in_progress |             |   task_id: 1            |
|   worktree: "auth-refactor" |       |   path: .worktrees/auth-refactor
+--------------------+                +-------------------------+
        ^           |                         ^
        |           |                         |
        |           +------- 任务 ID ---------+
        |           |      (绑定桥梁)
        |           v
+--------------------+                +-------------------------+
| task_2.json        |                | ui-login/               |
|   id: 2            |                |   branch: wt/ui-login   |
|   status: pending  |                |   task_id: 2            |
|   worktree: ""     |                |   path: .worktrees/ui-login
+--------------------+                +-------------------------+
        |                                       |
        v                                       v
+--------------------+                +-------------------------+
| index.json         |                | events.jsonl            |
| (worktree 注册表)   |                | (生命周期日志)           |
| - name: "auth-refactor" |            | - worktree.create.after |
| - status: "active" |                | - task.completed        |
+--------------------+                +-------------------------+


状态机 (State Machines):

任务状态机 (Task):
  pending (待处理)
      |
      | 绑定 worktree
      v
  in_progress (进行中)
      |
      | 删除 worktree (complete_task=True)
      v
  completed (已完成)


Worktree 状态机:
  absent (不存在)
      |
      | 创建 worktree
      v
  active (活跃) --------+--------+
      |                  |        |
      | 删除 worktree    |        | 保留 worktree
      v                  |        v
  removed (已删除)       |    kept (已保留)
                         |
                         | 运行命令
                         v
                      (继续工作)

三、工作原理

3.1 整体设计哲学:双平面架构

s12 引入了双平面架构,将任务管理和执行环境分离:

控制平面(Control Plane).tasks/ 目录

  • 管理任务的目标(做什么)
  • 存储任务状态、所有者、worktree 绑定关系
  • 使用 task_*.json 文件存储

执行平面(Execution Plane).worktrees/ 目录

  • 提供任务的执行环境(在哪做)
  • 每个 worktree 是一个独立的 Git 分支和目录
  • 使用 index.json 管理 worktree 索引

核心思想:用任务 ID 作为桥梁,连接控制平面和执行平面。

3.2 事件系统:生命周期可观测性

class EventBus:
    """
    事件总线:记录所有生命周期事件

    设计原因:
    1. 可观测性:所有操作都有记录,便于调试和审计
    2. 崩溃恢复:可以从事件流重建状态
    3. 审计追踪:谁在什么时候做了什么
    """
    def __init__(self, event_log_path: Path):
        self.path = event_log_path
        # 确保日志目录存在
        self.path.parent.mkdir(parents=True, exist_ok=True)
        # 如果日志文件不存在,创建空文件
        if not self.path.exists():
            self.path.write_text("")

    def emit(
        self,
        event: str,              # 事件名称(如 "worktree.create.after")
        task: dict | None = None,  # 相关任务信息
        worktree: dict | None = None,  # 相关 worktree 信息
        error: str | None = None,  # 错误信息(如果有)
    ):
        """
        发射一个事件,追加到日志文件

        使用 JSONL 格式(每行一个 JSON),便于流式处理
        """
        payload = {
   
            "event": event,
            "ts": time.time(),  # 时间戳
            "task": task or {
   },
            "worktree": worktree or {
   },
        }
        if error:
            payload["error"] = error

        # 追加到日志文件(append-only)
        with self.path.open("a", encoding="utf-8") as f:
            f.write(json.dumps(payload) + "\n")

    def list_recent(self, limit: int = 20) -> str:
        """
        获取最近的事件

        参数:
            limit: 返回的事件数量,默认 20 条
        """
        n = max(1, min(int(limit or 20), 200))  # 限制在 1-200 之间
        lines = self.path.read_text(encoding="utf-8").splitlines()
        recent = lines[-n:]  # 取最后 n 行
        items = []
        for line in recent:
            try:
                items.append(json.loads(line))
            except Exception:
                items.append({
   "event": "parse_error", "raw": line})
        return json.dumps(items, indent=2)

为什么使用 JSONL 格式?

  • 流式处理:每行一个 JSON,可以逐行读取,不需要加载整个文件
  • 追加友好:新事件直接追加到文件末尾,不需要修改已有内容
  • 崩溃安全:即使程序崩溃,已写入的事件不会丢失

3.3 任务管理器:控制平面的核心

class TaskManager:
    """
    任务管理器:管理任务的创建、更新、绑定 worktree

    设计特点:
    1. 文件存储:每个任务一个 JSON 文件
    2. 自增 ID:自动分配任务 ID
    3. 双向绑定:任务和 worktree 互相引用
    """
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(parents=True, exist_ok=True)
        # 计算下一个可用的 ID
        self._next_id = self._max_id() + 1

    def _max_id(self) -> int:
        """
        找出当前最大的任务 ID

        为什么需要这个函数?
        - 程序重启后,需要知道从哪个 ID 开始分配
        - 避免 ID 冲突
        """
        ids = []
        for f in self.dir.glob("task_*.json"):
            try:
                # 从文件名 "task_123.json" 中提取 ID
                ids.append(int(f.stem.split("_")[1]))
            except Exception:
                pass
        return max(ids) if ids else 0

    def create(self, subject: str, description: str = "") -> str:
        """
        创建新任务

        参数:
            subject: 任务标题
            description: 任务描述

        返回:
            JSON 格式的任务详情
        """
        task = {
   
            "id": self._next_id,
            "subject": subject,
            "description": description,
            "status": "pending",  # 初始状态:待处理
            "owner": "",  # 任务所有者(队友名称)
            "worktree": "",  # 绑定的 worktree 名称
            "blockedBy": [],  # 依赖的任务 ID 列表
            "created_at": time.time(),
            "updated_at": time.time(),
        }
        self._save(task)
        self._next_id += 1  # 递增 ID
        return json.dumps(task, indent=2)

    def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str:
        """
        将任务绑定到 worktree

        这是连接控制平面和执行平面的关键操作!

        参数:
            task_id: 任务 ID
            worktree: worktree 名称
            owner: 任务所有者

        副作用:
        1. 任务状态从 "pending" 改为 "in_progress"
        2. 任务的 worktree 字段被设置
        """
        task = self._load(task_id)
        task["worktree"] = worktree  # 绑定 worktree
        if owner:
            task["owner"] = owner
        # 关键:绑定 worktree 时自动将状态改为 in_progress
        if task["status"] == "pending":
            task["status"] = "in_progress"
        task["updated_at"] = time.time()
        self._save(task)
        return json.dumps(task, indent=2)

    def unbind_worktree(self, task_id: int) -> str:
        """
        解除任务与 worktree 的绑定

        当 worktree 被删除时调用
        """
        task = self._load(task_id)
        task["worktree"] = ""  # 清空 worktree 字段
        task["updated_at"] = time.time()
        self._save(task)
        return json.dumps(task, indent=2)

    def list_all(self) -> str:
        """
        列出所有任务,显示状态、所有者、worktree 绑定

        输出示例:
        [ ] #1: Implement auth owner=alice wt=auth-refactor
        [>] #2: Fix login bug owner=bob
        [x] #3: Update docs
        """
        tasks = []
        for f in sorted(self.dir.glob("task_*.json")):
            tasks.append(json.loads(f.read_text()))
        if not tasks:
            return "No tasks."
        lines = []
        for t in tasks:
            # 状态标记:待处理 [ ],进行中 [>],已完成 [x]
            marker = {
   
                "pending": "[ ]",
                "in_progress": "[>]",
                "completed": "[x]",
            }.get(t["status"], "[?]")
            owner = f" owner={t['owner']}" if t.get("owner") else ""
            wt = f" wt={t['worktree']}" if t.get("worktree") else ""
            lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}")
        return "\n".join(lines)

关键设计点

  1. bind_worktree 的副作用:绑定 worktree 时自动将任务状态改为 in_progress,因为绑定意味着开始工作
  2. unbind_worktree:解除绑定时只清空 worktree 字段,不改变任务状态
  3. list_all 的输出格式:使用 [ ][>][x] 标记状态,一目了然

3.4 Worktree 管理器:执行平面的核心

class WorktreeManager:
    """
    Worktree 管理器:管理 git worktree 的创建、运行命令、删除

    核心功能:
    1. 创建 worktree(自动创建 Git 分支)
    2. 在 worktree 中运行命令
    3. 删除或保留 worktree
    4. 维护 worktree 索引
    """
    def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus):
        self.repo_root = repo_root  # Git 仓库根目录
        self.tasks = tasks  # 任务管理器引用
        self.events = events  # 事件总线引用
        self.dir = repo_root / ".worktrees"  # worktree 目录
        self.dir.mkdir(parents=True, exist_ok=True)
        self.index_path = self.dir / "index.json"  # 索引文件
        # 如果索引文件不存在,创建空索引
        if not self.index_path.exists():
            self.index_path.write_text(json.dumps({
   "worktrees": []}, indent=2))
        # 检查是否在 Git 仓库中
        self.git_available = self._is_git_repo()

    def _is_git_repo(self) -> bool:
        """
        检查当前目录是否在 Git 仓库中

        为什么需要这个检查?
        - worktree 是 Git 功能,必须在 Git 仓库中才能使用
        - 提供友好的错误提示
        """
        try:
            r = subprocess.run(
                ["git", "rev-parse", "--is-inside-work-tree"],
                cwd=self.repo_root,
                capture_output=True,
                text=True,
                timeout=10,
            )
            return r.returncode == 0
        except Exception:
            return False

    def _run_git(self, args: list[str]) -> str:
        """
        执行 Git 命令

        参数:
            args: Git 命令参数列表,如 ["worktree", "add", "-b", "wt/auth", ".worktrees/auth", "HEAD"]

        为什么封装 Git 命令?
        1. 统一错误处理
        2. 统一超时设置
        3. 检查是否在 Git 仓库中
        """
        if not self.git_available:
            raise RuntimeError("Not in a git repository. worktree tools require git.")
        r = subprocess.run(
            ["git", *args],
            cwd=self.repo_root,
            capture_output=True,
            text=True,
            timeout=120,
        )
        if r.returncode != 0:
            msg = (r.stdout + r.stderr).strip()
            raise RuntimeError(msg or f"git {' '.join(args)} failed")
        return (r.stdout + r.stderr).strip() or "(no output)"

    def _find(self, name: str) -> dict | None:
        """
        从索引中查找 worktree

        参数:
            name: worktree 名称

        返回:
            worktree 条目字典,如果不存在返回 None
        """
        idx = self._load_index()
        for wt in idx.get("worktrees", []):
            if wt.get("name") == name:
                return wt
        return None

    def _validate_name(self, name: str):
        """
        验证 worktree 名称格式

        为什么需要验证?
        - 防止恶意名称(如路径遍历攻击)
        - 确保名称可以作为目录名和 Git 分支名

        允许的字符:字母、数字、点、下划线、连字符
        长度限制:1-40 字符
        """
        if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""):
            raise ValueError(
                "Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -"
            )

    def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str:
        """
        创建 worktree

        参数:
            name: worktree 名称(同时用作目录名和分支名)
            task_id: 绑定的任务 ID(可选)
            base_ref: 基础引用(默认 HEAD,也可以是其他分支或提交)

        执行步骤:
        1. 验证名称
        2. 检查是否已存在
        3. 发射 before 事件
        4. 执行 git worktree add 命令
        5. 更新索引
        6. 绑定任务(如果有)
        7. 发射 after 事件

        Git 命令解释:
        git worktree add -b wt/{name} .worktrees/{name} HEAD
        - -b wt/{name}:创建新分支 wt/{name}
        - .worktrees/{name}:worktree 目录路径
        - HEAD:基于当前提交
        """
        self._validate_name(name)
        if self._find(name):
            raise ValueError(f"Worktree '{name}' already exists in index")
        if task_id is not None and not self.tasks.exists(task_id):
            raise ValueError(f"Task {task_id} not found")

        path = self.dir / name  # worktree 目录路径
        branch = f"wt/{name}"  # 分支名称(wt/ 前缀避免污染主分支)

        # 发射 before 事件
        self.events.emit(
            "worktree.create.before",
            task={
   "id": task_id} if task_id is not None else {
   },
            worktree={
   "name": name, "base_ref": base_ref},
        )
        try:
            # 执行 git worktree add 命令
            self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])

            # 创建索引条目
            entry = {
   
                "name": name,
                "path": str(path),
                "branch": branch,
                "task_id": task_id,
                "status": "active",  # 状态:活跃
                "created_at": time.time(),
            }

            # 更新索引文件
            idx = self._load_index()
            idx["worktrees"].append(entry)
            self._save_index(idx)

            # 如果有任务 ID,绑定到任务
            if task_id is not None:
                self.tasks.bind_worktree(task_id, name)

            # 发射 after 事件
            self.events.emit(
                "worktree.create.after",
                task={
   "id": task_id} if task_id is not None else {
   },
                worktree={
   
                    "name": name,
                    "path": str(path),
                    "branch": branch,
                    "status": "active",
                },
            )
            return json.dumps(entry, indent=2)
        except Exception as e:
            # 发射 failed 事件
            self.events.emit(
                "worktree.create.failed",
                task={
   "id": task_id} if task_id is not None else {
   },
                worktree={
   "name": name, "base_ref": base_ref},
                error=str(e),
            )
            raise

    def run(self, name: str, command: str) -> str:
        """
        在 worktree 中执行命令

        参数:
            name: worktree 名称
            command: 要执行的 shell 命令

        关键点:
        - cwd 参数指定为 worktree 目录,实现目录隔离
        - 命令在独立的目录中执行,不会影响其他 worktree

        安全检查:
        - 危险命令检查(rm -rf /, sudo, shutdown 等)
        - 超时限制(300 秒)
        """
        dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
        if any(d in command for d in dangerous):
            return "Error: Dangerous command blocked"

        wt = self._find(name)
        if not wt:
            return f"Error: Unknown worktree '{name}'"
        path = Path(wt["path"])
        if not path.exists():
            return f"Error: Worktree path missing: {path}"

        try:
            r = subprocess.run(
                command,
                shell=True,
                cwd=path,  # 关键:在 worktree 目录中执行
                capture_output=True,
                text=True,
                timeout=300,
            )
            out = (r.stdout + r.stderr).strip()
            return out[:50000] if out else "(no output)"
        except subprocess.TimeoutExpired:
            return "Error: Timeout (300s)"

    def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str:
        """
        删除 worktree

        参数:
            name: worktree 名称
            force: 是否强制删除(即使有未提交的更改)
            complete_task: 是否同时完成绑定的任务

        执行步骤:
        1. 发射 before 事件
        2. 执行 git worktree remove 命令
        3. 如果 complete_task=True,更新任务状态为 completed
        4. 更新索引状态为 removed
        5. 发射 after 事件

        为什么 complete_task 参数很重要?
        - 一个调用同时完成拆除和任务完成
        - 减少调用次数,简化工作流
        """
        wt = self._find(name)
        if not wt:
            return f"Error: Unknown worktree '{name}'"

        self.events.emit(
            "worktree.remove.before",
            task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
            worktree={
   "name": name, "path": wt.get("path")},
        )
        try:
            # 构建 git worktree remove 命令
            args = ["worktree", "remove"]
            if force:
                args.append("--force")  # 强制删除
            args.append(wt["path"])
            self._run_git(args)

            # 如果 complete_task=True,同时完成任务
            if complete_task and wt.get("task_id") is not None:
                task_id = wt["task_id"]
                before = json.loads(self.tasks.get(task_id))
                self.tasks.update(task_id, status="completed")  # 更新任务状态
                self.tasks.unbind_worktree(task_id)  # 解除绑定
                self.events.emit(
                    "task.completed",
                    task={
   
                        "id": task_id,
                        "subject": before.get("subject", ""),
                        "status": "completed",
                    },
                    worktree={
   "name": name},
                )

            # 更新索引状态
            idx = self._load_index()
            for item in idx.get("worktrees", []):
                if item.get("name") == name:
                    item["status"] = "removed"
                    item["removed_at"] = time.time()
            self._save_index(idx)

            self.events.emit(
                "worktree.remove.after",
                task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
                worktree={
   "name": name, "path": wt.get("path"), "status": "removed"},
            )
            return f"Removed worktree '{name}'"
        except Exception as e:
            self.events.emit(
                "worktree.remove.failed",
                task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
                worktree={
   "name": name, "path": wt.get("path")},
                error=str(e),
            )
            raise

    def keep(self, name: str) -> str:
        """
        保留 worktree(不删除)

        什么时候使用 keep?
        - 需要保留修改供后续使用
        - 不想立即完成任务
        - 需要继续在 worktree 中工作

        与 remove 的区别:
        - keep:保留目录,状态改为 "kept"
        - remove:删除目录,状态改为 "removed"
        """
        wt = self._find(name)
        if not wt:
            return f"Error: Unknown worktree '{name}'"

        idx = self._load_index()
        kept = None
        for item in idx.get("worktrees", []):
            if item.get("name") == name:
                item["status"] = "kept"  # 更新状态为保留
                item["kept_at"] = time.time()
                kept = item
        self._save_index(idx)

        self.events.emit(
            "worktree.keep",
            task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
            worktree={
   
                "name": name,
                "path": wt.get("path"),
                "status": "kept",
            },
        )
        return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'"

关键设计点

  1. create 的 Git 命令git worktree add -b wt/{name} .worktrees/{name} HEAD

    • -b wt/{name}:创建新分支,wt/ 前缀避免污染主分支
    • .worktrees/{name}:worktree 目录路径
    • HEAD:基于当前提交
  2. runcwd 参数:指定为 worktree 目录,实现目录隔离

  3. removecomplete_task 参数:一个调用同时完成拆除和任务完成

  4. keep vs remove

    • keep:保留目录,状态改为 "kept"
    • remove:删除目录,状态改为 "removed"

3.5 任务和 Worktree 的绑定机制

def bind_worktree(self, task_id, worktree):
    """
    将任务绑定到 worktree

    这是连接控制平面和执行平面的关键操作!

    绑定时同时写入两侧状态:
    1. 任务文件:设置 worktree 字段
    2. 任务状态:从 pending 改为 in_progress
    3. 索引文件:设置 task_id 字段
    """
    task = self._load(task_id)
    task["worktree"] = worktree  # 设置 worktree 字段
    if task["status"] == "pending":
        task["status"] = "in_progress"  # 自动改为进行中
    self._save(task)

为什么绑定时自动改为 in_progress?

  • 绑定 worktree 意味着开始工作
  • 任务从"待处理"变为"进行中"
  • 符合任务状态机的逻辑

3.6 收尾机制:Keep vs Remove

def remove(self, name, force=False, complete_task=False):
    """
    删除 worktree 并可选完成任务

    参数:
        name: worktree 名称
        force: 强制删除(即使有未提交的更改)
        complete_task: 是否同时完成绑定的任务

    为什么 complete_task 参数很重要?
    - 一个调用同时完成拆除和任务完成
    - 减少调用次数,简化工作流
    - 原子操作,避免状态不一致
    """
    # ... 删除 worktree ...
    if complete_task and wt.get("task_id") is not None:
        task_id = wt["task_id"]
        self.tasks.update(task_id, status="completed")  # 完成任务
        self.tasks.unbind_worktree(task_id)  # 解除绑定
        self.events.emit("task.completed", ...)  # 发射事件

收尾的两种选择

  1. worktree_keep(name):保留目录供后续使用
  2. worktree_remove(name, complete_task=True):删除目录,完成绑定任务,发出事件

3.7 事件流:生命周期可观测性

{
   
  "event": "worktree.remove.after",
  "task": {
   "id": 1, "status": "completed"},
  "worktree": {
   "name": "auth-refactor", "status": "removed"},
  "ts": 1730000000
}

事件类型

  • worktree.create.before/after/failed:worktree 创建
  • worktree.remove.before/after/failed:worktree 删除
  • worktree.keep:worktree 保留
  • task.completed:任务完成

为什么需要事件流?

  1. 可观测性:所有操作都有记录,便于调试和审计
  2. 崩溃恢复:可以从事件流重建状态
  3. 审计追踪:谁在什么时候做了什么

3.8 崩溃恢复

会话记忆是易失的; 磁盘状态是持久的。

崩溃后从 .tasks/ + .worktrees/index.json 重建现场:

  • 任务状态存储在 .tasks/task_*.json 文件中
  • worktree 索引存储在 .worktrees/index.json 文件中
  • 事件日志存储在 .worktrees/events.jsonl 文件中

恢复步骤

  1. 读取 .tasks/ 目录下的所有任务文件
  2. 读取 .worktrees/index.json 文件
  3. 检查 worktree 目录是否存在
  4. 根据索引重建 worktree 状态

四、相对 s11 的变更

组件 之前 (s11) 之后 (s12)
协调 任务板 (owner/status) 任务板 + worktree 显式绑定
执行范围 共享目录 每个任务独立目录
可恢复性 仅任务状态 任务状态 + worktree 索引
收尾 任务完成 任务完成 + 显式 keep/remove
生命周期可见性 隐式日志 .worktrees/events.jsonl 显式事件流
并发安全 目录级隔离,避免文件冲突
Git 集成 原生 Git worktree 支持

核心改进总结

s12 的核心价值是引入了目录级隔离双平面架构

设计精髓

  1. 双平面分离:控制平面管目标,执行平面管环境
  2. 目录级隔离:每个任务独立目录,避免文件冲突
  3. Git 原生集成:利用 Git worktree 实现分支隔离
  4. 事件驱动:所有操作都有事件记录,便于调试和审计
  5. 原子收尾:一个调用同时完成拆除和任务完成
  6. 崩溃恢复:磁盘状态持久化,可以从崩溃中恢复

实际应用场景

  • 多人协作:每个人在独立的 worktree 中工作,互不干扰
  • 并行开发:同时开发多个功能,每个功能在独立的 worktree 中
  • 代码审查:在独立的 worktree 中审查代码,不影响主分支
  • 实验性修改:在 worktree 中尝试修改,可以随时丢弃或保留

五、试一试

cd learn-claude-code
python agents/s12_worktree_task_isolation.py

试试这些 prompt (英文 prompt 对 LLM 效果更好, 也可以用中文):

  1. Create tasks for backend auth and frontend login page, then list tasks.
  2. Create worktree "auth-refactor" for task 1, then bind task 2 to a new worktree "ui-login".
  3. Run "git status --short" in worktree "auth-refactor".
  4. Keep worktree "ui-login", then list worktrees and inspect events.
  5. Remove worktree "auth-refactor" with complete_task=true, then list tasks/worktrees/events.

观察重点

  • 创建 worktree 后,任务状态是否自动改为 in_progress?
  • 在 worktree 中运行命令,是否在独立的目录中执行?
  • 删除 worktree 时,complete_task 参数是否同时完成任务?
  • 事件流是否记录了所有操作?
  • 任务和 worktree 的绑定关系是否正确显示?、

六、完整代码

#!/usr/bin/env python3
import json
import os
import re
import subprocess
import time
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
    os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]


def detect_repo_root(cwd: Path) -> Path | None:
    """Return git repo root if cwd is inside a repo, else None."""
    try:
        r = subprocess.run(
            ["git", "rev-parse", "--show-toplevel"],
            cwd=cwd,
            capture_output=True,
            text=True,
            timeout=10,
        )
        if r.returncode != 0:
            return None
        root = Path(r.stdout.strip())
        return root if root.exists() else None
    except Exception:
        return None


REPO_ROOT = detect_repo_root(WORKDIR) or WORKDIR

SYSTEM = (
    f"You are a coding agent at {WORKDIR}. "
    "Use task + worktree tools for multi-task work. "
    "For parallel or risky changes: create tasks, allocate worktree lanes, "
    "run commands in those lanes, then choose keep/remove for closeout. "
    "Use worktree_events when you need lifecycle visibility."
)


# -- EventBus: append-only lifecycle events for observability --
class EventBus:
    def __init__(self, event_log_path: Path):
        self.path = event_log_path
        self.path.parent.mkdir(parents=True, exist_ok=True)
        if not self.path.exists():
            self.path.write_text("")

    def emit(
        self,
        event: str,
        task: dict | None = None,
        worktree: dict | None = None,
        error: str | None = None,
    ):
        payload = {
   
            "event": event,
            "ts": time.time(),
            "task": task or {
   },
            "worktree": worktree or {
   },
        }
        if error:
            payload["error"] = error
        with self.path.open("a", encoding="utf-8") as f:
            f.write(json.dumps(payload) + "\n")

    def list_recent(self, limit: int = 20) -> str:
        n = max(1, min(int(limit or 20), 200))
        lines = self.path.read_text(encoding="utf-8").splitlines()
        recent = lines[-n:]
        items = []
        for line in recent:
            try:
                items.append(json.loads(line))
            except Exception:
                items.append({
   "event": "parse_error", "raw": line})
        return json.dumps(items, indent=2)


# -- TaskManager: persistent task board with optional worktree binding --
class TaskManager:
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(parents=True, exist_ok=True)
        self._next_id = self._max_id() + 1

    def _max_id(self) -> int:
        ids = []
        for f in self.dir.glob("task_*.json"):
            try:
                ids.append(int(f.stem.split("_")[1]))
            except Exception:
                pass
        return max(ids) if ids else 0

    def _path(self, task_id: int) -> Path:
        return self.dir / f"task_{task_id}.json"

    def _load(self, task_id: int) -> dict:
        path = self._path(task_id)
        if not path.exists():
            raise ValueError(f"Task {task_id} not found")
        return json.loads(path.read_text())

    def _save(self, task: dict):
        self._path(task["id"]).write_text(json.dumps(task, indent=2))

    def create(self, subject: str, description: str = "") -> str:
        task = {
   
            "id": self._next_id,
            "subject": subject,
            "description": description,
            "status": "pending",
            "owner": "",
            "worktree": "",
            "blockedBy": [],
            "created_at": time.time(),
            "updated_at": time.time(),
        }
        self._save(task)
        self._next_id += 1
        return json.dumps(task, indent=2)

    def get(self, task_id: int) -> str:
        return json.dumps(self._load(task_id), indent=2)

    def exists(self, task_id: int) -> bool:
        return self._path(task_id).exists()

    def update(self, task_id: int, status: str = None, owner: str = None) -> str:
        task = self._load(task_id)
        if status:
            if status not in ("pending", "in_progress", "completed"):
                raise ValueError(f"Invalid status: {status}")
            task["status"] = status
        if owner is not None:
            task["owner"] = owner
        task["updated_at"] = time.time()
        self._save(task)
        return json.dumps(task, indent=2)

    def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str:
        task = self._load(task_id)
        task["worktree"] = worktree
        if owner:
            task["owner"] = owner
        if task["status"] == "pending":
            task["status"] = "in_progress"
        task["updated_at"] = time.time()
        self._save(task)
        return json.dumps(task, indent=2)

    def unbind_worktree(self, task_id: int) -> str:
        task = self._load(task_id)
        task["worktree"] = ""
        task["updated_at"] = time.time()
        self._save(task)
        return json.dumps(task, indent=2)

    def list_all(self) -> str:
        tasks = []
        for f in sorted(self.dir.glob("task_*.json")):
            tasks.append(json.loads(f.read_text()))
        if not tasks:
            return "No tasks."
        lines = []
        for t in tasks:
            marker = {
   
                "pending": "[ ]",
                "in_progress": "[>]",
                "completed": "[x]",
            }.get(t["status"], "[?]")
            owner = f" owner={t['owner']}" if t.get("owner") else ""
            wt = f" wt={t['worktree']}" if t.get("worktree") else ""
            lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}")
        return "\n".join(lines)


TASKS = TaskManager(REPO_ROOT / ".tasks")
EVENTS = EventBus(REPO_ROOT / ".worktrees" / "events.jsonl")


# -- WorktreeManager: create/list/run/remove git worktrees + lifecycle index --
class WorktreeManager:
    def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus):
        self.repo_root = repo_root
        self.tasks = tasks
        self.events = events
        self.dir = repo_root / ".worktrees"
        self.dir.mkdir(parents=True, exist_ok=True)
        self.index_path = self.dir / "index.json"
        if not self.index_path.exists():
            self.index_path.write_text(json.dumps({
   "worktrees": []}, indent=2))
        self.git_available = self._is_git_repo()

    def _is_git_repo(self) -> bool:
        try:
            r = subprocess.run(
                ["git", "rev-parse", "--is-inside-work-tree"],
                cwd=self.repo_root,
                capture_output=True,
                text=True,
                timeout=10,
            )
            return r.returncode == 0
        except Exception:
            return False

    def _run_git(self, args: list[str]) -> str:
        if not self.git_available:
            raise RuntimeError("Not in a git repository. worktree tools require git.")
        r = subprocess.run(
            ["git", *args],
            cwd=self.repo_root,
            capture_output=True,
            text=True,
            timeout=120,
        )
        if r.returncode != 0:
            msg = (r.stdout + r.stderr).strip()
            raise RuntimeError(msg or f"git {' '.join(args)} failed")
        return (r.stdout + r.stderr).strip() or "(no output)"

    def _load_index(self) -> dict:
        return json.loads(self.index_path.read_text())

    def _save_index(self, data: dict):
        self.index_path.write_text(json.dumps(data, indent=2))

    def _find(self, name: str) -> dict | None:
        idx = self._load_index()
        for wt in idx.get("worktrees", []):
            if wt.get("name") == name:
                return wt
        return None

    def _validate_name(self, name: str):
        if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""):
            raise ValueError(
                "Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -"
            )

    def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str:
        self._validate_name(name)
        if self._find(name):
            raise ValueError(f"Worktree '{name}' already exists in index")
        if task_id is not None and not self.tasks.exists(task_id):
            raise ValueError(f"Task {task_id} not found")

        path = self.dir / name
        branch = f"wt/{name}"
        self.events.emit(
            "worktree.create.before",
            task={
   "id": task_id} if task_id is not None else {
   },
            worktree={
   "name": name, "base_ref": base_ref},
        )
        try:
            self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])

            entry = {
   
                "name": name,
                "path": str(path),
                "branch": branch,
                "task_id": task_id,
                "status": "active",
                "created_at": time.time(),
            }

            idx = self._load_index()
            idx["worktrees"].append(entry)
            self._save_index(idx)

            if task_id is not None:
                self.tasks.bind_worktree(task_id, name)

            self.events.emit(
                "worktree.create.after",
                task={
   "id": task_id} if task_id is not None else {
   },
                worktree={
   
                    "name": name,
                    "path": str(path),
                    "branch": branch,
                    "status": "active",
                },
            )
            return json.dumps(entry, indent=2)
        except Exception as e:
            self.events.emit(
                "worktree.create.failed",
                task={
   "id": task_id} if task_id is not None else {
   },
                worktree={
   "name": name, "base_ref": base_ref},
                error=str(e),
            )
            raise

    def list_all(self) -> str:
        idx = self._load_index()
        wts = idx.get("worktrees", [])
        if not wts:
            return "No worktrees in index."
        lines = []
        for wt in wts:
            suffix = f" task={wt['task_id']}" if wt.get("task_id") else ""
            lines.append(
                f"[{wt.get('status', 'unknown')}] {wt['name']} -> "
                f"{wt['path']} ({wt.get('branch', '-')}){suffix}"
            )
        return "\n".join(lines)

    def status(self, name: str) -> str:
        wt = self._find(name)
        if not wt:
            return f"Error: Unknown worktree '{name}'"
        path = Path(wt["path"])
        if not path.exists():
            return f"Error: Worktree path missing: {path}"
        r = subprocess.run(
            ["git", "status", "--short", "--branch"],
            cwd=path,
            capture_output=True,
            text=True,
            timeout=60,
        )
        text = (r.stdout + r.stderr).strip()
        return text or "Clean worktree"

    def run(self, name: str, command: str) -> str:
        dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
        if any(d in command for d in dangerous):
            return "Error: Dangerous command blocked"

        wt = self._find(name)
        if not wt:
            return f"Error: Unknown worktree '{name}'"
        path = Path(wt["path"])
        if not path.exists():
            return f"Error: Worktree path missing: {path}"

        try:
            r = subprocess.run(
                command,
                shell=True,
                cwd=path,
                capture_output=True,
                text=True,
                timeout=300,
            )
            out = (r.stdout + r.stderr).strip()
            return out[:50000] if out else "(no output)"
        except subprocess.TimeoutExpired:
            return "Error: Timeout (300s)"

    def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str:
        wt = self._find(name)
        if not wt:
            return f"Error: Unknown worktree '{name}'"

        self.events.emit(
            "worktree.remove.before",
            task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
            worktree={
   "name": name, "path": wt.get("path")},
        )
        try:
            args = ["worktree", "remove"]
            if force:
                args.append("--force")
            args.append(wt["path"])
            self._run_git(args)

            if complete_task and wt.get("task_id") is not None:
                task_id = wt["task_id"]
                before = json.loads(self.tasks.get(task_id))
                self.tasks.update(task_id, status="completed")
                self.tasks.unbind_worktree(task_id)
                self.events.emit(
                    "task.completed",
                    task={
   
                        "id": task_id,
                        "subject": before.get("subject", ""),
                        "status": "completed",
                    },
                    worktree={
   "name": name},
                )

            idx = self._load_index()
            for item in idx.get("worktrees", []):
                if item.get("name") == name:
                    item["status"] = "removed"
                    item["removed_at"] = time.time()
            self._save_index(idx)

            self.events.emit(
                "worktree.remove.after",
                task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
                worktree={
   "name": name, "path": wt.get("path"), "status": "removed"},
            )
            return f"Removed worktree '{name}'"
        except Exception as e:
            self.events.emit(
                "worktree.remove.failed",
                task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
                worktree={
   "name": name, "path": wt.get("path")},
                error=str(e),
            )
            raise

    def keep(self, name: str) -> str:
        wt = self._find(name)
        if not wt:
            return f"Error: Unknown worktree '{name}'"

        idx = self._load_index()
        kept = None
        for item in idx.get("worktrees", []):
            if item.get("name") == name:
                item["status"] = "kept"
                item["kept_at"] = time.time()
                kept = item
        self._save_index(idx)

        self.events.emit(
            "worktree.keep",
            task={
   "id": wt.get("task_id")} if wt.get("task_id") is not None else {
   },
            worktree={
   
                "name": name,
                "path": wt.get("path"),
                "status": "kept",
            },
        )
        return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'"


WORKTREES = WorktreeManager(REPO_ROOT, TASKS, EVENTS)


# -- Base tools (kept minimal, same style as previous sessions) --
def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path


def run_bash(command: str) -> str:
    dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
    if any(d in command for d in dangerous):
        return "Error: Dangerous command blocked"
    try:
        r = subprocess.run(
            command,
            shell=True,
            cwd=WORKDIR,
            capture_output=True,
            text=True,
            timeout=120,
        )
        out = (r.stdout + r.stderr).strip()
        return out[:50000] if out else "(no output)"
    except subprocess.TimeoutExpired:
        return "Error: Timeout (120s)"


def run_read(path: str, limit: int = None) -> str:
    try:
        lines = safe_path(path).read_text().splitlines()
        if limit and limit < len(lines):
            lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
        return "\n".join(lines)[:50000]
    except Exception as e:
        return f"Error: {e}"


def run_write(path: str, content: str) -> str:
    try:
        fp = safe_path(path)
        fp.parent.mkdir(parents=True, exist_ok=True)
        fp.write_text(content)
        return f"Wrote {len(content)} bytes"
    except Exception as e:
        return f"Error: {e}"


def run_edit(path: str, old_text: str, new_text: str) -> str:
    try:
        fp = safe_path(path)
        c = fp.read_text()
        if old_text not in c:
            return f"Error: Text not found in {path}"
        fp.write_text(c.replace(old_text, new_text, 1))
        return f"Edited {path}"
    except Exception as e:
        return f"Error: {e}"


TOOL_HANDLERS = {
   
    "bash": lambda **kw: run_bash(kw["command"]),
    "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
    "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
    "task_list": lambda **kw: TASKS.list_all(),
    "task_get": lambda **kw: TASKS.get(kw["task_id"]),
    "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("owner")),
    "task_bind_worktree": lambda **kw: TASKS.bind_worktree(kw["task_id"], kw["worktree"], kw.get("owner", "")),
    "worktree_create": lambda **kw: WORKTREES.create(kw["name"], kw.get("task_id"), kw.get("base_ref", "HEAD")),
    "worktree_list": lambda **kw: WORKTREES.list_all(),
    "worktree_status": lambda **kw: WORKTREES.status(kw["name"]),
    "worktree_run": lambda **kw: WORKTREES.run(kw["name"], kw["command"]),
    "worktree_keep": lambda **kw: WORKTREES.keep(kw["name"]),
    "worktree_remove": lambda **kw: WORKTREES.remove(kw["name"], kw.get("force", False), kw.get("complete_task", False)),
    "worktree_events": lambda **kw: EVENTS.list_recent(kw.get("limit", 20)),
}

TOOLS = [
    {
   
        "name": "bash",
        "description": "Run a shell command in the current workspace (blocking).",
        "input_schema": {
   
            "type": "object",
            "properties": {
   "command": {
   "type": "string"}},
            "required": ["command"],
        },
    },
    {
   
        "name": "read_file",
        "description": "Read file contents.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "path": {
   "type": "string"},
                "limit": {
   "type": "integer"},
            },
            "required": ["path"],
        },
    },
    {
   
        "name": "write_file",
        "description": "Write content to file.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "path": {
   "type": "string"},
                "content": {
   "type": "string"},
            },
            "required": ["path", "content"],
        },
    },
    {
   
        "name": "edit_file",
        "description": "Replace exact text in file.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "path": {
   "type": "string"},
                "old_text": {
   "type": "string"},
                "new_text": {
   "type": "string"},
            },
            "required": ["path", "old_text", "new_text"],
        },
    },
    {
   
        "name": "task_create",
        "description": "Create a new task on the shared task board.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "subject": {
   "type": "string"},
                "description": {
   "type": "string"},
            },
            "required": ["subject"],
        },
    },
    {
   
        "name": "task_list",
        "description": "List all tasks with status, owner, and worktree binding.",
        "input_schema": {
   "type": "object", "properties": {
   }},
    },
    {
   
        "name": "task_get",
        "description": "Get task details by ID.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   "task_id": {
   "type": "integer"}},
            "required": ["task_id"],
        },
    },
    {
   
        "name": "task_update",
        "description": "Update task status or owner.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "task_id": {
   "type": "integer"},
                "status": {
   
                    "type": "string",
                    "enum": ["pending", "in_progress", "completed"],
                },
                "owner": {
   "type": "string"},
            },
            "required": ["task_id"],
        },
    },
    {
   
        "name": "task_bind_worktree",
        "description": "Bind a task to a worktree name.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "task_id": {
   "type": "integer"},
                "worktree": {
   "type": "string"},
                "owner": {
   "type": "string"},
            },
            "required": ["task_id", "worktree"],
        },
    },
    {
   
        "name": "worktree_create",
        "description": "Create a git worktree and optionally bind it to a task.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "name": {
   "type": "string"},
                "task_id": {
   "type": "integer"},
                "base_ref": {
   "type": "string"},
            },
            "required": ["name"],
        },
    },
    {
   
        "name": "worktree_list",
        "description": "List worktrees tracked in .worktrees/index.json.",
        "input_schema": {
   "type": "object", "properties": {
   }},
    },
    {
   
        "name": "worktree_status",
        "description": "Show git status for one worktree.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   "name": {
   "type": "string"}},
            "required": ["name"],
        },
    },
    {
   
        "name": "worktree_run",
        "description": "Run a shell command in a named worktree directory.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "name": {
   "type": "string"},
                "command": {
   "type": "string"},
            },
            "required": ["name", "command"],
        },
    },
    {
   
        "name": "worktree_remove",
        "description": "Remove a worktree and optionally mark its bound task completed.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   
                "name": {
   "type": "string"},
                "force": {
   "type": "boolean"},
                "complete_task": {
   "type": "boolean"},
            },
            "required": ["name"],
        },
    },
    {
   
        "name": "worktree_keep",
        "description": "Mark a worktree as kept in lifecycle state without removing it.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   "name": {
   "type": "string"}},
            "required": ["name"],
        },
    },
    {
   
        "name": "worktree_events",
        "description": "List recent worktree/task lifecycle events from .worktrees/events.jsonl.",
        "input_schema": {
   
            "type": "object",
            "properties": {
   "limit": {
   "type": "integer"}},
        },
    },
]


def agent_loop(messages: list):
    while True:
        response = client.messages.create(
            model=MODEL,
            system=SYSTEM,
            messages=messages,
            tools=TOOLS,
            max_tokens=8000,
        )
        messages.append({
   "role": "assistant", "content": response.content})
        if response.stop_reason != "tool_use":
            return

        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                try:
                    output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
                except Exception as e:
                    output = f"Error: {e}"
                print(f"> {block.name}: {str(output)[:200]}")
                results.append(
                    {
   
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": str(output),
                    }
                )
        messages.append({
   "role": "user", "content": results})


if __name__ == "__main__":
    print(f"Repo root for s12: {REPO_ROOT}")
    if not WORKTREES.git_available:
        print("Note: Not in a git repo. worktree_* tools will return errors.")

    history = []
    while True:
        try:
            query = input("\033[36ms12 >> \033[0m")
        except (EOFError, KeyboardInterrupt):
            break
        if query.strip().lower() in ("q", "exit", ""):
            break
        history.append({
   "role": "user", "content": query})
        agent_loop(history)
        response_content = history[-1]["content"]
        if isinstance(response_content, list):
            for block in response_content:
                if hasattr(block, "text"):
                    print(block.text)
        print()
目录
相关文章
|
3天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
10561 53
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
9天前
|
人工智能 JavaScript API
解放双手!OpenClaw Agent Browser全攻略(阿里云+本地部署+免费API+网页自动化场景落地)
“让AI聊聊天、写代码不难,难的是让它自己打开网页、填表单、查数据”——2026年,无数OpenClaw用户被这个痛点困扰。参考文章直击核心:当AI只能“纸上谈兵”,无法实际操控浏览器,就永远成不了真正的“数字员工”。而Agent Browser技能的出现,彻底打破了这一壁垒——它给OpenClaw装上“上网的手和眼睛”,让AI能像真人一样打开网页、点击按钮、填写表单、提取数据,24小时不间断完成网页自动化任务。
2384 5
|
23天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
23981 121
|
3天前
|
人工智能 IDE API
2026年国内 Codex 安装教程和使用教程:GPT-5.4 完整指南
Codex已进化为AI编程智能体,不仅能补全代码,更能理解项目、自动重构、执行任务。本文详解国内安装、GPT-5.4接入、cc-switch中转配置及实战开发流程,助你从零掌握“描述需求→AI实现”的新一代工程范式。(239字)
2214 126

热门文章

最新文章