import requests
import json
# 1. 定义API端点
api_url = "https://api.github.com/users/octocat"
# 2. 发送HTTP请求
response = requests.get(api_url)
# 3. 检查响应状态
if response.status_code == 200:
# 4. 解析响应数据(通常是JSON)
user_data = response.json()
# 5. 使用数据
print(f"用户名: {user_data['login']}")
print(f"姓名: {user_data.get('name', '未知')}")
print(f"仓库数: {user_data['public_repos']}")
print(f"关注者: {user_data['followers']}")
# 6. 保存数据
with open('user_data.json', 'w') as f:
json.dump(user_data, f, indent=2)
else:
print(f"请求失败,状态码: {response.status_code}")
print(f"错误信息: {response.text}")
REST(Representational State Transfer)是一种API设计风格,它使用标准的HTTP方法对资源进行操作。
| HTTP方法 | 操作 | 示例端点 | 描述 |
|---|---|---|---|
| GET | 读取/检索 | /api/users |
获取用户列表 |
| POST | 创建 | /api/users |
创建新用户 |
| GET | 读取/检索 | /api/users/{id} |
获取特定用户 |
| PUT | 更新/替换 | /api/users/{id} |
完全更新用户 |
| PATCH | 部分更新 | /api/users/{id} |
部分更新用户 |
| DELETE | 删除 | /api/users/{id} |
删除用户 |
import requests
import json
class UserAPI:
"""用户API客户端"""
def __init__(self, base_url, api_key=None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
if api_key:
self.session.headers.update({
'Authorization': f'Bearer {api_key}'
})
# 设置通用头
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})
# 1. GET - 获取用户列表
def get_users(self, page=1, limit=10):
"""获取用户列表"""
params = {
'page': page,
'limit': limit
}
response = self.session.get(
f'{self.base_url}/users',
params=params
)
response.raise_for_status()
return response.json()
# 2. GET - 获取特定用户
def get_user(self, user_id):
"""获取特定用户"""
response = self.session.get(
f'{self.base_url}/users/{user_id}'
)
response.raise_for_status()
return response.json()
# 3. POST - 创建用户
def create_user(self, user_data):
"""创建新用户"""
response = self.session.post(
f'{self.base_url}/users',
json=user_data # Requests会自动序列化为JSON
)
response.raise_for_status()
return response.json()
# 4. PUT - 更新用户(完全替换)
def update_user(self, user_id, user_data):
"""完全更新用户"""
response = self.session.put(
f'{self.base_url}/users/{user_id}',
json=user_data
)
response.raise_for_status()
return response.json()
# 5. PATCH - 部分更新用户
def patch_user(self, user_id, partial_data):
"""部分更新用户"""
response = self.session.patch(
f'{self.base_url}/users/{user_id}',
json=partial_data
)
response.raise_for_status()
return response.json()
# 6. DELETE - 删除用户
def delete_user(self, user_id):
"""删除用户"""
response = self.session.delete(
f'{self.base_url}/users/{user_id}'
)
# DELETE通常返回204 No Content
if response.status_code == 204:
return True
response.raise_for_status()
return response.json()
# 使用示例
if __name__ == "__main__":
api = UserAPI('https://api.example.com/v1', api_key='your_api_key')
try:
# 获取用户列表
users = api.get_users(page=1, limit=5)
print(f"获取到 {len(users.get('data', []))} 个用户")
# 创建新用户
new_user = {
'name': '张三',
'email': 'zhangsan@example.com',
'age': 25
}
created_user = api.create_user(new_user)
print(f"创建用户成功,ID: {created_user.get('id')}")
# 更新用户
update_data = {
'name': '李四',
'age': 26
}
updated_user = api.update_user(created_user['id'], update_data)
print(f"更新用户成功: {updated_user.get('name')}")
# 删除用户
if api.delete_user(created_user['id']):
print("删除用户成功")
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
if e.response:
print(f"响应状态码: {e.response.status_code}")
print(f"错误详情: {e.response.text}")
except Exception as e:
print(f"其他错误: {e}")
用途: 从服务器获取数据,不应该改变服务器状态。
特点:
# GET请求示例
params = {'page': 1, 'limit': 10}
response = requests.get(
'https://api.example.com/users',
params=params
)
用途: 向服务器提交数据,通常用于创建新资源。
特点:
# POST请求示例
data = {'name': '张三', 'email': 'test@example.com'}
response = requests.post(
'https://api.example.com/users',
json=data # 或使用 data=json.dumps(data)
)
用途: 更新整个资源,需要提供完整数据。
特点:
用途: 更新资源的部分字段。
特点:
用途: 删除服务器上的资源。
特点:
| 方法 | 安全性 | 幂等性 | 是否缓存 | 请求体 | 典型状态码 |
|---|---|---|---|---|---|
| GET | 是 | 是 | 是 | 无 | 200 OK |
| POST | 否 | 否 | 否 | 有 | 201 Created |
| PUT | 否 | 是 | 否 | 有 | 200 OK, 204 No Content |
| PATCH | 否 | 是 | 否 | 有 | 200 OK |
| DELETE | 否 | 是 | 否 | 可有 | 204 No Content |
向API发送数据时,需要选择合适的数据格式。最常用的是JSON,但有时也需要使用表单数据或文件上传。
import requests
import json
from requests_toolbelt.multipart.encoder import MultipartEncoder
# 1. JSON数据(最常用)
def send_json_data():
"""发送JSON格式数据"""
data = {
'name': '张三',
'email': 'zhangsan@example.com',
'age': 25,
'hobbies': ['编程', '阅读', '运动']
}
# 方法1: 使用json参数(推荐)
response = requests.post(
'https://api.example.com/users',
json=data, # Requests会自动设置Content-Type为application/json
headers={'Accept': 'application/json'}
)
# 方法2: 手动序列化JSON
response = requests.post(
'https://api.example.com/users',
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Accept': 'application/json'
}
)
return response
# 2. 表单数据(application/x-www-form-urlencoded)
def send_form_data():
"""发送表单数据"""
form_data = {
'username': 'zhangsan',
'password': 'password123',
'remember': 'true'
}
response = requests.post(
'https://api.example.com/login',
data=form_data, # 使用data参数,Content-Type会自动设置为application/x-www-form-urlencoded
headers={'Accept': 'application/json'}
)
return response
# 3. 多部分表单数据(multipart/form-data)
def send_multipart_form_data():
"""发送多部分表单数据,通常用于文件上传"""
# 方法1: 使用files参数(简单文件上传)
files = {'file': open('document.pdf', 'rb')}
data = {'description': '重要文档'}
response = requests.post(
'https://api.example.com/upload',
files=files,
data=data
)
# 方法2: 使用MultartEncoder(复杂情况)
multipart_data = MultipartEncoder(
fields={
'file': ('document.pdf', open('document.pdf', 'rb'), 'application/pdf'),
'description': '重要文档',
'tags': 'pdf,文档'
}
)
response = requests.post(
'https://api.example.com/upload',
data=multipart_data,
headers={'Content-Type': multipart_data.content_type}
)
return response
# 4. URL编码参数(查询字符串)
def send_query_params():
"""通过URL查询字符串发送参数"""
params = {
'q': 'python requests',
'page': 1,
'limit': 10,
'sort': 'date',
'order': 'desc'
}
response = requests.get(
'https://api.example.com/search',
params=params
)
return response
# 5. 自定义数据格式
def send_custom_data():
"""发送自定义格式数据"""
# XML数据
xml_data = '''<?xml version="1.0" encoding="UTF-8"?>
张三
zhangsan@example.com
25
'''
response = requests.post(
'https://api.example.com/users',
data=xml_data,
headers={
'Content-Type': 'application/xml',
'Accept': 'application/xml'
}
)
# YAML数据(需要PyYAML库)
# import yaml
# yaml_data = yaml.dump({'name': '张三', 'age': 25})
return response
# 6. 流式数据(大文件或实时数据)
def send_stream_data():
"""发送流式数据"""
# 生成器函数,用于生成数据流
def generate_data():
for i in range(10):
yield json.dumps({'chunk': i, 'data': 'x' * 100}).encode('utf-8')
yield b'\n' # 分隔符
response = requests.post(
'https://api.example.com/stream',
data=generate_data(),
headers={'Content-Type': 'application/x-ndjson'} # Newline Delimited JSON
)
return response
# 使用示例
if __name__ == "__main__":
print("测试不同数据格式的发送...")
# 测试JSON数据
try:
resp = send_json_data()
print(f"JSON请求状态码: {resp.status_code}")
except Exception as e:
print(f"JSON请求失败: {e}")
# 测试表单数据
try:
resp = send_form_data()
print(f"表单数据请求状态码: {resp.status_code}")
except Exception as e:
print(f"表单数据请求失败: {e}")
正确的Content-Type头部对于API调用至关重要,它告诉服务器如何解析请求数据。
json参数会自动设置data参数会自动设置files参数会自动设置正确处理API响应是API调用的关键。这包括解析数据、处理状态码、处理分页等。
import requests
import json
from datetime import datetime
from typing import Dict, Any, List, Optional
class APIResponseHandler:
"""API响应处理器"""
@staticmethod
def handle_response(response: requests.Response) -> Dict[str, Any]:
"""处理API响应"""
# 记录请求信息(用于调试)
request_info = {
'url': response.request.url,
'method': response.request.method,
'headers': dict(response.request.headers),
'body': response.request.body
}
print(f"请求信息: {json.dumps(request_info, indent=2)}")
# 检查HTTP状态码
if not response.ok:
return APIResponseHandler.handle_error(response)
# 检查Content-Type
content_type = response.headers.get('content-type', '')
# 处理不同内容类型的响应
if 'application/json' in content_type:
return APIResponseHandler.handle_json_response(response)
elif 'application/xml' in content_type or 'text/xml' in content_type:
return APIResponseHandler.handle_xml_response(response)
elif 'text/plain' in content_type:
return APIResponseHandler.handle_text_response(response)
elif 'application/octet-stream' in content_type:
return APIResponseHandler.handle_binary_response(response)
else:
# 未知类型,尝试自动检测
return APIResponseHandler.handle_unknown_response(response)
@staticmethod
def handle_json_response(response: requests.Response) -> Dict[str, Any]:
"""处理JSON响应"""
try:
data = response.json()
# 标准化响应格式
standardized = {
'success': True,
'status_code': response.status_code,
'data': data,
'headers': dict(response.headers),
'timestamp': datetime.now().isoformat(),
'response_time': response.elapsed.total_seconds()
}
# 检查是否有错误信息(即使状态码是200)
if isinstance(data, dict) and 'error' in data:
standardized['success'] = False
standardized['error'] = data['error']
# 提取分页信息(如果存在)
if isinstance(data, dict) and 'pagination' in data:
standardized['pagination'] = data['pagination']
return standardized
except json.JSONDecodeError as e:
return {
'success': False,
'status_code': response.status_code,
'error': f'JSON解析错误: {e}',
'raw_response': response.text[:500], # 只保留前500字符
'headers': dict(response.headers)
}
@staticmethod
def handle_xml_response(response: requests.Response) -> Dict[str, Any]:
"""处理XML响应"""
try:
# 需要安装xmltodict: pip install xmltodict
import xmltodict
data = xmltodict.parse(response.text)
return {
'success': True,
'status_code': response.status_code,
'data': data,
'headers': dict(response.headers),
'format': 'xml'
}
except ImportError:
return {
'success': False,
'status_code': response.status_code,
'error': '需要xmltodict库来处理XML响应',
'raw_response': response.text[:500]
}
except Exception as e:
return {
'success': False,
'status_code': response.status_code,
'error': f'XML解析错误: {e}',
'raw_response': response.text[:500]
}
@staticmethod
def handle_text_response(response: requests.Response) -> Dict[str, Any]:
"""处理文本响应"""
return {
'success': True,
'status_code': response.status_code,
'data': response.text,
'headers': dict(response.headers),
'format': 'text'
}
@staticmethod
def handle_binary_response(response: requests.Response) -> Dict[str, Any]:
"""处理二进制响应(如下载文件)"""
# 获取文件名(从Content-Disposition头或URL)
content_disposition = response.headers.get('content-disposition', '')
filename = None
if 'filename=' in content_disposition:
filename = content_disposition.split('filename=')[1].strip('"\'')
if not filename:
# 从URL提取文件名
import os
from urllib.parse import urlparse
url_path = urlparse(response.request.url).path
filename = os.path.basename(url_path) or 'download.bin'
# 保存文件
with open(filename, 'wb') as f:
f.write(response.content)
return {
'success': True,
'status_code': response.status_code,
'data': f'文件已保存为: {filename}',
'filename': filename,
'content_length': len(response.content),
'headers': dict(response.headers)
}
@staticmethod
def handle_unknown_response(response: requests.Response) -> Dict[str, Any]:
"""处理未知类型的响应"""
# 尝试猜测类型
content = response.content
# 检查是否是JSON
try:
data = json.loads(content.decode('utf-8'))
return APIResponseHandler.handle_json_response(response)
except:
pass
# 检查是否是文本
try:
text = content.decode('utf-8')
return {
'success': True,
'status_code': response.status_code,
'data': text,
'headers': dict(response.headers),
'format': 'unknown (treated as text)'
}
except:
# 可能是二进制数据
return {
'success': True,
'status_code': response.status_code,
'data': f'二进制数据,大小: {len(content)} 字节',
'headers': dict(response.headers),
'format': 'binary'
}
@staticmethod
def handle_error(response: requests.Response) -> Dict[str, Any]:
"""处理错误响应"""
error_info = {
'success': False,
'status_code': response.status_code,
'reason': response.reason,
'headers': dict(response.headers),
'timestamp': datetime.now().isoformat()
}
# 尝试获取错误详情
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
try:
error_data = response.json()
error_info['error'] = error_data
except:
error_info['raw_error'] = response.text[:500]
else:
error_info['raw_error'] = response.text[:500]
# 根据状态码添加建议
if response.status_code == 400:
error_info['suggestion'] = '检查请求参数是否正确'
elif response.status_code == 401:
error_info['suggestion'] = '检查认证信息是否正确'
elif response.status_code == 403:
error_info['suggestion'] = '没有访问权限'
elif response.status_code == 404:
error_info['suggestion'] = '请求的资源不存在'
elif response.status_code == 429:
error_info['suggestion'] = '请求过于频繁,请稍后重试'
elif 500 <= response.status_code < 600:
error_info['suggestion'] = '服务器内部错误,请稍后重试'
return error_info
# 使用示例
if __name__ == "__main__":
handler = APIResponseHandler()
# 测试不同API响应
test_urls = [
'https://api.github.com/users/octocat', # JSON响应
'https://httpbin.org/xml', # XML响应
'https://httpbin.org/robots.txt', # 文本响应
'https://httpbin.org/image/png', # 二进制响应
]
for url in test_urls:
print(f"\n测试URL: {url}")
try:
response = requests.get(url, timeout=10)
result = handler.handle_response(response)
print(f"处理结果:")
for key, value in result.items():
if key == 'data' and isinstance(value, (dict, list)):
print(f" {key}: {type(value).__name__} (长度: {len(value) if isinstance(value, list) else 'N/A'})")
elif key == 'headers':
print(f" {key}: 共 {len(value)} 个头部")
else:
print(f" {key}: {value}")
except Exception as e:
print(f"请求失败: {e}")
import requests
import json
from datetime import datetime
from typing import Dict, Any, Optional
from pydantic import BaseModel, ValidationError, validator
from dataclasses import dataclass, asdict
# 方法1: 使用Pydantic进行数据验证
class UserResponse(BaseModel):
"""用户响应数据模型"""
id: int
name: str
email: str
age: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@validator('email')
def validate_email(cls, v):
if '@' not in v:
raise ValueError('邮箱格式不正确')
return v
@validator('age')
def validate_age(cls, v):
if v is not None and (v < 0 or v > 150):
raise ValueError('年龄必须在0-150之间')
return v
class Pagination(BaseModel):
"""分页信息数据模型"""
page: int
limit: int
total: int
pages: int
class APIResponse(BaseModel):
"""API响应数据模型"""
status: str
code: int
message: str
data: Optional[Any] = None
pagination: Optional[Pagination] = None
@validator('status')
def validate_status(cls, v):
if v not in ['success', 'error']:
raise ValueError('状态必须是success或error')
return v
@validator('code')
def validate_code(cls, v):
if v < 100 or v >= 600:
raise ValueError('状态码必须在100-599之间')
return v
# 方法2: 使用dataclass
@dataclass
class Product:
"""产品数据类"""
id: int
name: str
price: float
stock: int
category: str = "未分类"
def to_dict(self):
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]):
return cls(**data)
class ResponseValidator:
"""响应验证器"""
@staticmethod
def validate_user_response(response_data: Dict[str, Any]) -> Optional[UserResponse]:
"""验证用户响应数据"""
try:
# 验证API响应结构
api_response = APIResponse(**response_data)
if api_response.status != 'success':
print(f"API返回错误: {api_response.message}")
return None
# 验证用户数据
if api_response.data and 'user' in api_response.data:
user_data = api_response.data['user']
user = UserResponse(**user_data)
return user
except ValidationError as e:
print(f"数据验证失败: {e}")
print(f"错误详情: {e.errors()}")
return None
except Exception as e:
print(f"验证过程中发生错误: {e}")
return None
return None
@staticmethod
def validate_and_extract(response_data: Dict[str, Any], model_class):
"""通用验证和提取方法"""
try:
# 如果response_data是字符串,尝试解析为JSON
if isinstance(response_data, str):
response_data = json.loads(response_data)
# 验证数据
instance = model_class(**response_data)
return instance
except json.JSONDecodeError as e:
print(f"JSON解析失败: {e}")
return None
except ValidationError as e:
print(f"数据验证失败: {e}")
return None
except Exception as e:
print(f"验证过程中发生错误: {e}")
return None
# 使用示例
if __name__ == "__main__":
validator = ResponseValidator()
# 模拟API响应数据
sample_response = {
'status': 'success',
'code': 200,
'message': '获取用户成功',
'data': {
'user': {
'id': 12345,
'name': '张三',
'email': 'zhangsan@example.com',
'age': 25,
'created_at': '2023-10-01T10:30:00Z',
'updated_at': '2023-10-02T14:20:00Z'
}
},
'pagination': {
'page': 1,
'limit': 10,
'total': 100,
'pages': 10
}
}
# 验证数据
user = validator.validate_user_response(sample_response)
if user:
print(f"验证成功!")
print(f"用户ID: {user.id}")
print(f"用户名: {user.name}")
print(f"邮箱: {user.email}")
print(f"年龄: {user.age}")
print(f"创建时间: {user.created_at}")
# 测试错误数据
error_response = {
'status': 'success', # 状态是success但数据有问题
'code': 200,
'message': '获取用户成功',
'data': {
'user': {
'id': 'not_a_number', # 错误的类型
'name': '张三',
'email': 'invalid-email', # 无效的邮箱
'age': 200, # 无效的年龄
}
}
}
print(f"\n测试错误数据验证:")
user = validator.validate_user_response(error_response)
if not user:
print("验证正确检测到错误数据")
大多数API都需要某种形式的认证来识别调用者并控制访问权限。
headers = {'X-API-Key': 'your_api_key'}
# 或
params = {'api_key': 'your_api_key'}
headers = {
'Authorization': 'Bearer your_jwt_token'
}
# Requests内置支持
response = requests.get(url, auth=('username', 'password'))
# 需要oauthlib库
# 通常涉及获取access_token和refresh_token
# 需要对请求内容进行哈希计算
# 常用于高安全性要求的API
import requests
import base64
import hashlib
import hmac
import time
import json
from typing import Dict, Any, Optional
class APIAuthenticator:
"""API认证管理器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# 1. API Key认证
def add_api_key_auth(self, params: Dict = None, headers: Dict = None) -> tuple:
"""添加API Key认证"""
api_key = self.config.get('api_key')
if not api_key:
return params or {}, headers or {}
api_key_location = self.config.get('api_key_location', 'header')
api_key_name = self.config.get('api_key_name', 'X-API-Key')
if api_key_location == 'header':
headers = headers or {}
headers[api_key_name] = api_key
elif api_key_location == 'query':
params = params or {}
params[api_key_name.lower()] = api_key
return params or {}, headers or {}
# 2. Bearer Token认证
def add_bearer_token_auth(self, headers: Dict = None) -> Dict:
"""添加Bearer Token认证"""
token = self.config.get('bearer_token')
if token:
headers = headers or {}
headers['Authorization'] = f'Bearer {token}'
return headers or {}
# 3. Basic认证
def add_basic_auth(self, auth_tuple: tuple = None) -> tuple:
"""添加Basic认证"""
username = self.config.get('username')
password = self.config.get('password')
if username and password:
return (username, password)
return auth_tuple or ()
# 4. OAuth 2.0客户端凭证认证
def get_oauth_token(self, token_url: str, client_id: str = None,
client_secret: str = None, scope: str = None) -> Optional[str]:
"""获取OAuth 2.0访问令牌(客户端凭证流程)"""
client_id = client_id or self.config.get('client_id')
client_secret = client_secret or self.config.get('client_secret')
scope = scope or self.config.get('scope')
if not client_id or not client_secret:
return None
data = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret,
}
if scope:
data['scope'] = scope
try:
response = requests.post(token_url, data=data, timeout=10)
response.raise_for_status()
token_data = response.json()
return token_data.get('access_token')
except Exception as e:
print(f"获取OAuth令牌失败: {e}")
return None
# 5. HMAC签名认证
def generate_hmac_signature(self, method: str, path: str,
body: str = '', timestamp: str = None) -> Dict[str, str]:
"""生成HMAC签名"""
api_key = self.config.get('api_key')
api_secret = self.config.get('api_secret')
if not api_key or not api_secret:
return {}
timestamp = timestamp or str(int(time.time() * 1000))
# 构建签名字符串
message = f"{method.upper()}\n{path}\n{timestamp}\n{body}"
# 计算HMAC-SHA256签名
signature = hmac.new(
api_secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
return {
'X-API-Key': api_key,
'X-Timestamp': timestamp,
'X-Signature': signature
}
# 6. AWS Signature v4 (用于AWS服务)
def generate_aws_signature(self, method: str, url: str,
region: str, service: str,
payload: str = '', headers: Dict = None) -> Dict[str, str]:
"""生成AWS Signature v4签名(简化版)"""
# 注意:这是简化版本,完整实现较复杂
# 实际项目中建议使用boto3库
access_key = self.config.get('aws_access_key')
secret_key = self.config.get('aws_secret_key')
if not access_key or not secret_key:
return {}
# 这里只是示例,实际AWS签名需要复杂的计算
# 包括规范请求、签名字符串、签名密钥等
return {
'Authorization': f'AWS4-HMAC-SHA256 Credential={access_key}/...',
'X-Amz-Date': time.strftime('%Y%m%dT%H%M%SZ', time.gmtime())
}
class AuthenticatedAPIClient:
"""带认证的API客户端"""
def __init__(self, base_url: str, authenticator: APIAuthenticator):
self.base_url = base_url.rstrip('/')
self.authenticator = authenticator
self.session = requests.Session()
# 配置会话
self.session.headers.update({
'Accept': 'application/json',
'Content-Type': 'application/json',
'User-Agent': 'AuthenticatedAPIClient/1.0'
})
def request(self, method: str, endpoint: str,
params: Dict = None, data: Any = None,
json_data: Dict = None, auth_type: str = 'bearer') -> requests.Response:
"""发送带认证的请求"""
url = f"{self.base_url}{endpoint}"
# 准备参数和头部
request_params = params or {}
request_headers = {}
# 根据认证类型添加认证信息
if auth_type == 'api_key':
request_params, request_headers = self.authenticator.add_api_key_auth(
request_params, request_headers
)
elif auth_type == 'bearer':
request_headers = self.authenticator.add_bearer_token_auth(request_headers)
elif auth_type == 'basic':
# Basic认证通过auth参数传递
auth_tuple = self.authenticator.add_basic_auth()
if auth_tuple:
# 需要在请求时传递auth参数
pass
elif auth_type == 'hmac':
# 生成HMAC签名
body = json.dumps(json_data) if json_data else ''
hmac_headers = self.authenticator.generate_hmac_signature(
method, endpoint, body
)
request_headers.update(hmac_headers)
# 发送请求
try:
# 处理Basic认证
auth_tuple = None
if auth_type == 'basic':
auth_tuple = self.authenticator.add_basic_auth()
response = self.session.request(
method=method,
url=url,
params=request_params,
data=data,
json=json_data,
headers=request_headers,
auth=auth_tuple,
timeout=30
)
return response
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
raise
def get(self, endpoint: str, params: Dict = None, auth_type: str = 'bearer') -> requests.Response:
"""发送GET请求"""
return self.request('GET', endpoint, params=params, auth_type=auth_type)
def post(self, endpoint: str, json_data: Dict = None, auth_type: str = 'bearer') -> requests.Response:
"""发送POST请求"""
return self.request('POST', endpoint, json_data=json_data, auth_type=auth_type)
def put(self, endpoint: str, json_data: Dict = None, auth_type: str = 'bearer') -> requests.Response:
"""发送PUT请求"""
return self.request('PUT', endpoint, json_data=json_data, auth_type=auth_type)
def delete(self, endpoint: str, auth_type: str = 'bearer') -> requests.Response:
"""发送DELETE请求"""
return self.request('DELETE', endpoint, auth_type=auth_type)
# 使用示例
if __name__ == "__main__":
# 配置认证信息
config = {
'api_key': 'your_api_key_here',
'bearer_token': 'your_jwt_token_here',
'username': 'your_username',
'password': 'your_password',
'api_secret': 'your_api_secret_for_hmac'
}
# 创建认证器
authenticator = APIAuthenticator(config)
# 创建API客户端
client = AuthenticatedAPIClient('https://api.example.com/v1', authenticator)
# 使用不同认证方式发送请求
try:
# 1. 使用Bearer Token认证
print("使用Bearer Token认证:")
response = client.get('/users/me', auth_type='bearer')
if response.status_code == 200:
print(f"获取用户信息成功: {response.json().get('name')}")
# 2. 使用API Key认证
print("\n使用API Key认证:")
response = client.get('/products', auth_type='api_key')
if response.status_code == 200:
data = response.json()
print(f"获取到 {len(data.get('products', []))} 个产品")
# 3. 使用Basic认证
print("\n使用Basic认证:")
response = client.get('/protected', auth_type='basic')
if response.status_code == 200:
print("Basic认证成功")
# 4. 使用HMAC认证
print("\n使用HMAC认证:")
response = client.post('/orders',
json_data={'product_id': 123, 'quantity': 2},
auth_type='hmac')
if response.status_code == 201:
print("创建订单成功")
except Exception as e:
print(f"API调用失败: {e}")
大多数API都有速率限制,以防止滥用和保护服务器资源。常见的速率限制方式:
import requests
import time
import threading
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
class RateLimiter:
"""速率限制器"""
def __init__(self, requests_per_second: float = 1.0):
"""
初始化速率限制器
参数:
requests_per_second: 每秒最大请求数
"""
self.requests_per_second = requests_per_second
self.min_interval = 1.0 / requests_per_second
# 记录请求时间
self.request_times = deque()
self.lock = threading.Lock()
# 统计信息
self.stats = {
'total_requests': 0,
'throttled_requests': 0,
'last_reset': datetime.now()
}
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""
获取请求许可
参数:
blocking: 是否阻塞等待
timeout: 超时时间(秒)
返回:
是否获得许可
"""
start_time = time.time()
with self.lock:
current_time = time.time()
# 清理过期的请求记录
while self.request_times and \
current_time - self.request_times[0] > 1.0:
self.request_times.popleft()
# 检查是否超过限制
if len(self.request_times) < self.requests_per_second:
# 还有配额,允许请求
self.request_times.append(current_time)
self.stats['total_requests'] += 1
return True
# 计算需要等待的时间
oldest_request = self.request_times[0]
wait_time = 1.0 - (current_time - oldest_request)
if wait_time <= 0:
# 实际上不应该发生,但为了安全
self.request_times.append(current_time)
self.stats['total_requests'] += 1
return True
# 需要等待
if not blocking:
self.stats['throttled_requests'] += 1
return False
if timeout is not None and wait_time > timeout:
self.stats['throttled_requests'] += 1
return False
# 等待
time.sleep(wait_time)
with self.lock:
current_time = time.time()
# 再次检查(可能在等待期间有变化)
while self.request_times and \
current_time - self.request_times[0] > 1.0:
self.request_times.popleft()
if len(self.request_times) < self.requests_per_second:
self.request_times.append(current_time)
self.stats['total_requests'] += 1
return True
self.stats['throttled_requests'] += 1
return False
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
with self.lock:
stats = self.stats.copy()
stats['current_window_requests'] = len(self.request_times)
stats['available_requests'] = max(
0, self.requests_per_second - len(self.request_times)
)
return stats
class RateLimitAwareSession(requests.Session):
"""带速率限制的Session"""
def __init__(self, requests_per_second: float = 1.0):
super().__init__()
self.rate_limiter = RateLimiter(requests_per_second)
# 配置重试
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.mount('http://', adapter)
self.mount('https://', adapter)
def request(self, method, url, **kwargs):
"""重写request方法以添加速率限制"""
# 应用速率限制
if not self.rate_limiter.acquire(blocking=True, timeout=30):
raise Exception("无法获取速率限制许可")
# 发送请求
response = super().request(method, url, **kwargs)
# 检查响应中的速率限制头
self._check_rate_limit_headers(response)
return response
def _check_rate_limit_headers(self, response: requests.Response):
"""检查响应头中的速率限制信息"""
rate_limit_headers = {
'X-RateLimit-Limit': '限制',
'X-RateLimit-Remaining': '剩余',
'X-RateLimit-Reset': '重置时间',
'Retry-After': '重试等待'
}
found_headers = {}
for header, description in rate_limit_headers.items():
if header in response.headers:
found_headers[header] = response.headers[header]
if found_headers:
print("发现速率限制头:")
for header, value in found_headers.items():
print(f" {header}: {value}")
# 如果收到429状态码,根据Retry-After头调整
if response.status_code == 429:
retry_after = response.headers.get('Retry-After')
if retry_after:
try:
wait_seconds = int(retry_after)
print(f"速率限制,等待 {wait_seconds} 秒")
time.sleep(wait_seconds)
except ValueError:
pass
class APIThrottler:
"""API请求节流器"""
def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0):
"""
初始化节流器
参数:
base_delay: 基础延迟(秒)
max_delay: 最大延迟(秒)
"""
self.base_delay = base_delay
self.max_delay = max_delay
self.current_delay = base_delay
self.consecutive_errors = 0
def before_request(self):
"""请求前调用,可能延迟"""
if self.current_delay > 0:
time.sleep(self.current_delay)
def after_success(self):
"""成功请求后调用"""
# 重置延迟
self.current_delay = self.base_delay
self.consecutive_errors = 0
def after_error(self, status_code: int):
"""错误请求后调用"""
self.consecutive_errors += 1
# 根据错误类型调整延迟
if status_code == 429: # 太多请求
# 指数退避
self.current_delay = min(
self.base_delay * (2 ** self.consecutive_errors),
self.max_delay
)
print(f"收到429错误,下次延迟: {self.current_delay:.2f}秒")
elif 500 <= status_code < 600: # 服务器错误
# 线性增加
self.current_delay = min(
self.base_delay * (1 + self.consecutive_errors * 0.5),
self.max_delay
)
else:
# 其他错误,稍微增加延迟
self.current_delay = min(self.current_delay * 1.1, self.max_delay)
def get_status(self) -> Dict[str, Any]:
"""获取当前状态"""
return {
'current_delay': self.current_delay,
'consecutive_errors': self.consecutive_errors,
'base_delay': self.base_delay,
'max_delay': self.max_delay
}
# 使用示例
if __name__ == "__main__":
# 创建带速率限制的Session
session = RateLimitAwareSession(requests_per_second=2.0) # 每秒2个请求
print("测试速率限制:")
# 创建节流器
throttler = APIThrottler(base_delay=0.5, max_delay=5.0)
# 模拟多个请求
urls = [
'https://httpbin.org/get',
'https://httpbin.org/post',
'https://httpbin.org/put',
'https://httpbin.org/delete',
'https://httpbin.org/status/429', # 模拟速率限制错误
]
for i, url in enumerate(urls, 1):
print(f"\n请求 {i}/{len(urls)}: {url}")
# 应用节流
throttler.before_request()
try:
# 发送请求
if 'post' in url:
response = session.post(url, json={'test': 'data'})
elif 'put' in url:
response = session.put(url, json={'test': 'data'})
elif 'delete' in url:
response = session.delete(url)
else:
response = session.get(url)
# 处理响应
if response.status_code == 200:
print(f"请求成功,状态码: {response.status_code}")
throttler.after_success()
else:
print(f"请求失败,状态码: {response.status_code}")
throttler.after_error(response.status_code)
except Exception as e:
print(f"请求异常: {e}")
throttler.after_error(0) # 0表示网络异常
# 显示节流器状态
status = throttler.get_status()
print(f"节流器状态: 延迟={status['current_delay']:.2f}s, 连续错误={status['consecutive_errors']}")
# 显示速率限制器统计
stats = session.rate_limiter.get_stats()
print(f"\n速率限制器统计:")
for key, value in stats.items():
print(f" {key}: {value}")
使用page和limit参数控制分页。
{
"data": [...],
"pagination": {
"page": 1,
"limit": 10,
"total": 100,
"pages": 10
}
}
使用cursor和limit参数,适合大数据集。
{
"data": [...],
"pagination": {
"next_cursor": "abc123",
"has_more": true,
"limit": 10
}
}
使用offset和limit参数。
{
"data": [...],
"pagination": {
"offset": 0,
"limit": 10,
"total": 100
}
}
使用since和until参数按时间分页。
{
"data": [...],
"pagination": {
"since": "2023-01-01T00:00:00Z",
"until": "2023-12-31T23:59:59Z"
}
}
import requests
import time
from typing import Dict, Any, List, Optional, Generator
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
class PaginationHandler:
"""分页处理器"""
def __init__(self, session: requests.Session):
self.session = session
# 1. 基于页码的分页
def fetch_all_with_page(self, base_url: str,
page_param: str = 'page',
limit_param: str = 'limit',
limit: int = 100,
max_pages: int = None) -> List[Any]:
"""
获取所有数据(基于页码)
参数:
base_url: 基础URL
page_param: 页码参数名
limit_param: 每页数量参数名
limit: 每页数量
max_pages: 最大页数(None表示无限制)
"""
all_data = []
page = 1
while True:
# 构建URL
parsed_url = urlparse(base_url)
query_params = parse_qs(parsed_url.query)
# 添加分页参数
query_params[page_param] = [str(page)]
query_params[limit_param] = [str(limit)]
# 重建URL
new_query = urlencode(query_params, doseq=True)
url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
print(f"获取第 {page} 页: {url}")
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
# 提取数据(根据API响应结构调整)
items = data.get('data', [])
if not items and 'items' in data:
items = data['items']
if not items:
print(f"第 {page} 页没有数据,停止")
break
all_data.extend(items)
# 检查是否还有更多页
pagination = data.get('pagination', {})
total_pages = pagination.get('pages')
if total_pages and page >= total_pages:
print(f"已达到总页数 {total_pages},停止")
break
# 检查max_pages限制
if max_pages and page >= max_pages:
print(f"已达到最大页数限制 {max_pages},停止")
break
page += 1
# 避免请求过快
time.sleep(0.1)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404 and page > 1:
# 页码超出范围,停止
print(f"页码 {page} 超出范围,停止")
break
else:
print(f"获取第 {page} 页失败: {e}")
break
except Exception as e:
print(f"获取第 {page} 页时发生错误: {e}")
break
return all_data
# 2. 基于游标的分页
def fetch_all_with_cursor(self, base_url: str,
cursor_param: str = 'cursor',
limit_param: str = 'limit',
limit: int = 100,
max_requests: int = 100) -> List[Any]:
"""
获取所有数据(基于游标)
参数:
base_url: 基础URL
cursor_param: 游标参数名
limit_param: 每页数量参数名
limit: 每页数量
max_requests: 最大请求次数
"""
all_data = []
next_cursor = None
request_count = 0
while True:
# 检查请求次数限制
request_count += 1
if request_count > max_requests:
print(f"达到最大请求次数限制 {max_requests},停止")
break
# 构建URL
parsed_url = urlparse(base_url)
query_params = parse_qs(parsed_url.query)
# 添加分页参数
query_params[limit_param] = [str(limit)]
if next_cursor:
query_params[cursor_param] = [next_cursor]
# 重建URL
new_query = urlencode(query_params, doseq=True)
url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
print(f"获取数据 (请求 #{request_count}): {url}")
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
# 提取数据
items = data.get('data', [])
if not items and 'items' in data:
items = data['items']
if not items:
print(f"没有数据,停止")
break
all_data.extend(items)
# 获取下一个游标
pagination = data.get('pagination', {})
next_cursor = pagination.get('next_cursor')
# 检查是否还有更多数据
if not next_cursor or not pagination.get('has_more', True):
print(f"没有更多数据,停止")
break
# 避免请求过快
time.sleep(0.1)
except Exception as e:
print(f"获取数据失败: {e}")
break
return all_data
# 3. 基于偏移量的分页
def fetch_all_with_offset(self, base_url: str,
offset_param: str = 'offset',
limit_param: str = 'limit',
limit: int = 100) -> List[Any]:
"""
获取所有数据(基于偏移量)
参数:
base_url: 基础URL
offset_param: 偏移量参数名
limit_param: 每页数量参数名
limit: 每页数量
"""
all_data = []
offset = 0
while True:
# 构建URL
parsed_url = urlparse(base_url)
query_params = parse_qs(parsed_url.query)
# 添加分页参数
query_params[offset_param] = [str(offset)]
query_params[limit_param] = [str(limit)]
# 重建URL
new_query = urlencode(query_params, doseq=True)
url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
print(f"获取数据 (偏移量: {offset}): {url}")
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
# 提取数据
items = data.get('data', [])
if not items and 'items' in data:
items = data['items']
if not items:
print(f"没有数据,停止")
break
all_data.extend(items)
# 检查是否还有更多数据
total_items = data.get('pagination', {}).get('total')
if total_items and offset + len(items) >= total_items:
print(f"已获取所有 {total_items} 条数据,停止")
break
# 更新偏移量
offset += len(items)
# 避免请求过快
time.sleep(0.1)
except Exception as e:
print(f"获取数据失败: {e}")
break
return all_data
# 4. 生成器版本(内存友好)
def iterate_with_page(self, base_url: str,
page_param: str = 'page',
limit_param: str = 'limit',
limit: int = 100,
max_pages: int = None) -> Generator[Any, None, None]:
"""
分页迭代器(基于页码)
参数:
base_url: 基础URL
page_param: 页码参数名
limit_param: 每页数量参数名
limit: 每页数量
max_pages: 最大页数
"""
page = 1
while True:
# 构建URL
parsed_url = urlparse(base_url)
query_params = parse_qs(parsed_url.query)
# 添加分页参数
query_params[page_param] = [str(page)]
query_params[limit_param] = [str(limit)]
# 重建URL
new_query = urlencode(query_params, doseq=True)
url = urlunparse((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
new_query,
parsed_url.fragment
))
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
# 提取数据
items = data.get('data', [])
if not items and 'items' in data:
items = data['items']
if not items:
break
# 返回数据
for item in items:
yield item
# 检查是否还有更多页
pagination = data.get('pagination', {})
total_pages = pagination.get('pages')
if total_pages and page >= total_pages:
break
# 检查max_pages限制
if max_pages and page >= max_pages:
break
page += 1
# 避免请求过快
time.sleep(0.1)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404 and page > 1:
# 页码超出范围,停止
break
else:
print(f"获取第 {page} 页失败: {e}")
break
except Exception as e:
print(f"获取第 {page} 页时发生错误: {e}")
break
# 使用示例
if __name__ == "__main__":
session = requests.Session()
handler = PaginationHandler(session)
# 示例API(使用httpbin模拟)
base_url = "https://httpbin.org/anything"
print("测试基于页码的分页:")
# 获取所有数据
all_data = handler.fetch_all_with_page(
base_url,
page_param='page',
limit_param='limit',
limit=2,
max_pages=3 # 只获取前3页
)
print(f"总共获取到 {len(all_data)} 条数据")
print("\n测试分页迭代器(内存友好):")
# 使用生成器迭代
item_count = 0
for item in handler.iterate_with_page(
base_url,
page_param='page',
limit_param='limit',
limit=2,
max_pages=2 # 只迭代前2页
):
item_count += 1
print(f"处理第 {item_count} 条数据")
print(f"总共处理了 {item_count} 条数据")
import requests
import time
import json
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class APIErrorType(Enum):
"""API错误类型枚举"""
NETWORK_ERROR = "network_error"
TIMEOUT_ERROR = "timeout_error"
RATE_LIMIT_ERROR = "rate_limit_error"
AUTH_ERROR = "authentication_error"
VALIDATION_ERROR = "validation_error"
SERVER_ERROR = "server_error"
CLIENT_ERROR = "client_error"
UNKNOWN_ERROR = "unknown_error"
@dataclass
class APIError:
"""API错误信息"""
error_type: APIErrorType
message: str
status_code: Optional[int] = None
response_text: Optional[str] = None
request_info: Optional[Dict[str, Any]] = None
retry_count: int = 0
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'error_type': self.error_type.value,
'message': self.message,
'status_code': self.status_code,
'response_text': self.response_text,
'request_info': self.request_info,
'retry_count': self.retry_count,
'timestamp': time.time()
}
class APIRetryStrategy:
"""API重试策略"""
def __init__(self, max_retries: int = 3, backoff_factor: float = 1.0):
"""
初始化重试策略
参数:
max_retries: 最大重试次数
backoff_factor: 退避因子
"""
self.max_retries = max_retries
self.backoff_factor = backoff_factor
# 可重试的错误状态码
self.retryable_status_codes = {408, 429, 500, 502, 503, 504}
# 可重试的异常类型
self.retryable_exceptions = (
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
)
def should_retry(self, error: APIError) -> bool:
"""判断是否应该重试"""
# 超过最大重试次数
if error.retry_count >= self.max_retries:
return False
# 网络错误通常应该重试
if error.error_type in [APIErrorType.NETWORK_ERROR, APIErrorType.TIMEOUT_ERROR]:
return True
# 速率限制错误应该重试
if error.error_type == APIErrorType.RATE_LIMIT_ERROR:
return True
# 服务器错误应该重试
if error.error_type == APIErrorType.SERVER_ERROR:
return True
# 根据状态码判断
if error.status_code in self.retryable_status_codes:
return True
return False
def get_wait_time(self, retry_count: int) -> float:
"""计算等待时间(指数退避)"""
return self.backoff_factor * (2 ** retry_count)
def create_retry_adapter(self) -> HTTPAdapter:
"""创建重试适配器(用于requests.Session)"""
retry_strategy = Retry(
total=self.max_retries,
backoff_factor=self.backoff_factor,
status_forcelist=list(self.retryable_status_codes),
allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"]
)
return HTTPAdapter(max_retries=retry_strategy)
class APIErrorHandler:
"""API错误处理器"""
def __init__(self, retry_strategy: Optional[APIRetryStrategy] = None):
self.retry_strategy = retry_strategy or APIRetryStrategy()
self.error_callbacks = {}
def register_error_callback(self, error_type: APIErrorType, callback: Callable):
"""注册错误回调函数"""
self.error_callbacks[error_type] = callback
def handle_error(self, error: APIError) -> Optional[Any]:
"""处理错误"""
# 调用特定错误的回调函数
if error.error_type in self.error_callbacks:
return self.error_callbacks[error.error_type](error)
# 默认错误处理
return self._default_error_handler(error)
def _default_error_handler(self, error: APIError) -> None:
"""默认错误处理"""
print(f"API错误 [{error.error_type.value}]: {error.message}")
if error.status_code:
print(f"状态码: {error.status_code}")
if error.response_text:
print(f"响应内容: {error.response_text[:200]}...")
if error.request_info:
print(f"请求信息: {json.dumps(error.request_info, indent=2)}")
def execute_with_retry(self, request_func: Callable, *args, **kwargs) -> Any:
"""带重试的执行"""
retry_count = 0
while retry_count <= self.retry_strategy.max_retries:
try:
return request_func(*args, **kwargs)
except requests.exceptions.RequestException as e:
# 创建错误对象
error = self._create_error_from_exception(e, retry_count)
# 判断是否应该重试
if not self.retry_strategy.should_retry(error):
self.handle_error(error)
raise
# 计算等待时间
wait_time = self.retry_strategy.get_wait_time(retry_count)
print(f"重试 {retry_count + 1}/{self.retry_strategy.max_retries},等待 {wait_time:.2f} 秒...")
# 等待
time.sleep(wait_time)
retry_count += 1
error.retry_count = retry_count
# 所有重试都失败
error = APIError(
error_type=APIErrorType.UNKNOWN_ERROR,
message=f"所有 {self.retry_strategy.max_retries} 次重试都失败",
retry_count=retry_count
)
self.handle_error(error)
raise Exception(f"API调用失败,已重试 {retry_count} 次")
def _create_error_from_exception(self, exception: Exception, retry_count: int) -> APIError:
"""从异常创建错误对象"""
# 网络连接错误
if isinstance(exception, requests.exceptions.ConnectionError):
return APIError(
error_type=APIErrorType.NETWORK_ERROR,
message=f"网络连接错误: {exception}",
retry_count=retry_count
)
# 超时错误
elif isinstance(exception, requests.exceptions.Timeout):
return APIError(
error_type=APIErrorType.TIMEOUT_ERROR,
message=f"请求超时: {exception}",
retry_count=retry_count
)
# HTTP错误
elif isinstance(exception, requests.exceptions.HTTPError):
response = exception.response
# 根据状态码确定错误类型
if response.status_code == 401 or response.status_code == 403:
error_type = APIErrorType.AUTH_ERROR
message = "认证失败"
elif response.status_code == 400:
error_type = APIErrorType.VALIDATION_ERROR
message = "请求参数验证失败"
elif response.status_code == 429:
error_type = APIErrorType.RATE_LIMIT_ERROR
message = "请求过于频繁,触发速率限制"
elif 500 <= response.status_code < 600:
error_type = APIErrorType.SERVER_ERROR
message = f"服务器错误: {response.status_code}"
else:
error_type = APIErrorType.CLIENT_ERROR
message = f"客户端错误: {response.status_code}"
return APIError(
error_type=error_type,
message=message,
status_code=response.status_code,
response_text=response.text,
retry_count=retry_count
)
# 其他异常
else:
return APIError(
error_type=APIErrorType.UNKNOWN_ERROR,
message=f"未知错误: {exception}",
retry_count=retry_count
)
class RobustAPIClient:
"""健壮的API客户端"""
def __init__(self, base_url: str):
self.base_url = base_url
self.error_handler = APIErrorHandler()
self.session = self._create_session()
# 注册错误回调
self._register_error_callbacks()
def _create_session(self) -> requests.Session:
"""创建配置好的Session"""
session = requests.Session()
# 配置重试策略
retry_strategy = APIRetryStrategy(max_retries=3, backoff_factor=1.0)
adapter = retry_strategy.create_retry_adapter()
session.mount('http://', adapter)
session.mount('https://', adapter)
# 配置通用头部
session.headers.update({
'User-Agent': 'RobustAPIClient/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
return session
def _register_error_callbacks(self):
"""注册错误回调函数"""
# 速率限制错误回调
def handle_rate_limit_error(error: APIError) -> None:
print(f"处理速率限制错误...")
# 从响应头获取重试等待时间
if error.response_text:
try:
error_data = json.loads(error.response_text)
retry_after = error_data.get('retry_after', 60)
print(f"根据API建议,等待 {retry_after} 秒")
time.sleep(retry_after)
except:
pass
self.error_handler.register_error_callback(
APIErrorType.RATE_LIMIT_ERROR,
handle_rate_limit_error
)
# 认证错误回调
def handle_auth_error(error: APIError) -> None:
print(f"处理认证错误...")
print("建议: 1. 检查API密钥 2. 刷新访问令牌 3. 重新登录")
self.error_handler.register_error_callback(
APIErrorType.AUTH_ERROR,
handle_auth_error
)
def request(self, method: str, endpoint: str, **kwargs) -> Any:
"""发送请求(带错误处理)"""
url = f"{self.base_url}{endpoint}"
def request_func():
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
try:
response = self.error_handler.execute_with_retry(request_func)
# 成功响应
if response.status_code == 200:
return response.json()
elif response.status_code == 204: # No Content
return None
else:
return response.text
except Exception as e:
# 错误处理器已经处理了错误,这里只是重新抛出
raise
def get(self, endpoint: str, **kwargs) -> Any:
"""发送GET请求"""
return self.request('GET', endpoint, **kwargs)
def post(self, endpoint: str, **kwargs) -> Any:
"""发送POST请求"""
return self.request('POST', endpoint, **kwargs)
def put(self, endpoint: str, **kwargs) -> Any:
"""发送PUT请求"""
return self.request('PUT', endpoint, **kwargs)
def delete(self, endpoint: str, **kwargs) -> Any:
"""发送DELETE请求"""
return self.request('DELETE', endpoint, **kwargs)
# 使用示例
if __name__ == "__main__":
# 创建健壮的API客户端
client = RobustAPIClient('https://httpbin.org')
print("测试健壮API客户端...")
# 测试正常请求
try:
print("\n1. 测试正常请求:")
data = client.get('/get')
print(f"请求成功: {data.get('url')}")
except Exception as e:
print(f"请求失败: {e}")
# 测试服务器错误(应该重试)
try:
print("\n2. 测试服务器错误(500):")
data = client.get('/status/500')
print(f"请求结果: {data}")
except Exception as e:
print(f"请求失败(预期中): {e}")
# 测试速率限制(429)
try:
print("\n3. 测试速率限制(429):")
data = client.get('/status/429')
print(f"请求结果: {data}")
except Exception as e:
print(f"请求失败(预期中): {e}")
# 测试客户端错误(400,不应该重试)
try:
print("\n4. 测试客户端错误(400):")
data = client.get('/status/400')
print(f"请求结果: {data}")
except Exception as e:
print(f"请求失败(预期中): {e}")
创建一个完整的天气预报API客户端,支持多种天气服务提供商。
import requests
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass, asdict
from enum import Enum
import csv
class WeatherProvider(Enum):
"""天气服务提供商"""
OPENWEATHER = "openweather"
WEATHERAPI = "weatherapi"
VISUALCROSSING = "visualcrossing"
@dataclass
class WeatherData:
"""天气数据类"""
temperature: float # 温度(摄氏度)
feels_like: float # 体感温度
humidity: int # 湿度(百分比)
pressure: int # 气压(hPa)
wind_speed: float # 风速(米/秒)
wind_direction: str # 风向
description: str # 天气描述
icon: str # 天气图标代码
timestamp: str # 时间戳
location: str # 位置
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return asdict(self)
def to_csv_row(self) -> List[str]:
"""转换为CSV行"""
return [
self.timestamp,
self.location,
str(self.temperature),
str(self.feels_like),
str(self.humidity),
str(self.pressure),
str(self.wind_speed),
self.wind_direction,
self.description,
self.icon
]
class BaseWeatherClient:
"""天气客户端基类"""
def __init__(self, api_key: str):
self.api_key = api_key
self.session = requests.Session()
self.base_url = ""
# 配置Session
self.session.headers.update({
'User-Agent': 'WeatherClient/1.0',
'Accept': 'application/json'
})
def get_current_weather(self, location: str) -> Optional[WeatherData]:
"""获取当前天气(子类必须实现)"""
raise NotImplementedError
def get_forecast(self, location: str, days: int = 5) -> List[WeatherData]:
"""获取天气预报(子类必须实现)"""
raise NotImplementedError
def _make_request(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送API请求"""
try:
response = self.session.get(
f"{self.base_url}{endpoint}",
params=params,
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
if e.response.status_code == 401:
print("API密钥无效")
elif e.response.status_code == 404:
print("位置未找到")
elif e.response.status_code == 429:
print("请求过于频繁")
except requests.exceptions.ConnectionError:
print("网络连接错误")
except requests.exceptions.Timeout:
print("请求超时")
except Exception as e:
print(f"未知错误: {e}")
return None
class OpenWeatherClient(BaseWeatherClient):
"""OpenWeatherMap客户端"""
def __init__(self, api_key: str):
super().__init__(api_key)
self.base_url = "https://api.openweathermap.org/data/2.5"
def get_current_weather(self, location: str) -> Optional[WeatherData]:
"""获取当前天气"""
params = {
'q': location,
'appid': self.api_key,
'units': 'metric', # 使用摄氏度
'lang': 'zh_cn' # 中文描述
}
data = self._make_request('/weather', params)
if not data:
return None
return self._parse_current_weather(data, location)
def get_forecast(self, location: str, days: int = 5) -> List[WeatherData]:
"""获取天气预报"""
params = {
'q': location,
'appid': self.api_key,
'units': 'metric',
'lang': 'zh_cn',
'cnt': days * 8 # 每3小时一个数据点
}
data = self._make_request('/forecast', params)
if not data:
return []
return self._parse_forecast(data, location)
def _parse_current_weather(self, data: Dict[str, Any], location: str) -> WeatherData:
"""解析当前天气数据"""
main = data['main']
weather = data['weather'][0]
wind = data['wind']
# 转换风向角度为方向
wind_deg = wind.get('deg', 0)
wind_direction = self._degrees_to_direction(wind_deg)
return WeatherData(
temperature=main['temp'],
feels_like=main['feels_like'],
humidity=main['humidity'],
pressure=main['pressure'],
wind_speed=wind['speed'],
wind_direction=wind_direction,
description=weather['description'],
icon=weather['icon'],
timestamp=datetime.fromtimestamp(data['dt']).isoformat(),
location=location
)
def _parse_forecast(self, data: Dict[str, Any], location: str) -> List[WeatherData]:
"""解析天气预报数据"""
forecast_list = []
for item in data['list']:
main = item['main']
weather = item['weather'][0]
wind = item['wind']
wind_deg = wind.get('deg', 0)
wind_direction = self._degrees_to_direction(wind_deg)
weather_data = WeatherData(
temperature=main['temp'],
feels_like=main['feels_like'],
humidity=main['humidity'],
pressure=main['pressure'],
wind_speed=wind['speed'],
wind_direction=wind_direction,
description=weather['description'],
icon=weather['icon'],
timestamp=datetime.fromtimestamp(item['dt']).isoformat(),
location=location
)
forecast_list.append(weather_data)
return forecast_list
@staticmethod
def _degrees_to_direction(degrees: float) -> str:
"""将角度转换为方向"""
directions = ['北', '东北', '东', '东南', '南', '西南', '西', '西北']
index = round(degrees / 45) % 8
return directions[index]
class WeatherAPIClient(BaseWeatherClient):
"""WeatherAPI.com客户端"""
def __init__(self, api_key: str):
super().__init__(api_key)
self.base_url = "http://api.weatherapi.com/v1"
def get_current_weather(self, location: str) -> Optional[WeatherData]:
"""获取当前天气"""
params = {
'key': self.api_key,
'q': location,
'lang': 'zh'
}
data = self._make_request('/current.json', params)
if not data:
return None
return self._parse_current_weather(data, location)
def get_forecast(self, location: str, days: int = 5) -> List[WeatherData]:
"""获取天气预报"""
params = {
'key': self.api_key,
'q': location,
'days': days,
'lang': 'zh'
}
data = self._make_request('/forecast.json', params)
if not data:
return []
return self._parse_forecast(data, location)
def _parse_current_weather(self, data: Dict[str, Any], location: str) -> WeatherData:
"""解析当前天气数据"""
current = data['current']
return WeatherData(
temperature=current['temp_c'],
feels_like=current['feelslike_c'],
humidity=current['humidity'],
pressure=current['pressure_mb'],
wind_speed=current['wind_kph'] / 3.6, # 转换为米/秒
wind_direction=current['wind_dir'],
description=current['condition']['text'],
icon=self._extract_icon_code(current['condition']['icon']),
timestamp=current['last_updated'],
location=location
)
def _parse_forecast(self, data: Dict[str, Any], location: str) -> List[WeatherData]:
"""解析天气预报数据"""
forecast_list = []
for day in data['forecast']['forecastday']:
date = day['date']
# 每天的概况
day_data = day['day']
weather_data = WeatherData(
temperature=day_data['avgtemp_c'],
feels_like=day_data['avgtemp_c'], # WeatherAPI没有体感温度
humidity=day_data['avghumidity'],
pressure=day_data.get('pressure_mb', 1013),
wind_speed=day_data['maxwind_kph'] / 3.6,
wind_direction=day_data.get('wind_dir', 'N/A'),
description=day_data['condition']['text'],
icon=self._extract_icon_code(day_data['condition']['icon']),
timestamp=f"{date} 12:00",
location=location
)
forecast_list.append(weather_data)
return forecast_list
@staticmethod
def _extract_icon_code(icon_url: str) -> str:
"""从图标URL中提取图标代码"""
# 示例: "//cdn.weatherapi.com/weather/64x64/day/113.png"
if '/' in icon_url:
parts = icon_url.split('/')
if len(parts) >= 2:
return parts[-1].replace('.png', '')
return ""
class WeatherService:
"""天气服务管理器"""
def __init__(self):
self.clients = {}
self.weather_history = []
def register_client(self, provider: WeatherProvider, api_key: str):
"""注册天气客户端"""
if provider == WeatherProvider.OPENWEATHER:
self.clients[provider] = OpenWeatherClient(api_key)
elif provider == WeatherProvider.WEATHERAPI:
self.clients[provider] = WeatherAPIClient(api_key)
else:
raise ValueError(f"不支持的天气服务提供商: {provider}")
def get_weather(self, location: str, provider: WeatherProvider = None) -> Optional[WeatherData]:
"""获取天气数据"""
if provider:
# 使用指定的提供商
if provider not in self.clients:
print(f"未注册的天气服务提供商: {provider}")
return None
return self.clients[provider].get_current_weather(location)
else:
# 尝试所有已注册的客户端
for client in self.clients.values():
weather_data = client.get_current_weather(location)
if weather_data:
return weather_data
return None
def compare_providers(self, location: str) -> Dict[str, Any]:
"""比较不同提供商的天气数据"""
results = {}
for provider, client in self.clients.items():
try:
weather_data = client.get_current_weather(location)
if weather_data:
results[provider.value] = weather_data.to_dict()
except Exception as e:
print(f"{provider.value} 获取失败: {e}")
return results
def save_to_csv(self, weather_data: WeatherData, filename: str = "weather_history.csv"):
"""保存天气数据到CSV"""
# 检查文件是否存在,决定是否写入表头
try:
with open(filename, 'r') as f:
# 文件已存在,不需要表头
pass
except FileNotFoundError:
# 文件不存在,写入表头
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'时间戳', '位置', '温度(℃)', '体感温度(℃)',
'湿度(%)', '气压(hPa)', '风速(m/s)', '风向',
'描述', '图标'
])
# 追加数据
with open(filename, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(weather_data.to_csv_row())
print(f"天气数据已保存到 {filename}")
def get_weather_alerts(self, location: str, threshold_temp: float = 35.0) -> List[str]:
"""获取天气警报"""
alerts = []
weather_data = self.get_weather(location)
if not weather_data:
return alerts
# 高温警报
if weather_data.temperature > threshold_temp:
alerts.append(f"高温警报: {weather_data.temperature}℃")
# 大风警报
if weather_data.wind_speed > 10.0:
alerts.append(f"大风警报: {weather_data.wind_speed}m/s")
# 高湿度警报
if weather_data.humidity > 80:
alerts.append(f"高湿度警报: {weather_data.humidity}%")
return alerts
# 使用示例
if __name__ == "__main__":
# 创建天气服务
weather_service = WeatherService()
# 注册天气服务提供商(需要实际的API密钥)
# 注意:以下API密钥是示例,实际使用时需要替换为真实的API密钥
# 注册OpenWeatherMap
# weather_service.register_client(
# WeatherProvider.OPENWEATHER,
# api_key="your_openweather_api_key"
# )
# 注册WeatherAPI
# weather_service.register_client(
# WeatherProvider.WEATHERAPI,
# api_key="your_weatherapi_key"
# )
print("天气预报API客户端")
print("=" * 50)
# 由于我们没有真实的API密钥,这里模拟数据
print("\n模拟获取北京天气:")
# 模拟天气数据
mock_weather = WeatherData(
temperature=22.5,
feels_like=21.0,
humidity=65,
pressure=1013,
wind_speed=3.2,
wind_direction="东北",
description="晴间多云",
icon="01d",
timestamp=datetime.now().isoformat(),
location="北京"
)
print(f"位置: {mock_weather.location}")
print(f"温度: {mock_weather.temperature}℃")
print(f"体感温度: {mock_weather.feels_like}℃")
print(f"湿度: {mock_weather.humidity}%")
print(f"气压: {mock_weather.pressure}hPa")
print(f"风速: {mock_weather.wind_speed}m/s")
print(f"风向: {mock_weather.wind_direction}")
print(f"天气: {mock_weather.description}")
# 保存到CSV
weather_service.save_to_csv(mock_weather, "weather_data.csv")
# 获取天气警报
alerts = weather_service.get_weather_alerts("北京")
if alerts:
print("\n天气警报:")
for alert in alerts:
print(f" ⚠ {alert}")
print("\n项目功能总结:")
print("1. 支持多个天气服务提供商")
print("2. 统一的数据接口")
print("3. 错误处理和重试机制")
print("4. 数据持久化(CSV)")
print("5. 天气警报系统")
print("6. 提供商数据对比")
测试不同的API调用场景:
Requests库是Python中API调用的首选工具,它提供了强大而灵活的功能来与各种Web API交互。关键要点:
通过本教程,你应该已经掌握了使用Requests进行API调用的所有核心概念和最佳实践。无论是构建简单的API客户端还是复杂的企业级集成,Requests都能提供强大的支持。