如果你正在将 Claude API 用于生产级 Agent 工作流,或者正在向团队技术负责人汇报 LLM 预算,那么你一定遇到过这个问题:单次调用的成本很容易估算,但 Agent at Scale 的实际账单为何总比预期高出一截?
本文的结论先说:Agent 成本的主要增量来自五个隐性乘数——重试放大、上下文膨胀、并发峰值、路由不当与观测盲区。 管控好这五个变量,团队级预算可以做到可预测、可追溯、可优化。
本文适合:已在使用或评估 Claude API 的开发者、技术负责人,以及需要向管理层汇报 LLM ROI 的团队。
一、先把成本结构搞清楚
1.1 Claude API 的基础计费单元
在 claudeapi.com 平台,Claude API 按 输入 Token + 输出 Token 计费,写作和发布时一律以 ClaudeAPI 控制台 / 价格页实时展示为准。
| 模型 | 输入(每 M Token) | 输出(每 M Token) | 适合放在 Agent 的哪一层 |
|---|---|---|---|
| claude-opus-4-7 | $4.000 | $20.000 | 复杂推理、长上下文、高价值决策 |
| claude-sonnet-4-6 | $2.400 | $12.000 | 代码生成、多步规划、结构化输出 |
| claude-haiku-4-5-20251001 | $0.800 | $4.000 | 分类、摘要、意图识别、低风险批处理 |
以上价格为 ClaudeAPI 平台标准定价,具体以 console.claudeapi.com 账单页实时显示为准。
单次调用的成本估算很直观:(input_tokens × input_price + output_tokens × output_price) / 1_000_000。
问题是:Agent 不是单次调用。
1.2 Agent at Scale 的五个隐性成本乘数
| 成本乘数 | 典型场景 | 影响量级 |
|---|---|---|
| 重试放大 | 网络超时/限速后自动重试,未计入 token 的请求也消耗资源 | ×1.2 ~ ×3 |
| 上下文膨胀 | 多轮对话/长工具链将历史 message 全部传入,输入 token 随轮次线性增长 | ×2 ~ ×10 |
| 并发峰值 | 批量任务同时发起,触发限速后重试风暴 | 账单 spike |
| 路由不当 | 所有请求统一打到 Opus,Haiku 能做的任务也用了最贵的模型 | ×5 ~ ×20 |
| 观测盲区 | 没有 token 用量日志,无法发现失控的 Agent 循环 | 事后无法追溯 |
这五个乘数叠加后,账单可能比预期高出 10 倍以上。下面逐一给出控制方法。
1.3 团队月度预算怎么估
不要一开始就按"模型单价 × 预计请求数"粗算。Agent 工作流更适合用下面这个公式:
月度预算 =
日任务量
× 平均每个任务的调用次数
× 平均每次调用的输入 / 输出 Token
× 模型路由占比
× 30 天
× 安全系数
月度预算 =
日任务量
× 平均每个任务的调用次数
× 平均每次调用的输入 / 输出 Token
× 模型路由占比
× 30 天
× 安全系数
其中最容易被低估的是 平均每个任务的调用次数 和 安全系数。一个看似简单的"生成周报" Agent,实际可能包含意图识别、检索、工具调用、摘要、改写、校验 6 个步骤;如果中间还有失败重试,真实调用次数会继续增加。
建议上线前先跑 100~300 个代表性任务,统计:
| 指标 | 为什么要看 | 建议阈值 |
|---|---|---|
| 单任务平均调用次数 | 判断 Agent 是否被拆得过碎,或是否存在循环 | 超过 8 次要复查流程 |
| 单任务 P95 Token | 比平均值更能反映极端账单风险 | P95 超过平均值 3 倍要做裁剪 |
| 重试率 | 判断并发、限速、网络和错误处理是否健康 | 超过 5% 要复查重试逻辑 |
| Haiku / Sonnet / Opus 占比 | 判断模型路由是否合理 | Opus 占比超过 10% 要说明原因 |
| 缓存命中率 | 判断 Prompt Caching 是否真的生效 | 长前缀场景低于 50% 要复查前缀稳定性 |
安全系数建议从 1.3 起步。如果是新上线的 Agent、输入长度波动大、工具调用链路复杂,建议先用 1.5~2.0 做预算上限,稳定两周后再下调。
二、控制重试成本:指数退避 + 预算熔断
Agent 场景下,重试是必要的容错机制,但无节制的重试会造成成本 spike。
推荐策略
- 指数退避:首次失败等 1s,第二次 2s,第三次 4s,最多重试 4 次。
- 预算熔断:设定单任务最大 token 消耗阈值,超过后终止重试并记录日志。
- 区分错误类型:
529 Overloaded值得重试;400 Bad Request/401 Unauthorized不值得重试(参考 Claude API 常见错误码处理方法)。
import anthropic
import time
client = anthropic.Anthropic(
api_key="YOUR_API_KEY",
base_url="https://gw.claudeapi.com"
)
RETRYABLE_STATUS = {529, 503, 502, 500}
MAX_RETRIES = 4
TASK_TOKEN_BUDGET = 100_000 # 单任务 token 上限
def call_with_budget(messages, model="claude-sonnet-4-6", task_tokens_used=0):
for attempt in range(MAX_RETRIES):
try:
response = client.messages.create(
model=model,
max_tokens=4096,
messages=messages
)
used = response.usage.input_tokens + response.usage.output_tokens
task_tokens_used += used
if task_tokens_used > TASK_TOKEN_BUDGET:
raise RuntimeError(f"任务已消耗 {task_tokens_used} tokens,超出预算,强制终止")
return response, task_tokens_used
except anthropic.APIStatusError as e:
if e.status_code not in RETRYABLE_STATUS:
raise # 不可重试错误直接抛出
wait = 2 ** attempt
print(f"[Retry {attempt+1}] status={e.status_code},等待 {wait}s")
time.sleep(wait)
raise RuntimeError("超过最大重试次数")
import anthropic
import time
client = anthropic.Anthropic(
api_key="YOUR_API_KEY",
base_url="https://gw.claudeapi.com"
)
RETRYABLE_STATUS = {529, 503, 502, 500}
MAX_RETRIES = 4
TASK_TOKEN_BUDGET = 100_000 # 单任务 token 上限
def call_with_budget(messages, model="claude-sonnet-4-6", task_tokens_used=0):
for attempt in range(MAX_RETRIES):
try:
response = client.messages.create(
model=model,
max_tokens=4096,
messages=messages
)
used = response.usage.input_tokens + response.usage.output_tokens
task_tokens_used += used
if task_tokens_used > TASK_TOKEN_BUDGET:
raise RuntimeError(f"任务已消耗 {task_tokens_used} tokens,超出预算,强制终止")
return response, task_tokens_used
except anthropic.APIStatusError as e:
if e.status_code not in RETRYABLE_STATUS:
raise # 不可重试错误直接抛出
wait = 2 ** attempt
print(f"[Retry {attempt+1}] status={e.status_code},等待 {wait}s")
time.sleep(wait)
raise RuntimeError("超过最大重试次数")
三、控制上下文膨胀:Prompt Caching + 滑动窗口
多轮 Agent 最常见的成本失控原因是:每轮都把完整历史 messages 传进去,输入 token 随对话轮次线性增长。
3.1 Prompt Caching:固定前缀只算一次
对于 System Prompt、工具定义(tools schema)、长文档前缀等不变内容,可以通过 Prompt Caching 让其只在首次写入时计费,后续命中缓存时按更低费率计算(数据来源:Anthropic 官方文档 docs.anthropic.com/en/docs/build-with-claude/prompt-caching,截至 2026-06-30)。
详细配置方法参见 Claude API Prompt Caching 使用指南。
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=[
{
"type": "text",
"text": "你是一个专业的数据分析 Agent,以下是完整的工具说明和背景文档...\n" + long_system_doc,
"cache_control": {"type": "ephemeral"} # 标记为可缓存
}
],
messages=messages
)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=[
{
"type": "text",
"text": "你是一个专业的数据分析 Agent,以下是完整的工具说明和背景文档...\n" + long_system_doc,
"cache_control": {"type": "ephemeral"} # 标记为可缓存
}
],
messages=messages
)
3.2 滑动窗口:只保留最近 N 轮
def trim_messages(messages, max_turns=10):
"""保留最后 max_turns 轮对话,避免上下文无限膨胀"""
if len(messages) > max_turns * 2:
# 保留第一条(通常是任务初始指令)+ 最近 N 轮
return [messages[0]] + messages[-(max_turns * 2 - 1):]
return messages
def trim_messages(messages, max_turns=10):
"""保留最后 max_turns 轮对话,避免上下文无限膨胀"""
if len(messages) > max_turns * 2:
# 保留第一条(通常是任务初始指令)+ 最近 N 轮
return [messages[0]] + messages[-(max_turns * 2 - 1):]
return messages
四、模型分层路由:把对的任务交给对的模型
并非所有 Agent 子任务都需要最强的模型。 分层路由是降低 agent at scale cost 最直接有效的手段。
典型路由策略
| 任务类型 | 推荐模型 | 单 M token 输入价格 |
|---|---|---|
| 意图识别、分类、摘要 | claude-haiku-4-5-20251001 | $0.800 |
| 代码生成、复杂推理、多步规划 | claude-sonnet-4-6 | $2.400 |
| 超长文档分析、高难度创作、关键决策 | claude-opus-4-7 | $4.000 |
def route_model(task_type: str) -> str:
routing = {
"classify": "claude-haiku-4-5-20251001",
"summarize": "claude-haiku-4-5-20251001",
"code_gen": "claude-sonnet-4-6",
"planning": "claude-sonnet-4-6",
"deep_analysis": "claude-opus-4-7",
}
return routing.get(task_type, "claude-sonnet-4-6") # 默认 Sonnet
def route_model(task_type: str) -> str:
routing = {
"classify": "claude-haiku-4-5-20251001",
"summarize": "claude-haiku-4-5-20251001",
"code_gen": "claude-sonnet-4-6",
"planning": "claude-sonnet-4-6",
"deep_analysis": "claude-opus-4-7",
}
return routing.get(task_type, "claude-sonnet-4-6") # 默认 Sonnet
如果你的 Agent 工作流结构复杂,可以参考 Claude API Tool Use 完整指南 了解如何用工具调用来拆分 Agent 子任务。
五、并发管理:令牌桶限流 + 队列
Agent at Scale 的另一个典型问题是批量任务同时发起,触发 API 限速(429),然后大量重试堆叠,形成"重试风暴"。
这里要区分两个概念:
- Rate limit:单位时间内请求数或 Token 量过高,通常表现为 429,需要降并发、排队、指数退避。
- Spend limit:账户或组织层面的消费上限。即使并发不高,也可能因为预算耗尽而无法继续调用。
因此,并发治理不只是"把请求发慢一点",还要把任务放进队列,给每个任务打上优先级和预算上限。生产环境里建议至少分三档:
| 优先级 | 任务例子 | 处理策略 |
|---|---|---|
| P0 | 用户正在等待的交互式请求 | 小并发、短超时、失败快速反馈 |
| P1 | 内部报表、批量摘要、客服质检 | 队列异步执行,可延迟几分钟 |
| P2 | 历史数据重跑、离线分析 | 夜间低峰执行,严格预算上限 |
推荐方案:令牌桶限流
import asyncio
from asyncio import Semaphore
async def run_agent_pool(tasks, max_concurrent=5):
"""控制并发数量,避免触发限速"""
sem = Semaphore(max_concurrent)
async def run_one(task):
async with sem:
return await process_task(task)
return await asyncio.gather(*[run_one(t) for t in tasks])
import asyncio
from asyncio import Semaphore
async def run_agent_pool(tasks, max_concurrent=5):
"""控制并发数量,避免触发限速"""
sem = Semaphore(max_concurrent)
async def run_one(task):
async with sem:
return await process_task(task)
return await asyncio.gather(*[run_one(t) for t in tasks])
注意:Anthropic 官方文档提供 Message Batches 用于异步批处理;但 ClaudeAPI 当前不支持 Message Batches API。批量场景请用上述并发控制和任务队列方案替代,结合 流式输出 SSE 接入指南 优化响应体验。
六、可观测性:让预算有据可查
没有日志,就没有优化的入口。 团队级 Agent 工作流至少需要以下四层观测:
import logging
from dataclasses import dataclass, field
from typing import List
@dataclass
class AgentCallLog:
task_id: str
model: str
input_tokens: int
output_tokens: int
cache_read_tokens: int = 0
cache_write_tokens: int = 0
retry_count: int = 0
error: str = ""
def cost_usd(self) -> float:
"""估算本次调用基础成本(美元),缓存读写以控制台账单为准"""
price_map = {
"claude-opus-4-7": (4.0, 20.0),
"claude-sonnet-4-6": (2.4, 12.0),
"claude-haiku-4-5-20251001": (0.8, 4.0),
}
inp, out = price_map.get(self.model, (2.4, 12.0))
return (self.input_tokens * inp + self.output_tokens * out) / 1_000_000
# 使用示例
def log_call(response, task_id, model, retry_count=0):
log = AgentCallLog(
task_id=task_id,
model=model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cache_read_tokens=getattr(response.usage, "cache_read_input_tokens", 0),
cache_write_tokens=getattr(response.usage, "cache_creation_input_tokens", 0),
retry_count=retry_count,
)
logging.info(f"[AgentCall] {log}")
return log
import logging
from dataclasses import dataclass, field
from typing import List
@dataclass
class AgentCallLog:
task_id: str
model: str
input_tokens: int
output_tokens: int
cache_read_tokens: int = 0
cache_write_tokens: int = 0
retry_count: int = 0
error: str = ""
def cost_usd(self) -> float:
"""估算本次调用基础成本(美元),缓存读写以控制台账单为准"""
price_map = {
"claude-opus-4-7": (4.0, 20.0),
"claude-sonnet-4-6": (2.4, 12.0),
"claude-haiku-4-5-20251001": (0.8, 4.0),
}
inp, out = price_map.get(self.model, (2.4, 12.0))
return (self.input_tokens * inp + self.output_tokens * out) / 1_000_000
# 使用示例
def log_call(response, task_id, model, retry_count=0):
log = AgentCallLog(
task_id=task_id,
model=model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cache_read_tokens=getattr(response.usage, "cache_read_input_tokens", 0),
cache_write_tokens=getattr(response.usage, "cache_creation_input_tokens", 0),
retry_count=retry_count,
)
logging.info(f"[AgentCall] {log}")
return log
团队级每日预算报表:把所有 AgentCallLog 写入数据库(SQLite / PostgreSQL 均可),按 task_id、model、team 分组汇总,就能生成可向管理层汇报的 LLM 成本报表。
建议日志至少包含这些字段:
| 字段 | 用途 |
|---|---|
request_id |
串联用户请求、工具调用和模型调用 |
task_id / workflow_id |
分析哪个 Agent 流程最耗钱 |
team_id / user_id |
做团队、项目、成员维度的预算归因 |
model |
检查是否有低复杂度任务误用高价模型 |
input_tokens / output_tokens |
计算直接成本 |
cache_read_input_tokens |
判断 Prompt Caching 命中效果 |
retry_count / error_code |
识别重试风暴和错误类型 |
latency_ms |
识别慢请求和超时重试来源 |
cost_usd_estimated |
报表和告警使用,最终以账单为准 |
有了这些字段后,每日看三张表就够:按模型汇总、按任务汇总、按团队汇总。真正有价值的不是"今天花了多少钱",而是"哪类任务、哪个团队、哪段链路让成本偏离预期"。
七、团队级预算管理检查清单
在正式上线 Agent at Scale 工作流前,逐项核验:
- [ ] 模型分层:是否已按任务类型区分 Haiku / Sonnet / Opus?
- [ ] Prompt Caching:固定 System Prompt 和 tools schema 是否已加
cache_control? - [ ] 上下文窗口:多轮对话是否有滑动窗口或摘要压缩机制?
- [ ] 并发限流:是否有 Semaphore 或令牌桶防止重试风暴?
- [ ] 错误分类:重试逻辑是否区分了可重试与不可重试错误?
- [ ] 用量日志:每次调用的 token 数和模型是否都有记录?
- [ ] 预算熔断:是否设定了单任务 / 每日 token 上限告警?
常见问题(FAQ)
Q1:Agent at Scale 的 LLM 成本主要由哪些部分构成?
除了直接的 Token 费用(输入 + 输出),实际成本还包括:重试产生的额外请求、上下文膨胀导致的输入 token 暴涨、路由不当将低复杂度任务打到高价模型,以及因缺乏观测导致的失控 Agent 循环。这些隐性成本加总后可能比基础 Token 费高出数倍。
Q2:Prompt Caching 能节省多少成本?
据 Anthropic 官方文档(docs.anthropic.com/en/docs/build-with-claude/prompt-caching,截至 2026-06-30),Prompt Caching 会把可复用前缀拆分为缓存写入与缓存读取;缓存读取按更低费率计费。对于 System Prompt 较长、多轮共享同一前缀的 Agent 场景,它通常能显著降低重复输入成本,但实际节省比例取决于前缀长度、调用频率、缓存命中率和缓存有效期。建议在可观测日志中记录 cache_read_input_tokens 与 cache_creation_input_tokens 字段来追踪真实效果。
Q3:ClaudeAPI 支持 Message Batches API 吗?
不支持。批量处理场景请使用并发控制(Semaphore)+ 队列管理替代,结合流式输出可优化整体响应体验。
Q4:团队多人共用一个 API Key 怎么做预算分配?
建议在调用时通过 metadata 或日志标注 user_id / team_id,在可观测层做分组汇总,而不是给每个人单独开 Key(会增加管理复杂度)。详细的 Key 管理方法参见 Claude API Key 申请与配置指南。
Q5:如何预估一个新 Agent 项目的月度预算?
推荐三步法:① 跑 100 次代表性任务,统计平均 input/output token 数和模型分布;② 按预期日调用量换算月总量;③ 乘以对应模型单价后再乘以 1.3 的安全系数(覆盖重试和上下文膨胀)。详细价格计算方法见 Claude API 价格指南(2026)。
Q6:流式输出(Streaming)能降低成本吗?
流式输出不改变 Token 计费量,但能改善用户感知延迟,减少因超时引发的不必要重试,从而间接降低重试成本。流式接入方法参见 Claude API 流式输出 SSE 接入指南。
下一步
- 创建 API Key:前往 ClaudeAPI 控制台 创建 Key,配置
base_url = https://gw.claudeapi.com。 - 接入基础调用:参考 Claude API Python 入门教程 完成首次调用。
- 开启成本可观测:在生产环境添加上文的
AgentCallLog,接入你的监控系统。 - 设置预算告警:在控制台设定每日用量上限,超额后触发通知。
数据与事实声明
| 信息 | 来源 | 核实状态 |
|---|---|---|
| Claude API 模型价格(Opus/Sonnet/Haiku) | ClaudeAPI 平台定价,console.claudeapi.com | ✅ 平台确认,以控制台实时显示为准 |
| Prompt Caching 计费与命中字段 | Anthropic 官方文档 docs.anthropic.com/en/docs/build-with-claude/prompt-caching | ⚠️ 实际节省取决于前缀稳定性、调用频率、缓存命中率和缓存有效期 |
| 成本乘数倍数(×1.2 ~ ×20) | 基于行业经验与推文选题建议的推断 | ⚠️ 定性估算,不同场景差异显著,建议用实际日志核实 |
| ClaudeAPI 不支持 Message Batches API | ClaudeAPI 平台约束,经产品确认 | ✅ |



