第十二章 Worktree + Task Isolation (Worktree 任务隔离)
s01 > s02 > s03 > s04 > s05 > s06 | s07 > s08 > s09 > s10 > s11 > [ s12 ]
“本专栏基于开源项目
learn-claude-code的官方文档。原文档非常硬核,为了方便像我一样的新手小白理解,我对文档进行了逐行精读,并加入了很多中文注释、大白话解释和踩坑记录。希望这套‘咀嚼版’教程能帮你推开 AI Agent 开发的大门。”
"各干各的目录, 互不干扰" -- 任务管目标, worktree 管目录, 按 ID 绑定。
一、问题
到 s11, 智能体已经能自主认领和完成任务。但所有任务共享一个目录。两个智能体同时重构不同模块 -- A 改 config.py, B 也改 config.py, 未提交的改动互相污染, 谁也没法干净回滚。
任务板管 "做什么" 但不管 "在哪做"。解法: 给每个任务一个独立的 git worktree 目录, 用任务 ID 把两边关联起来。
二、解决方案
双平面架构 (Dual-Plane Architecture):
任务管目标,worktree 管目录,按 ID 绑定
控制平面 (.tasks/) 执行平面 (.worktrees/)
(管理目标: 做什么) (管理环境: 在哪做)
+--------------------+ +-------------------------+
| task_1.json | | auth-refactor/ |
| id: 1 | | branch: wt/auth-refactor
| status: in_progress | | task_id: 1 |
| worktree: "auth-refactor" | | path: .worktrees/auth-refactor
+--------------------+ +-------------------------+
^ | ^
| | |
| +------- 任务 ID ---------+
| | (绑定桥梁)
| v
+--------------------+ +-------------------------+
| task_2.json | | ui-login/ |
| id: 2 | | branch: wt/ui-login |
| status: pending | | task_id: 2 |
| worktree: "" | | path: .worktrees/ui-login
+--------------------+ +-------------------------+
| |
v v
+--------------------+ +-------------------------+
| index.json | | events.jsonl |
| (worktree 注册表) | | (生命周期日志) |
| - name: "auth-refactor" | | - worktree.create.after |
| - status: "active" | | - task.completed |
+--------------------+ +-------------------------+
状态机 (State Machines):
任务状态机 (Task):
pending (待处理)
|
| 绑定 worktree
v
in_progress (进行中)
|
| 删除 worktree (complete_task=True)
v
completed (已完成)
Worktree 状态机:
absent (不存在)
|
| 创建 worktree
v
active (活跃) --------+--------+
| | |
| 删除 worktree | | 保留 worktree
v | v
removed (已删除) | kept (已保留)
|
| 运行命令
v
(继续工作)
三、工作原理
3.1 整体设计哲学:双平面架构
s12 引入了双平面架构,将任务管理和执行环境分离:
控制平面(Control Plane):.tasks/ 目录
- 管理任务的目标(做什么)
- 存储任务状态、所有者、worktree 绑定关系
- 使用
task_*.json文件存储
执行平面(Execution Plane):.worktrees/ 目录
- 提供任务的执行环境(在哪做)
- 每个 worktree 是一个独立的 Git 分支和目录
- 使用
index.json管理 worktree 索引
核心思想:用任务 ID 作为桥梁,连接控制平面和执行平面。
3.2 事件系统:生命周期可观测性
class EventBus:
"""
事件总线:记录所有生命周期事件
设计原因:
1. 可观测性:所有操作都有记录,便于调试和审计
2. 崩溃恢复:可以从事件流重建状态
3. 审计追踪:谁在什么时候做了什么
"""
def __init__(self, event_log_path: Path):
self.path = event_log_path
# 确保日志目录存在
self.path.parent.mkdir(parents=True, exist_ok=True)
# 如果日志文件不存在,创建空文件
if not self.path.exists():
self.path.write_text("")
def emit(
self,
event: str, # 事件名称(如 "worktree.create.after")
task: dict | None = None, # 相关任务信息
worktree: dict | None = None, # 相关 worktree 信息
error: str | None = None, # 错误信息(如果有)
):
"""
发射一个事件,追加到日志文件
使用 JSONL 格式(每行一个 JSON),便于流式处理
"""
payload = {
"event": event,
"ts": time.time(), # 时间戳
"task": task or {
},
"worktree": worktree or {
},
}
if error:
payload["error"] = error
# 追加到日志文件(append-only)
with self.path.open("a", encoding="utf-8") as f:
f.write(json.dumps(payload) + "\n")
def list_recent(self, limit: int = 20) -> str:
"""
获取最近的事件
参数:
limit: 返回的事件数量,默认 20 条
"""
n = max(1, min(int(limit or 20), 200)) # 限制在 1-200 之间
lines = self.path.read_text(encoding="utf-8").splitlines()
recent = lines[-n:] # 取最后 n 行
items = []
for line in recent:
try:
items.append(json.loads(line))
except Exception:
items.append({
"event": "parse_error", "raw": line})
return json.dumps(items, indent=2)
为什么使用 JSONL 格式?
- 流式处理:每行一个 JSON,可以逐行读取,不需要加载整个文件
- 追加友好:新事件直接追加到文件末尾,不需要修改已有内容
- 崩溃安全:即使程序崩溃,已写入的事件不会丢失
3.3 任务管理器:控制平面的核心
class TaskManager:
"""
任务管理器:管理任务的创建、更新、绑定 worktree
设计特点:
1. 文件存储:每个任务一个 JSON 文件
2. 自增 ID:自动分配任务 ID
3. 双向绑定:任务和 worktree 互相引用
"""
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(parents=True, exist_ok=True)
# 计算下一个可用的 ID
self._next_id = self._max_id() + 1
def _max_id(self) -> int:
"""
找出当前最大的任务 ID
为什么需要这个函数?
- 程序重启后,需要知道从哪个 ID 开始分配
- 避免 ID 冲突
"""
ids = []
for f in self.dir.glob("task_*.json"):
try:
# 从文件名 "task_123.json" 中提取 ID
ids.append(int(f.stem.split("_")[1]))
except Exception:
pass
return max(ids) if ids else 0
def create(self, subject: str, description: str = "") -> str:
"""
创建新任务
参数:
subject: 任务标题
description: 任务描述
返回:
JSON 格式的任务详情
"""
task = {
"id": self._next_id,
"subject": subject,
"description": description,
"status": "pending", # 初始状态:待处理
"owner": "", # 任务所有者(队友名称)
"worktree": "", # 绑定的 worktree 名称
"blockedBy": [], # 依赖的任务 ID 列表
"created_at": time.time(),
"updated_at": time.time(),
}
self._save(task)
self._next_id += 1 # 递增 ID
return json.dumps(task, indent=2)
def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str:
"""
将任务绑定到 worktree
这是连接控制平面和执行平面的关键操作!
参数:
task_id: 任务 ID
worktree: worktree 名称
owner: 任务所有者
副作用:
1. 任务状态从 "pending" 改为 "in_progress"
2. 任务的 worktree 字段被设置
"""
task = self._load(task_id)
task["worktree"] = worktree # 绑定 worktree
if owner:
task["owner"] = owner
# 关键:绑定 worktree 时自动将状态改为 in_progress
if task["status"] == "pending":
task["status"] = "in_progress"
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)
def unbind_worktree(self, task_id: int) -> str:
"""
解除任务与 worktree 的绑定
当 worktree 被删除时调用
"""
task = self._load(task_id)
task["worktree"] = "" # 清空 worktree 字段
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)
def list_all(self) -> str:
"""
列出所有任务,显示状态、所有者、worktree 绑定
输出示例:
[ ] #1: Implement auth owner=alice wt=auth-refactor
[>] #2: Fix login bug owner=bob
[x] #3: Update docs
"""
tasks = []
for f in sorted(self.dir.glob("task_*.json")):
tasks.append(json.loads(f.read_text()))
if not tasks:
return "No tasks."
lines = []
for t in tasks:
# 状态标记:待处理 [ ],进行中 [>],已完成 [x]
marker = {
"pending": "[ ]",
"in_progress": "[>]",
"completed": "[x]",
}.get(t["status"], "[?]")
owner = f" owner={t['owner']}" if t.get("owner") else ""
wt = f" wt={t['worktree']}" if t.get("worktree") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}")
return "\n".join(lines)
关键设计点:
bind_worktree的副作用:绑定 worktree 时自动将任务状态改为in_progress,因为绑定意味着开始工作unbind_worktree:解除绑定时只清空 worktree 字段,不改变任务状态list_all的输出格式:使用[ ]、[>]、[x]标记状态,一目了然
3.4 Worktree 管理器:执行平面的核心
class WorktreeManager:
"""
Worktree 管理器:管理 git worktree 的创建、运行命令、删除
核心功能:
1. 创建 worktree(自动创建 Git 分支)
2. 在 worktree 中运行命令
3. 删除或保留 worktree
4. 维护 worktree 索引
"""
def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus):
self.repo_root = repo_root # Git 仓库根目录
self.tasks = tasks # 任务管理器引用
self.events = events # 事件总线引用
self.dir = repo_root / ".worktrees" # worktree 目录
self.dir.mkdir(parents=True, exist_ok=True)
self.index_path = self.dir / "index.json" # 索引文件
# 如果索引文件不存在,创建空索引
if not self.index_path.exists():
self.index_path.write_text(json.dumps({
"worktrees": []}, indent=2))
# 检查是否在 Git 仓库中
self.git_available = self._is_git_repo()
def _is_git_repo(self) -> bool:
"""
检查当前目录是否在 Git 仓库中
为什么需要这个检查?
- worktree 是 Git 功能,必须在 Git 仓库中才能使用
- 提供友好的错误提示
"""
try:
r = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=self.repo_root,
capture_output=True,
text=True,
timeout=10,
)
return r.returncode == 0
except Exception:
return False
def _run_git(self, args: list[str]) -> str:
"""
执行 Git 命令
参数:
args: Git 命令参数列表,如 ["worktree", "add", "-b", "wt/auth", ".worktrees/auth", "HEAD"]
为什么封装 Git 命令?
1. 统一错误处理
2. 统一超时设置
3. 检查是否在 Git 仓库中
"""
if not self.git_available:
raise RuntimeError("Not in a git repository. worktree tools require git.")
r = subprocess.run(
["git", *args],
cwd=self.repo_root,
capture_output=True,
text=True,
timeout=120,
)
if r.returncode != 0:
msg = (r.stdout + r.stderr).strip()
raise RuntimeError(msg or f"git {' '.join(args)} failed")
return (r.stdout + r.stderr).strip() or "(no output)"
def _find(self, name: str) -> dict | None:
"""
从索引中查找 worktree
参数:
name: worktree 名称
返回:
worktree 条目字典,如果不存在返回 None
"""
idx = self._load_index()
for wt in idx.get("worktrees", []):
if wt.get("name") == name:
return wt
return None
def _validate_name(self, name: str):
"""
验证 worktree 名称格式
为什么需要验证?
- 防止恶意名称(如路径遍历攻击)
- 确保名称可以作为目录名和 Git 分支名
允许的字符:字母、数字、点、下划线、连字符
长度限制:1-40 字符
"""
if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""):
raise ValueError(
"Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -"
)
def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str:
"""
创建 worktree
参数:
name: worktree 名称(同时用作目录名和分支名)
task_id: 绑定的任务 ID(可选)
base_ref: 基础引用(默认 HEAD,也可以是其他分支或提交)
执行步骤:
1. 验证名称
2. 检查是否已存在
3. 发射 before 事件
4. 执行 git worktree add 命令
5. 更新索引
6. 绑定任务(如果有)
7. 发射 after 事件
Git 命令解释:
git worktree add -b wt/{name} .worktrees/{name} HEAD
- -b wt/{name}:创建新分支 wt/{name}
- .worktrees/{name}:worktree 目录路径
- HEAD:基于当前提交
"""
self._validate_name(name)
if self._find(name):
raise ValueError(f"Worktree '{name}' already exists in index")
if task_id is not None and not self.tasks.exists(task_id):
raise ValueError(f"Task {task_id} not found")
path = self.dir / name # worktree 目录路径
branch = f"wt/{name}" # 分支名称(wt/ 前缀避免污染主分支)
# 发射 before 事件
self.events.emit(
"worktree.create.before",
task={
"id": task_id} if task_id is not None else {
},
worktree={
"name": name, "base_ref": base_ref},
)
try:
# 执行 git worktree add 命令
self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])
# 创建索引条目
entry = {
"name": name,
"path": str(path),
"branch": branch,
"task_id": task_id,
"status": "active", # 状态:活跃
"created_at": time.time(),
}
# 更新索引文件
idx = self._load_index()
idx["worktrees"].append(entry)
self._save_index(idx)
# 如果有任务 ID,绑定到任务
if task_id is not None:
self.tasks.bind_worktree(task_id, name)
# 发射 after 事件
self.events.emit(
"worktree.create.after",
task={
"id": task_id} if task_id is not None else {
},
worktree={
"name": name,
"path": str(path),
"branch": branch,
"status": "active",
},
)
return json.dumps(entry, indent=2)
except Exception as e:
# 发射 failed 事件
self.events.emit(
"worktree.create.failed",
task={
"id": task_id} if task_id is not None else {
},
worktree={
"name": name, "base_ref": base_ref},
error=str(e),
)
raise
def run(self, name: str, command: str) -> str:
"""
在 worktree 中执行命令
参数:
name: worktree 名称
command: 要执行的 shell 命令
关键点:
- cwd 参数指定为 worktree 目录,实现目录隔离
- 命令在独立的目录中执行,不会影响其他 worktree
安全检查:
- 危险命令检查(rm -rf /, sudo, shutdown 等)
- 超时限制(300 秒)
"""
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
path = Path(wt["path"])
if not path.exists():
return f"Error: Worktree path missing: {path}"
try:
r = subprocess.run(
command,
shell=True,
cwd=path, # 关键:在 worktree 目录中执行
capture_output=True,
text=True,
timeout=300,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (300s)"
def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str:
"""
删除 worktree
参数:
name: worktree 名称
force: 是否强制删除(即使有未提交的更改)
complete_task: 是否同时完成绑定的任务
执行步骤:
1. 发射 before 事件
2. 执行 git worktree remove 命令
3. 如果 complete_task=True,更新任务状态为 completed
4. 更新索引状态为 removed
5. 发射 after 事件
为什么 complete_task 参数很重要?
- 一个调用同时完成拆除和任务完成
- 减少调用次数,简化工作流
"""
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
self.events.emit(
"worktree.remove.before",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name, "path": wt.get("path")},
)
try:
# 构建 git worktree remove 命令
args = ["worktree", "remove"]
if force:
args.append("--force") # 强制删除
args.append(wt["path"])
self._run_git(args)
# 如果 complete_task=True,同时完成任务
if complete_task and wt.get("task_id") is not None:
task_id = wt["task_id"]
before = json.loads(self.tasks.get(task_id))
self.tasks.update(task_id, status="completed") # 更新任务状态
self.tasks.unbind_worktree(task_id) # 解除绑定
self.events.emit(
"task.completed",
task={
"id": task_id,
"subject": before.get("subject", ""),
"status": "completed",
},
worktree={
"name": name},
)
# 更新索引状态
idx = self._load_index()
for item in idx.get("worktrees", []):
if item.get("name") == name:
item["status"] = "removed"
item["removed_at"] = time.time()
self._save_index(idx)
self.events.emit(
"worktree.remove.after",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name, "path": wt.get("path"), "status": "removed"},
)
return f"Removed worktree '{name}'"
except Exception as e:
self.events.emit(
"worktree.remove.failed",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name, "path": wt.get("path")},
error=str(e),
)
raise
def keep(self, name: str) -> str:
"""
保留 worktree(不删除)
什么时候使用 keep?
- 需要保留修改供后续使用
- 不想立即完成任务
- 需要继续在 worktree 中工作
与 remove 的区别:
- keep:保留目录,状态改为 "kept"
- remove:删除目录,状态改为 "removed"
"""
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
idx = self._load_index()
kept = None
for item in idx.get("worktrees", []):
if item.get("name") == name:
item["status"] = "kept" # 更新状态为保留
item["kept_at"] = time.time()
kept = item
self._save_index(idx)
self.events.emit(
"worktree.keep",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name,
"path": wt.get("path"),
"status": "kept",
},
)
return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'"
关键设计点:
create的 Git 命令:git worktree add -b wt/{name} .worktrees/{name} HEAD-b wt/{name}:创建新分支,wt/前缀避免污染主分支.worktrees/{name}:worktree 目录路径HEAD:基于当前提交
run的cwd参数:指定为 worktree 目录,实现目录隔离remove的complete_task参数:一个调用同时完成拆除和任务完成keepvsremove:keep:保留目录,状态改为 "kept"remove:删除目录,状态改为 "removed"
3.5 任务和 Worktree 的绑定机制
def bind_worktree(self, task_id, worktree):
"""
将任务绑定到 worktree
这是连接控制平面和执行平面的关键操作!
绑定时同时写入两侧状态:
1. 任务文件:设置 worktree 字段
2. 任务状态:从 pending 改为 in_progress
3. 索引文件:设置 task_id 字段
"""
task = self._load(task_id)
task["worktree"] = worktree # 设置 worktree 字段
if task["status"] == "pending":
task["status"] = "in_progress" # 自动改为进行中
self._save(task)
为什么绑定时自动改为 in_progress?
- 绑定 worktree 意味着开始工作
- 任务从"待处理"变为"进行中"
- 符合任务状态机的逻辑
3.6 收尾机制:Keep vs Remove
def remove(self, name, force=False, complete_task=False):
"""
删除 worktree 并可选完成任务
参数:
name: worktree 名称
force: 强制删除(即使有未提交的更改)
complete_task: 是否同时完成绑定的任务
为什么 complete_task 参数很重要?
- 一个调用同时完成拆除和任务完成
- 减少调用次数,简化工作流
- 原子操作,避免状态不一致
"""
# ... 删除 worktree ...
if complete_task and wt.get("task_id") is not None:
task_id = wt["task_id"]
self.tasks.update(task_id, status="completed") # 完成任务
self.tasks.unbind_worktree(task_id) # 解除绑定
self.events.emit("task.completed", ...) # 发射事件
收尾的两种选择:
worktree_keep(name):保留目录供后续使用worktree_remove(name, complete_task=True):删除目录,完成绑定任务,发出事件
3.7 事件流:生命周期可观测性
{
"event": "worktree.remove.after",
"task": {
"id": 1, "status": "completed"},
"worktree": {
"name": "auth-refactor", "status": "removed"},
"ts": 1730000000
}
事件类型:
worktree.create.before/after/failed:worktree 创建worktree.remove.before/after/failed:worktree 删除worktree.keep:worktree 保留task.completed:任务完成
为什么需要事件流?
- 可观测性:所有操作都有记录,便于调试和审计
- 崩溃恢复:可以从事件流重建状态
- 审计追踪:谁在什么时候做了什么
3.8 崩溃恢复
会话记忆是易失的; 磁盘状态是持久的。
崩溃后从 .tasks/ + .worktrees/index.json 重建现场:
- 任务状态存储在
.tasks/task_*.json文件中 - worktree 索引存储在
.worktrees/index.json文件中 - 事件日志存储在
.worktrees/events.jsonl文件中
恢复步骤:
- 读取
.tasks/目录下的所有任务文件 - 读取
.worktrees/index.json文件 - 检查 worktree 目录是否存在
- 根据索引重建 worktree 状态
四、相对 s11 的变更
| 组件 | 之前 (s11) | 之后 (s12) |
|---|---|---|
| 协调 | 任务板 (owner/status) | 任务板 + worktree 显式绑定 |
| 执行范围 | 共享目录 | 每个任务独立目录 |
| 可恢复性 | 仅任务状态 | 任务状态 + worktree 索引 |
| 收尾 | 任务完成 | 任务完成 + 显式 keep/remove |
| 生命周期可见性 | 隐式日志 | .worktrees/events.jsonl 显式事件流 |
| 并发安全 | 无 | 目录级隔离,避免文件冲突 |
| Git 集成 | 无 | 原生 Git worktree 支持 |
核心改进总结
s12 的核心价值是引入了目录级隔离和双平面架构。
设计精髓:
- 双平面分离:控制平面管目标,执行平面管环境
- 目录级隔离:每个任务独立目录,避免文件冲突
- Git 原生集成:利用 Git worktree 实现分支隔离
- 事件驱动:所有操作都有事件记录,便于调试和审计
- 原子收尾:一个调用同时完成拆除和任务完成
- 崩溃恢复:磁盘状态持久化,可以从崩溃中恢复
实际应用场景:
- 多人协作:每个人在独立的 worktree 中工作,互不干扰
- 并行开发:同时开发多个功能,每个功能在独立的 worktree 中
- 代码审查:在独立的 worktree 中审查代码,不影响主分支
- 实验性修改:在 worktree 中尝试修改,可以随时丢弃或保留
五、试一试
cd learn-claude-code
python agents/s12_worktree_task_isolation.py
试试这些 prompt (英文 prompt 对 LLM 效果更好, 也可以用中文):
Create tasks for backend auth and frontend login page, then list tasks.Create worktree "auth-refactor" for task 1, then bind task 2 to a new worktree "ui-login".Run "git status --short" in worktree "auth-refactor".Keep worktree "ui-login", then list worktrees and inspect events.Remove worktree "auth-refactor" with complete_task=true, then list tasks/worktrees/events.
观察重点:
- 创建 worktree 后,任务状态是否自动改为 in_progress?
- 在 worktree 中运行命令,是否在独立的目录中执行?
- 删除 worktree 时,complete_task 参数是否同时完成任务?
- 事件流是否记录了所有操作?
- 任务和 worktree 的绑定关系是否正确显示?、
六、完整代码
#!/usr/bin/env python3
import json
import os
import re
import subprocess
import time
from pathlib import Path
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
def detect_repo_root(cwd: Path) -> Path | None:
"""Return git repo root if cwd is inside a repo, else None."""
try:
r = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
cwd=cwd,
capture_output=True,
text=True,
timeout=10,
)
if r.returncode != 0:
return None
root = Path(r.stdout.strip())
return root if root.exists() else None
except Exception:
return None
REPO_ROOT = detect_repo_root(WORKDIR) or WORKDIR
SYSTEM = (
f"You are a coding agent at {WORKDIR}. "
"Use task + worktree tools for multi-task work. "
"For parallel or risky changes: create tasks, allocate worktree lanes, "
"run commands in those lanes, then choose keep/remove for closeout. "
"Use worktree_events when you need lifecycle visibility."
)
# -- EventBus: append-only lifecycle events for observability --
class EventBus:
def __init__(self, event_log_path: Path):
self.path = event_log_path
self.path.parent.mkdir(parents=True, exist_ok=True)
if not self.path.exists():
self.path.write_text("")
def emit(
self,
event: str,
task: dict | None = None,
worktree: dict | None = None,
error: str | None = None,
):
payload = {
"event": event,
"ts": time.time(),
"task": task or {
},
"worktree": worktree or {
},
}
if error:
payload["error"] = error
with self.path.open("a", encoding="utf-8") as f:
f.write(json.dumps(payload) + "\n")
def list_recent(self, limit: int = 20) -> str:
n = max(1, min(int(limit or 20), 200))
lines = self.path.read_text(encoding="utf-8").splitlines()
recent = lines[-n:]
items = []
for line in recent:
try:
items.append(json.loads(line))
except Exception:
items.append({
"event": "parse_error", "raw": line})
return json.dumps(items, indent=2)
# -- TaskManager: persistent task board with optional worktree binding --
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(parents=True, exist_ok=True)
self._next_id = self._max_id() + 1
def _max_id(self) -> int:
ids = []
for f in self.dir.glob("task_*.json"):
try:
ids.append(int(f.stem.split("_")[1]))
except Exception:
pass
return max(ids) if ids else 0
def _path(self, task_id: int) -> Path:
return self.dir / f"task_{task_id}.json"
def _load(self, task_id: int) -> dict:
path = self._path(task_id)
if not path.exists():
raise ValueError(f"Task {task_id} not found")
return json.loads(path.read_text())
def _save(self, task: dict):
self._path(task["id"]).write_text(json.dumps(task, indent=2))
def create(self, subject: str, description: str = "") -> str:
task = {
"id": self._next_id,
"subject": subject,
"description": description,
"status": "pending",
"owner": "",
"worktree": "",
"blockedBy": [],
"created_at": time.time(),
"updated_at": time.time(),
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)
def get(self, task_id: int) -> str:
return json.dumps(self._load(task_id), indent=2)
def exists(self, task_id: int) -> bool:
return self._path(task_id).exists()
def update(self, task_id: int, status: str = None, owner: str = None) -> str:
task = self._load(task_id)
if status:
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Invalid status: {status}")
task["status"] = status
if owner is not None:
task["owner"] = owner
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)
def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str:
task = self._load(task_id)
task["worktree"] = worktree
if owner:
task["owner"] = owner
if task["status"] == "pending":
task["status"] = "in_progress"
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)
def unbind_worktree(self, task_id: int) -> str:
task = self._load(task_id)
task["worktree"] = ""
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)
def list_all(self) -> str:
tasks = []
for f in sorted(self.dir.glob("task_*.json")):
tasks.append(json.loads(f.read_text()))
if not tasks:
return "No tasks."
lines = []
for t in tasks:
marker = {
"pending": "[ ]",
"in_progress": "[>]",
"completed": "[x]",
}.get(t["status"], "[?]")
owner = f" owner={t['owner']}" if t.get("owner") else ""
wt = f" wt={t['worktree']}" if t.get("worktree") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}")
return "\n".join(lines)
TASKS = TaskManager(REPO_ROOT / ".tasks")
EVENTS = EventBus(REPO_ROOT / ".worktrees" / "events.jsonl")
# -- WorktreeManager: create/list/run/remove git worktrees + lifecycle index --
class WorktreeManager:
def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus):
self.repo_root = repo_root
self.tasks = tasks
self.events = events
self.dir = repo_root / ".worktrees"
self.dir.mkdir(parents=True, exist_ok=True)
self.index_path = self.dir / "index.json"
if not self.index_path.exists():
self.index_path.write_text(json.dumps({
"worktrees": []}, indent=2))
self.git_available = self._is_git_repo()
def _is_git_repo(self) -> bool:
try:
r = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=self.repo_root,
capture_output=True,
text=True,
timeout=10,
)
return r.returncode == 0
except Exception:
return False
def _run_git(self, args: list[str]) -> str:
if not self.git_available:
raise RuntimeError("Not in a git repository. worktree tools require git.")
r = subprocess.run(
["git", *args],
cwd=self.repo_root,
capture_output=True,
text=True,
timeout=120,
)
if r.returncode != 0:
msg = (r.stdout + r.stderr).strip()
raise RuntimeError(msg or f"git {' '.join(args)} failed")
return (r.stdout + r.stderr).strip() or "(no output)"
def _load_index(self) -> dict:
return json.loads(self.index_path.read_text())
def _save_index(self, data: dict):
self.index_path.write_text(json.dumps(data, indent=2))
def _find(self, name: str) -> dict | None:
idx = self._load_index()
for wt in idx.get("worktrees", []):
if wt.get("name") == name:
return wt
return None
def _validate_name(self, name: str):
if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""):
raise ValueError(
"Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -"
)
def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str:
self._validate_name(name)
if self._find(name):
raise ValueError(f"Worktree '{name}' already exists in index")
if task_id is not None and not self.tasks.exists(task_id):
raise ValueError(f"Task {task_id} not found")
path = self.dir / name
branch = f"wt/{name}"
self.events.emit(
"worktree.create.before",
task={
"id": task_id} if task_id is not None else {
},
worktree={
"name": name, "base_ref": base_ref},
)
try:
self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])
entry = {
"name": name,
"path": str(path),
"branch": branch,
"task_id": task_id,
"status": "active",
"created_at": time.time(),
}
idx = self._load_index()
idx["worktrees"].append(entry)
self._save_index(idx)
if task_id is not None:
self.tasks.bind_worktree(task_id, name)
self.events.emit(
"worktree.create.after",
task={
"id": task_id} if task_id is not None else {
},
worktree={
"name": name,
"path": str(path),
"branch": branch,
"status": "active",
},
)
return json.dumps(entry, indent=2)
except Exception as e:
self.events.emit(
"worktree.create.failed",
task={
"id": task_id} if task_id is not None else {
},
worktree={
"name": name, "base_ref": base_ref},
error=str(e),
)
raise
def list_all(self) -> str:
idx = self._load_index()
wts = idx.get("worktrees", [])
if not wts:
return "No worktrees in index."
lines = []
for wt in wts:
suffix = f" task={wt['task_id']}" if wt.get("task_id") else ""
lines.append(
f"[{wt.get('status', 'unknown')}] {wt['name']} -> "
f"{wt['path']} ({wt.get('branch', '-')}){suffix}"
)
return "\n".join(lines)
def status(self, name: str) -> str:
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
path = Path(wt["path"])
if not path.exists():
return f"Error: Worktree path missing: {path}"
r = subprocess.run(
["git", "status", "--short", "--branch"],
cwd=path,
capture_output=True,
text=True,
timeout=60,
)
text = (r.stdout + r.stderr).strip()
return text or "Clean worktree"
def run(self, name: str, command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
path = Path(wt["path"])
if not path.exists():
return f"Error: Worktree path missing: {path}"
try:
r = subprocess.run(
command,
shell=True,
cwd=path,
capture_output=True,
text=True,
timeout=300,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (300s)"
def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str:
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
self.events.emit(
"worktree.remove.before",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name, "path": wt.get("path")},
)
try:
args = ["worktree", "remove"]
if force:
args.append("--force")
args.append(wt["path"])
self._run_git(args)
if complete_task and wt.get("task_id") is not None:
task_id = wt["task_id"]
before = json.loads(self.tasks.get(task_id))
self.tasks.update(task_id, status="completed")
self.tasks.unbind_worktree(task_id)
self.events.emit(
"task.completed",
task={
"id": task_id,
"subject": before.get("subject", ""),
"status": "completed",
},
worktree={
"name": name},
)
idx = self._load_index()
for item in idx.get("worktrees", []):
if item.get("name") == name:
item["status"] = "removed"
item["removed_at"] = time.time()
self._save_index(idx)
self.events.emit(
"worktree.remove.after",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name, "path": wt.get("path"), "status": "removed"},
)
return f"Removed worktree '{name}'"
except Exception as e:
self.events.emit(
"worktree.remove.failed",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name, "path": wt.get("path")},
error=str(e),
)
raise
def keep(self, name: str) -> str:
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
idx = self._load_index()
kept = None
for item in idx.get("worktrees", []):
if item.get("name") == name:
item["status"] = "kept"
item["kept_at"] = time.time()
kept = item
self._save_index(idx)
self.events.emit(
"worktree.keep",
task={
"id": wt.get("task_id")} if wt.get("task_id") is not None else {
},
worktree={
"name": name,
"path": wt.get("path"),
"status": "kept",
},
)
return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'"
WORKTREES = WorktreeManager(REPO_ROOT, TASKS, EVENTS)
# -- Base tools (kept minimal, same style as previous sessions) --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command,
shell=True,
cwd=WORKDIR,
capture_output=True,
text=True,
timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("owner")),
"task_bind_worktree": lambda **kw: TASKS.bind_worktree(kw["task_id"], kw["worktree"], kw.get("owner", "")),
"worktree_create": lambda **kw: WORKTREES.create(kw["name"], kw.get("task_id"), kw.get("base_ref", "HEAD")),
"worktree_list": lambda **kw: WORKTREES.list_all(),
"worktree_status": lambda **kw: WORKTREES.status(kw["name"]),
"worktree_run": lambda **kw: WORKTREES.run(kw["name"], kw["command"]),
"worktree_keep": lambda **kw: WORKTREES.keep(kw["name"]),
"worktree_remove": lambda **kw: WORKTREES.remove(kw["name"], kw.get("force", False), kw.get("complete_task", False)),
"worktree_events": lambda **kw: EVENTS.list_recent(kw.get("limit", 20)),
}
TOOLS = [
{
"name": "bash",
"description": "Run a shell command in the current workspace (blocking).",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string"}},
"required": ["command"],
},
},
{
"name": "read_file",
"description": "Read file contents.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string"},
"limit": {
"type": "integer"},
},
"required": ["path"],
},
},
{
"name": "write_file",
"description": "Write content to file.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string"},
"content": {
"type": "string"},
},
"required": ["path", "content"],
},
},
{
"name": "edit_file",
"description": "Replace exact text in file.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string"},
"old_text": {
"type": "string"},
"new_text": {
"type": "string"},
},
"required": ["path", "old_text", "new_text"],
},
},
{
"name": "task_create",
"description": "Create a new task on the shared task board.",
"input_schema": {
"type": "object",
"properties": {
"subject": {
"type": "string"},
"description": {
"type": "string"},
},
"required": ["subject"],
},
},
{
"name": "task_list",
"description": "List all tasks with status, owner, and worktree binding.",
"input_schema": {
"type": "object", "properties": {
}},
},
{
"name": "task_get",
"description": "Get task details by ID.",
"input_schema": {
"type": "object",
"properties": {
"task_id": {
"type": "integer"}},
"required": ["task_id"],
},
},
{
"name": "task_update",
"description": "Update task status or owner.",
"input_schema": {
"type": "object",
"properties": {
"task_id": {
"type": "integer"},
"status": {
"type": "string",
"enum": ["pending", "in_progress", "completed"],
},
"owner": {
"type": "string"},
},
"required": ["task_id"],
},
},
{
"name": "task_bind_worktree",
"description": "Bind a task to a worktree name.",
"input_schema": {
"type": "object",
"properties": {
"task_id": {
"type": "integer"},
"worktree": {
"type": "string"},
"owner": {
"type": "string"},
},
"required": ["task_id", "worktree"],
},
},
{
"name": "worktree_create",
"description": "Create a git worktree and optionally bind it to a task.",
"input_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"},
"task_id": {
"type": "integer"},
"base_ref": {
"type": "string"},
},
"required": ["name"],
},
},
{
"name": "worktree_list",
"description": "List worktrees tracked in .worktrees/index.json.",
"input_schema": {
"type": "object", "properties": {
}},
},
{
"name": "worktree_status",
"description": "Show git status for one worktree.",
"input_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"}},
"required": ["name"],
},
},
{
"name": "worktree_run",
"description": "Run a shell command in a named worktree directory.",
"input_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"},
"command": {
"type": "string"},
},
"required": ["name", "command"],
},
},
{
"name": "worktree_remove",
"description": "Remove a worktree and optionally mark its bound task completed.",
"input_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"},
"force": {
"type": "boolean"},
"complete_task": {
"type": "boolean"},
},
"required": ["name"],
},
},
{
"name": "worktree_keep",
"description": "Mark a worktree as kept in lifecycle state without removing it.",
"input_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"}},
"required": ["name"],
},
},
{
"name": "worktree_events",
"description": "List recent worktree/task lifecycle events from .worktrees/events.jsonl.",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer"}},
},
},
]
def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({
"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}: {str(output)[:200]}")
results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
}
)
messages.append({
"role": "user", "content": results})
if __name__ == "__main__":
print(f"Repo root for s12: {REPO_ROOT}")
if not WORKTREES.git_available:
print("Note: Not in a git repo. worktree_* tools will return errors.")
history = []
while True:
try:
query = input("\033[36ms12 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({
"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()