【从零手写 ClaudeCode:learn-claude-code 项目实战笔记】(8)Background Tasks (后台任务)

简介: 本节详解s08后台任务机制:通过守护线程异步执行`npm install`等耗时命令,用线程安全通知队列将结果注入LLM调用前的上下文,实现“边跑边想”。含完整代码、踩坑注释与实操示例,助新手理解AI Agent并发设计核心。

第八章 Background Tasks (后台任务)

s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12

“本专栏基于开源项目 learn-claude-code 的官方文档。原文档非常硬核,为了方便像我一样的新手小白理解,我对文档进行了逐行精读,并加入了很多中文注释、大白话解释和踩坑记录。希望这套‘咀嚼版’教程能帮你推开 AI Agent 开发的大门。”

项目地址:shareAI-lab/learn-claude-code: Bash is all you need - A nano Claude Code–like agent, built from 0 to 1

"慢操作丢后台, agent 继续想下一步" -- 后台线程跑命令, 完成后注入通知。

一、问题-慢命令阻塞agent

有些命令要跑好几分钟: npm installpytestdocker build。阻塞式循环下模型只能干等。用户说 "装依赖, 顺便建个配置文件", 智能体却只能一个一个来。

二、解决方案

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

通过把耗时命令放到后台守护线程中执行,用线程安全的通知队列在每次 LLM 调用前向 agent 传递完成结果的方法,解决了阻塞式执行慢命令(如 npm install、pytest 等)时 agent 无法同时做其他事的问题。

三、工作原理

  1. BackgroundManager 用线程安全的通知队列追踪任务。引入的原因是s08引入了后台线程来执行耗时命令,这就产生了多线程同时访问共享数据的问题。
class BackgroundManager:
    def __init__(self):
        self.tasks = {
   }                  # 任务字典
        self._notification_queue = []    # 通知队列
        self._lock = threading.Lock()    # 互斥锁 ← 关键!
  1. run() 启动守护线程, 立即返回。
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]  # 生成短ID
    self.tasks[task_id] = {
   "status": "running", "result": None, "command": command}
    thread = threading.Thread(
        target=self._execute,  # 线程要执行的函数
        args=(task_id, command),  # 传给函数的参数
        daemon=True  # ← 守护线程:主程序退出时自动结束
    )
    thread.start()  # 启动线程
    return f"Background task {task_id} started"
  1. 子进程完成后, 结果进入通知队列。_execute() 方法 —— 线程实际执行的代码。
def _execute(self, task_id: str, command: str):
    # 真正执行 shell 命令
    try:
        r = subprocess.run(command, shell=True, ..., timeout=300)
        output = (r.stdout + r.stderr).strip()
        status = "completed"
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
        status = "timeout"
    except Exception as e:
        output = f"Error: {e}"
        status = "error"

    # 更新任务状态和结果
    self.tasks[task_id]["status"] = status
    self.tasks[task_id]["result"] = output

    # 把结果放入通知队列(加锁保护)
    with self._lock:
        self._notification_queue.append({
   ...})

原理上很像Java消息队列(笔者本人常写Java),都是用于线程/进程间通信的缓冲区,但 s08 的实现是一个简化版的内存队列。

  1. 每次 LLM 调用前排空通知队列。为什么要清空? 防止 LLM 重复收到同一个任务的完成通知。通知队列采用"消费即删除"模式:取出结果注入给 agent 后,立即从队列中移除,确保每个结果只被处理一次。
def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications() # 排空队列
        if notifs:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
            messages.append({
   "role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
            messages.append({
   "role": "assistant", "content": "Noted background results."})
        response = client.messages.create(...)

循环保持单线程。主线程agent_loop是单线程,只有子进程 I/O 被并行化。

四、相对 s07 的变更

组件 之前 (s07) 之后 (s08)
Tools 8 6 (基础 + background_run + check)
执行方式 仅阻塞 阻塞 + 后台线程
通知机制 每轮排空的队列
并发 守护线程

五、试一试

cd learn-claude-code
python agents/s08_background_tasks.py

试试这些 prompt (英文 prompt 对 LLM 效果更好, 也可以用中文):

  1. Run "sleep 5 && echo done" in the background, then create a file while it runs
  2. Start 3 background tasks: "sleep 2", "sleep 4", "sleep 6". Check their status.
  3. Run pytest in the background and keep working on other things

六、完整代码

#!/usr/bin/env python3
import os
import subprocess
import threading
import uuid
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
    os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

SYSTEM = f"You are a coding agent at {WORKDIR}. Use background_run for long-running commands."


# -- BackgroundManager: threaded execution + notification queue --
class BackgroundManager:
    def __init__(self):
        self.tasks = {
   }  # task_id -> {status, result, command}
        self._notification_queue = []  # completed task results
        self._lock = threading.Lock()

    def run(self, command: str) -> str:
        """Start a background thread, return task_id immediately."""
        task_id = str(uuid.uuid4())[:8]
        self.tasks[task_id] = {
   "status": "running", "result": None, "command": command}
        thread = threading.Thread(
            target=self._execute, args=(task_id, command), daemon=True
        )
        thread.start()
        return f"Background task {task_id} started: {command[:80]}"

    def _execute(self, task_id: str, command: str):
        """Thread target: run subprocess, capture output, push to queue."""
        try:
            r = subprocess.run(
                command, shell=True, cwd=WORKDIR,
                capture_output=True, text=True, timeout=300
            )
            output = (r.stdout + r.stderr).strip()[:50000]
            status = "completed"
        except subprocess.TimeoutExpired:
            output = "Error: Timeout (300s)"
            status = "timeout"
        except Exception as e:
            output = f"Error: {e}"
            status = "error"
        self.tasks[task_id]["status"] = status
        self.tasks[task_id]["result"] = output or "(no output)"
        with self._lock:
            self._notification_queue.append({
   
                "task_id": task_id,
                "status": status,
                "command": command[:80],
                "result": (output or "(no output)")[:500],
            })

    def check(self, task_id: str = None) -> str:
        """Check status of one task or list all."""
        if task_id:
            t = self.tasks.get(task_id)
            if not t:
                return f"Error: Unknown task {task_id}"
            return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
        lines = []
        for tid, t in self.tasks.items():
            lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
        return "\n".join(lines) if lines else "No background tasks."

    def drain_notifications(self) -> list:
        """Return and clear all pending completion notifications."""
        with self._lock:
            notifs = list(self._notification_queue)
            self._notification_queue.clear()
        return notifs


BG = BackgroundManager()


# -- Tool implementations --
def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path

def run_bash(command: str) -> str:
    dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
    if any(d in command for d in dangerous):
        return "Error: Dangerous command blocked"
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
                           capture_output=True, text=True, timeout=120)
        out = (r.stdout + r.stderr).strip()
        return out[:50000] if out else "(no output)"
    except subprocess.TimeoutExpired:
        return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
    try:
        lines = safe_path(path).read_text().splitlines()
        if limit and limit < len(lines):
            lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
        return "\n".join(lines)[:50000]
    except Exception as e:
        return f"Error: {e}"

def run_write(path: str, content: str) -> str:
    try:
        fp = safe_path(path)
        fp.parent.mkdir(parents=True, exist_ok=True)
        fp.write_text(content)
        return f"Wrote {len(content)} bytes"
    except Exception as e:
        return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
    try:
        fp = safe_path(path)
        c = fp.read_text()
        if old_text not in c:
            return f"Error: Text not found in {path}"
        fp.write_text(c.replace(old_text, new_text, 1))
        return f"Edited {path}"
    except Exception as e:
        return f"Error: {e}"


TOOL_HANDLERS = {
   
    "bash":             lambda **kw: run_bash(kw["command"]),
    "read_file":        lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file":       lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":        lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
    "background_run":   lambda **kw: BG.run(kw["command"]),
    "check_background": lambda **kw: BG.check(kw.get("task_id")),
}

TOOLS = [
    {
   "name": "bash", "description": "Run a shell command (blocking).",
     "input_schema": {
   "type": "object", "properties": {
   "command": {
   "type": "string"}}, "required": ["command"]}},
    {
   "name": "read_file", "description": "Read file contents.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "limit": {
   "type": "integer"}}, "required": ["path"]}},
    {
   "name": "write_file", "description": "Write content to file.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "content": {
   "type": "string"}}, "required": ["path", "content"]}},
    {
   "name": "edit_file", "description": "Replace exact text in file.",
     "input_schema": {
   "type": "object", "properties": {
   "path": {
   "type": "string"}, "old_text": {
   "type": "string"}, "new_text": {
   "type": "string"}}, "required": ["path", "old_text", "new_text"]}},
    {
   "name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
     "input_schema": {
   "type": "object", "properties": {
   "command": {
   "type": "string"}}, "required": ["command"]}},
    {
   "name": "check_background", "description": "Check background task status. Omit task_id to list all.",
     "input_schema": {
   "type": "object", "properties": {
   "task_id": {
   "type": "string"}}}},
]


def agent_loop(messages: list):
    while True:
        # Drain background notifications and inject as system message before LLM call
        notifs = BG.drain_notifications()
        if notifs and messages:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
            )
            messages.append({
   "role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
            messages.append({
   "role": "assistant", "content": "Noted background results."})
        response = client.messages.create(
            model=MODEL, system=SYSTEM, messages=messages,
            tools=TOOLS, max_tokens=8000,
        )
        messages.append({
   "role": "assistant", "content": response.content})
        if response.stop_reason != "tool_use":
            return
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                try:
                    output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
                except Exception as e:
                    output = f"Error: {e}"
                print(f"> {block.name}: {str(output)[:200]}")
                results.append({
   "type": "tool_result", "tool_use_id": block.id, "content": str(output)})
        messages.append({
   "role": "user", "content": results})


if __name__ == "__main__":
    history = []
    while True:
        try:
            query = input("\033[36ms08 >> \033[0m")
        except (EOFError, KeyboardInterrupt):
            break
        if query.strip().lower() in ("q", "exit", ""):
            break
        history.append({
   "role": "user", "content": query})
        agent_loop(history)
        response_content = history[-1]["content"]
        if isinstance(response_content, list):
            for block in response_content:
                if hasattr(block, "text"):
                    print(block.text)
        print()
目录
相关文章
|
8天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5164 9
|
16天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
21103 114
|
7天前
|
JavaScript Linux API
保姆级教程,通过GACCode在国内使用Claudecode、Codex!
保姆级教程,通过GACCode在国内使用Claudecode、Codex!
4645 1
保姆级教程,通过GACCode在国内使用Claudecode、Codex!
|
12天前
|
人工智能 安全 前端开发
Team 版 OpenClaw:HiClaw 开源,5 分钟完成本地安装
HiClaw 基于 OpenClaw、Higress AI Gateway、Element IM 客户端+Tuwunel IM 服务器(均基于 Matrix 实时通信协议)、MinIO 共享文件系统打造。
8063 7
|
14天前
|
人工智能 JavaScript API
保姆级教程:OpenClaw阿里云/本地部署配置Tavily Search skill 实时联网,让OpenClaw“睁眼看世界”
默认状态下的OpenClaw如同“闭门造车”的隐士,仅能依赖模型训练数据回答问题,无法获取实时新闻、最新数据或训练截止日期后的新信息。2026年,激活其联网能力的最优方案是配置Tavily Search技能——无需科学上网、无需信用卡验证,每月1000次免费搜索额度完全满足个人需求,搭配ClawHub技能市场,还能一键拓展天气查询、邮件管理等实用功能。
8079 5

热门文章

最新文章