Claude Batch API 实战:把批量任务成本砍掉一半,叠加缓存再降 90%
如果你的 Claude 调用里有任何一类任务符合"不需要立刻返回结果、可以容忍几分钟到几小时延迟"——批量数据清洗、文档结构化、用户评论分类、历史日志总结、AB 测试结果生成——那你多花了一倍的钱。
Message Batches API 是 Anthropic 给离线场景的官方 5 折通道,再叠加 prompt caching 能把综合成本压到不开缓存价的十分之一。本文给出完整工程方案。
一、Batch API 是什么

简单说:你把成百上千个 Messages 请求打包成一份 JSONL 文件提交,Anthropic 在 24 小时内异步处理完返回结果,所有 token 按 50% 价格计费。
| 维度 | 普通 Messages API | Batch API |
|---|---|---|
| 调用方式 | 同步,单次一个请求 | 异步,批量上千请求 |
| 响应时间 | 秒级 | 5 分钟 ~ 24 小时 |
| 计费 | 标准价 | 5 折 |
| 单批上限 | - | 10 万请求 / 256MB |
| 适用场景 | 用户实时交互 | 离线批处理 |
5 折是无条件的,没有"达到 X 量级才折扣"的门槛。一个 batch 里哪怕只有 10 个请求也是半价。
二、什么场景该用 Batch API

直接用对照表判断:
| 场景 | 用 Batch 吗 | 为什么 |
|---|---|---|
| 用户对话机器人 | ❌ | 实时性要求高 |
| 网站 IM 客服 | ❌ | 实时性要求高 |
| 一次性导入 50 万条历史评论做情感分类 | ✅ | 离线、量大、可等待 |
| 每天凌晨跑一次 SQL 转自然语言总结 | ✅ | 周期性、可等待 |
| 用户上传 PDF 后异步生成摘要 | ✅ | 可以告知用户"几分钟后通知" |
| 实时翻译 | ❌ | 实时性要求高 |
| AB 测试的合成数据生成(10k 条 prompt) | ✅ | 离线 |
| 训练数据标注(百万级) | ✅ | 离线、巨量 |
| 内部 RAG 知识库初始化向量+摘要 | ✅ | 一次性、可等待 |
| Agent 后台决策日志的事后归因分析 | ✅ | 离线 |
判断口诀:用户在屏幕前等着 → 不用 Batch;用户提交完去做别的事 → 用 Batch。
三、最简提交示例
Batch API 的请求体长这样:
import anthropic
client = anthropic.Anthropic(
api_key="sk-你的ClaudeAPI密钥",
base_url="https://gw.claudeapi.com"
)
batch = client.messages.batches.create(
requests=[
{
"custom_id": "task-001",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "把下面这段评论分类:「物流太慢了」"}
]
}
},
{
"custom_id": "task-002",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "把下面这段评论分类:「质量超出预期」"}
]
}
},
]
)
print(f"Batch ID: {batch.id}")
print(f"状态: {batch.processing_status}")
import anthropic
client = anthropic.Anthropic(
api_key="sk-你的ClaudeAPI密钥",
base_url="https://gw.claudeapi.com"
)
batch = client.messages.batches.create(
requests=[
{
"custom_id": "task-001",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "把下面这段评论分类:「物流太慢了」"}
]
}
},
{
"custom_id": "task-002",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "把下面这段评论分类:「质量超出预期」"}
]
}
},
]
)
print(f"Batch ID: {batch.id}")
print(f"状态: {batch.processing_status}")
几个关键字段:
| 字段 | 说明 |
|---|---|
custom_id |
你自己的请求标识,结果里会带回,用来对应原始数据。必须唯一 |
params |
一个完整的 Messages.create 参数对象,模型/max_tokens/messages 等照常写 |
processing_status |
in_progress / canceling / ended 三种 |
提交后 batch.id 立刻返回,但实际处理是异步的。
四、轮询与拉取结果
提交完不要傻等,写一个轮询:
import time
batch_id = batch.id
while True:
status = client.messages.batches.retrieve(batch_id)
counts = status.request_counts
print(f"[{status.processing_status}] "
f"完成 {counts.succeeded}/{counts.processing + counts.succeeded + counts.errored} "
f"失败 {counts.errored}")
if status.processing_status == "ended":
break
time.sleep(30)
results_url = status.results_url
print(f"结果地址:{results_url}")
import time
batch_id = batch.id
while True:
status = client.messages.batches.retrieve(batch_id)
counts = status.request_counts
print(f"[{status.processing_status}] "
f"完成 {counts.succeeded}/{counts.processing + counts.succeeded + counts.errored} "
f"失败 {counts.errored}")
if status.processing_status == "ended":
break
time.sleep(30)
results_url = status.results_url
print(f"结果地址:{results_url}")
request_counts 字段给出每个状态的请求数:
| 字段 | 含义 |
|---|---|
processing |
处理中 |
succeeded |
成功 |
errored |
失败 |
canceled |
被取消 |
expired |
超过 24 小时未完成(极少发生) |
拉结果是流式 JSONL:
for line in client.messages.batches.results(batch_id):
if line.result.type == "succeeded":
custom_id = line.custom_id
text = line.result.message.content[0].text
print(f"{custom_id} → {text}")
elif line.result.type == "errored":
print(f"{line.custom_id} 失败:{line.result.error}")
for line in client.messages.batches.results(batch_id):
if line.result.type == "succeeded":
custom_id = line.custom_id
text = line.result.message.content[0].text
print(f"{custom_id} → {text}")
elif line.result.type == "errored":
print(f"{line.custom_id} 失败:{line.result.error}")
custom_id 是你提交时给的,用它对应回原始数据行即可。
五、和 Prompt Caching 协同:把成本再砍 90%

Batch API 半价的基础上,所有支持 prompt caching 的请求在 batch 里同样有效。Caching 的命中读取价是原价的 10%,叠加 batch 半价后,命中部分实际是 原价的 5%。
最适用的模式:几千条数据共享同一份长 system prompt。
LONG_SYSTEM = """
你是一位资深的电商客服分析师,专门处理用户评论分类。
评论需要分到以下 12 个类别中的一个:
1. 物流速度(包含太慢、按时、超快等)
2. 商品质量(包含好评、差评、与描述不符等)
3. 客服态度
... (这里假设几千字的分类细则、示例、边界情况说明)
"""
requests = []
for i, comment in enumerate(comments):
requests.append({
"custom_id": f"review-{i:06d}",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 256,
"system": [
{
"type": "text",
"text": LONG_SYSTEM,
"cache_control": {"type": "ephemeral"}
}
],
"messages": [
{"role": "user", "content": f"分类这条评论:{comment}"}
]
}
})
batch = client.messages.batches.create(requests=requests)
LONG_SYSTEM = """
你是一位资深的电商客服分析师,专门处理用户评论分类。
评论需要分到以下 12 个类别中的一个:
1. 物流速度(包含太慢、按时、超快等)
2. 商品质量(包含好评、差评、与描述不符等)
3. 客服态度
... (这里假设几千字的分类细则、示例、边界情况说明)
"""
requests = []
for i, comment in enumerate(comments):
requests.append({
"custom_id": f"review-{i:06d}",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 256,
"system": [
{
"type": "text",
"text": LONG_SYSTEM,
"cache_control": {"type": "ephemeral"}
}
],
"messages": [
{"role": "user", "content": f"分类这条评论:{comment}"}
]
}
})
batch = client.messages.batches.create(requests=requests)
第一个请求会写入缓存(按原价计费),后续所有命中缓存的请求里 system 部分只按原价 10% 计费——再叠加 Batch 5 折,system 部分实际花原价的 5%。
简化算账(按 Sonnet 4.6 输入价 $3/M 估):
| 方案 | 5000 条 × 平均 3k token system | 实际成本 |
|---|---|---|
| 不开 Batch、不开 Caching | $3 × 15 = $45 | $45 |
| 只开 Batch | $1.5 × 15 = $22.5 | $22.5 |
| 只开 Caching | $3 × 1.5 + $0.3 × 13.5 = $8.55 | $8.55 |
| Batch + Caching | $1.5 × 1.5 + $0.15 × 13.5 = $4.28 | $4.28 |
5000 条评论从 $45 降到 $4.28,91% 折扣。数据量越大,绝对节省越夸张。
注:缓存的 TTL 是 5 分钟。Batch 任务通常在几分钟到几十分钟内完成,绝大多数情况都能命中。如果不放心,把第一个请求当作"预热"用同步 Messages API 先打一次,确保缓存生效再提交 batch。
六、完整工程封装
把上面这些拼成一个能直接用的工具函数:
import anthropic
import json
import time
from typing import List, Dict, Callable
class BatchRunner:
def __init__(self, api_key: str, base_url: str = "https://gw.claudeapi.com"):
self.client = anthropic.Anthropic(api_key=api_key, base_url=base_url)
def run(self,
items: List[Dict],
build_request: Callable[[Dict], Dict],
poll_interval: int = 30) -> Dict[str, str]:
"""
items: 原始数据列表
build_request: 把每条 item 转成 batch request 的函数(要返回含 custom_id + params 的 dict)
返回: {custom_id: 输出文本}
"""
requests = [build_request(item) for item in items]
batch = self.client.messages.batches.create(requests=requests)
print(f"Submitted batch {batch.id} with {len(requests)} requests")
while True:
status = self.client.messages.batches.retrieve(batch.id)
print(f" status={status.processing_status} "
f"succeeded={status.request_counts.succeeded} "
f"errored={status.request_counts.errored}")
if status.processing_status == "ended":
break
time.sleep(poll_interval)
results = {}
for line in self.client.messages.batches.results(batch.id):
if line.result.type == "succeeded":
results[line.custom_id] = line.result.message.content[0].text
else:
results[line.custom_id] = None
return results
# 使用示例
runner = BatchRunner(api_key="sk-你的ClaudeAPI密钥")
comments = [
{"id": 1, "text": "物流太慢了"},
{"id": 2, "text": "质量超出预期"},
]
def build(item):
return {
"custom_id": f"c-{item['id']}",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 256,
"system": [{
"type": "text",
"text": LONG_SYSTEM,
"cache_control": {"type": "ephemeral"}
}],
"messages": [{"role": "user", "content": item["text"]}]
}
}
outputs = runner.run(comments, build)
import anthropic
import json
import time
from typing import List, Dict, Callable
class BatchRunner:
def __init__(self, api_key: str, base_url: str = "https://gw.claudeapi.com"):
self.client = anthropic.Anthropic(api_key=api_key, base_url=base_url)
def run(self,
items: List[Dict],
build_request: Callable[[Dict], Dict],
poll_interval: int = 30) -> Dict[str, str]:
"""
items: 原始数据列表
build_request: 把每条 item 转成 batch request 的函数(要返回含 custom_id + params 的 dict)
返回: {custom_id: 输出文本}
"""
requests = [build_request(item) for item in items]
batch = self.client.messages.batches.create(requests=requests)
print(f"Submitted batch {batch.id} with {len(requests)} requests")
while True:
status = self.client.messages.batches.retrieve(batch.id)
print(f" status={status.processing_status} "
f"succeeded={status.request_counts.succeeded} "
f"errored={status.request_counts.errored}")
if status.processing_status == "ended":
break
time.sleep(poll_interval)
results = {}
for line in self.client.messages.batches.results(batch.id):
if line.result.type == "succeeded":
results[line.custom_id] = line.result.message.content[0].text
else:
results[line.custom_id] = None
return results
# 使用示例
runner = BatchRunner(api_key="sk-你的ClaudeAPI密钥")
comments = [
{"id": 1, "text": "物流太慢了"},
{"id": 2, "text": "质量超出预期"},
]
def build(item):
return {
"custom_id": f"c-{item['id']}",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 256,
"system": [{
"type": "text",
"text": LONG_SYSTEM,
"cache_control": {"type": "ephemeral"}
}],
"messages": [{"role": "user", "content": item["text"]}]
}
}
outputs = runner.run(comments, build)
七、踩坑与注意事项
坑 1:custom_id 必须全 batch 内唯一。用 f"task-{i:08d}" 这种格式化数字最安全,不要用业务 ID 直接当 custom_id——业务 ID 容易有重复或非法字符。
坑 2:单 batch 不要超 10 万请求 / 256MB。超了会被直接拒绝。建议每批 5k-2w 条,更容易排查问题。
坑 3:Batch API 不能用于实时场景做"伪同步"。哪怕你提交 1 条请求,也可能等十几秒到几分钟才返回。Anthropic 没承诺"少量请求就一定快"。
坑 4:失败请求不会重试。errored 状态的请求要你自己捞出来重新提交。建议在客户端记录 custom_id → 原始数据 的映射,方便重跑。
坑 5:Caching 在 batch 里命中的前提是 system / tools 块完全一致。哪怕只差一个空格,都会被认为是新的缓存键。把要缓存的部分抽成常量字符串严格固定。
坑 6:Batch 不支持 streaming。所有请求结果都是完整生成完返回。如果你的提示词容易诱导超长输出,记得设合理的 max_tokens 上限。
坑 7:模型在 batch 里和同步 API 行为不完全一致。stop_sequences、temperature 都生效,但偶尔会观察到 batch 模型版本与同步 API 的细微差异(Anthropic 会同步部署,但批量任务可能落到不同副本)。重要场景做一遍 1k 样本的离线 A/B。
八、模型选型参考
| 场景 | 推荐模型 | 原因 |
|---|---|---|
| 大量评论 / 短文本分类 | Haiku 4.5 | 5 折后单条几分钱,量大不心疼 |
| 长文档摘要 / RAG 初始化 | Sonnet 4.6 | 质量 + 价格甜点,叠加 caching 性价比最高 |
| 复杂分析 / 推理类批量任务 | Opus 4.7 | 必要时再用,注意预算 |
| 1M 长上下文批量处理 | Opus 4.7 / Sonnet 4.6 | 配合 caching 是杀手锏 |
经验法则:能用 Haiku 跑过验证就不用 Sonnet;能用 Sonnet 就不用 Opus。Batch + Caching 的乘法效应让"用对的模型"价值放大一倍。
小结
Batch API 是 Anthropic 给离线场景的官方 5 折通道,叠加 prompt caching 之后能让长 system + 海量条目的处理成本降到原价 5-10%。判断标准就一条——用户是否在屏幕前等着拿结果。能等就上 Batch,能复用就上 Caching,两者叠加几乎没有副作用。
国内开发者通过 claudeapi.com 可直接调用 Batch API 与 Caching,base_url 替换为 https://gw.claudeapi.com 即可,SDK 代码与官方完全兼容,按官方汇率折人民币,支付宝、微信充值,国内直连低延迟。



