速率限制
了解 API 速率限制及其管理方式。合理的速率限制有助于保障平台稳定性和所有用户的公平使用体验。
工作原理
速率限制通过 滑动窗口算法 控制单位时间内的请求数量。当请求超出限制时,API 将返回 429 Too Many Requests 错误。
速率限制从两个维度进行控制:
- RPM(Requests Per Minute):每分钟允许的最大请求数
- 并发数:同时允许进行中的最大请求数
两个维度中任意一个达到上限,后续请求即被拒绝。
各模型限制
各模型有独立的速率限制:
文本生成模型
| 模型 | RPM | 并发数 | 说明 |
|---|---|---|---|
| deepseek-v4-flash | 100 | 20 | 高速模型 |
| deepseek-v4-pro | 30 | 5 | 高性能模型 |
| qwen3.7-max | 60 | 10 | 通用模型 |
| glm-5.7 | 60 | 10 | 通用模型 |
| kimi-k2.6 | 30 | 5 | 长上下文模型 |
| minimax-m3 | 30 | 5 | 通用模型 |
图片生成模型
| 模型 | RPM | 并发数 | 说明 |
|---|---|---|---|
| 可图 (Kolors) | 30 | 5 | 快速生图 |
| gpt-image-2 | 20 | 3 | 高质量生图 |
| nano-banana-2 | 10 | 2 | 高质量生图 |
| doubao-seedream-4.0 | 10 | 2 | 豆包生图 |
| doubao-seedream-5.0-lite | 10 | 2 | 豆包生图 |
| doubao-seedream-4.5 | 10 | 2 | 豆包生图 |
视频生成模型
| 模型 | RPM | 每日上限 | 并发数 | 说明 |
|---|---|---|---|---|
| veo-3 | 5 | 100 | 2 | Google Veo |
| wanx2.1-t2v-turbo | 10 | 200 | 3 | 万相 2.7 |
| doubao-seedance-2-0 | 5 | 100 | 2 | Doubao Seedance 2.0 |
联网搜索
| 模型 | RPM | 并发数 | 说明 |
|---|---|---|---|
| web-search | 30 | 5 | 按次计费 |
速率限制响应头
当请求触发速率限制时,响应中会包含以下 HTTP 头:
| 响应头 | 说明 | 示例 |
|---|---|---|
X-RateLimit-Limit | 当前窗口允许的最大请求数 | 30 |
X-RateLimit-Remaining | 当前窗口剩余的请求数 | 0 |
X-RateLimit-Reset | 窗口重置的 Unix 时间戳 | 1705312260 |
Retry-After | 建议等待的秒数(仅 429 响应) | 30 |
你可以利用这些响应头在客户端主动进行流量控制。
处理速率限制
指数退避重试
当收到 429 错误时,推荐使用带抖动的指数退避策略:
python
import time
import random
import requests
def call_with_retry(url, headers, payload, max_retries=5):
for attempt in range(max_retries):
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 429:
# 优先使用服务器建议的等待时间
retry_after = response.headers.get("Retry-After")
if retry_after:
wait = int(retry_after)
else:
# 指数退避 + 随机抖动
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"触发速率限制,等待 {wait:.1f} 秒后重试...")
time.sleep(wait)
continue
return response
raise Exception("达到最大重试次数")javascript
async function callWithRetry(url, headers, payload, maxRetries = 5) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(payload),
})
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After')
const wait = retryAfter ? parseInt(retryAfter) : Math.pow(2, attempt) + Math.random()
console.log(`触发速率限制,等待 ${wait.toFixed(1)} 秒后重试...`)
await new Promise((r) => setTimeout(r, wait * 1000))
continue
}
return response
}
throw new Error('达到最大重试次数')
}客户端限流
在应用层主动控制请求速率,避免触发服务端限制:
python
import time
from threading import Lock
class RateLimiter:
def __init__(self, max_requests, window_seconds=60):
self.max_requests = max_requests
self.window = window_seconds
self.requests = []
self.lock = Lock()
def acquire(self):
with self.lock:
now = time.time()
# 清除窗口外的记录
self.requests = [t for t in self.requests if now - t < self.window]
if len(self.requests) >= self.max_requests:
wait = self.window - (now - self.requests[0])
if wait > 0:
time.sleep(wait)
self.requests.append(time.time())最佳实践
- 主动监控响应头:读取
X-RateLimit-Remaining,在额度耗尽前主动降速 - 实现请求队列:对高并发场景使用队列控制请求流速
- 缓存结果:对相同查询缓存结果,减少重复请求
- 使用
Retry-After:收到 429 响应时,优先使用服务器返回的Retry-After值 - 区分错误类型:只对 429 和 5xx 错误重试,400 错误应修复请求
- 异步任务不受 RPM 限制:视频生成等异步任务受并发数限制,轮询状态不计入 RPM