【从零手写 ClaudeCode:learn-claude-code 项目实战笔记】(7)Task System (任务系统)

简介: 本章详解任务系统(s07):将内存中扁平待办升级为磁盘持久化的有向无环任务图(DAG),支持依赖(blockedBy/blocks)、状态流转(pending→in_progress→completed)与自动解锁,成为多Agent协作的调度骨架。

第七章 Task System (任务系统)

s01 > s02 > s03 > s04 > s05 > s06 | [ s07 ] s08 > s09 > s10 > s11 > s12

“本专栏基于开源项目 learn-claude-code 的官方文档。原文档非常硬核,为了方便像我一样的新手小白理解,我对文档进行了逐行精读,并加入了很多中文注释、大白话解释和踩坑记录。希望这套‘咀嚼版’教程能帮你推开 AI Agent 开发的大门。”

项目地址:shareAI-lab/learn-claude-code: Bash is all you need - A nano Claude Code–like agent, built from 0 to 1

"大目标要拆成小任务, 排好序, 记在磁盘上" -- 文件持久化的任务图, 为多 agent 协作打基础。

一、问题-上下文耦合困境

s03 的 TodoManager 只是内存中的扁平清单: 没有顺序、没有依赖、状态只有做完没做完,进程结束就没了。真实目标是有结构的 -- 任务 B 依赖任务 A, 任务 C 和 D 可以并行, 任务 E 要等 C 和 D 都完成。

没有显式的关系, 智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里, 上下文压缩 (s06) 一跑就没了。

二、解决方案

把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件, 有状态、前置依赖 (blockedBy) 和后置依赖 (blocks)。任务图随时回答三个问题:

  • 什么可以做? -- 状态为 pendingblockedBy 为空的任务。
  • 什么被卡住? -- 等待前置任务完成的任务。
  • 什么做完了? -- 状态为 completed 的任务, 完成时自动解锁后续任务。
.tasks/
  task_1.json  {"id":1, "status":"completed"}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending"}
  task_3.json  {"id":3, "blockedBy":[1], "status":"pending"}
  task_4.json  {"id":4, "blockedBy":[2,3], "status":"pending"}

任务图 (DAG):
                 +----------+
            +--> | task 2   | --+
            |    | pending  |   |
+----------+     +----------+    +--> +----------+
| task 1   |                          | task 4   |
| completed| --> +----------+    +--> | blocked  |
+----------+     | task 3   | --+     +----------+
                 | pending  |
                 +----------+

顺序:   task 1 必须先完成, 才能开始 2 和 3
并行:   task 2 和 3 可以同时执行
依赖:   task 4 要等 2 和 3 都完成
状态:   pending -> in_progress -> completed

这个任务图是 s07 之后所有机制的协调骨架: 后台执行 (s08)、多 agent 团队 (s09+)、worktree 隔离 (s12) 都读写这同一个结构。

三、工作原理

  1. TaskManager: 每个任务一个 JSON 文件, CRUD + 依赖图。每个任务有 blockedBy(前置依赖)和 blocks(后置依赖)
class TaskManager:
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True) # 创建.tasks/ 目录
        self._next_id = self._max_id() + 1 # 找出当前已存在的最大任务ID然后加一

    def _save(self, task: dict):
        path = self.dir / f"task_{task['id']}.json"
        path.write_text(json.dumps(task, indent=2))

    def create(self, subject, description=""):
        task = {
   
        "id": self._next_id, 
        "subject": subject,
        "status": "pending",
        "blockedBy": [], # 前置依赖
        "blocks": [], # 后置依赖
        "owner": ""
        }
        self._save(task)
        self._next_id += 1
        return json.dumps(task, indent=2)
  1. 依赖解除: 完成任务时, 自动将其 ID 从其他任务的 blockedBy 中移除, 解锁后续任务。
def _clear_dependency(self, completed_id):
    for f in self.dir.glob("task_*.json"):
        task = json.loads(f.read_text())
        if completed_id in task.get("blockedBy", []):
            task["blockedBy"].remove(completed_id)
            self._save(task)
  1. 状态变更 + 依赖关联: update 处理状态转换和依赖边。
def update(self, task_id, status=None,
           add_blocked_by=None, add_blocks=None):
     ......
     # s07: update 方法中维护双向依赖
    if add_blocks:
        task["blocks"] = list(set(task["blocks"] + add_blocks))
        # 同时更新被阻塞任务的 blockedBy 列表
        for blocked_id in add_blocks:
            try:
                blocked = self._load(blocked_id)
                if task_id not in blocked["blockedBy"]:
                    blocked["blockedBy"].append(task_id)
                    self._save(blocked)
            except ValueError:
                pass

三种状态的流转:pending(待处理) → in_progress(进行中) → completed(已完成)。当任务变为 completed 时,会自动触发 _clear_dependency() 方法,将该任务 ID 从其他所有任务的 blockedBy 列表中移除,从而解锁后续任务。

  1. 四个任务工具加入 dispatch map。
TOOL_HANDLERS = {
   
    # ...base tools...
    "task_create": lambda **kw: TASKS.create(kw["subject"]),
    "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
    "task_list":   lambda **kw: TASKS.list_all(),
    "task_get":    lambda **kw: TASKS.get(kw["task_id"]),
}

从 s07 起, 任务图是多步工作的默认选择。s03 的 Todo 仍可用于单次会话内的快速清单。

工具 作用
task_create 创建新任务,指定主题和描述,初始状态为 pending
task_update 更新任务状态(pending/in_progress/completed)或依赖关系
task_list 列出所有任务及其状态,显示哪些任务被阻塞
task_get 获取单个任务的完整详细信息

四、相对 s06 的变更

组件 之前 (s06) 之后 (s07)
Tools 5 8 (task_create/update/list/get)
规划模型 扁平清单 (仅内存) 带依赖关系的任务图 (磁盘)
关系 blockedBy + blocks
状态追踪 做完没做完 pending -> in_progress -> completed
持久化 压缩后丢失 压缩和重启后存活

五、试一试

cd learn-claude-code
python agents/s07_task_system.py

试试这些 prompt (英文 prompt 对 LLM 效果更好, 也可以用中文):

  1. Create 3 tasks: "Setup project", "Write code", "Write tests". Make them depend on each other in order.
  2. List all tasks and show the dependency graph
  3. Complete task 1 and then list tasks to see task 2 unblocked
  4. Create a task board for refactoring: parse -> transform -> emit -> test, where transform and emit can run in parallel after parse

六、完整代码

#!/usr/bin/env python3
import json
import os
import subprocess
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
    os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TASKS_DIR = WORKDIR / ".tasks"

SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."


# -- TaskManager: CRUD with dependency graph, persisted as JSON files --
class TaskManager:
    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True)
        self._next_id = self._max_id() + 1

    def _max_id(self) -> int:
        ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
        return max(ids) if ids else 0

    def _load(self, task_id: int) -> dict:
        path = self.dir / f"task_{task_id}.json"
        if not path.exists():
            raise ValueError(f"Task {task_id} not found")
        return json.loads(path.read_text())

    def _save(self, task: dict):
        path = self.dir / f"task_{task['id']}.json"
        path.write_text(json.dumps(task, indent=2))

    def create(self, subject: str, description: str = "") -> str:
        task = {
   
            "id": self._next_id, "subject": subject, "description": description,
            "status": "pending", "blockedBy": [], "blocks": [], "owner": "",
        }
        self._save(task)
        self._next_id += 1
        return json.dumps(task, indent=2)

    def get(self, task_id: int) -> str:
        return json.dumps(self._load(task_id), indent=2)

    def update(self, task_id: int, status: str = None,
               add_blocked_by: list = None, add_blocks: list = None) -> str:
        task = self._load(task_id)
        if status:
            if status not in ("pending", "in_progress", "completed"):
                raise ValueError(f"Invalid status: {status}")
            task["status"] = status
            # When a task is completed, remove it from all other tasks' blockedBy
            if status == "completed":
                self._clear_dependency(task_id)
        if add_blocked_by:
            task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
        if add_blocks:
            task["blocks"] = list(set(task["blocks"] + add_blocks))
            # Bidirectional: also update the blocked tasks' blockedBy lists
            for blocked_id in add_blocks:
                try:
                    blocked = self._load(blocked_id)
                    if task_id not in blocked["blockedBy"]:
                        blocked["blockedBy"].append(task_id)
                        self._save(blocked)
                except ValueError:
                    pass
        self._save(task)
        return json.dumps(task, indent=2)

    def _clear_dependency(self, completed_id: int):
        """Remove completed_id from all other tasks' blockedBy lists."""
        for f in self.dir.glob("task_*.json"):
            task = json.loads(f.read_text())
            if completed_id in task.get("blockedBy", []):
                task["blockedBy"].remove(completed_id)
                self._save(task)

    def list_all(self) -> str:
        tasks = []
        for f in sorted(self.dir.glob("task_*.json")):
            tasks.append(json.loads(f.read_text()))
        if not tasks:
            return "No tasks."
        lines = []
        for t in tasks:
            marker = {
   "pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
            blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
            lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
        return "\n".join(lines)


TASKS = TaskManager(TASKS_DIR)


# -- Base tool implementations --
def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path

def run_bash(command: str) -> str:
    dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
    if any(d in command for d in dangerous):
        return "Error: Dangerous command blocked"
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
                           capture_output=True, text=True, timeout=120)
        out = (r.stdout + r.stderr).strip()
        return out[:50000] if out else "(no output)"
    except subprocess.TimeoutExpired:
        return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
    try:
        lines = safe_path(path).read_text().splitlines()
        if limit and limit < len(lines):
            lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
        return "\n".join(lines)[:50000]
    except Exception as e:
        return f"Error: {e}"

def run_write(path: str, content: str) -> str:
    try:
        fp = safe_path(path)
        fp.parent.mkdir(parents=True, exist_ok=True)
        fp.write_text(content)
        return f"Wrote {len(content)} bytes"
    except Exception as e:
        return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
    try:
        fp = safe_path(path)
        c = fp.read_text()
        if old_text not in c:
            return f"Error: Text not found in {path}"
        fp.write_text(c.replace(old_text, new_text, 1))
        return f"Edited {path}"
    except Exception as e:
        return f"Error: {e}"


TOOL_HANDLERS = {
   
    "bash":        lambda **kw: run_bash(kw["command"]),
    "read_file":   lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file":  lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":   lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
    "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
    "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("addBlocks")),
    "task_list":   lambda **kw: TASKS.list_all(),
    "task_get":    lambda **kw: TASKS.get(kw["task_id"]),
}

TOOLS = [
    {
   "name": "bash", "description": "Run a shell command.",
     "input_schema": {
   "type": "object", "properties": {
   "command": {
   "type": "string"}}, "required": ["command"]}},
    {
   "name": "read_file", "description": "Read file contents.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "limit": {
   "type": "integer"}}, "required": ["path"]}},
    {
   "name": "write_file", "description": "Write content to file.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "content": {
   "type": "string"}}, "required": ["path", "content"]}},
    {
   "name": "edit_file", "description": "Replace exact text in file.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "old_text": {
   "type": "string"}, "new_text": {
   "type": "string"}}, "required": ["path", "old_text", "new_text"]}},
    {
   "name": "task_create", "description": "Create a new task.",
     "input_schema": {
   "type": "object", "properties": {
   "subject": {
   "type": "string"}, "description": {
   "type": "string"}}, "required": ["subject"]}},
    {
   "name": "task_update", "description": "Update a task's status or dependencies.",
     "input_schema": {
   "type": "object", "properties": {
   "task_id": {
   "type": "integer"}, "status": {
   "type": "string", "enum": ["pending", "in_progress", "completed"]}, "addBlockedBy": {
   "type": "array", "items": {
   "type": "integer"}}, "addBlocks": {
   "type": "array", "items": {
   "type": "integer"}}}, "required": ["task_id"]}},
    {
   "name": "task_list", "description": "List all tasks with status summary.",
     "input_schema": {
   "type": "object", "properties": {
   }}},
    {
   "name": "task_get", "description": "Get full details of a task by ID.",
     "input_schema": {
   "type": "object", "properties": {
   "task_id": {
   "type": "integer"}}, "required": ["task_id"]}},
]


def agent_loop(messages: list):
    while True:
        response = client.messages.create(
            model=MODEL, system=SYSTEM, messages=messages,
            tools=TOOLS, max_tokens=8000,
        )
        messages.append({
   "role": "assistant", "content": response.content})
        if response.stop_reason != "tool_use":
            return
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                try:
                    output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
                except Exception as e:
                    output = f"Error: {e}"
                print(f"> {block.name}: {str(output)[:200]}")
                results.append({
   "type": "tool_result", "tool_use_id": block.id, "content": str(output)})
        messages.append({
   "role": "user", "content": results})


if __name__ == "__main__":
    history = []
    while True:
        try:
            query = input("\033[36ms07 >> \033[0m")
        except (EOFError, KeyboardInterrupt):
            break
        if query.strip().lower() in ("q", "exit", ""):
            break
        history.append({
   "role": "user", "content": query})
        agent_loop(history)
        response_content = history[-1]["content"]
        if isinstance(response_content, list):
            for block in response_content:
                if hasattr(block, "text"):
                    print(block.text)
        print()
目录
相关文章
|
8天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5162 9
|
16天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
21101 114
|
7天前
|
JavaScript Linux API
保姆级教程,通过GACCode在国内使用Claudecode、Codex!
保姆级教程,通过GACCode在国内使用Claudecode、Codex!
4644 1
保姆级教程,通过GACCode在国内使用Claudecode、Codex!
|
12天前
|
人工智能 安全 前端开发
Team 版 OpenClaw:HiClaw 开源,5 分钟完成本地安装
HiClaw 基于 OpenClaw、Higress AI Gateway、Element IM 客户端+Tuwunel IM 服务器(均基于 Matrix 实时通信协议)、MinIO 共享文件系统打造。
8061 7
|
14天前
|
人工智能 JavaScript API
保姆级教程:OpenClaw阿里云/本地部署配置Tavily Search skill 实时联网,让OpenClaw“睁眼看世界”
默认状态下的OpenClaw如同“闭门造车”的隐士,仅能依赖模型训练数据回答问题,无法获取实时新闻、最新数据或训练截止日期后的新信息。2026年,激活其联网能力的最优方案是配置Tavily Search技能——无需科学上网、无需信用卡验证,每月1000次免费搜索额度完全满足个人需求,搭配ClawHub技能市场,还能一键拓展天气查询、邮件管理等实用功能。
8075 5

热门文章

最新文章