跳转到主内容

Claude Batch API 实战:把批量任务成本砍掉一半,叠加缓存再降 90%

用 Message Batches API 跑离线大批量任务,官方价格直接 5 折,再叠加 prompt caching 能把综合成本压到不开缓存价的十分之一。本文给出适用场景、提交-轮询完整代码、与 prompt caching 协同的工程方案,以及踩坑清单。

开发指南Claude API 国内Claude Opus 4.7预计阅读10分钟
2026.05.15 发表
Claude Batch API 实战:把批量任务成本砍掉一半,叠加缓存再降 90%

Claude Batch API 实战:把批量任务成本砍掉一半,叠加缓存再降 90%

如果你的 Claude 调用里有任何一类任务符合"不需要立刻返回结果、可以容忍几分钟到几小时延迟"——批量数据清洗、文档结构化、用户评论分类、历史日志总结、AB 测试结果生成——那你多花了一倍的钱

Message Batches API 是 Anthropic 给离线场景的官方 5 折通道,再叠加 prompt caching 能把综合成本压到不开缓存价的十分之一。本文给出完整工程方案。


一、Batch API 是什么

简单说:你把成百上千个 Messages 请求打包成一份 JSONL 文件提交,Anthropic 在 24 小时内异步处理完返回结果,所有 token 按 50% 价格计费。

维度 普通 Messages API Batch API
调用方式 同步,单次一个请求 异步,批量上千请求
响应时间 秒级 5 分钟 ~ 24 小时
计费 标准价 5 折
单批上限 - 10 万请求 / 256MB
适用场景 用户实时交互 离线批处理

5 折是无条件的,没有"达到 X 量级才折扣"的门槛。一个 batch 里哪怕只有 10 个请求也是半价。


二、什么场景该用 Batch API

直接用对照表判断:

场景 用 Batch 吗 为什么
用户对话机器人 实时性要求高
网站 IM 客服 实时性要求高
一次性导入 50 万条历史评论做情感分类 离线、量大、可等待
每天凌晨跑一次 SQL 转自然语言总结 周期性、可等待
用户上传 PDF 后异步生成摘要 可以告知用户"几分钟后通知"
实时翻译 实时性要求高
AB 测试的合成数据生成(10k 条 prompt) 离线
训练数据标注(百万级) 离线、巨量
内部 RAG 知识库初始化向量+摘要 一次性、可等待
Agent 后台决策日志的事后归因分析 离线

判断口诀:用户在屏幕前等着 → 不用 Batch;用户提交完去做别的事 → 用 Batch。


三、最简提交示例

Batch API 的请求体长这样:

import anthropic

client = anthropic.Anthropic(
    api_key="sk-你的ClaudeAPI密钥",
    base_url="https://gw.claudeapi.com"
)

batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": "task-001",
            "params": {
                "model": "claude-sonnet-4-6",
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "把下面这段评论分类:「物流太慢了」"}
                ]
            }
        },
        {
            "custom_id": "task-002",
            "params": {
                "model": "claude-sonnet-4-6",
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "把下面这段评论分类:「质量超出预期」"}
                ]
            }
        },
    ]
)

print(f"Batch ID: {batch.id}")
print(f"状态: {batch.processing_status}")
import anthropic

client = anthropic.Anthropic(
    api_key="sk-你的ClaudeAPI密钥",
    base_url="https://gw.claudeapi.com"
)

batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": "task-001",
            "params": {
                "model": "claude-sonnet-4-6",
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "把下面这段评论分类:「物流太慢了」"}
                ]
            }
        },
        {
            "custom_id": "task-002",
            "params": {
                "model": "claude-sonnet-4-6",
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "把下面这段评论分类:「质量超出预期」"}
                ]
            }
        },
    ]
)

print(f"Batch ID: {batch.id}")
print(f"状态: {batch.processing_status}")

几个关键字段:

字段 说明
custom_id 你自己的请求标识,结果里会带回,用来对应原始数据。必须唯一
params 一个完整的 Messages.create 参数对象,模型/max_tokens/messages 等照常写
processing_status in_progress / canceling / ended 三种

提交后 batch.id 立刻返回,但实际处理是异步的。


四、轮询与拉取结果

提交完不要傻等,写一个轮询:

import time

batch_id = batch.id

while True:
    status = client.messages.batches.retrieve(batch_id)
    counts = status.request_counts
    print(f"[{status.processing_status}] "
          f"完成 {counts.succeeded}/{counts.processing + counts.succeeded + counts.errored} "
          f"失败 {counts.errored}")

    if status.processing_status == "ended":
        break

    time.sleep(30)

results_url = status.results_url
print(f"结果地址:{results_url}")
import time

batch_id = batch.id

while True:
    status = client.messages.batches.retrieve(batch_id)
    counts = status.request_counts
    print(f"[{status.processing_status}] "
          f"完成 {counts.succeeded}/{counts.processing + counts.succeeded + counts.errored} "
          f"失败 {counts.errored}")

    if status.processing_status == "ended":
        break

    time.sleep(30)

results_url = status.results_url
print(f"结果地址:{results_url}")

request_counts 字段给出每个状态的请求数:

字段 含义
processing 处理中
succeeded 成功
errored 失败
canceled 被取消
expired 超过 24 小时未完成(极少发生)

拉结果是流式 JSONL:

for line in client.messages.batches.results(batch_id):
    if line.result.type == "succeeded":
        custom_id = line.custom_id
        text = line.result.message.content[0].text
        print(f"{custom_id}{text}")
    elif line.result.type == "errored":
        print(f"{line.custom_id} 失败:{line.result.error}")
for line in client.messages.batches.results(batch_id):
    if line.result.type == "succeeded":
        custom_id = line.custom_id
        text = line.result.message.content[0].text
        print(f"{custom_id}{text}")
    elif line.result.type == "errored":
        print(f"{line.custom_id} 失败:{line.result.error}")

custom_id 是你提交时给的,用它对应回原始数据行即可。


五、和 Prompt Caching 协同:把成本再砍 90%

Batch API 半价的基础上,所有支持 prompt caching 的请求在 batch 里同样有效。Caching 的命中读取价是原价的 10%,叠加 batch 半价后,命中部分实际是 原价的 5%

最适用的模式:几千条数据共享同一份长 system prompt

LONG_SYSTEM = """
你是一位资深的电商客服分析师,专门处理用户评论分类。
评论需要分到以下 12 个类别中的一个:
1. 物流速度(包含太慢、按时、超快等)
2. 商品质量(包含好评、差评、与描述不符等)
3. 客服态度
... (这里假设几千字的分类细则、示例、边界情况说明)
"""

requests = []
for i, comment in enumerate(comments):
    requests.append({
        "custom_id": f"review-{i:06d}",
        "params": {
            "model": "claude-sonnet-4-6",
            "max_tokens": 256,
            "system": [
                {
                    "type": "text",
                    "text": LONG_SYSTEM,
                    "cache_control": {"type": "ephemeral"}
                }
            ],
            "messages": [
                {"role": "user", "content": f"分类这条评论:{comment}"}
            ]
        }
    })

batch = client.messages.batches.create(requests=requests)
LONG_SYSTEM = """
你是一位资深的电商客服分析师,专门处理用户评论分类。
评论需要分到以下 12 个类别中的一个:
1. 物流速度(包含太慢、按时、超快等)
2. 商品质量(包含好评、差评、与描述不符等)
3. 客服态度
... (这里假设几千字的分类细则、示例、边界情况说明)
"""

requests = []
for i, comment in enumerate(comments):
    requests.append({
        "custom_id": f"review-{i:06d}",
        "params": {
            "model": "claude-sonnet-4-6",
            "max_tokens": 256,
            "system": [
                {
                    "type": "text",
                    "text": LONG_SYSTEM,
                    "cache_control": {"type": "ephemeral"}
                }
            ],
            "messages": [
                {"role": "user", "content": f"分类这条评论:{comment}"}
            ]
        }
    })

batch = client.messages.batches.create(requests=requests)

第一个请求会写入缓存(按原价计费),后续所有命中缓存的请求里 system 部分只按原价 10% 计费——再叠加 Batch 5 折,system 部分实际花原价的 5%

简化算账(按 Sonnet 4.6 输入价 $3/M 估):

方案 5000 条 × 平均 3k token system 实际成本
不开 Batch、不开 Caching $3 × 15 = $45 $45
只开 Batch $1.5 × 15 = $22.5 $22.5
只开 Caching $3 × 1.5 + $0.3 × 13.5 = $8.55 $8.55
Batch + Caching $1.5 × 1.5 + $0.15 × 13.5 = $4.28 $4.28

5000 条评论从 $45 降到 $4.28,91% 折扣。数据量越大,绝对节省越夸张。

注:缓存的 TTL 是 5 分钟。Batch 任务通常在几分钟到几十分钟内完成,绝大多数情况都能命中。如果不放心,把第一个请求当作"预热"用同步 Messages API 先打一次,确保缓存生效再提交 batch。


六、完整工程封装

把上面这些拼成一个能直接用的工具函数:

import anthropic
import json
import time
from typing import List, Dict, Callable

class BatchRunner:
    def __init__(self, api_key: str, base_url: str = "https://gw.claudeapi.com"):
        self.client = anthropic.Anthropic(api_key=api_key, base_url=base_url)

    def run(self,
            items: List[Dict],
            build_request: Callable[[Dict], Dict],
            poll_interval: int = 30) -> Dict[str, str]:
        """
        items: 原始数据列表
        build_request: 把每条 item 转成 batch request 的函数(要返回含 custom_id + params 的 dict)
        返回: {custom_id: 输出文本}
        """
        requests = [build_request(item) for item in items]
        batch = self.client.messages.batches.create(requests=requests)
        print(f"Submitted batch {batch.id} with {len(requests)} requests")

        while True:
            status = self.client.messages.batches.retrieve(batch.id)
            print(f"  status={status.processing_status} "
                  f"succeeded={status.request_counts.succeeded} "
                  f"errored={status.request_counts.errored}")
            if status.processing_status == "ended":
                break
            time.sleep(poll_interval)

        results = {}
        for line in self.client.messages.batches.results(batch.id):
            if line.result.type == "succeeded":
                results[line.custom_id] = line.result.message.content[0].text
            else:
                results[line.custom_id] = None
        return results


# 使用示例
runner = BatchRunner(api_key="sk-你的ClaudeAPI密钥")

comments = [
    {"id": 1, "text": "物流太慢了"},
    {"id": 2, "text": "质量超出预期"},
]

def build(item):
    return {
        "custom_id": f"c-{item['id']}",
        "params": {
            "model": "claude-sonnet-4-6",
            "max_tokens": 256,
            "system": [{
                "type": "text",
                "text": LONG_SYSTEM,
                "cache_control": {"type": "ephemeral"}
            }],
            "messages": [{"role": "user", "content": item["text"]}]
        }
    }

outputs = runner.run(comments, build)
import anthropic
import json
import time
from typing import List, Dict, Callable

class BatchRunner:
    def __init__(self, api_key: str, base_url: str = "https://gw.claudeapi.com"):
        self.client = anthropic.Anthropic(api_key=api_key, base_url=base_url)

    def run(self,
            items: List[Dict],
            build_request: Callable[[Dict], Dict],
            poll_interval: int = 30) -> Dict[str, str]:
        """
        items: 原始数据列表
        build_request: 把每条 item 转成 batch request 的函数(要返回含 custom_id + params 的 dict)
        返回: {custom_id: 输出文本}
        """
        requests = [build_request(item) for item in items]
        batch = self.client.messages.batches.create(requests=requests)
        print(f"Submitted batch {batch.id} with {len(requests)} requests")

        while True:
            status = self.client.messages.batches.retrieve(batch.id)
            print(f"  status={status.processing_status} "
                  f"succeeded={status.request_counts.succeeded} "
                  f"errored={status.request_counts.errored}")
            if status.processing_status == "ended":
                break
            time.sleep(poll_interval)

        results = {}
        for line in self.client.messages.batches.results(batch.id):
            if line.result.type == "succeeded":
                results[line.custom_id] = line.result.message.content[0].text
            else:
                results[line.custom_id] = None
        return results


# 使用示例
runner = BatchRunner(api_key="sk-你的ClaudeAPI密钥")

comments = [
    {"id": 1, "text": "物流太慢了"},
    {"id": 2, "text": "质量超出预期"},
]

def build(item):
    return {
        "custom_id": f"c-{item['id']}",
        "params": {
            "model": "claude-sonnet-4-6",
            "max_tokens": 256,
            "system": [{
                "type": "text",
                "text": LONG_SYSTEM,
                "cache_control": {"type": "ephemeral"}
            }],
            "messages": [{"role": "user", "content": item["text"]}]
        }
    }

outputs = runner.run(comments, build)

七、踩坑与注意事项

坑 1:custom_id 必须全 batch 内唯一。用 f"task-{i:08d}" 这种格式化数字最安全,不要用业务 ID 直接当 custom_id——业务 ID 容易有重复或非法字符。

坑 2:单 batch 不要超 10 万请求 / 256MB。超了会被直接拒绝。建议每批 5k-2w 条,更容易排查问题。

坑 3:Batch API 不能用于实时场景做"伪同步"。哪怕你提交 1 条请求,也可能等十几秒到几分钟才返回。Anthropic 没承诺"少量请求就一定快"。

坑 4:失败请求不会重试errored 状态的请求要你自己捞出来重新提交。建议在客户端记录 custom_id → 原始数据 的映射,方便重跑。

坑 5:Caching 在 batch 里命中的前提是 system / tools 块完全一致。哪怕只差一个空格,都会被认为是新的缓存键。把要缓存的部分抽成常量字符串严格固定。

坑 6:Batch 不支持 streaming。所有请求结果都是完整生成完返回。如果你的提示词容易诱导超长输出,记得设合理的 max_tokens 上限。

坑 7:模型在 batch 里和同步 API 行为不完全一致stop_sequencestemperature 都生效,但偶尔会观察到 batch 模型版本与同步 API 的细微差异(Anthropic 会同步部署,但批量任务可能落到不同副本)。重要场景做一遍 1k 样本的离线 A/B。


八、模型选型参考

场景 推荐模型 原因
大量评论 / 短文本分类 Haiku 4.5 5 折后单条几分钱,量大不心疼
长文档摘要 / RAG 初始化 Sonnet 4.6 质量 + 价格甜点,叠加 caching 性价比最高
复杂分析 / 推理类批量任务 Opus 4.7 必要时再用,注意预算
1M 长上下文批量处理 Opus 4.7 / Sonnet 4.6 配合 caching 是杀手锏

经验法则:能用 Haiku 跑过验证就不用 Sonnet;能用 Sonnet 就不用 Opus。Batch + Caching 的乘法效应让"用对的模型"价值放大一倍。


小结

Batch API 是 Anthropic 给离线场景的官方 5 折通道,叠加 prompt caching 之后能让长 system + 海量条目的处理成本降到原价 5-10%。判断标准就一条——用户是否在屏幕前等着拿结果。能等就上 Batch,能复用就上 Caching,两者叠加几乎没有副作用。

国内开发者通过 claudeapi.com 可直接调用 Batch API 与 Caching,base_url 替换为 https://gw.claudeapi.com 即可,SDK 代码与官方完全兼容,按官方汇率折人民币,支付宝、微信充值,国内直连低延迟。

相关文章