第八章 Background Tasks (后台任务)
s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12
“本专栏基于开源项目
learn-claude-code的官方文档。原文档非常硬核,为了方便像我一样的新手小白理解,我对文档进行了逐行精读,并加入了很多中文注释、大白话解释和踩坑记录。希望这套‘咀嚼版’教程能帮你推开 AI Agent 开发的大门。”
"慢操作丢后台, agent 继续想下一步" -- 后台线程跑命令, 完成后注入通知。
一、问题-慢命令阻塞agent
有些命令要跑好几分钟: npm install、pytest、docker build。阻塞式循环下模型只能干等。用户说 "装依赖, 顺便建个配置文件", 智能体却只能一个一个来。
二、解决方案
Main thread Background thread
+-----------------+ +-----------------+
| agent loop | | subprocess runs |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+
Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
| |
v v
[A runs] [B runs] (parallel)
| |
+-- results injected before next LLM call --+
通过把耗时命令放到后台守护线程中执行,用线程安全的通知队列在每次 LLM 调用前向 agent 传递完成结果的方法,解决了阻塞式执行慢命令(如 npm install、pytest 等)时 agent 无法同时做其他事的问题。
三、工作原理
- BackgroundManager 用线程安全的通知队列追踪任务。引入的原因是s08引入了后台线程来执行耗时命令,这就产生了多线程同时访问共享数据的问题。
class BackgroundManager:
def __init__(self):
self.tasks = {
} # 任务字典
self._notification_queue = [] # 通知队列
self._lock = threading.Lock() # 互斥锁 ← 关键!
run()启动守护线程, 立即返回。
def run(self, command: str) -> str:
task_id = str(uuid.uuid4())[:8] # 生成短ID
self.tasks[task_id] = {
"status": "running", "result": None, "command": command}
thread = threading.Thread(
target=self._execute, # 线程要执行的函数
args=(task_id, command), # 传给函数的参数
daemon=True # ← 守护线程:主程序退出时自动结束
)
thread.start() # 启动线程
return f"Background task {task_id} started"
- 子进程完成后, 结果进入通知队列。_execute() 方法 —— 线程实际执行的代码。
def _execute(self, task_id: str, command: str):
# 真正执行 shell 命令
try:
r = subprocess.run(command, shell=True, ..., timeout=300)
output = (r.stdout + r.stderr).strip()
status = "completed"
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
status = "timeout"
except Exception as e:
output = f"Error: {e}"
status = "error"
# 更新任务状态和结果
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output
# 把结果放入通知队列(加锁保护)
with self._lock:
self._notification_queue.append({
...})
原理上很像Java消息队列(笔者本人常写Java),都是用于线程/进程间通信的缓冲区,但 s08 的实现是一个简化版的内存队列。
- 每次 LLM 调用前排空通知队列。为什么要清空? 防止 LLM 重复收到同一个任务的完成通知。通知队列采用"消费即删除"模式:取出结果注入给 agent 后,立即从队列中移除,确保每个结果只被处理一次。
def agent_loop(messages: list):
while True:
notifs = BG.drain_notifications() # 排空队列
if notifs:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
messages.append({
"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
messages.append({
"role": "assistant", "content": "Noted background results."})
response = client.messages.create(...)
循环保持单线程。主线程agent_loop是单线程,只有子进程 I/O 被并行化。
四、相对 s07 的变更
| 组件 | 之前 (s07) | 之后 (s08) |
|---|---|---|
| Tools | 8 | 6 (基础 + background_run + check) |
| 执行方式 | 仅阻塞 | 阻塞 + 后台线程 |
| 通知机制 | 无 | 每轮排空的队列 |
| 并发 | 无 | 守护线程 |
五、试一试
cd learn-claude-code
python agents/s08_background_tasks.py
试试这些 prompt (英文 prompt 对 LLM 效果更好, 也可以用中文):
Run "sleep 5 && echo done" in the background, then create a file while it runsStart 3 background tasks: "sleep 2", "sleep 4", "sleep 6". Check their status.Run pytest in the background and keep working on other things
六、完整代码
#!/usr/bin/env python3
import os
import subprocess
import threading
import uuid
from pathlib import Path
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
SYSTEM = f"You are a coding agent at {WORKDIR}. Use background_run for long-running commands."
# -- BackgroundManager: threaded execution + notification queue --
class BackgroundManager:
def __init__(self):
self.tasks = {
} # task_id -> {status, result, command}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()
def run(self, command: str) -> str:
"""Start a background thread, return task_id immediately."""
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {
"status": "running", "result": None, "command": command}
thread = threading.Thread(
target=self._execute, args=(task_id, command), daemon=True
)
thread.start()
return f"Background task {task_id} started: {command[:80]}"
def _execute(self, task_id: str, command: str):
"""Thread target: run subprocess, capture output, push to queue."""
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=300
)
output = (r.stdout + r.stderr).strip()[:50000]
status = "completed"
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
status = "timeout"
except Exception as e:
output = f"Error: {e}"
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._lock:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
})
def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
if task_id:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
lines = []
for tid, t in self.tasks.items():
lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
return "\n".join(lines) if lines else "No background tasks."
def drain_notifications(self) -> list:
"""Return and clear all pending completion notifications."""
with self._lock:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
BG = BackgroundManager()
# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"background_run": lambda **kw: BG.run(kw["command"]),
"check_background": lambda **kw: BG.check(kw.get("task_id")),
}
TOOLS = [
{
"name": "bash", "description": "Run a shell command (blocking).",
"input_schema": {
"type": "object", "properties": {
"command": {
"type": "string"}}, "required": ["command"]}},
{
"name": "read_file", "description": "Read file contents.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "limit": {
"type": "integer"}}, "required": ["path"]}},
{
"name": "write_file", "description": "Write content to file.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "content": {
"type": "string"}}, "required": ["path", "content"]}},
{
"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {
"type": "object", "properties": {
"path": {
"type": "string"}, "old_text": {
"type": "string"}, "new_text": {
"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{
"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
"input_schema": {
"type": "object", "properties": {
"command": {
"type": "string"}}, "required": ["command"]}},
{
"name": "check_background", "description": "Check background task status. Omit task_id to list all.",
"input_schema": {
"type": "object", "properties": {
"task_id": {
"type": "string"}}}},
]
def agent_loop(messages: list):
while True:
# Drain background notifications and inject as system message before LLM call
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append({
"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
messages.append({
"role": "assistant", "content": "Noted background results."})
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({
"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}: {str(output)[:200]}")
results.append({
"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
messages.append({
"role": "user", "content": results})
if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms08 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({
"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()