Requests API接口调用

API驱动世界 - 现代应用离不开API,Requests是Python中最优雅、最强大的HTTP客户端库,完美支持各种API调用需求。

API基础概念

客户端
发送HTTP请求
API服务器
处理请求
返回数据
JSON/XML格式
什么是API?
  • 应用程序编程接口: 不同软件系统之间的通信协议
  • 基于HTTP协议: 大多数Web API使用HTTP/HTTPS
  • 数据格式: 通常使用JSON或XML
  • 无状态: 每个请求都是独立的
  • 资源导向: 操作的是资源(用户、订单等)
为什么使用API?
  • 数据交换: 获取或提交数据
  • 功能集成: 使用第三方服务功能
  • 自动化: 自动化重复任务
  • 微服务: 服务间通信
  • 开放平台: 构建开发者生态
典型的API调用流程 以GitHub API为例
基本API调用示例
import requests
import json

# 1. 定义API端点
api_url = "https://api.github.com/users/octocat"

# 2. 发送HTTP请求
response = requests.get(api_url)

# 3. 检查响应状态
if response.status_code == 200:
    # 4. 解析响应数据(通常是JSON)
    user_data = response.json()

    # 5. 使用数据
    print(f"用户名: {user_data['login']}")
    print(f"姓名: {user_data.get('name', '未知')}")
    print(f"仓库数: {user_data['public_repos']}")
    print(f"关注者: {user_data['followers']}")

    # 6. 保存数据
    with open('user_data.json', 'w') as f:
        json.dump(user_data, f, indent=2)
else:
    print(f"请求失败,状态码: {response.status_code}")
    print(f"错误信息: {response.text}")

RESTful API

REST(Representational State Transfer)是一种API设计风格,它使用标准的HTTP方法对资源进行操作。

HTTP方法 操作 示例端点 描述
GET 读取/检索 /api/users 获取用户列表
POST 创建 /api/users 创建新用户
GET 读取/检索 /api/users/{id} 获取特定用户
PUT 更新/替换 /api/users/{id} 完全更新用户
PATCH 部分更新 /api/users/{id} 部分更新用户
DELETE 删除 /api/users/{id} 删除用户
RESTful API完整示例
import requests
import json

class UserAPI:
    """用户API客户端"""

    def __init__(self, base_url, api_key=None):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()

        if api_key:
            self.session.headers.update({
                'Authorization': f'Bearer {api_key}'
            })

        # 设置通用头
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })

    # 1. GET - 获取用户列表
    def get_users(self, page=1, limit=10):
        """获取用户列表"""

        params = {
            'page': page,
            'limit': limit
        }

        response = self.session.get(
            f'{self.base_url}/users',
            params=params
        )
        response.raise_for_status()

        return response.json()

    # 2. GET - 获取特定用户
    def get_user(self, user_id):
        """获取特定用户"""

        response = self.session.get(
            f'{self.base_url}/users/{user_id}'
        )
        response.raise_for_status()

        return response.json()

    # 3. POST - 创建用户
    def create_user(self, user_data):
        """创建新用户"""

        response = self.session.post(
            f'{self.base_url}/users',
            json=user_data  # Requests会自动序列化为JSON
        )
        response.raise_for_status()

        return response.json()

    # 4. PUT - 更新用户(完全替换)
    def update_user(self, user_id, user_data):
        """完全更新用户"""

        response = self.session.put(
            f'{self.base_url}/users/{user_id}',
            json=user_data
        )
        response.raise_for_status()

        return response.json()

    # 5. PATCH - 部分更新用户
    def patch_user(self, user_id, partial_data):
        """部分更新用户"""

        response = self.session.patch(
            f'{self.base_url}/users/{user_id}',
            json=partial_data
        )
        response.raise_for_status()

        return response.json()

    # 6. DELETE - 删除用户
    def delete_user(self, user_id):
        """删除用户"""

        response = self.session.delete(
            f'{self.base_url}/users/{user_id}'
        )

        # DELETE通常返回204 No Content
        if response.status_code == 204:
            return True

        response.raise_for_status()
        return response.json()

# 使用示例
if __name__ == "__main__":
    api = UserAPI('https://api.example.com/v1', api_key='your_api_key')

    try:
        # 获取用户列表
        users = api.get_users(page=1, limit=5)
        print(f"获取到 {len(users.get('data', []))} 个用户")

        # 创建新用户
        new_user = {
            'name': '张三',
            'email': 'zhangsan@example.com',
            'age': 25
        }

        created_user = api.create_user(new_user)
        print(f"创建用户成功,ID: {created_user.get('id')}")

        # 更新用户
        update_data = {
            'name': '李四',
            'age': 26
        }

        updated_user = api.update_user(created_user['id'], update_data)
        print(f"更新用户成功: {updated_user.get('name')}")

        # 删除用户
        if api.delete_user(created_user['id']):
            print("删除用户成功")

    except requests.exceptions.HTTPError as e:
        print(f"HTTP错误: {e}")
        if e.response:
            print(f"响应状态码: {e.response.status_code}")
            print(f"错误详情: {e.response.text}")
    except Exception as e:
        print(f"其他错误: {e}")

HTTP方法详解

GET 读取数据

用途: 从服务器获取数据,不应该改变服务器状态。

特点:

  • 参数通过URL查询字符串传递
  • 可以被缓存
  • 可以被收藏为书签
  • 有长度限制(URL长度)
  • 幂等操作(多次执行结果相同)
# GET请求示例
params = {'page': 1, 'limit': 10}
response = requests.get(
    'https://api.example.com/users',
    params=params
)
POST 创建数据

用途: 向服务器提交数据,通常用于创建新资源。

特点:

  • 数据通过请求体传递
  • 不会被缓存
  • 不能被收藏为书签
  • 没有长度限制
  • 非幂等操作(每次执行可能不同)
# POST请求示例
data = {'name': '张三', 'email': 'test@example.com'}
response = requests.post(
    'https://api.example.com/users',
    json=data  # 或使用 data=json.dumps(data)
)
PUT 完全更新

用途: 更新整个资源,需要提供完整数据。

特点:

  • 替换整个资源
  • 需要提供所有字段
  • 幂等操作
  • 如果资源不存在,可能创建新资源
PATCH 部分更新

用途: 更新资源的部分字段。

特点:

  • 只更新提供的字段
  • 不需要提供所有字段
  • 幂等操作
  • 更节省带宽
DELETE 删除数据

用途: 删除服务器上的资源。

特点:

  • 删除指定资源
  • 幂等操作
  • 通常返回204 No Content
  • 资源被删除后再次删除通常也返回成功

HTTP方法对比表

方法 安全性 幂等性 是否缓存 请求体 典型状态码
GET 200 OK
POST 201 Created
PUT 200 OK, 204 No Content
PATCH 200 OK
DELETE 可有 204 No Content

请求数据格式

向API发送数据时,需要选择合适的数据格式。最常用的是JSON,但有时也需要使用表单数据或文件上传。

不同数据格式的发送方式
各种数据格式示例
import requests
import json
from requests_toolbelt.multipart.encoder import MultipartEncoder

# 1. JSON数据(最常用)
def send_json_data():
    """发送JSON格式数据"""

    data = {
        'name': '张三',
        'email': 'zhangsan@example.com',
        'age': 25,
        'hobbies': ['编程', '阅读', '运动']
    }

    # 方法1: 使用json参数(推荐)
    response = requests.post(
        'https://api.example.com/users',
        json=data,  # Requests会自动设置Content-Type为application/json
        headers={'Accept': 'application/json'}
    )

    # 方法2: 手动序列化JSON
    response = requests.post(
        'https://api.example.com/users',
        data=json.dumps(data),
        headers={
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        }
    )

    return response

# 2. 表单数据(application/x-www-form-urlencoded)
def send_form_data():
    """发送表单数据"""

    form_data = {
        'username': 'zhangsan',
        'password': 'password123',
        'remember': 'true'
    }

    response = requests.post(
        'https://api.example.com/login',
        data=form_data,  # 使用data参数,Content-Type会自动设置为application/x-www-form-urlencoded
        headers={'Accept': 'application/json'}
    )

    return response

# 3. 多部分表单数据(multipart/form-data)
def send_multipart_form_data():
    """发送多部分表单数据,通常用于文件上传"""

    # 方法1: 使用files参数(简单文件上传)
    files = {'file': open('document.pdf', 'rb')}
    data = {'description': '重要文档'}

    response = requests.post(
        'https://api.example.com/upload',
        files=files,
        data=data
    )

    # 方法2: 使用MultartEncoder(复杂情况)
    multipart_data = MultipartEncoder(
        fields={
            'file': ('document.pdf', open('document.pdf', 'rb'), 'application/pdf'),
            'description': '重要文档',
            'tags': 'pdf,文档'
        }
    )

    response = requests.post(
        'https://api.example.com/upload',
        data=multipart_data,
        headers={'Content-Type': multipart_data.content_type}
    )

    return response

# 4. URL编码参数(查询字符串)
def send_query_params():
    """通过URL查询字符串发送参数"""

    params = {
        'q': 'python requests',
        'page': 1,
        'limit': 10,
        'sort': 'date',
        'order': 'desc'
    }

    response = requests.get(
        'https://api.example.com/search',
        params=params
    )

    return response

# 5. 自定义数据格式
def send_custom_data():
    """发送自定义格式数据"""

    # XML数据
    xml_data = '''<?xml version="1.0" encoding="UTF-8"?>

    张三
    zhangsan@example.com
    25
'''

    response = requests.post(
        'https://api.example.com/users',
        data=xml_data,
        headers={
            'Content-Type': 'application/xml',
            'Accept': 'application/xml'
        }
    )

    # YAML数据(需要PyYAML库)
    # import yaml
    # yaml_data = yaml.dump({'name': '张三', 'age': 25})

    return response

# 6. 流式数据(大文件或实时数据)
def send_stream_data():
    """发送流式数据"""

    # 生成器函数,用于生成数据流
    def generate_data():
        for i in range(10):
            yield json.dumps({'chunk': i, 'data': 'x' * 100}).encode('utf-8')
            yield b'\n'  # 分隔符

    response = requests.post(
        'https://api.example.com/stream',
        data=generate_data(),
        headers={'Content-Type': 'application/x-ndjson'}  # Newline Delimited JSON
    )

    return response

# 使用示例
if __name__ == "__main__":
    print("测试不同数据格式的发送...")

    # 测试JSON数据
    try:
        resp = send_json_data()
        print(f"JSON请求状态码: {resp.status_code}")
    except Exception as e:
        print(f"JSON请求失败: {e}")

    # 测试表单数据
    try:
        resp = send_form_data()
        print(f"表单数据请求状态码: {resp.status_code}")
    except Exception as e:
        print(f"表单数据请求失败: {e}")

Content-Type头部的重要性

正确的Content-Type头部对于API调用至关重要,它告诉服务器如何解析请求数据。

application/json
用于JSON格式数据,Requests的json参数会自动设置
application/x-www-form-urlencoded
用于表单数据,Requests的data参数会自动设置
multipart/form-data
用于文件上传,Requests的files参数会自动设置
text/xml 或 application/xml
用于XML数据,需要手动设置

响应处理

正确处理API响应是API调用的关键。这包括解析数据、处理状态码、处理分页等。

"status": "success",
"code": 200,
"message": "请求成功",
"data": {
"user": {
"id": 12345,
"name": "张三",
"email": "zhangsan@example.com",
"created_at": "2023-10-01T10:30:00Z"
},
"pagination": {
"page": 1,
"limit": 10,
"total": 100,
"pages": 10
}
}
响应处理完整示例
import requests
import json
from datetime import datetime
from typing import Dict, Any, List, Optional

class APIResponseHandler:
    """API响应处理器"""

    @staticmethod
    def handle_response(response: requests.Response) -> Dict[str, Any]:
        """处理API响应"""

        # 记录请求信息(用于调试)
        request_info = {
            'url': response.request.url,
            'method': response.request.method,
            'headers': dict(response.request.headers),
            'body': response.request.body
        }

        print(f"请求信息: {json.dumps(request_info, indent=2)}")

        # 检查HTTP状态码
        if not response.ok:
            return APIResponseHandler.handle_error(response)

        # 检查Content-Type
        content_type = response.headers.get('content-type', '')

        # 处理不同内容类型的响应
        if 'application/json' in content_type:
            return APIResponseHandler.handle_json_response(response)
        elif 'application/xml' in content_type or 'text/xml' in content_type:
            return APIResponseHandler.handle_xml_response(response)
        elif 'text/plain' in content_type:
            return APIResponseHandler.handle_text_response(response)
        elif 'application/octet-stream' in content_type:
            return APIResponseHandler.handle_binary_response(response)
        else:
            # 未知类型,尝试自动检测
            return APIResponseHandler.handle_unknown_response(response)

    @staticmethod
    def handle_json_response(response: requests.Response) -> Dict[str, Any]:
        """处理JSON响应"""

        try:
            data = response.json()

            # 标准化响应格式
            standardized = {
                'success': True,
                'status_code': response.status_code,
                'data': data,
                'headers': dict(response.headers),
                'timestamp': datetime.now().isoformat(),
                'response_time': response.elapsed.total_seconds()
            }

            # 检查是否有错误信息(即使状态码是200)
            if isinstance(data, dict) and 'error' in data:
                standardized['success'] = False
                standardized['error'] = data['error']

            # 提取分页信息(如果存在)
            if isinstance(data, dict) and 'pagination' in data:
                standardized['pagination'] = data['pagination']

            return standardized

        except json.JSONDecodeError as e:
            return {
                'success': False,
                'status_code': response.status_code,
                'error': f'JSON解析错误: {e}',
                'raw_response': response.text[:500],  # 只保留前500字符
                'headers': dict(response.headers)
            }

    @staticmethod
    def handle_xml_response(response: requests.Response) -> Dict[str, Any]:
        """处理XML响应"""

        try:
            # 需要安装xmltodict: pip install xmltodict
            import xmltodict

            data = xmltodict.parse(response.text)

            return {
                'success': True,
                'status_code': response.status_code,
                'data': data,
                'headers': dict(response.headers),
                'format': 'xml'
            }

        except ImportError:
            return {
                'success': False,
                'status_code': response.status_code,
                'error': '需要xmltodict库来处理XML响应',
                'raw_response': response.text[:500]
            }
        except Exception as e:
            return {
                'success': False,
                'status_code': response.status_code,
                'error': f'XML解析错误: {e}',
                'raw_response': response.text[:500]
            }

    @staticmethod
    def handle_text_response(response: requests.Response) -> Dict[str, Any]:
        """处理文本响应"""

        return {
            'success': True,
            'status_code': response.status_code,
            'data': response.text,
            'headers': dict(response.headers),
            'format': 'text'
        }

    @staticmethod
    def handle_binary_response(response: requests.Response) -> Dict[str, Any]:
        """处理二进制响应(如下载文件)"""

        # 获取文件名(从Content-Disposition头或URL)
        content_disposition = response.headers.get('content-disposition', '')
        filename = None

        if 'filename=' in content_disposition:
            filename = content_disposition.split('filename=')[1].strip('"\'')

        if not filename:
            # 从URL提取文件名
            import os
            from urllib.parse import urlparse

            url_path = urlparse(response.request.url).path
            filename = os.path.basename(url_path) or 'download.bin'

        # 保存文件
        with open(filename, 'wb') as f:
            f.write(response.content)

        return {
            'success': True,
            'status_code': response.status_code,
            'data': f'文件已保存为: {filename}',
            'filename': filename,
            'content_length': len(response.content),
            'headers': dict(response.headers)
        }

    @staticmethod
    def handle_unknown_response(response: requests.Response) -> Dict[str, Any]:
        """处理未知类型的响应"""

        # 尝试猜测类型
        content = response.content

        # 检查是否是JSON
        try:
            data = json.loads(content.decode('utf-8'))
            return APIResponseHandler.handle_json_response(response)
        except:
            pass

        # 检查是否是文本
        try:
            text = content.decode('utf-8')
            return {
                'success': True,
                'status_code': response.status_code,
                'data': text,
                'headers': dict(response.headers),
                'format': 'unknown (treated as text)'
            }
        except:
            # 可能是二进制数据
            return {
                'success': True,
                'status_code': response.status_code,
                'data': f'二进制数据,大小: {len(content)} 字节',
                'headers': dict(response.headers),
                'format': 'binary'
            }

    @staticmethod
    def handle_error(response: requests.Response) -> Dict[str, Any]:
        """处理错误响应"""

        error_info = {
            'success': False,
            'status_code': response.status_code,
            'reason': response.reason,
            'headers': dict(response.headers),
            'timestamp': datetime.now().isoformat()
        }

        # 尝试获取错误详情
        content_type = response.headers.get('content-type', '')

        if 'application/json' in content_type:
            try:
                error_data = response.json()
                error_info['error'] = error_data
            except:
                error_info['raw_error'] = response.text[:500]
        else:
            error_info['raw_error'] = response.text[:500]

        # 根据状态码添加建议
        if response.status_code == 400:
            error_info['suggestion'] = '检查请求参数是否正确'
        elif response.status_code == 401:
            error_info['suggestion'] = '检查认证信息是否正确'
        elif response.status_code == 403:
            error_info['suggestion'] = '没有访问权限'
        elif response.status_code == 404:
            error_info['suggestion'] = '请求的资源不存在'
        elif response.status_code == 429:
            error_info['suggestion'] = '请求过于频繁,请稍后重试'
        elif 500 <= response.status_code < 600:
            error_info['suggestion'] = '服务器内部错误,请稍后重试'

        return error_info

# 使用示例
if __name__ == "__main__":
    handler = APIResponseHandler()

    # 测试不同API响应
    test_urls = [
        'https://api.github.com/users/octocat',  # JSON响应
        'https://httpbin.org/xml',  # XML响应
        'https://httpbin.org/robots.txt',  # 文本响应
        'https://httpbin.org/image/png',  # 二进制响应
    ]

    for url in test_urls:
        print(f"\n测试URL: {url}")

        try:
            response = requests.get(url, timeout=10)
            result = handler.handle_response(response)

            print(f"处理结果:")
            for key, value in result.items():
                if key == 'data' and isinstance(value, (dict, list)):
                    print(f"  {key}: {type(value).__name__} (长度: {len(value) if isinstance(value, list) else 'N/A'})")
                elif key == 'headers':
                    print(f"  {key}: 共 {len(value)} 个头部")
                else:
                    print(f"  {key}: {value}")

        except Exception as e:
            print(f"请求失败: {e}")

响应数据验证

响应数据验证与转换
import requests
import json
from datetime import datetime
from typing import Dict, Any, Optional
from pydantic import BaseModel, ValidationError, validator
from dataclasses import dataclass, asdict

# 方法1: 使用Pydantic进行数据验证
class UserResponse(BaseModel):
    """用户响应数据模型"""

    id: int
    name: str
    email: str
    age: Optional[int] = None
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None

    @validator('email')
    def validate_email(cls, v):
        if '@' not in v:
            raise ValueError('邮箱格式不正确')
        return v

    @validator('age')
    def validate_age(cls, v):
        if v is not None and (v < 0 or v > 150):
            raise ValueError('年龄必须在0-150之间')
        return v

class Pagination(BaseModel):
    """分页信息数据模型"""

    page: int
    limit: int
    total: int
    pages: int

class APIResponse(BaseModel):
    """API响应数据模型"""

    status: str
    code: int
    message: str
    data: Optional[Any] = None
    pagination: Optional[Pagination] = None

    @validator('status')
    def validate_status(cls, v):
        if v not in ['success', 'error']:
            raise ValueError('状态必须是success或error')
        return v

    @validator('code')
    def validate_code(cls, v):
        if v < 100 or v >= 600:
            raise ValueError('状态码必须在100-599之间')
        return v

# 方法2: 使用dataclass
@dataclass
class Product:
    """产品数据类"""

    id: int
    name: str
    price: float
    stock: int
    category: str = "未分类"

    def to_dict(self):
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]):
        return cls(**data)

class ResponseValidator:
    """响应验证器"""

    @staticmethod
    def validate_user_response(response_data: Dict[str, Any]) -> Optional[UserResponse]:
        """验证用户响应数据"""

        try:
            # 验证API响应结构
            api_response = APIResponse(**response_data)

            if api_response.status != 'success':
                print(f"API返回错误: {api_response.message}")
                return None

            # 验证用户数据
            if api_response.data and 'user' in api_response.data:
                user_data = api_response.data['user']
                user = UserResponse(**user_data)
                return user

        except ValidationError as e:
            print(f"数据验证失败: {e}")
            print(f"错误详情: {e.errors()}")
            return None
        except Exception as e:
            print(f"验证过程中发生错误: {e}")
            return None

        return None

    @staticmethod
    def validate_and_extract(response_data: Dict[str, Any], model_class):
        """通用验证和提取方法"""

        try:
            # 如果response_data是字符串,尝试解析为JSON
            if isinstance(response_data, str):
                response_data = json.loads(response_data)

            # 验证数据
            instance = model_class(**response_data)
            return instance

        except json.JSONDecodeError as e:
            print(f"JSON解析失败: {e}")
            return None
        except ValidationError as e:
            print(f"数据验证失败: {e}")
            return None
        except Exception as e:
            print(f"验证过程中发生错误: {e}")
            return None

# 使用示例
if __name__ == "__main__":
    validator = ResponseValidator()

    # 模拟API响应数据
    sample_response = {
        'status': 'success',
        'code': 200,
        'message': '获取用户成功',
        'data': {
            'user': {
                'id': 12345,
                'name': '张三',
                'email': 'zhangsan@example.com',
                'age': 25,
                'created_at': '2023-10-01T10:30:00Z',
                'updated_at': '2023-10-02T14:20:00Z'
            }
        },
        'pagination': {
            'page': 1,
            'limit': 10,
            'total': 100,
            'pages': 10
        }
    }

    # 验证数据
    user = validator.validate_user_response(sample_response)

    if user:
        print(f"验证成功!")
        print(f"用户ID: {user.id}")
        print(f"用户名: {user.name}")
        print(f"邮箱: {user.email}")
        print(f"年龄: {user.age}")
        print(f"创建时间: {user.created_at}")

    # 测试错误数据
    error_response = {
        'status': 'success',  # 状态是success但数据有问题
        'code': 200,
        'message': '获取用户成功',
        'data': {
            'user': {
                'id': 'not_a_number',  # 错误的类型
                'name': '张三',
                'email': 'invalid-email',  # 无效的邮箱
                'age': 200,  # 无效的年龄
            }
        }
    }

    print(f"\n测试错误数据验证:")
    user = validator.validate_user_response(error_response)
    if not user:
        print("验证正确检测到错误数据")

认证与授权

大多数API都需要某种形式的认证来识别调用者并控制访问权限。

常见API认证方式
1. API Key
最简单的认证方式,通过查询参数或请求头传递
headers = {'X-API-Key': 'your_api_key'}
# 或
params = {'api_key': 'your_api_key'}
2. Bearer Token (JWT)
通过Authorization头传递的令牌
headers = {
    'Authorization': 'Bearer your_jwt_token'
}
3. Basic Auth
通过Authorization头传递用户名和密码的Base64编码
# Requests内置支持
response = requests.get(url, auth=('username', 'password'))
4. OAuth 2.0
工业标准授权框架,支持多种授权流程
# 需要oauthlib库
# 通常涉及获取access_token和refresh_token
5. HMAC签名
通过签名验证请求的完整性和来源
# 需要对请求内容进行哈希计算
# 常用于高安全性要求的API
各种认证方式的完整实现
import requests
import base64
import hashlib
import hmac
import time
import json
from typing import Dict, Any, Optional

class APIAuthenticator:
    """API认证管理器"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config

    # 1. API Key认证
    def add_api_key_auth(self, params: Dict = None, headers: Dict = None) -> tuple:
        """添加API Key认证"""

        api_key = self.config.get('api_key')
        if not api_key:
            return params or {}, headers or {}

        api_key_location = self.config.get('api_key_location', 'header')
        api_key_name = self.config.get('api_key_name', 'X-API-Key')

        if api_key_location == 'header':
            headers = headers or {}
            headers[api_key_name] = api_key
        elif api_key_location == 'query':
            params = params or {}
            params[api_key_name.lower()] = api_key

        return params or {}, headers or {}

    # 2. Bearer Token认证
    def add_bearer_token_auth(self, headers: Dict = None) -> Dict:
        """添加Bearer Token认证"""

        token = self.config.get('bearer_token')
        if token:
            headers = headers or {}
            headers['Authorization'] = f'Bearer {token}'

        return headers or {}

    # 3. Basic认证
    def add_basic_auth(self, auth_tuple: tuple = None) -> tuple:
        """添加Basic认证"""

        username = self.config.get('username')
        password = self.config.get('password')

        if username and password:
            return (username, password)

        return auth_tuple or ()

    # 4. OAuth 2.0客户端凭证认证
    def get_oauth_token(self, token_url: str, client_id: str = None,
                       client_secret: str = None, scope: str = None) -> Optional[str]:
        """获取OAuth 2.0访问令牌(客户端凭证流程)"""

        client_id = client_id or self.config.get('client_id')
        client_secret = client_secret or self.config.get('client_secret')
        scope = scope or self.config.get('scope')

        if not client_id or not client_secret:
            return None

        data = {
            'grant_type': 'client_credentials',
            'client_id': client_id,
            'client_secret': client_secret,
        }

        if scope:
            data['scope'] = scope

        try:
            response = requests.post(token_url, data=data, timeout=10)
            response.raise_for_status()

            token_data = response.json()
            return token_data.get('access_token')

        except Exception as e:
            print(f"获取OAuth令牌失败: {e}")
            return None

    # 5. HMAC签名认证
    def generate_hmac_signature(self, method: str, path: str,
                                body: str = '', timestamp: str = None) -> Dict[str, str]:
        """生成HMAC签名"""

        api_key = self.config.get('api_key')
        api_secret = self.config.get('api_secret')

        if not api_key or not api_secret:
            return {}

        timestamp = timestamp or str(int(time.time() * 1000))

        # 构建签名字符串
        message = f"{method.upper()}\n{path}\n{timestamp}\n{body}"

        # 计算HMAC-SHA256签名
        signature = hmac.new(
            api_secret.encode('utf-8'),
            message.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()

        return {
            'X-API-Key': api_key,
            'X-Timestamp': timestamp,
            'X-Signature': signature
        }

    # 6. AWS Signature v4 (用于AWS服务)
    def generate_aws_signature(self, method: str, url: str,
                               region: str, service: str,
                               payload: str = '', headers: Dict = None) -> Dict[str, str]:
        """生成AWS Signature v4签名(简化版)"""

        # 注意:这是简化版本,完整实现较复杂
        # 实际项目中建议使用boto3库

        access_key = self.config.get('aws_access_key')
        secret_key = self.config.get('aws_secret_key')

        if not access_key or not secret_key:
            return {}

        # 这里只是示例,实际AWS签名需要复杂的计算
        # 包括规范请求、签名字符串、签名密钥等

        return {
            'Authorization': f'AWS4-HMAC-SHA256 Credential={access_key}/...',
            'X-Amz-Date': time.strftime('%Y%m%dT%H%M%SZ', time.gmtime())
        }

class AuthenticatedAPIClient:
    """带认证的API客户端"""

    def __init__(self, base_url: str, authenticator: APIAuthenticator):
        self.base_url = base_url.rstrip('/')
        self.authenticator = authenticator
        self.session = requests.Session()

        # 配置会话
        self.session.headers.update({
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'AuthenticatedAPIClient/1.0'
        })

    def request(self, method: str, endpoint: str,
                params: Dict = None, data: Any = None,
                json_data: Dict = None, auth_type: str = 'bearer') -> requests.Response:
        """发送带认证的请求"""

        url = f"{self.base_url}{endpoint}"

        # 准备参数和头部
        request_params = params or {}
        request_headers = {}

        # 根据认证类型添加认证信息
        if auth_type == 'api_key':
            request_params, request_headers = self.authenticator.add_api_key_auth(
                request_params, request_headers
            )
        elif auth_type == 'bearer':
            request_headers = self.authenticator.add_bearer_token_auth(request_headers)
        elif auth_type == 'basic':
            # Basic认证通过auth参数传递
            auth_tuple = self.authenticator.add_basic_auth()
            if auth_tuple:
                # 需要在请求时传递auth参数
                pass
        elif auth_type == 'hmac':
            # 生成HMAC签名
            body = json.dumps(json_data) if json_data else ''
            hmac_headers = self.authenticator.generate_hmac_signature(
                method, endpoint, body
            )
            request_headers.update(hmac_headers)

        # 发送请求
        try:
            # 处理Basic认证
            auth_tuple = None
            if auth_type == 'basic':
                auth_tuple = self.authenticator.add_basic_auth()

            response = self.session.request(
                method=method,
                url=url,
                params=request_params,
                data=data,
                json=json_data,
                headers=request_headers,
                auth=auth_tuple,
                timeout=30
            )

            return response

        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            raise

    def get(self, endpoint: str, params: Dict = None, auth_type: str = 'bearer') -> requests.Response:
        """发送GET请求"""
        return self.request('GET', endpoint, params=params, auth_type=auth_type)

    def post(self, endpoint: str, json_data: Dict = None, auth_type: str = 'bearer') -> requests.Response:
        """发送POST请求"""
        return self.request('POST', endpoint, json_data=json_data, auth_type=auth_type)

    def put(self, endpoint: str, json_data: Dict = None, auth_type: str = 'bearer') -> requests.Response:
        """发送PUT请求"""
        return self.request('PUT', endpoint, json_data=json_data, auth_type=auth_type)

    def delete(self, endpoint: str, auth_type: str = 'bearer') -> requests.Response:
        """发送DELETE请求"""
        return self.request('DELETE', endpoint, auth_type=auth_type)

# 使用示例
if __name__ == "__main__":
    # 配置认证信息
    config = {
        'api_key': 'your_api_key_here',
        'bearer_token': 'your_jwt_token_here',
        'username': 'your_username',
        'password': 'your_password',
        'api_secret': 'your_api_secret_for_hmac'
    }

    # 创建认证器
    authenticator = APIAuthenticator(config)

    # 创建API客户端
    client = AuthenticatedAPIClient('https://api.example.com/v1', authenticator)

    # 使用不同认证方式发送请求
    try:
        # 1. 使用Bearer Token认证
        print("使用Bearer Token认证:")
        response = client.get('/users/me', auth_type='bearer')
        if response.status_code == 200:
            print(f"获取用户信息成功: {response.json().get('name')}")

        # 2. 使用API Key认证
        print("\n使用API Key认证:")
        response = client.get('/products', auth_type='api_key')
        if response.status_code == 200:
            data = response.json()
            print(f"获取到 {len(data.get('products', []))} 个产品")

        # 3. 使用Basic认证
        print("\n使用Basic认证:")
        response = client.get('/protected', auth_type='basic')
        if response.status_code == 200:
            print("Basic认证成功")

        # 4. 使用HMAC认证
        print("\n使用HMAC认证:")
        response = client.post('/orders',
                              json_data={'product_id': 123, 'quantity': 2},
                              auth_type='hmac')
        if response.status_code == 201:
            print("创建订单成功")

    except Exception as e:
        print(f"API调用失败: {e}")

速率限制处理

速率限制

大多数API都有速率限制,以防止滥用和保护服务器资源。常见的速率限制方式:

  • 请求数限制: 每分钟/小时/天最多N个请求
  • 并发连接限制: 同时最多N个连接
  • 数据量限制: 每分钟/小时最多N MB数据
  • 滑动窗口: 基于时间的动态限制
速率限制处理实现
import requests
import time
import threading
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Any, Optional

class RateLimiter:
    """速率限制器"""

    def __init__(self, requests_per_second: float = 1.0):
        """
        初始化速率限制器

        参数:
            requests_per_second: 每秒最大请求数
        """
        self.requests_per_second = requests_per_second
        self.min_interval = 1.0 / requests_per_second

        # 记录请求时间
        self.request_times = deque()
        self.lock = threading.Lock()

        # 统计信息
        self.stats = {
            'total_requests': 0,
            'throttled_requests': 0,
            'last_reset': datetime.now()
        }

    def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
        """
        获取请求许可

        参数:
            blocking: 是否阻塞等待
            timeout: 超时时间(秒)

        返回:
            是否获得许可
        """

        start_time = time.time()

        with self.lock:
            current_time = time.time()

            # 清理过期的请求记录
            while self.request_times and \
                  current_time - self.request_times[0] > 1.0:
                self.request_times.popleft()

            # 检查是否超过限制
            if len(self.request_times) < self.requests_per_second:
                # 还有配额,允许请求
                self.request_times.append(current_time)
                self.stats['total_requests'] += 1
                return True

            # 计算需要等待的时间
            oldest_request = self.request_times[0]
            wait_time = 1.0 - (current_time - oldest_request)

            if wait_time <= 0:
                # 实际上不应该发生,但为了安全
                self.request_times.append(current_time)
                self.stats['total_requests'] += 1
                return True

        # 需要等待
        if not blocking:
            self.stats['throttled_requests'] += 1
            return False

        if timeout is not None and wait_time > timeout:
            self.stats['throttled_requests'] += 1
            return False

        # 等待
        time.sleep(wait_time)

        with self.lock:
            current_time = time.time()

            # 再次检查(可能在等待期间有变化)
            while self.request_times and \
                  current_time - self.request_times[0] > 1.0:
                self.request_times.popleft()

            if len(self.request_times) < self.requests_per_second:
                self.request_times.append(current_time)
                self.stats['total_requests'] += 1
                return True

        self.stats['throttled_requests'] += 1
        return False

    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""

        with self.lock:
            stats = self.stats.copy()
            stats['current_window_requests'] = len(self.request_times)
            stats['available_requests'] = max(
                0, self.requests_per_second - len(self.request_times)
            )

        return stats

class RateLimitAwareSession(requests.Session):
    """带速率限制的Session"""

    def __init__(self, requests_per_second: float = 1.0):
        super().__init__()
        self.rate_limiter = RateLimiter(requests_per_second)

        # 配置重试
        from requests.adapters import HTTPAdapter
        from urllib3.util.retry import Retry

        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.mount('http://', adapter)
        self.mount('https://', adapter)

    def request(self, method, url, **kwargs):
        """重写request方法以添加速率限制"""

        # 应用速率限制
        if not self.rate_limiter.acquire(blocking=True, timeout=30):
            raise Exception("无法获取速率限制许可")

        # 发送请求
        response = super().request(method, url, **kwargs)

        # 检查响应中的速率限制头
        self._check_rate_limit_headers(response)

        return response

    def _check_rate_limit_headers(self, response: requests.Response):
        """检查响应头中的速率限制信息"""

        rate_limit_headers = {
            'X-RateLimit-Limit': '限制',
            'X-RateLimit-Remaining': '剩余',
            'X-RateLimit-Reset': '重置时间',
            'Retry-After': '重试等待'
        }

        found_headers = {}
        for header, description in rate_limit_headers.items():
            if header in response.headers:
                found_headers[header] = response.headers[header]

        if found_headers:
            print("发现速率限制头:")
            for header, value in found_headers.items():
                print(f"  {header}: {value}")

            # 如果收到429状态码,根据Retry-After头调整
            if response.status_code == 429:
                retry_after = response.headers.get('Retry-After')
                if retry_after:
                    try:
                        wait_seconds = int(retry_after)
                        print(f"速率限制,等待 {wait_seconds} 秒")
                        time.sleep(wait_seconds)
                    except ValueError:
                        pass

class APIThrottler:
    """API请求节流器"""

    def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0):
        """
        初始化节流器

        参数:
            base_delay: 基础延迟(秒)
            max_delay: 最大延迟(秒)
        """
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.current_delay = base_delay
        self.consecutive_errors = 0

    def before_request(self):
        """请求前调用,可能延迟"""

        if self.current_delay > 0:
            time.sleep(self.current_delay)

    def after_success(self):
        """成功请求后调用"""

        # 重置延迟
        self.current_delay = self.base_delay
        self.consecutive_errors = 0

    def after_error(self, status_code: int):
        """错误请求后调用"""

        self.consecutive_errors += 1

        # 根据错误类型调整延迟
        if status_code == 429:  # 太多请求
            # 指数退避
            self.current_delay = min(
                self.base_delay * (2 ** self.consecutive_errors),
                self.max_delay
            )
            print(f"收到429错误,下次延迟: {self.current_delay:.2f}秒")
        elif 500 <= status_code < 600:  # 服务器错误
            # 线性增加
            self.current_delay = min(
                self.base_delay * (1 + self.consecutive_errors * 0.5),
                self.max_delay
            )
        else:
            # 其他错误,稍微增加延迟
            self.current_delay = min(self.current_delay * 1.1, self.max_delay)

    def get_status(self) -> Dict[str, Any]:
        """获取当前状态"""

        return {
            'current_delay': self.current_delay,
            'consecutive_errors': self.consecutive_errors,
            'base_delay': self.base_delay,
            'max_delay': self.max_delay
        }

# 使用示例
if __name__ == "__main__":
    # 创建带速率限制的Session
    session = RateLimitAwareSession(requests_per_second=2.0)  # 每秒2个请求

    print("测试速率限制:")

    # 创建节流器
    throttler = APIThrottler(base_delay=0.5, max_delay=5.0)

    # 模拟多个请求
    urls = [
        'https://httpbin.org/get',
        'https://httpbin.org/post',
        'https://httpbin.org/put',
        'https://httpbin.org/delete',
        'https://httpbin.org/status/429',  # 模拟速率限制错误
    ]

    for i, url in enumerate(urls, 1):
        print(f"\n请求 {i}/{len(urls)}: {url}")

        # 应用节流
        throttler.before_request()

        try:
            # 发送请求
            if 'post' in url:
                response = session.post(url, json={'test': 'data'})
            elif 'put' in url:
                response = session.put(url, json={'test': 'data'})
            elif 'delete' in url:
                response = session.delete(url)
            else:
                response = session.get(url)

            # 处理响应
            if response.status_code == 200:
                print(f"请求成功,状态码: {response.status_code}")
                throttler.after_success()
            else:
                print(f"请求失败,状态码: {response.status_code}")
                throttler.after_error(response.status_code)

        except Exception as e:
            print(f"请求异常: {e}")
            throttler.after_error(0)  # 0表示网络异常

        # 显示节流器状态
        status = throttler.get_status()
        print(f"节流器状态: 延迟={status['current_delay']:.2f}s, 连续错误={status['consecutive_errors']}")

    # 显示速率限制器统计
    stats = session.rate_limiter.get_stats()
    print(f"\n速率限制器统计:")
    for key, value in stats.items():
        print(f"  {key}: {value}")

分页处理

常见的分页模式
1. 基于页码的分页

使用page和limit参数控制分页。

{
  "data": [...],
  "pagination": {
    "page": 1,
    "limit": 10,
    "total": 100,
    "pages": 10
  }
}
2. 基于游标的分页

使用cursor和limit参数,适合大数据集。

{
  "data": [...],
  "pagination": {
    "next_cursor": "abc123",
    "has_more": true,
    "limit": 10
  }
}
3. 基于偏移量的分页

使用offset和limit参数。

{
  "data": [...],
  "pagination": {
    "offset": 0,
    "limit": 10,
    "total": 100
  }
}
4. 基于时间戳的分页

使用since和until参数按时间分页。

{
  "data": [...],
  "pagination": {
    "since": "2023-01-01T00:00:00Z",
    "until": "2023-12-31T23:59:59Z"
  }
}
分页处理实现
import requests
import time
from typing import Dict, Any, List, Optional, Generator
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse

class PaginationHandler:
    """分页处理器"""

    def __init__(self, session: requests.Session):
        self.session = session

    # 1. 基于页码的分页
    def fetch_all_with_page(self, base_url: str,
                           page_param: str = 'page',
                           limit_param: str = 'limit',
                           limit: int = 100,
                           max_pages: int = None) -> List[Any]:
        """
        获取所有数据(基于页码)

        参数:
            base_url: 基础URL
            page_param: 页码参数名
            limit_param: 每页数量参数名
            limit: 每页数量
            max_pages: 最大页数(None表示无限制)
        """

        all_data = []
        page = 1

        while True:
            # 构建URL
            parsed_url = urlparse(base_url)
            query_params = parse_qs(parsed_url.query)

            # 添加分页参数
            query_params[page_param] = [str(page)]
            query_params[limit_param] = [str(limit)]

            # 重建URL
            new_query = urlencode(query_params, doseq=True)
            url = urlunparse((
                parsed_url.scheme,
                parsed_url.netloc,
                parsed_url.path,
                parsed_url.params,
                new_query,
                parsed_url.fragment
            ))

            print(f"获取第 {page} 页: {url}")

            try:
                response = self.session.get(url, timeout=30)
                response.raise_for_status()

                data = response.json()

                # 提取数据(根据API响应结构调整)
                items = data.get('data', [])
                if not items and 'items' in data:
                    items = data['items']

                if not items:
                    print(f"第 {page} 页没有数据,停止")
                    break

                all_data.extend(items)

                # 检查是否还有更多页
                pagination = data.get('pagination', {})
                total_pages = pagination.get('pages')

                if total_pages and page >= total_pages:
                    print(f"已达到总页数 {total_pages},停止")
                    break

                # 检查max_pages限制
                if max_pages and page >= max_pages:
                    print(f"已达到最大页数限制 {max_pages},停止")
                    break

                page += 1

                # 避免请求过快
                time.sleep(0.1)

            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404 and page > 1:
                    # 页码超出范围,停止
                    print(f"页码 {page} 超出范围,停止")
                    break
                else:
                    print(f"获取第 {page} 页失败: {e}")
                    break
            except Exception as e:
                print(f"获取第 {page} 页时发生错误: {e}")
                break

        return all_data

    # 2. 基于游标的分页
    def fetch_all_with_cursor(self, base_url: str,
                             cursor_param: str = 'cursor',
                             limit_param: str = 'limit',
                             limit: int = 100,
                             max_requests: int = 100) -> List[Any]:
        """
        获取所有数据(基于游标)

        参数:
            base_url: 基础URL
            cursor_param: 游标参数名
            limit_param: 每页数量参数名
            limit: 每页数量
            max_requests: 最大请求次数
        """

        all_data = []
        next_cursor = None
        request_count = 0

        while True:
            # 检查请求次数限制
            request_count += 1
            if request_count > max_requests:
                print(f"达到最大请求次数限制 {max_requests},停止")
                break

            # 构建URL
            parsed_url = urlparse(base_url)
            query_params = parse_qs(parsed_url.query)

            # 添加分页参数
            query_params[limit_param] = [str(limit)]
            if next_cursor:
                query_params[cursor_param] = [next_cursor]

            # 重建URL
            new_query = urlencode(query_params, doseq=True)
            url = urlunparse((
                parsed_url.scheme,
                parsed_url.netloc,
                parsed_url.path,
                parsed_url.params,
                new_query,
                parsed_url.fragment
            ))

            print(f"获取数据 (请求 #{request_count}): {url}")

            try:
                response = self.session.get(url, timeout=30)
                response.raise_for_status()

                data = response.json()

                # 提取数据
                items = data.get('data', [])
                if not items and 'items' in data:
                    items = data['items']

                if not items:
                    print(f"没有数据,停止")
                    break

                all_data.extend(items)

                # 获取下一个游标
                pagination = data.get('pagination', {})
                next_cursor = pagination.get('next_cursor')

                # 检查是否还有更多数据
                if not next_cursor or not pagination.get('has_more', True):
                    print(f"没有更多数据,停止")
                    break

                # 避免请求过快
                time.sleep(0.1)

            except Exception as e:
                print(f"获取数据失败: {e}")
                break

        return all_data

    # 3. 基于偏移量的分页
    def fetch_all_with_offset(self, base_url: str,
                             offset_param: str = 'offset',
                             limit_param: str = 'limit',
                             limit: int = 100) -> List[Any]:
        """
        获取所有数据(基于偏移量)

        参数:
            base_url: 基础URL
            offset_param: 偏移量参数名
            limit_param: 每页数量参数名
            limit: 每页数量
        """

        all_data = []
        offset = 0

        while True:
            # 构建URL
            parsed_url = urlparse(base_url)
            query_params = parse_qs(parsed_url.query)

            # 添加分页参数
            query_params[offset_param] = [str(offset)]
            query_params[limit_param] = [str(limit)]

            # 重建URL
            new_query = urlencode(query_params, doseq=True)
            url = urlunparse((
                parsed_url.scheme,
                parsed_url.netloc,
                parsed_url.path,
                parsed_url.params,
                new_query,
                parsed_url.fragment
            ))

            print(f"获取数据 (偏移量: {offset}): {url}")

            try:
                response = self.session.get(url, timeout=30)
                response.raise_for_status()

                data = response.json()

                # 提取数据
                items = data.get('data', [])
                if not items and 'items' in data:
                    items = data['items']

                if not items:
                    print(f"没有数据,停止")
                    break

                all_data.extend(items)

                # 检查是否还有更多数据
                total_items = data.get('pagination', {}).get('total')
                if total_items and offset + len(items) >= total_items:
                    print(f"已获取所有 {total_items} 条数据,停止")
                    break

                # 更新偏移量
                offset += len(items)

                # 避免请求过快
                time.sleep(0.1)

            except Exception as e:
                print(f"获取数据失败: {e}")
                break

        return all_data

    # 4. 生成器版本(内存友好)
    def iterate_with_page(self, base_url: str,
                         page_param: str = 'page',
                         limit_param: str = 'limit',
                         limit: int = 100,
                         max_pages: int = None) -> Generator[Any, None, None]:
        """
        分页迭代器(基于页码)

        参数:
            base_url: 基础URL
            page_param: 页码参数名
            limit_param: 每页数量参数名
            limit: 每页数量
            max_pages: 最大页数
        """

        page = 1

        while True:
            # 构建URL
            parsed_url = urlparse(base_url)
            query_params = parse_qs(parsed_url.query)

            # 添加分页参数
            query_params[page_param] = [str(page)]
            query_params[limit_param] = [str(limit)]

            # 重建URL
            new_query = urlencode(query_params, doseq=True)
            url = urlunparse((
                parsed_url.scheme,
                parsed_url.netloc,
                parsed_url.path,
                parsed_url.params,
                new_query,
                parsed_url.fragment
            ))

            try:
                response = self.session.get(url, timeout=30)
                response.raise_for_status()

                data = response.json()

                # 提取数据
                items = data.get('data', [])
                if not items and 'items' in data:
                    items = data['items']

                if not items:
                    break

                # 返回数据
                for item in items:
                    yield item

                # 检查是否还有更多页
                pagination = data.get('pagination', {})
                total_pages = pagination.get('pages')

                if total_pages and page >= total_pages:
                    break

                # 检查max_pages限制
                if max_pages and page >= max_pages:
                    break

                page += 1

                # 避免请求过快
                time.sleep(0.1)

            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404 and page > 1:
                    # 页码超出范围,停止
                    break
                else:
                    print(f"获取第 {page} 页失败: {e}")
                    break
            except Exception as e:
                print(f"获取第 {page} 页时发生错误: {e}")
                break

# 使用示例
if __name__ == "__main__":
    session = requests.Session()
    handler = PaginationHandler(session)

    # 示例API(使用httpbin模拟)
    base_url = "https://httpbin.org/anything"

    print("测试基于页码的分页:")

    # 获取所有数据
    all_data = handler.fetch_all_with_page(
        base_url,
        page_param='page',
        limit_param='limit',
        limit=2,
        max_pages=3  # 只获取前3页
    )

    print(f"总共获取到 {len(all_data)} 条数据")

    print("\n测试分页迭代器(内存友好):")

    # 使用生成器迭代
    item_count = 0
    for item in handler.iterate_with_page(
        base_url,
        page_param='page',
        limit_param='limit',
        limit=2,
        max_pages=2  # 只迭代前2页
    ):
        item_count += 1
        print(f"处理第 {item_count} 条数据")

    print(f"总共处理了 {item_count} 条数据")

API错误处理

[INFO] 开始API调用: GET https://api.example.com/users
[WARNING] 收到429状态码,速率限制触发
[INFO] 等待2秒后重试...
[ERROR] 服务器错误: 500 Internal Server Error
[INFO] 启用指数退避重试策略
[SUCCESS] 第3次重试成功!
完整的API错误处理策略
import requests
import time
import json
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class APIErrorType(Enum):
    """API错误类型枚举"""

    NETWORK_ERROR = "network_error"
    TIMEOUT_ERROR = "timeout_error"
    RATE_LIMIT_ERROR = "rate_limit_error"
    AUTH_ERROR = "authentication_error"
    VALIDATION_ERROR = "validation_error"
    SERVER_ERROR = "server_error"
    CLIENT_ERROR = "client_error"
    UNKNOWN_ERROR = "unknown_error"

@dataclass
class APIError:
    """API错误信息"""

    error_type: APIErrorType
    message: str
    status_code: Optional[int] = None
    response_text: Optional[str] = None
    request_info: Optional[Dict[str, Any]] = None
    retry_count: int = 0

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""

        return {
            'error_type': self.error_type.value,
            'message': self.message,
            'status_code': self.status_code,
            'response_text': self.response_text,
            'request_info': self.request_info,
            'retry_count': self.retry_count,
            'timestamp': time.time()
        }

class APIRetryStrategy:
    """API重试策略"""

    def __init__(self, max_retries: int = 3, backoff_factor: float = 1.0):
        """
        初始化重试策略

        参数:
            max_retries: 最大重试次数
            backoff_factor: 退避因子
        """

        self.max_retries = max_retries
        self.backoff_factor = backoff_factor

        # 可重试的错误状态码
        self.retryable_status_codes = {408, 429, 500, 502, 503, 504}

        # 可重试的异常类型
        self.retryable_exceptions = (
            requests.exceptions.ConnectionError,
            requests.exceptions.Timeout,
        )

    def should_retry(self, error: APIError) -> bool:
        """判断是否应该重试"""

        # 超过最大重试次数
        if error.retry_count >= self.max_retries:
            return False

        # 网络错误通常应该重试
        if error.error_type in [APIErrorType.NETWORK_ERROR, APIErrorType.TIMEOUT_ERROR]:
            return True

        # 速率限制错误应该重试
        if error.error_type == APIErrorType.RATE_LIMIT_ERROR:
            return True

        # 服务器错误应该重试
        if error.error_type == APIErrorType.SERVER_ERROR:
            return True

        # 根据状态码判断
        if error.status_code in self.retryable_status_codes:
            return True

        return False

    def get_wait_time(self, retry_count: int) -> float:
        """计算等待时间(指数退避)"""

        return self.backoff_factor * (2 ** retry_count)

    def create_retry_adapter(self) -> HTTPAdapter:
        """创建重试适配器(用于requests.Session)"""

        retry_strategy = Retry(
            total=self.max_retries,
            backoff_factor=self.backoff_factor,
            status_forcelist=list(self.retryable_status_codes),
            allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"]
        )

        return HTTPAdapter(max_retries=retry_strategy)

class APIErrorHandler:
    """API错误处理器"""

    def __init__(self, retry_strategy: Optional[APIRetryStrategy] = None):
        self.retry_strategy = retry_strategy or APIRetryStrategy()
        self.error_callbacks = {}

    def register_error_callback(self, error_type: APIErrorType, callback: Callable):
        """注册错误回调函数"""

        self.error_callbacks[error_type] = callback

    def handle_error(self, error: APIError) -> Optional[Any]:
        """处理错误"""

        # 调用特定错误的回调函数
        if error.error_type in self.error_callbacks:
            return self.error_callbacks[error.error_type](error)

        # 默认错误处理
        return self._default_error_handler(error)

    def _default_error_handler(self, error: APIError) -> None:
        """默认错误处理"""

        print(f"API错误 [{error.error_type.value}]: {error.message}")

        if error.status_code:
            print(f"状态码: {error.status_code}")

        if error.response_text:
            print(f"响应内容: {error.response_text[:200]}...")

        if error.request_info:
            print(f"请求信息: {json.dumps(error.request_info, indent=2)}")

    def execute_with_retry(self, request_func: Callable, *args, **kwargs) -> Any:
        """带重试的执行"""

        retry_count = 0

        while retry_count <= self.retry_strategy.max_retries:
            try:
                return request_func(*args, **kwargs)

            except requests.exceptions.RequestException as e:
                # 创建错误对象
                error = self._create_error_from_exception(e, retry_count)

                # 判断是否应该重试
                if not self.retry_strategy.should_retry(error):
                    self.handle_error(error)
                    raise

                # 计算等待时间
                wait_time = self.retry_strategy.get_wait_time(retry_count)
                print(f"重试 {retry_count + 1}/{self.retry_strategy.max_retries},等待 {wait_time:.2f} 秒...")

                # 等待
                time.sleep(wait_time)

                retry_count += 1
                error.retry_count = retry_count

        # 所有重试都失败
        error = APIError(
            error_type=APIErrorType.UNKNOWN_ERROR,
            message=f"所有 {self.retry_strategy.max_retries} 次重试都失败",
            retry_count=retry_count
        )

        self.handle_error(error)
        raise Exception(f"API调用失败,已重试 {retry_count} 次")

    def _create_error_from_exception(self, exception: Exception, retry_count: int) -> APIError:
        """从异常创建错误对象"""

        # 网络连接错误
        if isinstance(exception, requests.exceptions.ConnectionError):
            return APIError(
                error_type=APIErrorType.NETWORK_ERROR,
                message=f"网络连接错误: {exception}",
                retry_count=retry_count
            )

        # 超时错误
        elif isinstance(exception, requests.exceptions.Timeout):
            return APIError(
                error_type=APIErrorType.TIMEOUT_ERROR,
                message=f"请求超时: {exception}",
                retry_count=retry_count
            )

        # HTTP错误
        elif isinstance(exception, requests.exceptions.HTTPError):
            response = exception.response

            # 根据状态码确定错误类型
            if response.status_code == 401 or response.status_code == 403:
                error_type = APIErrorType.AUTH_ERROR
                message = "认证失败"
            elif response.status_code == 400:
                error_type = APIErrorType.VALIDATION_ERROR
                message = "请求参数验证失败"
            elif response.status_code == 429:
                error_type = APIErrorType.RATE_LIMIT_ERROR
                message = "请求过于频繁,触发速率限制"
            elif 500 <= response.status_code < 600:
                error_type = APIErrorType.SERVER_ERROR
                message = f"服务器错误: {response.status_code}"
            else:
                error_type = APIErrorType.CLIENT_ERROR
                message = f"客户端错误: {response.status_code}"

            return APIError(
                error_type=error_type,
                message=message,
                status_code=response.status_code,
                response_text=response.text,
                retry_count=retry_count
            )

        # 其他异常
        else:
            return APIError(
                error_type=APIErrorType.UNKNOWN_ERROR,
                message=f"未知错误: {exception}",
                retry_count=retry_count
            )

class RobustAPIClient:
    """健壮的API客户端"""

    def __init__(self, base_url: str):
        self.base_url = base_url
        self.error_handler = APIErrorHandler()
        self.session = self._create_session()

        # 注册错误回调
        self._register_error_callbacks()

    def _create_session(self) -> requests.Session:
        """创建配置好的Session"""

        session = requests.Session()

        # 配置重试策略
        retry_strategy = APIRetryStrategy(max_retries=3, backoff_factor=1.0)
        adapter = retry_strategy.create_retry_adapter()

        session.mount('http://', adapter)
        session.mount('https://', adapter)

        # 配置通用头部
        session.headers.update({
            'User-Agent': 'RobustAPIClient/1.0',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })

        return session

    def _register_error_callbacks(self):
        """注册错误回调函数"""

        # 速率限制错误回调
        def handle_rate_limit_error(error: APIError) -> None:
            print(f"处理速率限制错误...")

            # 从响应头获取重试等待时间
            if error.response_text:
                try:
                    error_data = json.loads(error.response_text)
                    retry_after = error_data.get('retry_after', 60)
                    print(f"根据API建议,等待 {retry_after} 秒")
                    time.sleep(retry_after)
                except:
                    pass

        self.error_handler.register_error_callback(
            APIErrorType.RATE_LIMIT_ERROR,
            handle_rate_limit_error
        )

        # 认证错误回调
        def handle_auth_error(error: APIError) -> None:
            print(f"处理认证错误...")
            print("建议: 1. 检查API密钥 2. 刷新访问令牌 3. 重新登录")

        self.error_handler.register_error_callback(
            APIErrorType.AUTH_ERROR,
            handle_auth_error
        )

    def request(self, method: str, endpoint: str, **kwargs) -> Any:
        """发送请求(带错误处理)"""

        url = f"{self.base_url}{endpoint}"

        def request_func():
            response = self.session.request(method, url, **kwargs)
            response.raise_for_status()
            return response

        try:
            response = self.error_handler.execute_with_retry(request_func)

            # 成功响应
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 204:  # No Content
                return None
            else:
                return response.text

        except Exception as e:
            # 错误处理器已经处理了错误,这里只是重新抛出
            raise

    def get(self, endpoint: str, **kwargs) -> Any:
        """发送GET请求"""
        return self.request('GET', endpoint, **kwargs)

    def post(self, endpoint: str, **kwargs) -> Any:
        """发送POST请求"""
        return self.request('POST', endpoint, **kwargs)

    def put(self, endpoint: str, **kwargs) -> Any:
        """发送PUT请求"""
        return self.request('PUT', endpoint, **kwargs)

    def delete(self, endpoint: str, **kwargs) -> Any:
        """发送DELETE请求"""
        return self.request('DELETE', endpoint, **kwargs)

# 使用示例
if __name__ == "__main__":
    # 创建健壮的API客户端
    client = RobustAPIClient('https://httpbin.org')

    print("测试健壮API客户端...")

    # 测试正常请求
    try:
        print("\n1. 测试正常请求:")
        data = client.get('/get')
        print(f"请求成功: {data.get('url')}")
    except Exception as e:
        print(f"请求失败: {e}")

    # 测试服务器错误(应该重试)
    try:
        print("\n2. 测试服务器错误(500):")
        data = client.get('/status/500')
        print(f"请求结果: {data}")
    except Exception as e:
        print(f"请求失败(预期中): {e}")

    # 测试速率限制(429)
    try:
        print("\n3. 测试速率限制(429):")
        data = client.get('/status/429')
        print(f"请求结果: {data}")
    except Exception as e:
        print(f"请求失败(预期中): {e}")

    # 测试客户端错误(400,不应该重试)
    try:
        print("\n4. 测试客户端错误(400):")
        data = client.get('/status/400')
        print(f"请求结果: {data}")
    except Exception as e:
        print(f"请求失败(预期中): {e}")

实际API示例

实战项目:天气预报API客户端

项目目标

创建一个完整的天气预报API客户端,支持多种天气服务提供商。

完整代码实现
天气预报API客户端完整代码
import requests
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass, asdict
from enum import Enum
import csv

class WeatherProvider(Enum):
    """天气服务提供商"""

    OPENWEATHER = "openweather"
    WEATHERAPI = "weatherapi"
    VISUALCROSSING = "visualcrossing"

@dataclass
class WeatherData:
    """天气数据类"""

    temperature: float  # 温度(摄氏度)
    feels_like: float   # 体感温度
    humidity: int       # 湿度(百分比)
    pressure: int       # 气压(hPa)
    wind_speed: float   # 风速(米/秒)
    wind_direction: str # 风向
    description: str    # 天气描述
    icon: str          # 天气图标代码
    timestamp: str     # 时间戳
    location: str      # 位置

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return asdict(self)

    def to_csv_row(self) -> List[str]:
        """转换为CSV行"""
        return [
            self.timestamp,
            self.location,
            str(self.temperature),
            str(self.feels_like),
            str(self.humidity),
            str(self.pressure),
            str(self.wind_speed),
            self.wind_direction,
            self.description,
            self.icon
        ]

class BaseWeatherClient:
    """天气客户端基类"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.session = requests.Session()
        self.base_url = ""

        # 配置Session
        self.session.headers.update({
            'User-Agent': 'WeatherClient/1.0',
            'Accept': 'application/json'
        })

    def get_current_weather(self, location: str) -> Optional[WeatherData]:
        """获取当前天气(子类必须实现)"""
        raise NotImplementedError

    def get_forecast(self, location: str, days: int = 5) -> List[WeatherData]:
        """获取天气预报(子类必须实现)"""
        raise NotImplementedError

    def _make_request(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """发送API请求"""

        try:
            response = self.session.get(
                f"{self.base_url}{endpoint}",
                params=params,
                timeout=10
            )
            response.raise_for_status()

            return response.json()

        except requests.exceptions.HTTPError as e:
            print(f"HTTP错误: {e}")
            if e.response.status_code == 401:
                print("API密钥无效")
            elif e.response.status_code == 404:
                print("位置未找到")
            elif e.response.status_code == 429:
                print("请求过于频繁")
        except requests.exceptions.ConnectionError:
            print("网络连接错误")
        except requests.exceptions.Timeout:
            print("请求超时")
        except Exception as e:
            print(f"未知错误: {e}")

        return None

class OpenWeatherClient(BaseWeatherClient):
    """OpenWeatherMap客户端"""

    def __init__(self, api_key: str):
        super().__init__(api_key)
        self.base_url = "https://api.openweathermap.org/data/2.5"

    def get_current_weather(self, location: str) -> Optional[WeatherData]:
        """获取当前天气"""

        params = {
            'q': location,
            'appid': self.api_key,
            'units': 'metric',  # 使用摄氏度
            'lang': 'zh_cn'     # 中文描述
        }

        data = self._make_request('/weather', params)
        if not data:
            return None

        return self._parse_current_weather(data, location)

    def get_forecast(self, location: str, days: int = 5) -> List[WeatherData]:
        """获取天气预报"""

        params = {
            'q': location,
            'appid': self.api_key,
            'units': 'metric',
            'lang': 'zh_cn',
            'cnt': days * 8  # 每3小时一个数据点
        }

        data = self._make_request('/forecast', params)
        if not data:
            return []

        return self._parse_forecast(data, location)

    def _parse_current_weather(self, data: Dict[str, Any], location: str) -> WeatherData:
        """解析当前天气数据"""

        main = data['main']
        weather = data['weather'][0]
        wind = data['wind']

        # 转换风向角度为方向
        wind_deg = wind.get('deg', 0)
        wind_direction = self._degrees_to_direction(wind_deg)

        return WeatherData(
            temperature=main['temp'],
            feels_like=main['feels_like'],
            humidity=main['humidity'],
            pressure=main['pressure'],
            wind_speed=wind['speed'],
            wind_direction=wind_direction,
            description=weather['description'],
            icon=weather['icon'],
            timestamp=datetime.fromtimestamp(data['dt']).isoformat(),
            location=location
        )

    def _parse_forecast(self, data: Dict[str, Any], location: str) -> List[WeatherData]:
        """解析天气预报数据"""

        forecast_list = []

        for item in data['list']:
            main = item['main']
            weather = item['weather'][0]
            wind = item['wind']

            wind_deg = wind.get('deg', 0)
            wind_direction = self._degrees_to_direction(wind_deg)

            weather_data = WeatherData(
                temperature=main['temp'],
                feels_like=main['feels_like'],
                humidity=main['humidity'],
                pressure=main['pressure'],
                wind_speed=wind['speed'],
                wind_direction=wind_direction,
                description=weather['description'],
                icon=weather['icon'],
                timestamp=datetime.fromtimestamp(item['dt']).isoformat(),
                location=location
            )

            forecast_list.append(weather_data)

        return forecast_list

    @staticmethod
    def _degrees_to_direction(degrees: float) -> str:
        """将角度转换为方向"""

        directions = ['北', '东北', '东', '东南', '南', '西南', '西', '西北']
        index = round(degrees / 45) % 8
        return directions[index]

class WeatherAPIClient(BaseWeatherClient):
    """WeatherAPI.com客户端"""

    def __init__(self, api_key: str):
        super().__init__(api_key)
        self.base_url = "http://api.weatherapi.com/v1"

    def get_current_weather(self, location: str) -> Optional[WeatherData]:
        """获取当前天气"""

        params = {
            'key': self.api_key,
            'q': location,
            'lang': 'zh'
        }

        data = self._make_request('/current.json', params)
        if not data:
            return None

        return self._parse_current_weather(data, location)

    def get_forecast(self, location: str, days: int = 5) -> List[WeatherData]:
        """获取天气预报"""

        params = {
            'key': self.api_key,
            'q': location,
            'days': days,
            'lang': 'zh'
        }

        data = self._make_request('/forecast.json', params)
        if not data:
            return []

        return self._parse_forecast(data, location)

    def _parse_current_weather(self, data: Dict[str, Any], location: str) -> WeatherData:
        """解析当前天气数据"""

        current = data['current']

        return WeatherData(
            temperature=current['temp_c'],
            feels_like=current['feelslike_c'],
            humidity=current['humidity'],
            pressure=current['pressure_mb'],
            wind_speed=current['wind_kph'] / 3.6,  # 转换为米/秒
            wind_direction=current['wind_dir'],
            description=current['condition']['text'],
            icon=self._extract_icon_code(current['condition']['icon']),
            timestamp=current['last_updated'],
            location=location
        )

    def _parse_forecast(self, data: Dict[str, Any], location: str) -> List[WeatherData]:
        """解析天气预报数据"""

        forecast_list = []

        for day in data['forecast']['forecastday']:
            date = day['date']

            # 每天的概况
            day_data = day['day']

            weather_data = WeatherData(
                temperature=day_data['avgtemp_c'],
                feels_like=day_data['avgtemp_c'],  # WeatherAPI没有体感温度
                humidity=day_data['avghumidity'],
                pressure=day_data.get('pressure_mb', 1013),
                wind_speed=day_data['maxwind_kph'] / 3.6,
                wind_direction=day_data.get('wind_dir', 'N/A'),
                description=day_data['condition']['text'],
                icon=self._extract_icon_code(day_data['condition']['icon']),
                timestamp=f"{date} 12:00",
                location=location
            )

            forecast_list.append(weather_data)

        return forecast_list

    @staticmethod
    def _extract_icon_code(icon_url: str) -> str:
        """从图标URL中提取图标代码"""

        # 示例: "//cdn.weatherapi.com/weather/64x64/day/113.png"
        if '/' in icon_url:
            parts = icon_url.split('/')
            if len(parts) >= 2:
                return parts[-1].replace('.png', '')
        return ""

class WeatherService:
    """天气服务管理器"""

    def __init__(self):
        self.clients = {}
        self.weather_history = []

    def register_client(self, provider: WeatherProvider, api_key: str):
        """注册天气客户端"""

        if provider == WeatherProvider.OPENWEATHER:
            self.clients[provider] = OpenWeatherClient(api_key)
        elif provider == WeatherProvider.WEATHERAPI:
            self.clients[provider] = WeatherAPIClient(api_key)
        else:
            raise ValueError(f"不支持的天气服务提供商: {provider}")

    def get_weather(self, location: str, provider: WeatherProvider = None) -> Optional[WeatherData]:
        """获取天气数据"""

        if provider:
            # 使用指定的提供商
            if provider not in self.clients:
                print(f"未注册的天气服务提供商: {provider}")
                return None

            return self.clients[provider].get_current_weather(location)

        else:
            # 尝试所有已注册的客户端
            for client in self.clients.values():
                weather_data = client.get_current_weather(location)
                if weather_data:
                    return weather_data

        return None

    def compare_providers(self, location: str) -> Dict[str, Any]:
        """比较不同提供商的天气数据"""

        results = {}

        for provider, client in self.clients.items():
            try:
                weather_data = client.get_current_weather(location)
                if weather_data:
                    results[provider.value] = weather_data.to_dict()
            except Exception as e:
                print(f"{provider.value} 获取失败: {e}")

        return results

    def save_to_csv(self, weather_data: WeatherData, filename: str = "weather_history.csv"):
        """保存天气数据到CSV"""

        # 检查文件是否存在,决定是否写入表头
        try:
            with open(filename, 'r') as f:
                # 文件已存在,不需要表头
                pass
        except FileNotFoundError:
            # 文件不存在,写入表头
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow([
                    '时间戳', '位置', '温度(℃)', '体感温度(℃)',
                    '湿度(%)', '气压(hPa)', '风速(m/s)', '风向',
                    '描述', '图标'
                ])

        # 追加数据
        with open(filename, 'a', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(weather_data.to_csv_row())

        print(f"天气数据已保存到 {filename}")

    def get_weather_alerts(self, location: str, threshold_temp: float = 35.0) -> List[str]:
        """获取天气警报"""

        alerts = []

        weather_data = self.get_weather(location)
        if not weather_data:
            return alerts

        # 高温警报
        if weather_data.temperature > threshold_temp:
            alerts.append(f"高温警报: {weather_data.temperature}℃")

        # 大风警报
        if weather_data.wind_speed > 10.0:
            alerts.append(f"大风警报: {weather_data.wind_speed}m/s")

        # 高湿度警报
        if weather_data.humidity > 80:
            alerts.append(f"高湿度警报: {weather_data.humidity}%")

        return alerts

# 使用示例
if __name__ == "__main__":
    # 创建天气服务
    weather_service = WeatherService()

    # 注册天气服务提供商(需要实际的API密钥)
    # 注意:以下API密钥是示例,实际使用时需要替换为真实的API密钥

    # 注册OpenWeatherMap
    # weather_service.register_client(
    #     WeatherProvider.OPENWEATHER,
    #     api_key="your_openweather_api_key"
    # )

    # 注册WeatherAPI
    # weather_service.register_client(
    #     WeatherProvider.WEATHERAPI,
    #     api_key="your_weatherapi_key"
    # )

    print("天气预报API客户端")
    print("=" * 50)

    # 由于我们没有真实的API密钥,这里模拟数据
    print("\n模拟获取北京天气:")

    # 模拟天气数据
    mock_weather = WeatherData(
        temperature=22.5,
        feels_like=21.0,
        humidity=65,
        pressure=1013,
        wind_speed=3.2,
        wind_direction="东北",
        description="晴间多云",
        icon="01d",
        timestamp=datetime.now().isoformat(),
        location="北京"
    )

    print(f"位置: {mock_weather.location}")
    print(f"温度: {mock_weather.temperature}℃")
    print(f"体感温度: {mock_weather.feels_like}℃")
    print(f"湿度: {mock_weather.humidity}%")
    print(f"气压: {mock_weather.pressure}hPa")
    print(f"风速: {mock_weather.wind_speed}m/s")
    print(f"风向: {mock_weather.wind_direction}")
    print(f"天气: {mock_weather.description}")

    # 保存到CSV
    weather_service.save_to_csv(mock_weather, "weather_data.csv")

    # 获取天气警报
    alerts = weather_service.get_weather_alerts("北京")
    if alerts:
        print("\n天气警报:")
        for alert in alerts:
            print(f"  ⚠ {alert}")

    print("\n项目功能总结:")
    print("1. 支持多个天气服务提供商")
    print("2. 统一的数据接口")
    print("3. 错误处理和重试机制")
    print("4. 数据持久化(CSV)")
    print("5. 天气警报系统")
    print("6. 提供商数据对比")
API客户端功能测试

测试不同的API调用场景:

更多实际API示例

GitHub API客户端
  • 获取用户信息和仓库列表
  • 创建、更新、删除仓库
  • 管理issues和pull requests
  • 处理GitHub Webhooks
  • OAuth认证流程
电商平台API
  • 商品搜索和详情获取
  • 订单创建和状态查询
  • 支付接口集成
  • 物流信息跟踪
  • 库存管理
社交媒体API
  • Twitter/X API集成
  • Facebook Graph API
  • Instagram内容获取
  • 社交媒体分析数据
  • 内容发布和调度
数据服务API
  • 金融数据API(股票、汇率)
  • 地图和位置服务API
  • 机器学习和AI API
  • 图像识别和处理API
  • 实时数据流API

总结

Requests库是Python中API调用的首选工具,它提供了强大而灵活的功能来与各种Web API交互。关键要点:

  • 简洁的API: 直观的方法调用,易于学习和使用
  • 全面的功能: 支持各种HTTP方法、认证方式、数据格式
  • 强大的错误处理: 内置异常处理和重试机制
  • 高性能: 连接池、会话复用、流式处理
  • 灵活性: 可以轻松扩展和定制以满足特定需求

通过本教程,你应该已经掌握了使用Requests进行API调用的所有核心概念和最佳实践。无论是构建简单的API客户端还是复杂的企业级集成,Requests都能提供强大的支持。