Python Requests 身份认证详解

身份认证 是保护API和数据资源的第一道防线。Requests库支持多种认证机制,从简单的Basic Auth到复杂的OAuth 2.0,满足不同安全级别的需求。

HTTP认证机制概述

为什么需要身份认证?

身份认证确保只有合法用户能够访问受保护的资源,主要目的包括:

  • 访问控制 - 限制资源访问权限
  • 数据保护 - 防止敏感数据泄露
  • 审计追踪 - 记录用户操作行为
  • 个性化服务 - 根据用户身份提供定制内容

HTTP认证流程

1
客户端发送请求

客户端向服务器发送HTTP请求(通常不带认证信息)

2
服务器要求认证

服务器返回401状态码和WWW-Authenticate头,指定认证方式

3
客户端提供凭证

客户端重新发送请求,包含Authorization头中的认证信息

4
服务器验证并响应

服务器验证凭证,成功返回200,失败返回401或403

认证方式比较

认证方式 安全性 适用场景 实现复杂度
Basic Auth 低(明文传输) 内部API、测试环境、配合HTTPS使用 简单
Digest Auth 中(哈希传输) 需要中等安全级别的Web应用 中等
Bearer Token 高(令牌机制) 现代RESTful API、移动应用、单页面应用 简单
OAuth 1.0a 高(签名认证) 第三方API授权(Twitter、Tumblr等) 复杂
OAuth 2.0 高(令牌机制) 现代Web应用、移动应用、微服务 中等
API Key 中(密钥验证) 服务间通信、数据API、统计服务 简单
安全警告

永远不要在没有HTTPS的情况下使用Basic Auth,因为凭证是Base64编码的(不是加密),容易被中间人窃取。所有认证机制都应配合TLS/SSL使用。

HTTP Basic认证

HTTP Basic认证是最简单的认证方式,它将用户名和密码用冒号连接,然后进行Base64编码,放在Authorization头中发送。

# Basic认证格式
Authorization: Basic base64(username:password)

# 示例
username = "admin", password = "secret123"
→ "admin:secret123" → Base64 → "YWRtaW46c2VjcmV0MTIz"
→ Authorization: Basic YWRtaW46c2VjcmV0MTIz

1. 使用Basic认证

import requests
import base64

# 方法1:在请求中直接提供auth参数(最简单)
response = requests.get(
    'https://api.example.com/protected',
    auth=('admin', 'secret123')
)

print(f"状态码: {response.status_code}")
print(f"响应内容: {response.text[:100]}...")

# 方法2:手动构建Authorization头(不推荐)
username = 'admin'
password = 'secret123'
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()

headers = {
    'Authorization': f'Basic {encoded_credentials}'
}

response = requests.get(
    'https://api.example.com/protected',
    headers=headers
)

print(f"手动构建头部的状态码: {response.status_code}")
import requests
from requests.auth import HTTPBasicAuth

# 使用HTTPBasicAuth类(更清晰)
auth = HTTPBasicAuth('admin', 'secret123')

response = requests.get(
    'https://api.example.com/protected',
    auth=auth
)

print(f"使用HTTPBasicAuth的状态码: {response.status_code}")

# 也可以创建可重用的认证对象
class BasicAuthClient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.auth = HTTPBasicAuth(username, password)
        self.session = requests.Session()

    def get(self, endpoint):
        url = f"{self.base_url}/{endpoint}"
        return self.session.get(url, auth=self.auth)

    def post(self, endpoint, data=None):
        url = f"{self.base_url}/{endpoint}"
        return self.session.post(url, auth=self.auth, json=data)

# 使用示例
client = BasicAuthClient('https://api.example.com', 'admin', 'secret123')
response = client.get('users')
print(f"客户端请求状态码: {response.status_code}")
import requests
from requests.auth import HTTPBasicAuth

# 创建Session并设置Basic认证
session = requests.Session()

# 方法1:为Session设置auth
session.auth = HTTPBasicAuth('admin', 'secret123')

# 方法2:设置默认头部(不推荐,不够安全)
# session.headers.update({
#     'Authorization': 'Basic ' + base64.b64encode(b'admin:secret123').decode()
# })

# 现在所有通过该Session的请求都会自动包含认证信息
response1 = session.get('https://api.example.com/users')
response2 = session.get('https://api.example.com/posts')

print(f"请求1状态码: {response1.status_code}")
print(f"请求2状态码: {response2.status_code}")

# 临时使用不同的认证
temp_auth = HTTPBasicAuth('guest', 'readonly')
response3 = session.get(
    'https://api.example.com/public',
    auth=temp_auth  # 临时覆盖Session的认证
)

print(f"请求3状态码: {response3.status_code}")

# 清除Session的认证
session.auth = None

2. Basic认证的安全性考虑

Basic认证的安全风险
  • 明文传输:Base64只是编码,不是加密,可以被轻易解码
  • 无过期机制:凭证长期有效,泄露风险高
  • 重放攻击:攻击者可以截获请求并重复发送
  • 密码存储:客户端需要存储密码,安全性差
Basic认证最佳实践
  • 必须使用HTTPS:确保传输层加密
  • 短期有效:定期更换密码
  • 使用API Token替代:避免直接使用用户密码
  • 配合其他安全措施:如IP白名单、请求频率限制
  • 避免在前端使用:Basic认证不适合浏览器前端应用

3. Basic认证实战示例

import requests
from requests.auth import HTTPBasicAuth
import json
import time

class BasicAuthAPI:
    """使用Basic认证的API客户端"""

    def __init__(self, base_url, username, password):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        self.session.auth = HTTPBasicAuth(username, password)

        # 设置默认请求头
        self.session.headers.update({
            'User-Agent': 'BasicAuthAPI/1.0',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })

        # 配置重试
        from requests.adapters import HTTPAdapter
        from urllib3.util.retry import Retry

        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[401, 429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST", "PUT", "DELETE"]
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)

    def _handle_response(self, response):
        """统一处理响应"""
        try:
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            if response.status_code == 401:
                print("认证失败:用户名或密码错误")
            elif response.status_code == 403:
                print("权限不足:用户没有访问此资源的权限")
            raise e

    def get_users(self):
        """获取用户列表"""
        response = self.session.get(f'{self.base_url}/users')
        return self._handle_response(response)

    def create_user(self, user_data):
        """创建新用户"""
        response = self.session.post(
            f'{self.base_url}/users',
            json=user_data
        )
        return self._handle_response(response)

    def update_user(self, user_id, user_data):
        """更新用户信息"""
        response = self.session.put(
            f'{self.base_url}/users/{user_id}',
            json=user_data
        )
        return self._handle_response(response)

    def delete_user(self, user_id):
        """删除用户"""
        response = self.session.delete(f'{self.base_url}/users/{user_id}')
        return self._handle_response(response)

    def test_connection(self):
        """测试连接和认证"""
        try:
            response = self.session.get(f'{self.base_url}/health', timeout=5)
            response.raise_for_status()
            return True
        except Exception as e:
            print(f"连接测试失败: {e}")
            return False

# 使用示例
if __name__ == '__main__':
    # 初始化客户端
    api = BasicAuthAPI(
        base_url='https://api.example.com',
        username='admin',
        password='secret123'  # 实际应用中应从环境变量或配置文件中读取
    )

    # 测试连接
    if api.test_connection():
        print("✓ 连接测试成功")

        # 获取用户列表
        users = api.get_users()
        print(f"获取到 {len(users)} 个用户")

        # 创建新用户
        new_user = {
            'name': '张三',
            'email': 'zhangsan@example.com',
            'role': 'user'
        }

        created_user = api.create_user(new_user)
        print(f"创建用户: {created_user['id']}")

        # 更新用户
        updated_data = {'email': 'zhangsan.new@example.com'}
        updated_user = api.update_user(created_user['id'], updated_data)
        print(f"更新用户: {updated_user['email']}")

        # 删除用户
        result = api.delete_user(created_user['id'])
        print(f"删除用户: {result.get('message', '成功')}")
    else:
        print("✗ 连接测试失败,请检查网络和认证信息")

HTTP Digest认证

什么是Digest认证?

HTTP Digest认证 是比Basic认证更安全的认证机制。它不会传输明文密码,而是使用哈希算法(通常是MD5)对密码和服务器提供的随机数(nonce)进行加密。

  • 不传输密码:只传输密码的哈希值
  • 防止重放攻击:使用随机数和计数器
  • 支持多种哈希算法:MD5、SHA-256等
  • 支持QoP(质量保护):可选的完整性保护

Digest认证流程

1
客户端请求资源

GET /protected

2
服务器返回401

WWW-Authenticate: Digest realm="Protected Area", nonce="abc123"

3
客户端计算响应

HA1 = MD5(username:realm:password)
HA2 = MD5(method:uri)
response = MD5(HA1:nonce:HA2)

4
客户端重新请求

Authorization: Digest username="admin", realm="...", nonce="...", response="..."

使用Digest认证

import requests
from requests.auth import HTTPDigestAuth

# 使用HTTPDigestAuth类
auth = HTTPDigestAuth('admin', 'secret123')

# 发送请求(Requests会自动处理Digest认证流程)
response = requests.get(
    'https://httpbin.org/digest-auth/auth/admin/secret123',
    auth=auth
)

print(f"状态码: {response.status_code}")
print(f"响应内容: {response.json()}")

# 使用Session
session = requests.Session()
session.auth = HTTPDigestAuth('admin', 'secret123')

response2 = session.get('https://httpbin.org/digest-auth/auth/admin/secret123')
print(f"Session请求状态码: {response2.status_code}")

# 注意:Digest认证要求服务器支持
# 测试时可以使用 httpbin.org 的digest-auth端点
import requests
from requests.auth import HTTPDigestAuth

class DigestAuthClient:
    """Digest认证客户端"""

    def __init__(self, base_url, username, password):
        self.base_url = base_url.rstrip('/')
        self.auth = HTTPDigestAuth(username, password)
        self.session = requests.Session()
        self.session.auth = self.auth

        # Digest认证的特定配置
        self.nonce_count = 0  # 跟踪nonce使用次数

        # 设置默认超时
        self.session.timeout = 30

    def _increment_nonce_count(self):
        """增加nonce计数器(简化示例)"""
        self.nonce_count += 1
        return self.nonce_count

    def request(self, method, endpoint, **kwargs):
        """发送请求"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"

        # 可以在这里添加自定义Digest逻辑
        response = self.session.request(method, url, **kwargs)

        # 检查是否需要重新认证
        if response.status_code == 401:
            print("认证失败,可能需要更新nonce")
            # 在实际应用中,这里可以处理nonce过期的情况

        return response

    def get(self, endpoint, **kwargs):
        return self.request('GET', endpoint, **kwargs)

    def post(self, endpoint, **kwargs):
        return self.request('POST', endpoint, **kwargs)

# 使用示例
client = DigestAuthClient(
    base_url='https://httpbin.org',
    username='admin',
    password='secret123'
)

# 测试Digest认证
response = client.get('/digest-auth/auth/admin/secret123')
if response.status_code == 200:
    print(f"Digest认证成功: {response.json()}")
else:
    print(f"Digest认证失败: {response.status_code}")

# 注意:Digest认证比Basic认证更安全,但实现也更复杂
# 大多数现代API使用Bearer Token而不是Digest认证
import requests
import hashlib
import time

class CustomDigestAuth(requests.auth.AuthBase):
    """自定义Digest认证实现"""

    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.last_nonce = None
        self.nonce_count = 0

    def __call__(self, r):
        # 如果有缓存的nonce,使用它
        if self.last_nonce:
            r.headers['Authorization'] = self._build_auth_header(r)
        return r

    def handle_401(self, r, **kwargs):
        """处理401响应,提取nonce并重新发送请求"""
        if r.status_code == 401 and 'WWW-Authenticate' in r.headers:
            auth_header = r.headers['WWW-Authenticate']

            # 解析Digest挑战参数
            params = self._parse_auth_header(auth_header)

            # 构建认证响应
            auth_response = self._build_auth_response(r, params)

            # 修改原始请求的头部
            r.request.headers['Authorization'] = auth_response

            # 重新发送请求
            return r.connection.send(r.request, **kwargs)

        return r

    def _parse_auth_header(self, auth_header):
        """解析WWW-Authenticate头部"""
        params = {}
        if auth_header.startswith('Digest '):
            parts = auth_header[7:].split(', ')
            for part in parts:
                if '=' in part:
                    key, value = part.split('=', 1)
                    params[key.strip()] = value.strip('"')
        return params

    def _build_auth_response(self, request, params):
        """构建Digest认证响应"""
        method = request.method
        uri = request.path_url

        # 计算HA1
        ha1_data = f"{self.username}:{params.get('realm', '')}:{self.password}"
        ha1 = hashlib.md5(ha1_data.encode()).hexdigest()

        # 计算HA2
        ha2_data = f"{method}:{uri}"
        ha2 = hashlib.md5(ha2_data.encode()).hexdigest()

        # 计算response
        self.nonce_count += 1
        nc_value = f"{self.nonce_count:08x}"

        cnonce = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]

        response_data = f"{ha1}:{params.get('nonce', '')}:{nc_value}:{cnonce}:auth:{ha2}"
        response = hashlib.md5(response_data.encode()).hexdigest()

        # 构建Authorization头部
        auth_parts = [
            f'username="{self.username}"',
            f'realm="{params.get("realm", "")}"',
            f'nonce="{params.get("nonce", "")}"',
            f'uri="{uri}"',
            f'response="{response}"',
            f'algorithm=MD5',
            f'qop=auth',
            f'nc={nc_value}',
            f'cnonce="{cnonce}"'
        ]

        return 'Digest ' + ', '.join(auth_parts)

    def _build_auth_header(self, request):
        """构建Authorization头部(简化版)"""
        # 简化实现,实际使用时需要完整实现上述逻辑
        return f'Digest username="{self.username}"'

# 使用自定义Digest认证
session = requests.Session()
auth = CustomDigestAuth('admin', 'secret123')
session.auth = auth

# 添加钩子处理401响应
session.hooks['response'].append(auth.handle_401)

try:
    response = session.get('https://httpbin.org/digest-auth/auth/admin/secret123')
    print(f"自定义Digest认证状态码: {response.status_code}")
except Exception as e:
    print(f"自定义Digest认证错误: {e}")

Digest认证的优缺点

优点
  • 不传输明文密码,比Basic认证安全
  • 防止重放攻击(使用nonce和计数器)
  • 支持消息完整性验证(qop=auth-int)
  • 不需要客户端证书或SSL(但推荐配合使用)
缺点
  • 实现复杂,客户端和服务器都需要支持
  • 性能开销大(需要多次哈希计算)
  • 不支持注销机制
  • 现代应用中逐渐被Bearer Token替代

Bearer Token认证

Bearer Token(承载令牌)是现代API最常用的认证方式。它是一个加密字符串,通常通过OAuth 2.0流程获得,放在Authorization头中发送。

# Bearer Token格式
Authorization: Bearer <token>

# 示例
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

1. 基本Bearer Token使用

import requests

# 方法1:直接在headers中添加Bearer Token(最常用)
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

headers = {
    'Authorization': f'Bearer {token}',
    'Content-Type': 'application/json'
}

response = requests.get(
    'https://api.example.com/users',
    headers=headers
)

print(f"状态码: {response.status_code}")

# 方法2:使用字典管理headers
auth_headers = {
    'Authorization': f'Bearer {token}'
}

# 可以合并headers
all_headers = {**auth_headers, **{'Accept': 'application/json'}}
response = requests.get('https://api.example.com/users', headers=all_headers)

# 方法3:使用Session统一管理
session = requests.Session()
session.headers.update({
    'Authorization': f'Bearer {token}',
    'Content-Type': 'application/json',
    'Accept': 'application/json'
})

response = session.get('https://api.example.com/users')
print(f"Session请求状态码: {response.status_code}")
import requests
from requests.auth import AuthBase

# Requests没有内置的HTTPBearerAuth类,但我们可以自定义
# 或者使用requests-oauthlib库中的OAuth2Token

# 自定义Bearer认证类
class BearerAuth(AuthBase):
    """Bearer Token认证类"""

    def __init__(self, token):
        self.token = token

    def __call__(self, r):
        r.headers['Authorization'] = f'Bearer {self.token}'
        return r

# 使用自定义Bearer认证
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
auth = BearerAuth(token)

response = requests.get(
    'https://api.example.com/users',
    auth=auth
)

print(f"使用BearerAuth的状态码: {response.status_code}")

# 使用requests-oauthlib(如果需要OAuth 2.0功能)
try:
    from oauthlib.oauth2 import BackendApplicationClient
    from requests_oauthlib import OAuth2Session

    # 使用OAuth2Session(内置Bearer Token支持)
    client_id = 'your_client_id'
    client_secret = 'your_client_secret'
    token_url = 'https://api.example.com/oauth/token'

    client = BackendApplicationClient(client_id=client_id)
    oauth = OAuth2Session(client=client)

    # 获取token
    token = oauth.fetch_token(
        token_url=token_url,
        client_id=client_id,
        client_secret=client_secret
    )

    # 现在oauth会自动在请求中添加Bearer Token
    response = oauth.get('https://api.example.com/users')
    print(f"OAuth2Session状态码: {response.status_code}")

except ImportError:
    print("requests-oauthlib未安装,使用: pip install requests-oauthlib")
import requests
from requests.auth import AuthBase
import time
import json

class SmartBearerAuth(AuthBase):
    """智能Bearer认证,支持token自动刷新"""

    def __init__(self, token, refresh_token=None, token_url=None,
                 client_id=None, client_secret=None):
        self.token = token
        self.refresh_token = refresh_token
        self.token_url = token_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.expires_at = None
        self.session = requests.Session()

    def __call__(self, r):
        # 检查token是否过期
        if self.expires_at and time.time() > self.expires_at:
            self._refresh_token()

        r.headers['Authorization'] = f'Bearer {self.token}'
        return r

    def _refresh_token(self):
        """刷新access token"""
        if not all([self.refresh_token, self.token_url,
                   self.client_id, self.client_secret]):
            raise ValueError("缺少刷新token所需的参数")

        data = {
            'grant_type': 'refresh_token',
            'refresh_token': self.refresh_token,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        response = self.session.post(self.token_url, data=data)
        response.raise_for_status()

        token_data = response.json()
        self.token = token_data['access_token']

        # 更新refresh_token(如果提供了新的)
        if 'refresh_token' in token_data:
            self.refresh_token = token_data['refresh_token']

        # 设置过期时间
        if 'expires_in' in token_data:
            self.expires_at = time.time() + token_data['expires_in']

        print("Token已刷新")

    def update_token(self, token_data):
        """更新token信息"""
        self.token = token_data.get('access_token', self.token)
        self.refresh_token = token_data.get('refresh_token', self.refresh_token)

        if 'expires_in' in token_data:
            self.expires_at = time.time() + token_data['expires_in']

# 使用智能Bearer认证
token_data = {
    'access_token': 'initial_token_here',
    'refresh_token': 'refresh_token_here',
    'expires_in': 3600  # 1小时过期
}

auth = SmartBearerAuth(
    token=token_data['access_token'],
    refresh_token=token_data['refresh_token'],
    token_url='https://api.example.com/oauth/token',
    client_id='your_client_id',
    client_secret='your_client_secret'
)

auth.update_token(token_data)

# 现在可以安全地使用,token会自动刷新
session = requests.Session()
session.auth = auth

try:
    response = session.get('https://api.example.com/protected')
    print(f"智能Bearer认证状态码: {response.status_code}")
except Exception as e:
    print(f"请求失败: {e}")

2. JWT (JSON Web Token) 处理

import requests
import jwt  # PyJWT库,需要安装: pip install PyJWT
import time
from requests.auth import AuthBase

class JWTAuth(AuthBase):
    """JWT认证处理"""

    def __init__(self, token=None, secret_key=None,
                 algorithm='HS256', payload=None):
        self.token = token
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.payload = payload or {}

        # 如果没有提供token,但提供了payload和secret_key,则生成token
        if not self.token and self.secret_key and self.payload:
            self.token = self._generate_token()

    def __call__(self, r):
        if self.token:
            r.headers['Authorization'] = f'Bearer {self.token}'
        return r

    def _generate_token(self):
        """生成JWT token"""
        # 添加标准声明
        standard_claims = {
            'iat': int(time.time()),  # 签发时间
            'exp': int(time.time()) + 3600,  # 过期时间(1小时)
            'iss': 'my_app',  # 签发者
        }

        # 合并payload
        full_payload = {**standard_claims, **self.payload}

        # 生成token
        token = jwt.encode(
            full_payload,
            self.secret_key,
            algorithm=self.algorithm
        )

        return token

    def decode_token(self, token=None):
        """解码JWT token(用于验证)"""
        token_to_decode = token or self.token
        if not token_to_decode:
            return None

        try:
            payload = jwt.decode(
                token_to_decode,
                self.secret_key,
                algorithms=[self.algorithm]
            )
            return payload
        except jwt.ExpiredSignatureError:
            print("Token已过期")
            return None
        except jwt.InvalidTokenError as e:
            print(f"无效Token: {e}")
            return None

    def is_token_valid(self):
        """检查token是否有效"""
        payload = self.decode_token()
        if payload and 'exp' in payload:
            return payload['exp'] > time.time()
        return False

# 使用JWT认证
# 示例1:使用现有token
existing_token = "your.jwt.token.here"
jwt_auth = JWTAuth(token=existing_token)

response = requests.get(
    'https://api.example.com/protected',
    auth=jwt_auth
)

# 示例2:生成新token
secret_key = "your-secret-key-here"
payload = {
    'user_id': 12345,
    'username': 'john_doe',
    'role': 'admin'
}

jwt_auth2 = JWTAuth(
    secret_key=secret_key,
    payload=payload,
    algorithm='HS256'
)

# 验证token
if jwt_auth2.is_token_valid():
    print("Token有效")

    # 解码token查看内容
    decoded = jwt_auth2.decode_token()
    print(f"Token内容: {decoded}")

    # 使用token发送请求
    session = requests.Session()
    session.auth = jwt_auth2

    response = session.get('https://api.example.com/protected')
    print(f"JWT认证状态码: {response.status_code}")
else:
    print("Token无效或已过期")

# 注意:实际应用中,secret_key应从安全的地方获取(如环境变量)

3. Bearer Token实战:GitHub API示例

import requests
import os
from datetime import datetime

class GitHubAPI:
    """GitHub API客户端(使用Bearer Token认证)"""

    def __init__(self, token=None):
        self.base_url = 'https://api.github.com'
        self.token = token or os.getenv('GITHUB_TOKEN')

        if not self.token:
            raise ValueError("需要GitHub Token,请提供或设置GITHUB_TOKEN环境变量")

        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'token {self.token}',  # GitHub使用'token'而不是'Bearer'
            'Accept': 'application/vnd.github.v3+json',
            'User-Agent': 'GitHubAPI-Client/1.0'
        })

        # GitHub API有速率限制,添加监控
        self.rate_limit_remaining = None
        self.rate_limit_reset = None

    def _update_rate_limit(self, response):
        """更新速率限制信息"""
        if 'X-RateLimit-Remaining' in response.headers:
            self.rate_limit_remaining = int(response.headers['X-RateLimit-Remaining'])

        if 'X-RateLimit-Reset' in response.headers:
            reset_timestamp = int(response.headers['X-RateLimit-Reset'])
            self.rate_limit_reset = datetime.fromtimestamp(reset_timestamp)

    def get_rate_limit_info(self):
        """获取速率限制信息"""
        response = self.session.get(f'{self.base_url}/rate_limit')
        data = response.json()

        self._update_rate_limit(response)

        return {
            'limit': data['resources']['core']['limit'],
            'remaining': data['resources']['core']['remaining'],
            'reset': datetime.fromtimestamp(data['resources']['core']['reset']),
            'used': data['resources']['core']['used']
        }

    def get_user(self, username=None):
        """获取用户信息"""
        endpoint = '/user' if not username else f'/users/{username}'
        response = self.session.get(f'{self.base_url}{endpoint}')

        self._update_rate_limit(response)
        response.raise_for_status()

        return response.json()

    def get_user_repos(self, username=None, page=1, per_page=30):
        """获取用户仓库"""
        if username:
            endpoint = f'/users/{username}/repos'
        else:
            endpoint = '/user/repos'

        params = {
            'page': page,
            'per_page': per_page,
            'sort': 'updated',
            'direction': 'desc'
        }

        response = self.session.get(f'{self.base_url}{endpoint}', params=params)

        self._update_rate_limit(response)
        response.raise_for_status()

        return response.json()

    def create_repo(self, name, description='', private=False):
        """创建仓库"""
        data = {
            'name': name,
            'description': description,
            'private': private,
            'auto_init': True  # 初始化README
        }

        response = self.session.post(f'{self.base_url}/user/repos', json=data)

        self._update_rate_limit(response)
        response.raise_for_status()

        return response.json()

    def search_code(self, query, page=1, per_page=30):
        """搜索代码"""
        params = {
            'q': query,
            'page': page,
            'per_page': per_page
        }

        response = self.session.get(f'{self.base_url}/search/code', params=params)

        self._update_rate_limit(response)
        response.raise_for_status()

        return response.json()

# 使用示例
if __name__ == '__main__':
    # 从环境变量获取GitHub Token
    # 在命令行设置: export GITHUB_TOKEN=your_token_here

    try:
        github = GitHubAPI()

        # 检查速率限制
        rate_limit = github.get_rate_limit_info()
        print(f"API速率限制: {rate_limit['remaining']}/{rate_limit['limit']} 剩余")
        print(f"重置时间: {rate_limit['reset']}")

        # 获取当前用户信息
        user = github.get_user()
        print(f"当前用户: {user['login']} ({user['name']})")
        print(f"仓库数: {user['public_repos']}")
        print(f"关注者: {user['followers']}")

        # 获取用户仓库
        repos = github.get_user_repos(per_page=5)
        print(f"\n最近的5个仓库:")
        for repo in repos:
            print(f"  - {repo['name']}: {repo['description'] or '无描述'}")

        # 搜索代码示例
        print(f"\n搜索包含'requests'的Python代码:")
        results = github.search_code('requests language:python', per_page=3)
        print(f"找到 {results['total_count']} 个结果")

        for item in results['items'][:3]:
            print(f"  - {item['repository']['full_name']}: {item['path']}")

        # 创建新仓库(需要足够的权限)
        # new_repo = github.create_repo('test-api-repo', '通过API创建的测试仓库')
        # print(f"创建仓库: {new_repo['html_url']}")

    except Exception as e:
        print(f"GitHub API错误: {e}")

OAuth认证

什么是OAuth?

OAuth(开放授权) 是一个开放的授权标准,允许用户授权第三方应用访问他们存储在另一个服务提供者上的信息,而不需要将用户名和密码提供给第三方应用。

  • OAuth 1.0a:使用签名认证,更安全但更复杂
  • OAuth 2.0:使用Bearer Token,更简单灵活,是现代标准
  • 主要流程:授权码、隐式、客户端凭证、密码凭证

OAuth 2.0授权流程

1
应用请求授权

用户被重定向到授权服务器,请求访问权限

2
用户授权

用户登录并授权应用访问特定资源

3
获取授权码

授权服务器返回授权码给应用

4
交换访问令牌

应用使用授权码换取访问令牌

5
访问资源

应用使用访问令牌访问受保护的资源

1. OAuth 1.0a 认证

import requests
from requests_oauthlib import OAuth1
import os

# OAuth 1.0a 认证(用于Twitter等API)
# 需要安装: pip install requests-oauthlib

# OAuth 1.0a 需要4个凭证
consumer_key = os.getenv('TWITTER_CONSUMER_KEY')
consumer_secret = os.getenv('TWITTER_CONSUMER_SECRET')
access_token = os.getenv('TWITTER_ACCESS_TOKEN')
access_token_secret = os.getenv('TWITTER_ACCESS_TOKEN_SECRET')

# 创建OAuth1认证对象
auth = OAuth1(
    consumer_key,
    consumer_secret,
    access_token,
    access_token_secret
)

# 使用OAuth1认证发送请求
try:
    # Twitter API v1.1 示例
    response = requests.get(
        'https://api.twitter.com/1.1/account/verify_credentials.json',
        auth=auth
    )

    if response.status_code == 200:
        user_data = response.json()
        print(f"Twitter用户: @{user_data['screen_name']}")
        print(f"名称: {user_data['name']}")
        print(f"描述: {user_data['description'][:50]}...")
    else:
        print(f"Twitter API错误: {response.status_code}")

except Exception as e:
    print(f"OAuth 1.0a认证失败: {e}")

# OAuth 1.0a签名过程(了解原理)
class OAuth1Manual:
    """手动实现OAuth 1.0a签名(简化版,了解原理)"""

    @staticmethod
    def explain_signature():
        """解释OAuth 1.0a签名过程"""
        steps = [
            "1. 收集请求参数(URL参数、POST数据、OAuth参数)",
            "2. 将参数按名称排序并编码",
            "3. 构建参数字符串(key=value&格式)",
            "4. 构建签名基础字符串(方法&URL&参数字符串)",
            "5. 生成签名密钥(consumer_secret&token_secret)",
            "6. 使用HMAC-SHA1算法计算签名",
            "7. 将签名添加到Authorization头部"
        ]

        print("OAuth 1.0a签名步骤:")
        for step in steps:
            print(f"  {step}")

        print("\n这就是为什么使用requests-oauthlib库更方便!")

# 查看签名过程
OAuth1Manual.explain_signature()

2. OAuth 2.0 客户端凭证流程

import requests
from requests.auth import HTTPBasicAuth
import base64
import time

class OAuth2ClientCredentials:
    """OAuth 2.0 客户端凭证流程(服务端到服务端)"""

    def __init__(self, token_url, client_id, client_secret, scope=None):
        self.token_url = token_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope

        self.access_token = None
        self.token_type = None
        self.expires_in = None
        self.expires_at = None

        self.session = requests.Session()

    def get_token(self):
        """获取访问令牌"""
        auth = HTTPBasicAuth(self.client_id, self.client_secret)

        data = {
            'grant_type': 'client_credentials'
        }

        if self.scope:
            data['scope'] = self.scope

        response = self.session.post(self.token_url, data=data, auth=auth)
        response.raise_for_status()

        token_data = response.json()
        self._update_token_data(token_data)

        return token_data

    def _update_token_data(self, token_data):
        """更新令牌数据"""
        self.access_token = token_data['access_token']
        self.token_type = token_data.get('token_type', 'Bearer')
        self.expires_in = token_data.get('expires_in', 3600)
        self.expires_at = time.time() + self.expires_in

    def is_token_valid(self):
        """检查令牌是否有效"""
        if not self.access_token or not self.expires_at:
            return False
        return time.time() < self.expires_at - 60  # 提前1分钟认为过期

    def ensure_valid_token(self):
        """确保有有效的令牌"""
        if not self.is_token_valid():
            self.get_token()

    def request(self, method, url, **kwargs):
        """发送请求,自动添加Bearer Token"""
        self.ensure_valid_token()

        # 添加Authorization头部
        if 'headers' not in kwargs:
            kwargs['headers'] = {}

        kwargs['headers']['Authorization'] = f'{self.token_type} {self.access_token}'

        return self.session.request(method, url, **kwargs)

    def get(self, url, **kwargs):
        return self.request('GET', url, **kwargs)

    def post(self, url, **kwargs):
        return self.request('POST', url, **kwargs)

# 使用示例
if __name__ == '__main__':
    # 示例:使用GitHub的OAuth 2.0(客户端凭证流程)
    # 注意:GitHub的OAuth 2.0主要用于用户授权,这里仅作演示

    # 实际使用中,这些信息应从环境变量或配置文件中获取
    client_id = 'your_client_id'
    client_secret = 'your_client_secret'
    token_url = 'https://github.com/login/oauth/access_token'

    oauth_client = OAuth2ClientCredentials(
        token_url=token_url,
        client_id=client_id,
        client_secret=client_secret,
        scope='repo,user'
    )

    try:
        # 获取令牌
        token_data = oauth_client.get_token()
        print(f"获取到令牌: {token_data['access_token'][:20]}...")
        print(f"令牌类型: {token_data.get('token_type', 'Bearer')}")
        print(f"过期时间: {token_data.get('expires_in', '未知')}秒")

        # 使用令牌访问API
        # 注意:GitHub的OAuth 2.0需要用户授权,这里仅演示流程
        # 对于GitHub,通常使用授权码流程

    except Exception as e:
        print(f"OAuth 2.0错误: {e}")

    # 另一个示例:假设有一个支持客户端凭证的API
    print("\n--- 模拟支持客户端凭证的API ---")

    class MockOAuth2API:
        """模拟OAuth 2.0 API服务器"""

        @staticmethod
        def mock_token_endpoint(client_id, client_secret):
            """模拟令牌端点"""
            if client_id == 'test_client' and client_secret == 'test_secret':
                return {
                    'access_token': 'mock_access_token_12345',
                    'token_type': 'Bearer',
                    'expires_in': 3600,
                    'scope': 'read write'
                }
            else:
                raise ValueError("无效的客户端凭证")

        @staticmethod
        def mock_protected_endpoint(token):
            """模拟受保护端点"""
            if token == 'Bearer mock_access_token_12345':
                return {
                    'status': 'success',
                    'data': {'message': '访问受保护资源成功'}
                }
            else:
                return {
                    'status': 'error',
                    'error': '无效令牌'
                }

    # 测试模拟API
    mock_oauth = OAuth2ClientCredentials(
        token_url='mock://token',  # 模拟URL
        client_id='test_client',
        client_secret='test_secret'
    )

    # 覆盖get_token方法以使用模拟
    original_get_token = mock_oauth.get_token
    mock_oauth.get_token = lambda: MockOAuth2API.mock_token_endpoint(
        mock_oauth.client_id, mock_oauth.client_secret
    )

    # 获取令牌
    mock_token = mock_oauth.get_token()
    print(f"模拟令牌: {mock_token['access_token']}")

    # 验证令牌
    print(f"令牌有效: {mock_oauth.is_token_valid()}")

3. OAuth 2.0 授权码流程(完整示例)

import requests
from requests.auth import HTTPBasicAuth
import webbrowser
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json

class OAuth2AuthorizationCode:
    """OAuth 2.0 授权码流程(完整实现)"""

    def __init__(self, client_id, client_secret,
                 authorization_url, token_url,
                 redirect_uri, scope=None):
        self.client_id = client_id
        self.client_secret = client_secret
        self.authorization_url = authorization_url
        self.token_url = token_url
        self.redirect_uri = redirect_uri
        self.scope = scope

        self.access_token = None
        self.refresh_token = None
        self.token_type = None
        self.expires_in = None

        # 用于接收授权码的服务器
        self.auth_code = None
        self.server = None

    def start_local_server(self, port=8000):
        """启动本地服务器接收授权码"""

        class AuthHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                # 解析查询参数
                query = urllib.parse.urlparse(self.path).query
                params = urllib.parse.parse_qs(query)

                # 获取授权码
                if 'code' in params:
                    self.server.auth_code = params['code'][0]

                    # 返回成功页面
                    self.send_response(200)
                    self.send_header('Content-type', 'text/html')
                    self.end_headers()

                    html = """
                    
                    
                    

授权成功!

您可以关闭此窗口并返回应用程序。

""" self.wfile.write(html.encode()) # 停止服务器 threading.Thread(target=self.server.shutdown).start() else: # 错误处理 self.send_response(400) self.end_headers() def log_message(self, format, *args): # 禁用日志输出 pass # 创建服务器 server = HTTPServer(('localhost', port), AuthHandler) server.auth_code = None # 存储授权码 # 在后台运行服务器 server_thread = threading.Thread(target=server.serve_forever) server_thread.daemon = True server_thread.start() self.server = server return f'http://localhost:{port}' def get_authorization_url(self, state=None): """构建授权URL""" params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.redirect_uri, 'scope': self.scope or '', 'state': state or 'random_state_string' } # 移除空值 params = {k: v for k, v in params.items() if v} query_string = urllib.parse.urlencode(params) return f"{self.authorization_url}?{query_string}" def authorize(self): """执行授权流程""" # 构建授权URL auth_url = self.get_authorization_url() print(f"请访问以下URL进行授权:") print(f"\n{auth_url}\n") # 自动打开浏览器 try: webbrowser.open(auth_url) print("已在浏览器中打开授权页面...") except: print("无法自动打开浏览器,请手动复制上面的URL访问") # 等待用户授权并获取授权码 print("等待授权...") # 这里需要等待服务器接收到授权码 import time max_wait = 300 # 最大等待5分钟 start_time = time.time() while not self.server.auth_code and (time.time() - start_time) < max_wait: time.sleep(1) if not self.server.auth_code: raise TimeoutError("授权超时") print(f"获取到授权码: {self.server.auth_code[:20]}...") return self.server.auth_code def exchange_token(self, authorization_code): """使用授权码交换访问令牌""" auth = HTTPBasicAuth(self.client_id, self.client_secret) data = { 'grant_type': 'authorization_code', 'code': authorization_code, 'redirect_uri': self.redirect_uri } response = requests.post(self.token_url, data=data, auth=auth) response.raise_for_status() token_data = response.json() self._update_token_data(token_data) return token_data def _update_token_data(self, token_data): """更新令牌数据""" self.access_token = token_data['access_token'] self.token_type = token_data.get('token_type', 'Bearer') self.expires_in = token_data.get('expires_in') self.refresh_token = token_data.get('refresh_token') def refresh_access_token(self): """使用刷新令牌获取新的访问令牌""" if not self.refresh_token: raise ValueError("没有刷新令牌") auth = HTTPBasicAuth(self.client_id, self.client_secret) data = { 'grant_type': 'refresh_token', 'refresh_token': self.refresh_token } response = requests.post(self.token_url, data=data, auth=auth) response.raise_for_status() token_data = response.json() self._update_token_data(token_data) return token_data def get_auth_header(self): """获取认证头部""" if not self.access_token: raise ValueError("没有访问令牌") return {'Authorization': f'{self.token_type} {self.access_token}'} # 完整使用示例(模拟) if __name__ == '__main__': print("=== OAuth 2.0 授权码流程演示 ===\n") # 模拟的OAuth 2.0配置 # 实际使用时,需要从OAuth提供商获取这些信息 config = { 'client_id': 'demo_client_id', 'client_secret': 'demo_client_secret', 'authorization_url': 'https://demo-oauth-server.com/authorize', 'token_url': 'https://demo-oauth-server.com/token', 'redirect_uri': 'http://localhost:8000/callback', 'scope': 'read write profile' } # 创建OAuth客户端 oauth_client = OAuth2AuthorizationCode(**config) try: # 步骤1: 启动本地服务器接收回调 print("1. 启动本地服务器接收授权码回调...") local_url = oauth_client.start_local_server() print(f" 回调地址: {local_url}") # 步骤2: 获取授权URL并打开浏览器 print("\n2. 生成授权URL...") auth_url = oauth_client.get_authorization_url() print(f" 授权URL: {auth_url[:80]}...") # 在实际应用中,这里会打开浏览器让用户授权 # 由于是演示,我们模拟一个授权码 print("\n3. 模拟用户授权过程...") # 模拟获取的授权码 mock_auth_code = 'mock_authorization_code_12345' print(f" 模拟授权码: {mock_auth_code}") # 步骤3: 使用授权码交换访问令牌 print("\n4. 使用授权码交换访问令牌...") # 模拟令牌端点响应 class MockTokenServer: @staticmethod def mock_token_exchange(auth_code): if auth_code == 'mock_authorization_code_12345': return { 'access_token': 'mock_access_token_67890', 'token_type': 'Bearer', 'expires_in': 3600, 'refresh_token': 'mock_refresh_token_abcde', 'scope': 'read write profile' } else: raise ValueError("无效的授权码") # 模拟令牌交换 mock_token_data = MockTokenServer.mock_token_exchange(mock_auth_code) print(f" 获取到访问令牌: {mock_token_data['access_token'][:20]}...") print(f" 令牌类型: {mock_token_data['token_type']}") print(f" 过期时间: {mock_token_data['expires_in']}秒") print(f" 刷新令牌: {mock_token_data['refresh_token'][:20]}...") # 更新客户端令牌数据 oauth_client._update_token_data(mock_token_data) # 步骤4: 使用访问令牌访问受保护资源 print("\n5. 使用访问令牌访问API...") # 创建带认证的请求 headers = oauth_client.get_auth_header() print(f" 请求头: {headers}") # 模拟API请求 mock_api_response = { 'status': 'success', 'user': { 'id': '12345', 'name': '演示用户', 'email': 'demo@example.com' } } print(f" 模拟API响应: {json.dumps(mock_api_response, indent=2)}") # 步骤5: 演示令牌刷新 print("\n6. 演示刷新令牌流程...") # 模拟刷新令牌响应 mock_refresh_data = { 'access_token': 'new_mock_access_token_11111', 'token_type': 'Bearer', 'expires_in': 3600 } print(f" 刷新前令牌: {oauth_client.access_token}") # 模拟刷新 oauth_client._update_token_data(mock_refresh_data) print(f" 刷新后令牌: {oauth_client.access_token}") print("\n✅ OAuth 2.0 授权码流程演示完成!") except Exception as e: print(f"\n❌ 错误: {e}") print("\n注意:这是一个模拟演示。在实际应用中,您需要:") print("1. 从OAuth提供商注册应用获取真实的client_id和client_secret") print("2. 使用真实的授权URL和令牌URL") print("3. 处理真实的浏览器重定向和用户交互") print("4. 安全地存储令牌和刷新令牌")

4. 使用requests-oauthlib简化OAuth

import os
from requests_oauthlib import OAuth2Session
from flask import Flask, request, redirect, session
import json

# requests-oauthlib是处理OAuth的官方推荐库
# 安装: pip install requests-oauthlib

# 以下是一个完整的Flask应用示例,演示OAuth 2.0授权码流程

def create_flask_oauth_app():
    """创建Flask OAuth 2.0示例应用"""

    app = Flask(__name__)
    app.secret_key = os.urandom(24)  # 用于会话加密

    # OAuth 2.0配置
    CLIENT_ID = os.getenv('OAUTH_CLIENT_ID', 'demo_client_id')
    CLIENT_SECRET = os.getenv('OAUTH_CLIENT_SECRET', 'demo_client_secret')
    AUTHORIZATION_BASE_URL = 'https://demo-oauth-server.com/authorize'
    TOKEN_URL = 'https://demo-oauth-server.com/token'
    REDIRECT_URI = 'http://localhost:5000/callback'
    SCOPE = ['read', 'write', 'profile']

    # 创建OAuth2Session
    def get_oauth_session(token=None, state=None):
        return OAuth2Session(
            client_id=CLIENT_ID,
            token=token,
            state=state,
            scope=SCOPE,
            redirect_uri=REDIRECT_URI,
            auto_refresh_kwargs={
                'client_id': CLIENT_ID,
                'client_secret': CLIENT_SECRET,
            },
            auto_refresh_url=TOKEN_URL,
            token_updater=lambda t: session.update({'oauth_token': t})
        )

    @app.route('/')
    def index():
        """首页"""
        html = """
        
        OAuth 2.0 演示
        
            

OAuth 2.0 授权码流程演示

点击这里登录

查看个人信息(需要登录)

""" return html @app.route('/login') def login(): """开始OAuth登录流程""" oauth = get_oauth_session() authorization_url, state = oauth.authorization_url(AUTHORIZATION_BASE_URL) # 保存state到session session['oauth_state'] = state return redirect(authorization_url) @app.route('/callback') def callback(): """OAuth回调处理""" # 验证state if 'oauth_state' not in session or session['oauth_state'] != request.args.get('state'): return 'State验证失败', 400 oauth = get_oauth_session(state=session['oauth_state']) try: # 获取令牌 token = oauth.fetch_token( TOKEN_URL, client_secret=CLIENT_SECRET, authorization_response=request.url ) # 保存令牌到session session['oauth_token'] = token return redirect('/profile') except Exception as e: return f'获取令牌失败: {str(e)}', 400 @app.route('/profile') def profile(): """获取用户信息(需要OAuth令牌)""" if 'oauth_token' not in session: return redirect('/login') token = session['oauth_token'] oauth = get_oauth_session(token=token) try: # 使用令牌访问API # 这里假设用户信息端点 response = oauth.get('https://demo-oauth-server.com/api/userinfo') user_info = response.json() html = f""" 用户信息

用户信息

{json.dumps(user_info, indent=2)}

退出登录

""" return html except Exception as e: return f'获取用户信息失败: {str(e)}', 400 @app.route('/logout') def logout(): """退出登录""" session.clear() return redirect('/') return app # 如果你想要运行这个Flask应用,取消下面的注释 # if __name__ == '__main__': # app = create_flask_oauth_app() # app.run(debug=True, port=5000) # 对于非Web应用,可以使用OAuth2Session直接进行客户端凭证流程 def oauth2_client_credentials_example(): """客户端凭证流程示例(非Web应用)""" from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session client_id = 'your_client_id' client_secret = 'your_client_secret' token_url = 'https://api.example.com/oauth/token' # 创建客户端凭证客户端 client = BackendApplicationClient(client_id=client_id) oauth = OAuth2Session(client=client) try: # 获取令牌 token = oauth.fetch_token( token_url=token_url, client_id=client_id, client_secret=client_secret ) print(f"获取到令牌: {token['access_token'][:20]}...") # 使用令牌访问API # oauth会自动在请求中添加Bearer Token response = oauth.get('https://api.example.com/api/protected') if response.status_code == 200: print(f"API访问成功: {response.json()}") else: print(f"API访问失败: {response.status_code}") except Exception as e: print(f"OAuth 2.0错误: {e}") print("requests-oauthlib 示例代码已准备好") print("要运行完整示例,需要:") print("1. 安装: pip install requests-oauthlib flask") print("2. 配置真实的OAuth 2.0提供商信息") print("3. 运行Flask应用或调用相应函数")

自定义认证机制

Requests允许你创建自定义认证类来处理非标准的认证机制。这通过继承requests.auth.AuthBase类实现。

1. 创建自定义认证类

简单自定义
import requests
from requests.auth import AuthBase

# 最简单自定义认证:API Key放在查询参数中
class APIKeyAuth(AuthBase):
    """API Key认证(查询参数方式)"""

    def __init__(self, api_key):
        self.api_key = api_key

    def __call__(self, r):
        # 添加API Key到URL查询参数
        from urllib.parse import urlparse, parse_qs, urlencode, urlunparse

        parsed_url = urlparse(r.url)
        query_params = parse_qs(parsed_url.query)
        query_params['api_key'] = self.api_key

        # 重建URL
        new_query = urlencode(query_params, doseq=True)
        r.url = urlunparse(parsed_url._replace(query=new_query))

        return r

# 使用自定义API Key认证
api_key = 'your_api_key_12345'
auth = APIKeyAuth(api_key)

response = requests.get('https://api.example.com/data', auth=auth)
print(f"API Key认证状态码: {response.status_code}")

# 另一个示例:自定义头部认证
class CustomHeaderAuth(AuthBase):
    """自定义头部认证"""

    def __init__(self, header_name, header_value):
        self.header_name = header_name
        self.header_value = header_value

    def __call__(self, r):
        r.headers[self.header_name] = self.header_value
        return r

# 使用自定义头部认证
auth2 = CustomHeaderAuth('X-API-Key', 'your_api_key_12345')
response2 = requests.get('https://api.example.com/data', auth=auth2)
print(f"自定义头部认证状态码: {response2.status_code}")
import requests
from requests.auth import AuthBase
import hashlib
import time
import hmac

class HMACAuth(AuthBase):
    """HMAC认证(用于AWS Signature V4等)"""

    def __init__(self, access_key, secret_key, service='api', region='us-east-1'):
        self.access_key = access_key
        self.secret_key = secret_key
        self.service = service
        self.region = region

    def __call__(self, r):
        # 获取当前时间
        amz_date = time.strftime('%Y%m%dT%H%M%SZ', time.gmtime())
        date_stamp = time.strftime('%Y%m%d', time.gmtime())

        # 规范请求(简化版,实际AWS签名更复杂)
        canonical_request = self._create_canonical_request(r, amz_date)

        # 创建字符串签名
        string_to_sign = self._create_string_to_sign(canonical_request, amz_date, date_stamp)

        # 计算签名
        signature = self._calculate_signature(string_to_sign, date_stamp)

        # 添加Authorization头部
        credential_scope = f"{date_stamp}/{self.region}/{self.service}/aws4_request"
        auth_header = (
            f"AWS4-HMAC-SHA256 "
            f"Credential={self.access_key}/{credential_scope}, "
            f"SignedHeaders=host;x-amz-date, "
            f"Signature={signature}"
        )

        r.headers['Authorization'] = auth_header
        r.headers['X-Amz-Date'] = amz_date

        return r

    def _create_canonical_request(self, r, amz_date):
        """创建规范请求(简化版)"""
        # 实际AWS签名需要完整的规范请求
        return f"{r.method}\n/\n\nhost:{r.headers.get('host', '')}\nx-amz-date:{amz_date}\n\nhost;x-amz-date\nUNSIGNED-PAYLOAD"

    def _create_string_to_sign(self, canonical_request, amz_date, date_stamp):
        """创建待签名字符串"""
        credential_scope = f"{date_stamp}/{self.region}/{self.service}/aws4_request"
        return f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n" + \
               hashlib.sha256(canonical_request.encode()).hexdigest()

    def _calculate_signature(self, string_to_sign, date_stamp):
        """计算签名"""
        # 派生签名密钥
        k_date = self._sign(('AWS4' + self.secret_key).encode(), date_stamp)
        k_region = self._sign(k_date, self.region)
        k_service = self._sign(k_region, self.service)
        k_signing = self._sign(k_service, 'aws4_request')

        # 计算签名
        signature = hmac.new(
            k_signing,
            string_to_sign.encode(),
            hashlib.sha256
        ).hexdigest()

        return signature

    def _sign(self, key, msg):
        """HMAC-SHA256签名"""
        return hmac.new(key, msg.encode(), hashlib.sha256).digest()

# 使用HMAC认证(简化示例)
# 注意:这只是一个演示,实际AWS签名更复杂
hmac_auth = HMACAuth(
    access_key='AKIAIOSFODNN7EXAMPLE',
    secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)

# 实际使用可能需要更多配置
# response = requests.get('https://api.example.com/data', auth=hmac_auth)
import requests
from requests.auth import AuthBase
import base64

class MultiAuth(AuthBase):
    """多重认证(支持多种认证方式)"""

    def __init__(self, auth_methods=None):
        self.auth_methods = auth_methods or []

    def add_auth(self, auth_method):
        """添加认证方法"""
        self.auth_methods.append(auth_method)

    def __call__(self, r):
        # 应用所有认证方法
        for auth_method in self.auth_methods:
            r = auth_method(r)
        return r

# 创建组合认证
class APIKeyInHeader(AuthBase):
    """API Key在头部"""
    def __init__(self, api_key):
        self.api_key = api_key

    def __call__(self, r):
        r.headers['X-API-Key'] = self.api_key
        return r

class APIKeyInParam(AuthBase):
    """API Key在查询参数"""
    def __init__(self, api_key):
        self.api_key = api_key

    def __call__(self, r):
        from urllib.parse import urlparse, parse_qs, urlencode, urlunparse

        parsed_url = urlparse(r.url)
        query_params = parse_qs(parsed_url.query)
        query_params['api_key'] = self.api_key

        new_query = urlencode(query_params, doseq=True)
        r.url = urlunparse(parsed_url._replace(query=new_query))

        return r

class TimestampAuth(AuthBase):
    """时间戳认证(防止重放攻击)"""
    def __init__(self):
        import time
        self.timestamp = str(int(time.time()))

    def __call__(self, r):
        r.headers['X-Timestamp'] = self.timestamp
        return r

class SignatureAuth(AuthBase):
    """签名认证"""
    def __init__(self, secret_key):
        self.secret_key = secret_key

    def __call__(self, r):
        import hashlib
        import hmac

        # 创建签名字符串
        string_to_sign = f"{r.method}\n{r.url}\n{r.headers.get('X-Timestamp', '')}"

        # 计算签名
        signature = hmac.new(
            self.secret_key.encode(),
            string_to_sign.encode(),
            hashlib.sha256
        ).hexdigest()

        r.headers['X-Signature'] = signature
        return r

# 使用多重认证
multi_auth = MultiAuth()
multi_auth.add_auth(APIKeyInHeader('header_api_key_123'))
multi_auth.add_auth(APIKeyInParam('param_api_key_456'))
multi_auth.add_auth(TimestampAuth())
multi_auth.add_auth(SignatureAuth('my_secret_key'))

# 创建请求
session = requests.Session()
session.auth = multi_auth

# 查看请求头部
from requests import Request
req = Request('GET', 'https://api.example.com/data', auth=multi_auth)
prepared = session.prepare_request(req)

print("多重认证请求头部:")
for key, value in prepared.headers.items():
    print(f"  {key}: {value}")

print(f"\n请求URL: {prepared.url}")

# 实际发送请求
# response = session.send(prepared)
# print(f"响应状态码: {response.status_code}")

2. 工厂模式创建认证

import requests
from requests.auth import AuthBase
from enum import Enum

class AuthType(Enum):
    """认证类型枚举"""
    BASIC = 'basic'
    BEARER = 'bearer'
    API_KEY_HEADER = 'api_key_header'
    API_KEY_PARAM = 'api_key_param'
    CUSTOM = 'custom'

class AuthFactory:
    """认证工厂"""

    @staticmethod
    def create_auth(auth_type, **kwargs):
        """创建认证对象"""
        if auth_type == AuthType.BASIC:
            from requests.auth import HTTPBasicAuth
            return HTTPBasicAuth(kwargs.get('username'), kwargs.get('password'))

        elif auth_type == AuthType.BEARER:
            class BearerAuth(AuthBase):
                def __init__(self, token):
                    self.token = token

                def __call__(self, r):
                    r.headers['Authorization'] = f'Bearer {self.token}'
                    return r

            return BearerAuth(kwargs.get('token'))

        elif auth_type == AuthType.API_KEY_HEADER:
            class APIKeyHeaderAuth(AuthBase):
                def __init__(self, api_key, header_name='X-API-Key'):
                    self.api_key = api_key
                    self.header_name = header_name

                def __call__(self, r):
                    r.headers[self.header_name] = self.api_key
                    return r

            return APIKeyHeaderAuth(
                kwargs.get('api_key'),
                kwargs.get('header_name', 'X-API-Key')
            )

        elif auth_type == AuthType.API_KEY_PARAM:
            class APIKeyParamAuth(AuthBase):
                def __init__(self, api_key, param_name='api_key'):
                    self.api_key = api_key
                    self.param_name = param_name

                def __call__(self, r):
                    from urllib.parse import urlparse, parse_qs, urlencode, urlunparse

                    parsed_url = urlparse(r.url)
                    query_params = parse_qs(parsed_url.query)
                    query_params[self.param_name] = self.api_key

                    new_query = urlencode(query_params, doseq=True)
                    r.url = urlunparse(parsed_url._replace(query=new_query))

                    return r

            return APIKeyParamAuth(
                kwargs.get('api_key'),
                kwargs.get('param_name', 'api_key')
            )

        elif auth_type == AuthType.CUSTOM:
            # 使用自定义认证类
            custom_class = kwargs.get('custom_class')
            if custom_class and issubclass(custom_class, AuthBase):
                return custom_class(**kwargs.get('params', {}))
            else:
                raise ValueError("无效的自定义认证类")

        else:
            raise ValueError(f"不支持的认证类型: {auth_type}")

# 使用认证工厂
# 创建Basic认证
basic_auth = AuthFactory.create_auth(
    AuthType.BASIC,
    username='admin',
    password='secret123'
)

# 创建Bearer认证
bearer_auth = AuthFactory.create_auth(
    AuthType.BEARER,
    token='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
)

# 创建API Key头部认证
api_key_auth = AuthFactory.create_auth(
    AuthType.API_KEY_HEADER,
    api_key='api_key_12345',
    header_name='X-API-Key'
)

# 创建API Key参数认证
api_key_param_auth = AuthFactory.create_auth(
    AuthType.API_KEY_PARAM,
    api_key='api_key_67890',
    param_name='api_key'
)

# 测试各种认证
test_url = 'https://httpbin.org/headers'

for auth_name, auth_obj in [
    ('Basic', basic_auth),
    ('Bearer', bearer_auth),
    ('API Key Header', api_key_auth)
]:
    try:
        response = requests.get(test_url, auth=auth_obj)
        print(f"{auth_name}认证测试: 状态码 {response.status_code}")
    except Exception as e:
        print(f"{auth_name}认证测试失败: {e}")

# 配置文件驱动的认证
class ConfigDrivenAuth:
    """配置文件驱动的认证"""

    def __init__(self, config_path='auth_config.json'):
        import json
        import os

        self.config_path = config_path
        self.config = self._load_config()

    def _load_config(self):
        """加载配置文件"""
        if os.path.exists(self.config_path):
            with open(self.config_path, 'r') as f:
                return json.load(f)
        else:
            # 默认配置
            return {
                'default_auth': 'api_key_header',
                'auth_methods': {
                    'basic': {
                        'username': '${BASIC_USERNAME}',
                        'password': '${BASIC_PASSWORD}'
                    },
                    'bearer': {
                        'token': '${BEARER_TOKEN}'
                    },
                    'api_key_header': {
                        'api_key': '${API_KEY}',
                        'header_name': 'X-API-Key'
                    }
                }
            }

    def _resolve_env_vars(self, value):
        """解析环境变量"""
        if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
            env_var = value[2:-1]
            return os.getenv(env_var, '')
        return value

    def get_auth(self, auth_type=None):
        """根据配置获取认证对象"""
        auth_type = auth_type or self.config.get('default_auth', 'api_key_header')

        auth_config = self.config['auth_methods'].get(auth_type, {})

        # 解析环境变量
        resolved_config = {}
        for key, value in auth_config.items():
            resolved_config[key] = self._resolve_env_vars(value)

        # 映射配置到AuthType
        auth_type_map = {
            'basic': AuthType.BASIC,
            'bearer': AuthType.BEARER,
            'api_key_header': AuthType.API_KEY_HEADER,
            'api_key_param': AuthType.API_KEY_PARAM
        }

        if auth_type in auth_type_map:
            return AuthFactory.create_auth(auth_type_map[auth_type], **resolved_config)
        else:
            raise ValueError(f"不支持的认证类型: {auth_type}")

# 使用配置驱动的认证
config_auth = ConfigDrivenAuth()

# 从配置获取认证(会从环境变量读取值)
# 需要设置环境变量:BASIC_USERNAME, BASIC_PASSWORD等
try:
    auth_from_config = config_auth.get_auth('basic')
    print(f"\n从配置获取Basic认证: {type(auth_from_config).__name__}")
except Exception as e:
    print(f"\n配置认证失败(需要设置环境变量): {e}")

Session对象认证管理

使用Session对象可以统一管理认证信息,避免为每个请求重复配置。

1. Session认证配置

import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth

# 创建Session并配置认证
session = requests.Session()

# 方法1:使用auth属性
session.auth = HTTPBasicAuth('username', 'password')

# 方法2:使用headers(不推荐用于Basic/Digest,适合Bearer Token)
# session.headers.update({'Authorization': 'Bearer token_here'})

# 方法3:使用自定义认证类
class CustomSessionAuth:
    """Session级别的自定义认证"""

    def __init__(self, token):
        self.token = token

    def __call__(self, r):
        r.headers['X-Session-Token'] = self.token
        return r

session.auth = CustomSessionAuth('session_token_123')

# 现在所有通过该Session的请求都会自动使用认证
response1 = session.get('https://api.example.com/resource1')
response2 = session.post('https://api.example.com/resource2', json={'data': 'test'})

print(f"请求1状态码: {response1.status_code}")
print(f"请求2状态码: {response2.status_code}")

# 临时覆盖Session的认证
response3 = session.get(
    'https://api.example.com/public',
    auth=None  # 这个请求不使用认证
)

print(f"请求3状态码: {response3.status_code}")

# 使用不同的认证
temp_auth = HTTPDigestAuth('guest', 'readonly')
response4 = session.get(
    'https://api.example.com/guest-area',
    auth=temp_auth  # 临时使用Digest认证
)

print(f"请求4状态码: {response4.status_code}")

# 清除Session认证
session.auth = None
print(f"Session认证已清除: {session.auth}")

2. 多Session管理不同认证

import requests
from requests.auth import HTTPBasicAuth

class MultiAuthManager:
    """多认证会话管理器"""

    def __init__(self):
        self.sessions = {}  # 存储不同认证的Session

    def get_session(self, auth_key, auth_config=None):
        """获取或创建指定认证的Session"""
        if auth_key not in self.sessions:
            session = requests.Session()

            # 配置认证
            if auth_config:
                self._configure_session_auth(session, auth_config)

            self.sessions[auth_key] = session

        return self.sessions[auth_key]

    def _configure_session_auth(self, session, auth_config):
        """配置Session认证"""
        auth_type = auth_config.get('type', 'none')

        if auth_type == 'basic':
            session.auth = HTTPBasicAuth(
                auth_config.get('username'),
                auth_config.get('password')
            )

        elif auth_type == 'bearer':
            session.headers.update({
                'Authorization': f'Bearer {auth_config.get("token")}'
            })

        elif auth_type == 'api_key':
            session.headers.update({
                auth_config.get('header_name', 'X-API-Key'): auth_config.get('api_key')
            })

        # 设置其他公共头部
        session.headers.update({
            'User-Agent': 'MultiAuthManager/1.0',
            'Accept': 'application/json'
        })

    def close_all(self):
        """关闭所有Session"""
        for session in self.sessions.values():
            session.close()
        self.sessions.clear()

# 使用多认证管理器
auth_manager = MultiAuthManager()

# 配置不同认证的Session
admin_config = {
    'type': 'basic',
    'username': 'admin',
    'password': 'admin123'
}

api_config = {
    'type': 'api_key',
    'api_key': 'api_key_12345',
    'header_name': 'X-API-Key'
}

user_config = {
    'type': 'bearer',
    'token': 'user_token_67890'
}

# 获取不同认证的Session
admin_session = auth_manager.get_session('admin', admin_config)
api_session = auth_manager.get_session('api', api_config)
user_session = auth_manager.get_session('user', user_config)

# 使用不同认证的Session访问相应API
try:
    # 使用管理员认证访问管理API
    admin_response = admin_session.get('https://api.example.com/admin/users')
    print(f"管理员请求状态码: {admin_response.status_code}")

    # 使用API Key访问数据API
    api_response = api_session.get('https://api.example.com/api/data')
    print(f"API Key请求状态码: {api_response.status_code}")

    # 使用用户Token访问用户API
    user_response = user_session.get('https://api.example.com/user/profile')
    print(f"用户Token请求状态码: {user_response.status_code}")

finally:
    # 清理资源
    auth_manager.close_all()
    print("所有Session已关闭")

# 上下文管理器版本
class AuthSessionManager:
    """认证会话管理器(上下文管理器)"""

    def __init__(self, auth_configs):
        self.auth_configs = auth_configs
        self.sessions = {}

    def __enter__(self):
        # 创建所有Session
        for key, config in self.auth_configs.items():
            session = requests.Session()
            self._configure_session(session, config)
            self.sessions[key] = session
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 关闭所有Session
        for session in self.sessions.values():
            session.close()
        self.sessions.clear()

    def _configure_session(self, session, config):
        """配置Session"""
        if 'headers' in config:
            session.headers.update(config['headers'])

        if 'auth' in config:
            session.auth = config['auth']

    def get(self, auth_key):
        """获取指定认证的Session"""
        return self.sessions.get(auth_key)

# 使用上下文管理器
auth_configs = {
    'github': {
        'headers': {
            'Authorization': 'token your_github_token',
            'Accept': 'application/vnd.github.v3+json'
        }
    },
    'slack': {
        'headers': {
            'Authorization': 'Bearer your_slack_token'
        }
    }
}

with AuthSessionManager(auth_configs) as manager:
    github_session = manager.get('github')
    slack_session = manager.get('slack')

    # 使用不同的Session访问不同的API
    # github_response = github_session.get('https://api.github.com/user')
    # slack_response = slack_session.get('https://slack.com/api/auth.test')

    print("在上下文管理器中使用多个认证Session")

print("上下文管理器已退出,所有资源已清理")

3. 认证状态管理

import requests
import time
from requests.auth import AuthBase

class StatefulAuthSession:
    """有状态认证Session,处理认证过期和刷新"""

    def __init__(self, base_url, auth_provider):
        self.base_url = base_url
        self.auth_provider = auth_provider  # 认证提供者
        self.session = requests.Session()
        self.is_authenticated = False
        self.auth_expiry = None

        # 设置默认头部
        self.session.headers.update({
            'User-Agent': 'StatefulAuthClient/1.0',
            'Accept': 'application/json'
        })

    def authenticate(self):
        """进行认证"""
        try:
            auth_data = self.auth_provider.authenticate()

            # 配置Session认证
            if 'token' in auth_data:
                self.session.headers['Authorization'] = f'Bearer {auth_data["token"]}'

            # 更新认证状态
            self.is_authenticated = True

            # 设置过期时间
            if 'expires_in' in auth_data:
                self.auth_expiry = time.time() + auth_data['expires_in']

            print("认证成功")
            return True

        except Exception as e:
            print(f"认证失败: {e}")
            self.is_authenticated = False
            return False

    def check_auth(self):
        """检查认证状态"""
        if not self.is_authenticated:
            return False

        # 检查是否过期
        if self.auth_expiry and time.time() > self.auth_expiry:
            print("认证已过期")
            self.is_authenticated = False
            return False

        return True

    def ensure_auth(self):
        """确保有有效的认证"""
        if not self.check_auth():
            return self.authenticate()
        return True

    def request(self, method, endpoint, **kwargs):
        """发送请求,自动处理认证"""
        # 确保有有效的认证
        if not self.ensure_auth():
            raise Exception("无法建立有效认证")

        url = f"{self.base_url}/{endpoint.lstrip('/')}"

        # 发送请求
        response = self.session.request(method, url, **kwargs)

        # 检查是否需要重新认证
        if response.status_code == 401:
            print("认证失效,尝试重新认证...")
            if self.authenticate():
                # 重试请求
                response = self.session.request(method, url, **kwargs)

        return response

    def get(self, endpoint, **kwargs):
        return self.request('GET', endpoint, **kwargs)

    def post(self, endpoint, **kwargs):
        return self.request('POST', endpoint, **kwargs)

    def close(self):
        """关闭Session"""
        self.session.close()
        self.is_authenticated = False

# 示例认证提供者
class MockAuthProvider:
    """模拟认证提供者"""

    def authenticate(self):
        # 模拟认证过程
        import random

        # 模拟网络延迟
        time.sleep(0.5)

        # 生成模拟token
        token = f"mock_token_{random.randint(10000, 99999)}"

        return {
            'token': token,
            'expires_in': 30,  # 30秒过期,方便测试
            'token_type': 'Bearer'
        }

# 使用有状态认证Session
if __name__ == '__main__':
    auth_provider = MockAuthProvider()
    client = StatefulAuthSession('https://api.example.com', auth_provider)

    try:
        # 首次认证
        print("进行首次认证...")
        if client.authenticate():
            print("认证状态:", client.is_authenticated)
            print("过期时间:", client.auth_expiry)

        # 发送请求
        print("\n发送请求1...")
        response1 = client.get('/resource1')
        print(f"请求1状态码: {response1.status_code}")

        # 等待过期
        print("\n等待认证过期...")
        time.sleep(35)  # 等待超过30秒

        # 再次发送请求(应该自动重新认证)
        print("\n发送请求2(认证已过期)...")
        response2 = client.get('/resource2')
        print(f"请求2状态码: {response2.status_code}")
        print("认证状态:", client.is_authenticated)

        # 模拟401错误
        print("\n模拟401错误处理...")
        # 这里可以创建一个模拟服务器返回401,然后测试重认证逻辑

    finally:
        client.close()
        print("\nSession已关闭")

安全最佳实践

重要安全警告

认证信息是系统的钥匙,一旦泄露可能导致严重的安全问题。以下最佳实践至关重要。

1. 凭证管理

不要这样做
  • 不要在代码中硬编码凭证
  • 不要将凭证提交到版本控制系统
  • 不要在前端暴露凭证
  • 不要在日志中记录完整凭证
  • 不要使用弱密码或默认凭证
应该这样做
  • 使用环境变量管理凭证
  • 使用密钥管理服务(如Vault)
  • 为不同环境使用不同凭证
  • 定期轮换凭证
  • 使用最小权限原则

2. 安全代码示例

import os
import requests
from dotenv import load_dotenv  # pip install python-dotenv
from requests.auth import HTTPBasicAuth
import keyring  # pip install keyring(系统密钥库)

# 方法1:使用python-dotenv加载环境变量
load_dotenv()  # 从.env文件加载环境变量

# 从环境变量获取凭证(最常用)
username = os.getenv('API_USERNAME')
password = os.getenv('API_PASSWORD')
api_key = os.getenv('API_KEY')

if not all([username, password]):
    raise ValueError("请设置API_USERNAME和API_PASSWORD环境变量")

# 使用环境变量的凭证
auth = HTTPBasicAuth(username, password)

# 方法2:使用系统密钥库(更安全)
def get_credential_from_keyring(service_name, username):
    """从系统密钥库获取凭证"""
    try:
        return keyring.get_password(service_name, username)
    except:
        return None

# 存储凭证到密钥库
# keyring.set_password('my_api_service', 'api_user', 'secret_password')

# 从密钥库读取
stored_password = get_credential_from_keyring('my_api_service', 'api_user')
if stored_password:
    auth = HTTPBasicAuth('api_user', stored_password)

# 方法3:使用配置文件(配合加密)
import json
import base64
from cryptography.fernet import Fernet  # pip install cryptography

class SecureConfig:
    """安全配置管理器"""

    def __init__(self, config_path='config.enc', key_path='key.key'):
        self.config_path = config_path
        self.key_path = key_path
        self.key = self._load_or_create_key()
        self.cipher = Fernet(self.key)

    def _load_or_create_key(self):
        """加载或创建加密密钥"""
        if os.path.exists(self.key_path):
            with open(self.key_path, 'rb') as f:
                return f.read()
        else:
            key = Fernet.generate_key()
            with open(self.key_path, 'wb') as f:
                f.write(key)
            # 设置文件权限(仅所有者可读)
            os.chmod(self.key_path, 0o600)
            return key

    def save_config(self, config_data):
        """保存加密配置"""
        json_str = json.dumps(config_data)
        encrypted = self.cipher.encrypt(json_str.encode())

        with open(self.config_path, 'wb') as f:
            f.write(encrypted)

        os.chmod(self.config_path, 0o600)

    def load_config(self):
        """加载解密配置"""
        if not os.path.exists(self.config_path):
            return {}

        with open(self.config_path, 'rb') as f:
            encrypted = f.read()

        decrypted = self.cipher.decrypt(encrypted)
        return json.loads(decrypted.decode())

# 使用安全配置
config = SecureConfig()

# 保存配置(加密)
config.save_config({
    'api_url': 'https://api.example.com',
    'username': 'api_user',
    'password': 'secret_password',  # 实际中不应该这样存储密码
    'api_key': 'key_12345'
})

# 加载配置
loaded_config = config.load_config()
print(f"加载的配置: {loaded_config.get('api_url')}")

# 方法4:使用AWS Secrets Manager或其他云服务
try:
    import boto3
    from botocore.exceptions import ClientError

    def get_secret_from_aws(secret_name, region_name='us-east-1'):
        """从AWS Secrets Manager获取密钥"""
        session = boto3.session.Session()
        client = session.client(
            service_name='secretsmanager',
            region_name=region_name
        )

        try:
            response = client.get_secret_value(SecretId=secret_name)
            if 'SecretString' in response:
                return json.loads(response['SecretString'])
            else:
                decoded_binary_secret = base64.b64decode(
                    response['SecretBinary']
                )
                return json.loads(decoded_binary_secret)
        except ClientError as e:
            print(f"AWS Secrets Manager错误: {e}")
            return None

    # 使用示例
    # aws_secret = get_secret_from_aws('my-api-credentials')
    # if aws_secret:
    #     username = aws_secret.get('username')
    #     password = aws_secret.get('password')

except ImportError:
    print("boto3未安装,跳过AWS Secrets Manager示例")

# 完整的认证客户端示例
class SecureAPIClient:
    """安全的API客户端"""

    def __init__(self, config_source='env'):
        self.config_source = config_source
        self.credentials = self._load_credentials()
        self.session = requests.Session()

        # 配置Session
        self._configure_session()

    def _load_credentials(self):
        """根据配置源加载凭证"""
        if self.config_source == 'env':
            return {
                'username': os.getenv('API_USERNAME'),
                'password': os.getenv('API_PASSWORD'),
                'api_key': os.getenv('API_KEY')
            }
        elif self.config_source == 'keyring':
            return {
                'username': 'api_user',
                'password': get_credential_from_keyring('my_api', 'api_user')
            }
        elif self.config_source == 'file':
            config = SecureConfig()
            return config.load_config()
        else:
            raise ValueError(f"不支持的配置源: {self.config_source}")

    def _configure_session(self):
        """配置Session"""
        # 添加认证头部
        if self.credentials.get('api_key'):
            self.session.headers['X-API-Key'] = self.credentials['api_key']

        # 或者使用Basic认证
        elif self.credentials.get('username') and self.credentials.get('password'):
            self.session.auth = HTTPBasicAuth(
                self.credentials['username'],
                self.credentials['password']
            )

        # 其他公共头部
        self.session.headers.update({
            'User-Agent': 'SecureAPIClient/1.0',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })

        # 配置安全选项
        self.session.verify = True  # 启用SSL验证
        self.session.timeout = 30   # 设置超时

    def request(self, method, url, **kwargs):
        """发送请求"""
        # 记录请求(不记录敏感信息)
        safe_url = url
        if 'api_key' in self.session.headers:
            safe_url = url.replace(self.session.headers['X-API-Key'], '***')

        print(f"发送请求: {method} {safe_url}")

        response = self.session.request(method, url, **kwargs)

        # 检查响应
        if response.status_code == 401:
            print("认证失败,可能需要更新凭证")
        elif response.status_code == 403:
            print("权限不足")

        return response

    def close(self):
        """关闭Session"""
        self.session.close()

# 使用安全客户端
with SecureAPIClient(config_source='env') as client:
    # 发送请求
    response = client.request('GET', 'https://api.example.com/data')
    print(f"响应状态码: {response.status_code}")

3. 认证审计与监控

import requests
import time
import logging
from datetime import datetime
from requests.auth import AuthBase

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('auth_audit.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('AuthAudit')

class AuditedAuth(AuthBase):
    """带审计的自定义认证"""

    def __init__(self, original_auth, user_id, client_ip='unknown'):
        self.original_auth = original_auth
        self.user_id = user_id
        self.client_ip = client_ip
        self.request_count = 0

    def __call__(self, r):
        self.request_count += 1

        # 记录审计信息(不记录敏感信息)
        audit_info = {
            'timestamp': datetime.now().isoformat(),
            'user_id': self.user_id,
            'client_ip': self.client_ip,
            'method': r.method,
            'url': r.url,
            'request_number': self.request_count
        }

        logger.info(f"认证请求: {audit_info}")

        # 应用原始认证
        return self.original_auth(r)

    def get_stats(self):
        """获取统计信息"""
        return {
            'user_id': self.user_id,
            'total_requests': self.request_count,
            'last_activity': datetime.now().isoformat()
        }

class MonitoredSession:
    """监控Session"""

    def __init__(self, session_name, alert_threshold=100):
        self.session_name = session_name
        self.session = requests.Session()
        self.request_count = 0
        self.alert_threshold = alert_threshold
        self.start_time = time.time()

        # 监控指标
        self.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'auth_failures': 0,
            'last_request_time': None
        }

    def request(self, method, url, **kwargs):
        """发送请求并监控"""
        self.request_count += 1
        self.metrics['total_requests'] += 1

        # 检查请求频率
        if self.request_count > self.alert_threshold:
            logger.warning(
                f"高频率请求警告: {self.session_name} "
                f"已发送 {self.request_count} 个请求"
            )

        start_time = time.time()

        try:
            response = self.session.request(method, url, **kwargs)
            elapsed = time.time() - start_time

            # 更新指标
            if 200 <= response.status_code < 300:
                self.metrics['successful_requests'] += 1
            else:
                self.metrics['failed_requests'] += 1

            # 记录认证失败
            if response.status_code == 401:
                self.metrics['auth_failures'] += 1
                logger.error(f"认证失败: {method} {url}")

            # 记录慢请求
            if elapsed > 5.0:  # 超过5秒认为是慢请求
                logger.warning(
                    f"慢请求: {method} {url} 耗时 {elapsed:.2f}秒"
                )

            self.metrics['last_request_time'] = datetime.now().isoformat()

            return response

        except Exception as e:
            self.metrics['failed_requests'] += 1
            logger.error(f"请求异常: {method} {url} - {e}")
            raise

    def get_metrics(self):
        """获取监控指标"""
        uptime = time.time() - self.start_time
        requests_per_second = self.metrics['total_requests'] / max(uptime, 1)

        return {
            **self.metrics,
            'session_name': self.session_name,
            'uptime_seconds': uptime,
            'requests_per_second': requests_per_second,
            'success_rate': (
                self.metrics['successful_requests'] /
                max(self.metrics['total_requests'], 1) * 100
            )
        }

# 使用审计和监控
# 创建基础认证
basic_auth = HTTPBasicAuth('admin', 'secret123')

# 包装为带审计的认证
audited_auth = AuditedAuth(
    original_auth=basic_auth,
    user_id='user_12345',
    client_ip='192.168.1.100'
)

# 创建监控Session
monitored_session = MonitoredSession(
    session_name='AdminAPISession',
    alert_threshold=50
)

# 配置Session
monitored_session.session.auth = audited_auth

# 发送请求
try:
    response = monitored_session.request(
        'GET',
        'https://api.example.com/admin/users'
    )

    print(f"请求状态码: {response.status_code}")

    # 获取审计信息
    auth_stats = audited_auth.get_stats()
    print(f"用户 {auth_stats['user_id']} 发送了 {auth_stats['total_requests']} 个请求")

    # 获取监控指标
    metrics = monitored_session.get_metrics()
    print(f"Session指标: {metrics['success_rate']:.1f}% 成功率")

except Exception as e:
    print(f"请求失败: {e}")

# 定期报告
def generate_security_report(sessions):
    """生成安全报告"""
    report = {
        'generated_at': datetime.now().isoformat(),
        'total_sessions': len(sessions),
        'sessions': [],
        'summary': {
            'total_requests': 0,
            'auth_failures': 0,
            'suspicious_activity': []
        }
    }

    for session in sessions:
        metrics = session.get_metrics()
        report['sessions'].append(metrics)
        report['summary']['total_requests'] += metrics['total_requests']
        report['summary']['auth_failures'] += metrics['auth_failures']

        # 检测可疑活动
        if metrics['auth_failures'] > 10:
            report['summary']['suspicious_activity'].append({
                'session': metrics['session_name'],
                'reason': '过多的认证失败',
                'count': metrics['auth_failures']
            })

        if metrics['requests_per_second'] > 10:
            report['summary']['suspicious_activity'].append({
                'session': metrics['session_name'],
                'reason': '请求频率过高',
                'rate': metrics['requests_per_second']
            })

    return report

# 生成报告
report = generate_security_report([monitored_session])
print(f"\n安全报告: {json.dumps(report, indent=2)}")

4. 安全认证检查清单

认证安全检查清单
  • ✅ 始终使用HTTPS(TLS/SSL)
  • ✅ 凭证存储在安全的地方(环境变量、密钥库)
  • ✅ 使用适当的认证机制(Bearer Token > Basic Auth)
  • ✅ 实现令牌刷新机制
  • ✅ 设置合理的超时时间
  • ✅ 启用SSL证书验证
  • ✅ 记录审计日志(不含敏感信息)
  • ✅ 监控异常认证活动
  • ✅ 定期轮换凭证和令牌
  • ✅ 使用最小权限原则
  • ✅ 实现速率限制和防滥用机制
  • ✅ 进行安全代码审查和渗透测试
最后的安全建议

安全是一个持续的过程,而不是一次性的任务。 定期审查和更新你的认证机制,关注安全公告,及时修补漏洞。对于生产系统,考虑使用专业的安全审计工具和服务。

身份认证总结

Requests库提供了强大而灵活的身份认证支持,从简单的Basic Auth到复杂的OAuth 2.0。关键要点:

  • Basic认证:简单但不安全,仅适用于HTTPS环境或内部网络
  • Digest认证:比Basic安全但不常用,逐渐被淘汰
  • Bearer Token:现代API的标准,配合JWT使用效果更好
  • OAuth 1.0a:用于Twitter等传统API,基于签名认证
  • OAuth 2.0:现代标准,支持多种流程(授权码、客户端凭证等)
  • 自定义认证:通过继承AuthBase类处理特殊认证需求
  • Session管理:统一管理认证状态,提高性能和安全性

记住:无论使用哪种认证方式,安全都是首要考虑因素。始终遵循最小权限原则,安全地存储和管理凭证,使用HTTPS加密传输,并实现适当的监控和审计机制。