身份认证确保只有合法用户能够访问受保护的资源,主要目的包括:
客户端向服务器发送HTTP请求(通常不带认证信息)
服务器返回401状态码和WWW-Authenticate头,指定认证方式
客户端重新发送请求,包含Authorization头中的认证信息
服务器验证凭证,成功返回200,失败返回401或403
| 认证方式 | 安全性 | 适用场景 | 实现复杂度 |
|---|---|---|---|
| Basic Auth | 低(明文传输) | 内部API、测试环境、配合HTTPS使用 | 简单 |
| Digest Auth | 中(哈希传输) | 需要中等安全级别的Web应用 | 中等 |
| Bearer Token | 高(令牌机制) | 现代RESTful API、移动应用、单页面应用 | 简单 |
| OAuth 1.0a | 高(签名认证) | 第三方API授权(Twitter、Tumblr等) | 复杂 |
| OAuth 2.0 | 高(令牌机制) | 现代Web应用、移动应用、微服务 | 中等 |
| API Key | 中(密钥验证) | 服务间通信、数据API、统计服务 | 简单 |
永远不要在没有HTTPS的情况下使用Basic Auth,因为凭证是Base64编码的(不是加密),容易被中间人窃取。所有认证机制都应配合TLS/SSL使用。
HTTP Basic认证是最简单的认证方式,它将用户名和密码用冒号连接,然后进行Base64编码,放在Authorization头中发送。
import requests
import base64
# 方法1:在请求中直接提供auth参数(最简单)
response = requests.get(
'https://api.example.com/protected',
auth=('admin', 'secret123')
)
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.text[:100]}...")
# 方法2:手动构建Authorization头(不推荐)
username = 'admin'
password = 'secret123'
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
headers = {
'Authorization': f'Basic {encoded_credentials}'
}
response = requests.get(
'https://api.example.com/protected',
headers=headers
)
print(f"手动构建头部的状态码: {response.status_code}")
import requests
from requests.auth import HTTPBasicAuth
# 使用HTTPBasicAuth类(更清晰)
auth = HTTPBasicAuth('admin', 'secret123')
response = requests.get(
'https://api.example.com/protected',
auth=auth
)
print(f"使用HTTPBasicAuth的状态码: {response.status_code}")
# 也可以创建可重用的认证对象
class BasicAuthClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password)
self.session = requests.Session()
def get(self, endpoint):
url = f"{self.base_url}/{endpoint}"
return self.session.get(url, auth=self.auth)
def post(self, endpoint, data=None):
url = f"{self.base_url}/{endpoint}"
return self.session.post(url, auth=self.auth, json=data)
# 使用示例
client = BasicAuthClient('https://api.example.com', 'admin', 'secret123')
response = client.get('users')
print(f"客户端请求状态码: {response.status_code}")
import requests
from requests.auth import HTTPBasicAuth
# 创建Session并设置Basic认证
session = requests.Session()
# 方法1:为Session设置auth
session.auth = HTTPBasicAuth('admin', 'secret123')
# 方法2:设置默认头部(不推荐,不够安全)
# session.headers.update({
# 'Authorization': 'Basic ' + base64.b64encode(b'admin:secret123').decode()
# })
# 现在所有通过该Session的请求都会自动包含认证信息
response1 = session.get('https://api.example.com/users')
response2 = session.get('https://api.example.com/posts')
print(f"请求1状态码: {response1.status_code}")
print(f"请求2状态码: {response2.status_code}")
# 临时使用不同的认证
temp_auth = HTTPBasicAuth('guest', 'readonly')
response3 = session.get(
'https://api.example.com/public',
auth=temp_auth # 临时覆盖Session的认证
)
print(f"请求3状态码: {response3.status_code}")
# 清除Session的认证
session.auth = None
import requests
from requests.auth import HTTPBasicAuth
import json
import time
class BasicAuthAPI:
"""使用Basic认证的API客户端"""
def __init__(self, base_url, username, password):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.auth = HTTPBasicAuth(username, password)
# 设置默认请求头
self.session.headers.update({
'User-Agent': 'BasicAuthAPI/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
# 配置重试
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[401, 429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def _handle_response(self, response):
"""统一处理响应"""
try:
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
print("认证失败:用户名或密码错误")
elif response.status_code == 403:
print("权限不足:用户没有访问此资源的权限")
raise e
def get_users(self):
"""获取用户列表"""
response = self.session.get(f'{self.base_url}/users')
return self._handle_response(response)
def create_user(self, user_data):
"""创建新用户"""
response = self.session.post(
f'{self.base_url}/users',
json=user_data
)
return self._handle_response(response)
def update_user(self, user_id, user_data):
"""更新用户信息"""
response = self.session.put(
f'{self.base_url}/users/{user_id}',
json=user_data
)
return self._handle_response(response)
def delete_user(self, user_id):
"""删除用户"""
response = self.session.delete(f'{self.base_url}/users/{user_id}')
return self._handle_response(response)
def test_connection(self):
"""测试连接和认证"""
try:
response = self.session.get(f'{self.base_url}/health', timeout=5)
response.raise_for_status()
return True
except Exception as e:
print(f"连接测试失败: {e}")
return False
# 使用示例
if __name__ == '__main__':
# 初始化客户端
api = BasicAuthAPI(
base_url='https://api.example.com',
username='admin',
password='secret123' # 实际应用中应从环境变量或配置文件中读取
)
# 测试连接
if api.test_connection():
print("✓ 连接测试成功")
# 获取用户列表
users = api.get_users()
print(f"获取到 {len(users)} 个用户")
# 创建新用户
new_user = {
'name': '张三',
'email': 'zhangsan@example.com',
'role': 'user'
}
created_user = api.create_user(new_user)
print(f"创建用户: {created_user['id']}")
# 更新用户
updated_data = {'email': 'zhangsan.new@example.com'}
updated_user = api.update_user(created_user['id'], updated_data)
print(f"更新用户: {updated_user['email']}")
# 删除用户
result = api.delete_user(created_user['id'])
print(f"删除用户: {result.get('message', '成功')}")
else:
print("✗ 连接测试失败,请检查网络和认证信息")
HTTP Digest认证 是比Basic认证更安全的认证机制。它不会传输明文密码,而是使用哈希算法(通常是MD5)对密码和服务器提供的随机数(nonce)进行加密。
GET /protected
WWW-Authenticate: Digest realm="Protected Area", nonce="abc123"
HA1 = MD5(username:realm:password)
HA2 = MD5(method:uri)
response = MD5(HA1:nonce:HA2)
Authorization: Digest username="admin", realm="...", nonce="...", response="..."
import requests
from requests.auth import HTTPDigestAuth
# 使用HTTPDigestAuth类
auth = HTTPDigestAuth('admin', 'secret123')
# 发送请求(Requests会自动处理Digest认证流程)
response = requests.get(
'https://httpbin.org/digest-auth/auth/admin/secret123',
auth=auth
)
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.json()}")
# 使用Session
session = requests.Session()
session.auth = HTTPDigestAuth('admin', 'secret123')
response2 = session.get('https://httpbin.org/digest-auth/auth/admin/secret123')
print(f"Session请求状态码: {response2.status_code}")
# 注意:Digest认证要求服务器支持
# 测试时可以使用 httpbin.org 的digest-auth端点
import requests
from requests.auth import HTTPDigestAuth
class DigestAuthClient:
"""Digest认证客户端"""
def __init__(self, base_url, username, password):
self.base_url = base_url.rstrip('/')
self.auth = HTTPDigestAuth(username, password)
self.session = requests.Session()
self.session.auth = self.auth
# Digest认证的特定配置
self.nonce_count = 0 # 跟踪nonce使用次数
# 设置默认超时
self.session.timeout = 30
def _increment_nonce_count(self):
"""增加nonce计数器(简化示例)"""
self.nonce_count += 1
return self.nonce_count
def request(self, method, endpoint, **kwargs):
"""发送请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# 可以在这里添加自定义Digest逻辑
response = self.session.request(method, url, **kwargs)
# 检查是否需要重新认证
if response.status_code == 401:
print("认证失败,可能需要更新nonce")
# 在实际应用中,这里可以处理nonce过期的情况
return response
def get(self, endpoint, **kwargs):
return self.request('GET', endpoint, **kwargs)
def post(self, endpoint, **kwargs):
return self.request('POST', endpoint, **kwargs)
# 使用示例
client = DigestAuthClient(
base_url='https://httpbin.org',
username='admin',
password='secret123'
)
# 测试Digest认证
response = client.get('/digest-auth/auth/admin/secret123')
if response.status_code == 200:
print(f"Digest认证成功: {response.json()}")
else:
print(f"Digest认证失败: {response.status_code}")
# 注意:Digest认证比Basic认证更安全,但实现也更复杂
# 大多数现代API使用Bearer Token而不是Digest认证
import requests
import hashlib
import time
class CustomDigestAuth(requests.auth.AuthBase):
"""自定义Digest认证实现"""
def __init__(self, username, password):
self.username = username
self.password = password
self.last_nonce = None
self.nonce_count = 0
def __call__(self, r):
# 如果有缓存的nonce,使用它
if self.last_nonce:
r.headers['Authorization'] = self._build_auth_header(r)
return r
def handle_401(self, r, **kwargs):
"""处理401响应,提取nonce并重新发送请求"""
if r.status_code == 401 and 'WWW-Authenticate' in r.headers:
auth_header = r.headers['WWW-Authenticate']
# 解析Digest挑战参数
params = self._parse_auth_header(auth_header)
# 构建认证响应
auth_response = self._build_auth_response(r, params)
# 修改原始请求的头部
r.request.headers['Authorization'] = auth_response
# 重新发送请求
return r.connection.send(r.request, **kwargs)
return r
def _parse_auth_header(self, auth_header):
"""解析WWW-Authenticate头部"""
params = {}
if auth_header.startswith('Digest '):
parts = auth_header[7:].split(', ')
for part in parts:
if '=' in part:
key, value = part.split('=', 1)
params[key.strip()] = value.strip('"')
return params
def _build_auth_response(self, request, params):
"""构建Digest认证响应"""
method = request.method
uri = request.path_url
# 计算HA1
ha1_data = f"{self.username}:{params.get('realm', '')}:{self.password}"
ha1 = hashlib.md5(ha1_data.encode()).hexdigest()
# 计算HA2
ha2_data = f"{method}:{uri}"
ha2 = hashlib.md5(ha2_data.encode()).hexdigest()
# 计算response
self.nonce_count += 1
nc_value = f"{self.nonce_count:08x}"
cnonce = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
response_data = f"{ha1}:{params.get('nonce', '')}:{nc_value}:{cnonce}:auth:{ha2}"
response = hashlib.md5(response_data.encode()).hexdigest()
# 构建Authorization头部
auth_parts = [
f'username="{self.username}"',
f'realm="{params.get("realm", "")}"',
f'nonce="{params.get("nonce", "")}"',
f'uri="{uri}"',
f'response="{response}"',
f'algorithm=MD5',
f'qop=auth',
f'nc={nc_value}',
f'cnonce="{cnonce}"'
]
return 'Digest ' + ', '.join(auth_parts)
def _build_auth_header(self, request):
"""构建Authorization头部(简化版)"""
# 简化实现,实际使用时需要完整实现上述逻辑
return f'Digest username="{self.username}"'
# 使用自定义Digest认证
session = requests.Session()
auth = CustomDigestAuth('admin', 'secret123')
session.auth = auth
# 添加钩子处理401响应
session.hooks['response'].append(auth.handle_401)
try:
response = session.get('https://httpbin.org/digest-auth/auth/admin/secret123')
print(f"自定义Digest认证状态码: {response.status_code}")
except Exception as e:
print(f"自定义Digest认证错误: {e}")
Bearer Token(承载令牌)是现代API最常用的认证方式。它是一个加密字符串,通常通过OAuth 2.0流程获得,放在Authorization头中发送。
import requests
# 方法1:直接在headers中添加Bearer Token(最常用)
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = requests.get(
'https://api.example.com/users',
headers=headers
)
print(f"状态码: {response.status_code}")
# 方法2:使用字典管理headers
auth_headers = {
'Authorization': f'Bearer {token}'
}
# 可以合并headers
all_headers = {**auth_headers, **{'Accept': 'application/json'}}
response = requests.get('https://api.example.com/users', headers=all_headers)
# 方法3:使用Session统一管理
session = requests.Session()
session.headers.update({
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
})
response = session.get('https://api.example.com/users')
print(f"Session请求状态码: {response.status_code}")
import requests
from requests.auth import AuthBase
# Requests没有内置的HTTPBearerAuth类,但我们可以自定义
# 或者使用requests-oauthlib库中的OAuth2Token
# 自定义Bearer认证类
class BearerAuth(AuthBase):
"""Bearer Token认证类"""
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers['Authorization'] = f'Bearer {self.token}'
return r
# 使用自定义Bearer认证
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
auth = BearerAuth(token)
response = requests.get(
'https://api.example.com/users',
auth=auth
)
print(f"使用BearerAuth的状态码: {response.status_code}")
# 使用requests-oauthlib(如果需要OAuth 2.0功能)
try:
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
# 使用OAuth2Session(内置Bearer Token支持)
client_id = 'your_client_id'
client_secret = 'your_client_secret'
token_url = 'https://api.example.com/oauth/token'
client = BackendApplicationClient(client_id=client_id)
oauth = OAuth2Session(client=client)
# 获取token
token = oauth.fetch_token(
token_url=token_url,
client_id=client_id,
client_secret=client_secret
)
# 现在oauth会自动在请求中添加Bearer Token
response = oauth.get('https://api.example.com/users')
print(f"OAuth2Session状态码: {response.status_code}")
except ImportError:
print("requests-oauthlib未安装,使用: pip install requests-oauthlib")
import requests
from requests.auth import AuthBase
import time
import json
class SmartBearerAuth(AuthBase):
"""智能Bearer认证,支持token自动刷新"""
def __init__(self, token, refresh_token=None, token_url=None,
client_id=None, client_secret=None):
self.token = token
self.refresh_token = refresh_token
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self.expires_at = None
self.session = requests.Session()
def __call__(self, r):
# 检查token是否过期
if self.expires_at and time.time() > self.expires_at:
self._refresh_token()
r.headers['Authorization'] = f'Bearer {self.token}'
return r
def _refresh_token(self):
"""刷新access token"""
if not all([self.refresh_token, self.token_url,
self.client_id, self.client_secret]):
raise ValueError("缺少刷新token所需的参数")
data = {
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
'client_id': self.client_id,
'client_secret': self.client_secret
}
response = self.session.post(self.token_url, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data['access_token']
# 更新refresh_token(如果提供了新的)
if 'refresh_token' in token_data:
self.refresh_token = token_data['refresh_token']
# 设置过期时间
if 'expires_in' in token_data:
self.expires_at = time.time() + token_data['expires_in']
print("Token已刷新")
def update_token(self, token_data):
"""更新token信息"""
self.token = token_data.get('access_token', self.token)
self.refresh_token = token_data.get('refresh_token', self.refresh_token)
if 'expires_in' in token_data:
self.expires_at = time.time() + token_data['expires_in']
# 使用智能Bearer认证
token_data = {
'access_token': 'initial_token_here',
'refresh_token': 'refresh_token_here',
'expires_in': 3600 # 1小时过期
}
auth = SmartBearerAuth(
token=token_data['access_token'],
refresh_token=token_data['refresh_token'],
token_url='https://api.example.com/oauth/token',
client_id='your_client_id',
client_secret='your_client_secret'
)
auth.update_token(token_data)
# 现在可以安全地使用,token会自动刷新
session = requests.Session()
session.auth = auth
try:
response = session.get('https://api.example.com/protected')
print(f"智能Bearer认证状态码: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
import requests
import jwt # PyJWT库,需要安装: pip install PyJWT
import time
from requests.auth import AuthBase
class JWTAuth(AuthBase):
"""JWT认证处理"""
def __init__(self, token=None, secret_key=None,
algorithm='HS256', payload=None):
self.token = token
self.secret_key = secret_key
self.algorithm = algorithm
self.payload = payload or {}
# 如果没有提供token,但提供了payload和secret_key,则生成token
if not self.token and self.secret_key and self.payload:
self.token = self._generate_token()
def __call__(self, r):
if self.token:
r.headers['Authorization'] = f'Bearer {self.token}'
return r
def _generate_token(self):
"""生成JWT token"""
# 添加标准声明
standard_claims = {
'iat': int(time.time()), # 签发时间
'exp': int(time.time()) + 3600, # 过期时间(1小时)
'iss': 'my_app', # 签发者
}
# 合并payload
full_payload = {**standard_claims, **self.payload}
# 生成token
token = jwt.encode(
full_payload,
self.secret_key,
algorithm=self.algorithm
)
return token
def decode_token(self, token=None):
"""解码JWT token(用于验证)"""
token_to_decode = token or self.token
if not token_to_decode:
return None
try:
payload = jwt.decode(
token_to_decode,
self.secret_key,
algorithms=[self.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
print("Token已过期")
return None
except jwt.InvalidTokenError as e:
print(f"无效Token: {e}")
return None
def is_token_valid(self):
"""检查token是否有效"""
payload = self.decode_token()
if payload and 'exp' in payload:
return payload['exp'] > time.time()
return False
# 使用JWT认证
# 示例1:使用现有token
existing_token = "your.jwt.token.here"
jwt_auth = JWTAuth(token=existing_token)
response = requests.get(
'https://api.example.com/protected',
auth=jwt_auth
)
# 示例2:生成新token
secret_key = "your-secret-key-here"
payload = {
'user_id': 12345,
'username': 'john_doe',
'role': 'admin'
}
jwt_auth2 = JWTAuth(
secret_key=secret_key,
payload=payload,
algorithm='HS256'
)
# 验证token
if jwt_auth2.is_token_valid():
print("Token有效")
# 解码token查看内容
decoded = jwt_auth2.decode_token()
print(f"Token内容: {decoded}")
# 使用token发送请求
session = requests.Session()
session.auth = jwt_auth2
response = session.get('https://api.example.com/protected')
print(f"JWT认证状态码: {response.status_code}")
else:
print("Token无效或已过期")
# 注意:实际应用中,secret_key应从安全的地方获取(如环境变量)
import requests
import os
from datetime import datetime
class GitHubAPI:
"""GitHub API客户端(使用Bearer Token认证)"""
def __init__(self, token=None):
self.base_url = 'https://api.github.com'
self.token = token or os.getenv('GITHUB_TOKEN')
if not self.token:
raise ValueError("需要GitHub Token,请提供或设置GITHUB_TOKEN环境变量")
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'token {self.token}', # GitHub使用'token'而不是'Bearer'
'Accept': 'application/vnd.github.v3+json',
'User-Agent': 'GitHubAPI-Client/1.0'
})
# GitHub API有速率限制,添加监控
self.rate_limit_remaining = None
self.rate_limit_reset = None
def _update_rate_limit(self, response):
"""更新速率限制信息"""
if 'X-RateLimit-Remaining' in response.headers:
self.rate_limit_remaining = int(response.headers['X-RateLimit-Remaining'])
if 'X-RateLimit-Reset' in response.headers:
reset_timestamp = int(response.headers['X-RateLimit-Reset'])
self.rate_limit_reset = datetime.fromtimestamp(reset_timestamp)
def get_rate_limit_info(self):
"""获取速率限制信息"""
response = self.session.get(f'{self.base_url}/rate_limit')
data = response.json()
self._update_rate_limit(response)
return {
'limit': data['resources']['core']['limit'],
'remaining': data['resources']['core']['remaining'],
'reset': datetime.fromtimestamp(data['resources']['core']['reset']),
'used': data['resources']['core']['used']
}
def get_user(self, username=None):
"""获取用户信息"""
endpoint = '/user' if not username else f'/users/{username}'
response = self.session.get(f'{self.base_url}{endpoint}')
self._update_rate_limit(response)
response.raise_for_status()
return response.json()
def get_user_repos(self, username=None, page=1, per_page=30):
"""获取用户仓库"""
if username:
endpoint = f'/users/{username}/repos'
else:
endpoint = '/user/repos'
params = {
'page': page,
'per_page': per_page,
'sort': 'updated',
'direction': 'desc'
}
response = self.session.get(f'{self.base_url}{endpoint}', params=params)
self._update_rate_limit(response)
response.raise_for_status()
return response.json()
def create_repo(self, name, description='', private=False):
"""创建仓库"""
data = {
'name': name,
'description': description,
'private': private,
'auto_init': True # 初始化README
}
response = self.session.post(f'{self.base_url}/user/repos', json=data)
self._update_rate_limit(response)
response.raise_for_status()
return response.json()
def search_code(self, query, page=1, per_page=30):
"""搜索代码"""
params = {
'q': query,
'page': page,
'per_page': per_page
}
response = self.session.get(f'{self.base_url}/search/code', params=params)
self._update_rate_limit(response)
response.raise_for_status()
return response.json()
# 使用示例
if __name__ == '__main__':
# 从环境变量获取GitHub Token
# 在命令行设置: export GITHUB_TOKEN=your_token_here
try:
github = GitHubAPI()
# 检查速率限制
rate_limit = github.get_rate_limit_info()
print(f"API速率限制: {rate_limit['remaining']}/{rate_limit['limit']} 剩余")
print(f"重置时间: {rate_limit['reset']}")
# 获取当前用户信息
user = github.get_user()
print(f"当前用户: {user['login']} ({user['name']})")
print(f"仓库数: {user['public_repos']}")
print(f"关注者: {user['followers']}")
# 获取用户仓库
repos = github.get_user_repos(per_page=5)
print(f"\n最近的5个仓库:")
for repo in repos:
print(f" - {repo['name']}: {repo['description'] or '无描述'}")
# 搜索代码示例
print(f"\n搜索包含'requests'的Python代码:")
results = github.search_code('requests language:python', per_page=3)
print(f"找到 {results['total_count']} 个结果")
for item in results['items'][:3]:
print(f" - {item['repository']['full_name']}: {item['path']}")
# 创建新仓库(需要足够的权限)
# new_repo = github.create_repo('test-api-repo', '通过API创建的测试仓库')
# print(f"创建仓库: {new_repo['html_url']}")
except Exception as e:
print(f"GitHub API错误: {e}")
OAuth(开放授权) 是一个开放的授权标准,允许用户授权第三方应用访问他们存储在另一个服务提供者上的信息,而不需要将用户名和密码提供给第三方应用。
用户被重定向到授权服务器,请求访问权限
用户登录并授权应用访问特定资源
授权服务器返回授权码给应用
应用使用授权码换取访问令牌
应用使用访问令牌访问受保护的资源
import requests
from requests_oauthlib import OAuth1
import os
# OAuth 1.0a 认证(用于Twitter等API)
# 需要安装: pip install requests-oauthlib
# OAuth 1.0a 需要4个凭证
consumer_key = os.getenv('TWITTER_CONSUMER_KEY')
consumer_secret = os.getenv('TWITTER_CONSUMER_SECRET')
access_token = os.getenv('TWITTER_ACCESS_TOKEN')
access_token_secret = os.getenv('TWITTER_ACCESS_TOKEN_SECRET')
# 创建OAuth1认证对象
auth = OAuth1(
consumer_key,
consumer_secret,
access_token,
access_token_secret
)
# 使用OAuth1认证发送请求
try:
# Twitter API v1.1 示例
response = requests.get(
'https://api.twitter.com/1.1/account/verify_credentials.json',
auth=auth
)
if response.status_code == 200:
user_data = response.json()
print(f"Twitter用户: @{user_data['screen_name']}")
print(f"名称: {user_data['name']}")
print(f"描述: {user_data['description'][:50]}...")
else:
print(f"Twitter API错误: {response.status_code}")
except Exception as e:
print(f"OAuth 1.0a认证失败: {e}")
# OAuth 1.0a签名过程(了解原理)
class OAuth1Manual:
"""手动实现OAuth 1.0a签名(简化版,了解原理)"""
@staticmethod
def explain_signature():
"""解释OAuth 1.0a签名过程"""
steps = [
"1. 收集请求参数(URL参数、POST数据、OAuth参数)",
"2. 将参数按名称排序并编码",
"3. 构建参数字符串(key=value&格式)",
"4. 构建签名基础字符串(方法&URL&参数字符串)",
"5. 生成签名密钥(consumer_secret&token_secret)",
"6. 使用HMAC-SHA1算法计算签名",
"7. 将签名添加到Authorization头部"
]
print("OAuth 1.0a签名步骤:")
for step in steps:
print(f" {step}")
print("\n这就是为什么使用requests-oauthlib库更方便!")
# 查看签名过程
OAuth1Manual.explain_signature()
import requests
from requests.auth import HTTPBasicAuth
import base64
import time
class OAuth2ClientCredentials:
"""OAuth 2.0 客户端凭证流程(服务端到服务端)"""
def __init__(self, token_url, client_id, client_secret, scope=None):
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.access_token = None
self.token_type = None
self.expires_in = None
self.expires_at = None
self.session = requests.Session()
def get_token(self):
"""获取访问令牌"""
auth = HTTPBasicAuth(self.client_id, self.client_secret)
data = {
'grant_type': 'client_credentials'
}
if self.scope:
data['scope'] = self.scope
response = self.session.post(self.token_url, data=data, auth=auth)
response.raise_for_status()
token_data = response.json()
self._update_token_data(token_data)
return token_data
def _update_token_data(self, token_data):
"""更新令牌数据"""
self.access_token = token_data['access_token']
self.token_type = token_data.get('token_type', 'Bearer')
self.expires_in = token_data.get('expires_in', 3600)
self.expires_at = time.time() + self.expires_in
def is_token_valid(self):
"""检查令牌是否有效"""
if not self.access_token or not self.expires_at:
return False
return time.time() < self.expires_at - 60 # 提前1分钟认为过期
def ensure_valid_token(self):
"""确保有有效的令牌"""
if not self.is_token_valid():
self.get_token()
def request(self, method, url, **kwargs):
"""发送请求,自动添加Bearer Token"""
self.ensure_valid_token()
# 添加Authorization头部
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['Authorization'] = f'{self.token_type} {self.access_token}'
return self.session.request(method, url, **kwargs)
def get(self, url, **kwargs):
return self.request('GET', url, **kwargs)
def post(self, url, **kwargs):
return self.request('POST', url, **kwargs)
# 使用示例
if __name__ == '__main__':
# 示例:使用GitHub的OAuth 2.0(客户端凭证流程)
# 注意:GitHub的OAuth 2.0主要用于用户授权,这里仅作演示
# 实际使用中,这些信息应从环境变量或配置文件中获取
client_id = 'your_client_id'
client_secret = 'your_client_secret'
token_url = 'https://github.com/login/oauth/access_token'
oauth_client = OAuth2ClientCredentials(
token_url=token_url,
client_id=client_id,
client_secret=client_secret,
scope='repo,user'
)
try:
# 获取令牌
token_data = oauth_client.get_token()
print(f"获取到令牌: {token_data['access_token'][:20]}...")
print(f"令牌类型: {token_data.get('token_type', 'Bearer')}")
print(f"过期时间: {token_data.get('expires_in', '未知')}秒")
# 使用令牌访问API
# 注意:GitHub的OAuth 2.0需要用户授权,这里仅演示流程
# 对于GitHub,通常使用授权码流程
except Exception as e:
print(f"OAuth 2.0错误: {e}")
# 另一个示例:假设有一个支持客户端凭证的API
print("\n--- 模拟支持客户端凭证的API ---")
class MockOAuth2API:
"""模拟OAuth 2.0 API服务器"""
@staticmethod
def mock_token_endpoint(client_id, client_secret):
"""模拟令牌端点"""
if client_id == 'test_client' and client_secret == 'test_secret':
return {
'access_token': 'mock_access_token_12345',
'token_type': 'Bearer',
'expires_in': 3600,
'scope': 'read write'
}
else:
raise ValueError("无效的客户端凭证")
@staticmethod
def mock_protected_endpoint(token):
"""模拟受保护端点"""
if token == 'Bearer mock_access_token_12345':
return {
'status': 'success',
'data': {'message': '访问受保护资源成功'}
}
else:
return {
'status': 'error',
'error': '无效令牌'
}
# 测试模拟API
mock_oauth = OAuth2ClientCredentials(
token_url='mock://token', # 模拟URL
client_id='test_client',
client_secret='test_secret'
)
# 覆盖get_token方法以使用模拟
original_get_token = mock_oauth.get_token
mock_oauth.get_token = lambda: MockOAuth2API.mock_token_endpoint(
mock_oauth.client_id, mock_oauth.client_secret
)
# 获取令牌
mock_token = mock_oauth.get_token()
print(f"模拟令牌: {mock_token['access_token']}")
# 验证令牌
print(f"令牌有效: {mock_oauth.is_token_valid()}")
import requests
from requests.auth import HTTPBasicAuth
import webbrowser
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json
class OAuth2AuthorizationCode:
"""OAuth 2.0 授权码流程(完整实现)"""
def __init__(self, client_id, client_secret,
authorization_url, token_url,
redirect_uri, scope=None):
self.client_id = client_id
self.client_secret = client_secret
self.authorization_url = authorization_url
self.token_url = token_url
self.redirect_uri = redirect_uri
self.scope = scope
self.access_token = None
self.refresh_token = None
self.token_type = None
self.expires_in = None
# 用于接收授权码的服务器
self.auth_code = None
self.server = None
def start_local_server(self, port=8000):
"""启动本地服务器接收授权码"""
class AuthHandler(BaseHTTPRequestHandler):
def do_GET(self):
# 解析查询参数
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query)
# 获取授权码
if 'code' in params:
self.server.auth_code = params['code'][0]
# 返回成功页面
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
html = """
授权成功!
您可以关闭此窗口并返回应用程序。
"""
self.wfile.write(html.encode())
# 停止服务器
threading.Thread(target=self.server.shutdown).start()
else:
# 错误处理
self.send_response(400)
self.end_headers()
def log_message(self, format, *args):
# 禁用日志输出
pass
# 创建服务器
server = HTTPServer(('localhost', port), AuthHandler)
server.auth_code = None # 存储授权码
# 在后台运行服务器
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
self.server = server
return f'http://localhost:{port}'
def get_authorization_url(self, state=None):
"""构建授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.redirect_uri,
'scope': self.scope or '',
'state': state or 'random_state_string'
}
# 移除空值
params = {k: v for k, v in params.items() if v}
query_string = urllib.parse.urlencode(params)
return f"{self.authorization_url}?{query_string}"
def authorize(self):
"""执行授权流程"""
# 构建授权URL
auth_url = self.get_authorization_url()
print(f"请访问以下URL进行授权:")
print(f"\n{auth_url}\n")
# 自动打开浏览器
try:
webbrowser.open(auth_url)
print("已在浏览器中打开授权页面...")
except:
print("无法自动打开浏览器,请手动复制上面的URL访问")
# 等待用户授权并获取授权码
print("等待授权...")
# 这里需要等待服务器接收到授权码
import time
max_wait = 300 # 最大等待5分钟
start_time = time.time()
while not self.server.auth_code and (time.time() - start_time) < max_wait:
time.sleep(1)
if not self.server.auth_code:
raise TimeoutError("授权超时")
print(f"获取到授权码: {self.server.auth_code[:20]}...")
return self.server.auth_code
def exchange_token(self, authorization_code):
"""使用授权码交换访问令牌"""
auth = HTTPBasicAuth(self.client_id, self.client_secret)
data = {
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': self.redirect_uri
}
response = requests.post(self.token_url, data=data, auth=auth)
response.raise_for_status()
token_data = response.json()
self._update_token_data(token_data)
return token_data
def _update_token_data(self, token_data):
"""更新令牌数据"""
self.access_token = token_data['access_token']
self.token_type = token_data.get('token_type', 'Bearer')
self.expires_in = token_data.get('expires_in')
self.refresh_token = token_data.get('refresh_token')
def refresh_access_token(self):
"""使用刷新令牌获取新的访问令牌"""
if not self.refresh_token:
raise ValueError("没有刷新令牌")
auth = HTTPBasicAuth(self.client_id, self.client_secret)
data = {
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
}
response = requests.post(self.token_url, data=data, auth=auth)
response.raise_for_status()
token_data = response.json()
self._update_token_data(token_data)
return token_data
def get_auth_header(self):
"""获取认证头部"""
if not self.access_token:
raise ValueError("没有访问令牌")
return {'Authorization': f'{self.token_type} {self.access_token}'}
# 完整使用示例(模拟)
if __name__ == '__main__':
print("=== OAuth 2.0 授权码流程演示 ===\n")
# 模拟的OAuth 2.0配置
# 实际使用时,需要从OAuth提供商获取这些信息
config = {
'client_id': 'demo_client_id',
'client_secret': 'demo_client_secret',
'authorization_url': 'https://demo-oauth-server.com/authorize',
'token_url': 'https://demo-oauth-server.com/token',
'redirect_uri': 'http://localhost:8000/callback',
'scope': 'read write profile'
}
# 创建OAuth客户端
oauth_client = OAuth2AuthorizationCode(**config)
try:
# 步骤1: 启动本地服务器接收回调
print("1. 启动本地服务器接收授权码回调...")
local_url = oauth_client.start_local_server()
print(f" 回调地址: {local_url}")
# 步骤2: 获取授权URL并打开浏览器
print("\n2. 生成授权URL...")
auth_url = oauth_client.get_authorization_url()
print(f" 授权URL: {auth_url[:80]}...")
# 在实际应用中,这里会打开浏览器让用户授权
# 由于是演示,我们模拟一个授权码
print("\n3. 模拟用户授权过程...")
# 模拟获取的授权码
mock_auth_code = 'mock_authorization_code_12345'
print(f" 模拟授权码: {mock_auth_code}")
# 步骤3: 使用授权码交换访问令牌
print("\n4. 使用授权码交换访问令牌...")
# 模拟令牌端点响应
class MockTokenServer:
@staticmethod
def mock_token_exchange(auth_code):
if auth_code == 'mock_authorization_code_12345':
return {
'access_token': 'mock_access_token_67890',
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': 'mock_refresh_token_abcde',
'scope': 'read write profile'
}
else:
raise ValueError("无效的授权码")
# 模拟令牌交换
mock_token_data = MockTokenServer.mock_token_exchange(mock_auth_code)
print(f" 获取到访问令牌: {mock_token_data['access_token'][:20]}...")
print(f" 令牌类型: {mock_token_data['token_type']}")
print(f" 过期时间: {mock_token_data['expires_in']}秒")
print(f" 刷新令牌: {mock_token_data['refresh_token'][:20]}...")
# 更新客户端令牌数据
oauth_client._update_token_data(mock_token_data)
# 步骤4: 使用访问令牌访问受保护资源
print("\n5. 使用访问令牌访问API...")
# 创建带认证的请求
headers = oauth_client.get_auth_header()
print(f" 请求头: {headers}")
# 模拟API请求
mock_api_response = {
'status': 'success',
'user': {
'id': '12345',
'name': '演示用户',
'email': 'demo@example.com'
}
}
print(f" 模拟API响应: {json.dumps(mock_api_response, indent=2)}")
# 步骤5: 演示令牌刷新
print("\n6. 演示刷新令牌流程...")
# 模拟刷新令牌响应
mock_refresh_data = {
'access_token': 'new_mock_access_token_11111',
'token_type': 'Bearer',
'expires_in': 3600
}
print(f" 刷新前令牌: {oauth_client.access_token}")
# 模拟刷新
oauth_client._update_token_data(mock_refresh_data)
print(f" 刷新后令牌: {oauth_client.access_token}")
print("\n✅ OAuth 2.0 授权码流程演示完成!")
except Exception as e:
print(f"\n❌ 错误: {e}")
print("\n注意:这是一个模拟演示。在实际应用中,您需要:")
print("1. 从OAuth提供商注册应用获取真实的client_id和client_secret")
print("2. 使用真实的授权URL和令牌URL")
print("3. 处理真实的浏览器重定向和用户交互")
print("4. 安全地存储令牌和刷新令牌")
import os
from requests_oauthlib import OAuth2Session
from flask import Flask, request, redirect, session
import json
# requests-oauthlib是处理OAuth的官方推荐库
# 安装: pip install requests-oauthlib
# 以下是一个完整的Flask应用示例,演示OAuth 2.0授权码流程
def create_flask_oauth_app():
"""创建Flask OAuth 2.0示例应用"""
app = Flask(__name__)
app.secret_key = os.urandom(24) # 用于会话加密
# OAuth 2.0配置
CLIENT_ID = os.getenv('OAUTH_CLIENT_ID', 'demo_client_id')
CLIENT_SECRET = os.getenv('OAUTH_CLIENT_SECRET', 'demo_client_secret')
AUTHORIZATION_BASE_URL = 'https://demo-oauth-server.com/authorize'
TOKEN_URL = 'https://demo-oauth-server.com/token'
REDIRECT_URI = 'http://localhost:5000/callback'
SCOPE = ['read', 'write', 'profile']
# 创建OAuth2Session
def get_oauth_session(token=None, state=None):
return OAuth2Session(
client_id=CLIENT_ID,
token=token,
state=state,
scope=SCOPE,
redirect_uri=REDIRECT_URI,
auto_refresh_kwargs={
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
},
auto_refresh_url=TOKEN_URL,
token_updater=lambda t: session.update({'oauth_token': t})
)
@app.route('/')
def index():
"""首页"""
html = """
OAuth 2.0 演示
OAuth 2.0 授权码流程演示
"""
return html
@app.route('/login')
def login():
"""开始OAuth登录流程"""
oauth = get_oauth_session()
authorization_url, state = oauth.authorization_url(AUTHORIZATION_BASE_URL)
# 保存state到session
session['oauth_state'] = state
return redirect(authorization_url)
@app.route('/callback')
def callback():
"""OAuth回调处理"""
# 验证state
if 'oauth_state' not in session or session['oauth_state'] != request.args.get('state'):
return 'State验证失败', 400
oauth = get_oauth_session(state=session['oauth_state'])
try:
# 获取令牌
token = oauth.fetch_token(
TOKEN_URL,
client_secret=CLIENT_SECRET,
authorization_response=request.url
)
# 保存令牌到session
session['oauth_token'] = token
return redirect('/profile')
except Exception as e:
return f'获取令牌失败: {str(e)}', 400
@app.route('/profile')
def profile():
"""获取用户信息(需要OAuth令牌)"""
if 'oauth_token' not in session:
return redirect('/login')
token = session['oauth_token']
oauth = get_oauth_session(token=token)
try:
# 使用令牌访问API
# 这里假设用户信息端点
response = oauth.get('https://demo-oauth-server.com/api/userinfo')
user_info = response.json()
html = f"""
用户信息
用户信息
{json.dumps(user_info, indent=2)}
"""
return html
except Exception as e:
return f'获取用户信息失败: {str(e)}', 400
@app.route('/logout')
def logout():
"""退出登录"""
session.clear()
return redirect('/')
return app
# 如果你想要运行这个Flask应用,取消下面的注释
# if __name__ == '__main__':
# app = create_flask_oauth_app()
# app.run(debug=True, port=5000)
# 对于非Web应用,可以使用OAuth2Session直接进行客户端凭证流程
def oauth2_client_credentials_example():
"""客户端凭证流程示例(非Web应用)"""
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
client_id = 'your_client_id'
client_secret = 'your_client_secret'
token_url = 'https://api.example.com/oauth/token'
# 创建客户端凭证客户端
client = BackendApplicationClient(client_id=client_id)
oauth = OAuth2Session(client=client)
try:
# 获取令牌
token = oauth.fetch_token(
token_url=token_url,
client_id=client_id,
client_secret=client_secret
)
print(f"获取到令牌: {token['access_token'][:20]}...")
# 使用令牌访问API
# oauth会自动在请求中添加Bearer Token
response = oauth.get('https://api.example.com/api/protected')
if response.status_code == 200:
print(f"API访问成功: {response.json()}")
else:
print(f"API访问失败: {response.status_code}")
except Exception as e:
print(f"OAuth 2.0错误: {e}")
print("requests-oauthlib 示例代码已准备好")
print("要运行完整示例,需要:")
print("1. 安装: pip install requests-oauthlib flask")
print("2. 配置真实的OAuth 2.0提供商信息")
print("3. 运行Flask应用或调用相应函数")
Requests允许你创建自定义认证类来处理非标准的认证机制。这通过继承requests.auth.AuthBase类实现。
import requests
from requests.auth import AuthBase
# 最简单自定义认证:API Key放在查询参数中
class APIKeyAuth(AuthBase):
"""API Key认证(查询参数方式)"""
def __init__(self, api_key):
self.api_key = api_key
def __call__(self, r):
# 添加API Key到URL查询参数
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
parsed_url = urlparse(r.url)
query_params = parse_qs(parsed_url.query)
query_params['api_key'] = self.api_key
# 重建URL
new_query = urlencode(query_params, doseq=True)
r.url = urlunparse(parsed_url._replace(query=new_query))
return r
# 使用自定义API Key认证
api_key = 'your_api_key_12345'
auth = APIKeyAuth(api_key)
response = requests.get('https://api.example.com/data', auth=auth)
print(f"API Key认证状态码: {response.status_code}")
# 另一个示例:自定义头部认证
class CustomHeaderAuth(AuthBase):
"""自定义头部认证"""
def __init__(self, header_name, header_value):
self.header_name = header_name
self.header_value = header_value
def __call__(self, r):
r.headers[self.header_name] = self.header_value
return r
# 使用自定义头部认证
auth2 = CustomHeaderAuth('X-API-Key', 'your_api_key_12345')
response2 = requests.get('https://api.example.com/data', auth=auth2)
print(f"自定义头部认证状态码: {response2.status_code}")
import requests
from requests.auth import AuthBase
import hashlib
import time
import hmac
class HMACAuth(AuthBase):
"""HMAC认证(用于AWS Signature V4等)"""
def __init__(self, access_key, secret_key, service='api', region='us-east-1'):
self.access_key = access_key
self.secret_key = secret_key
self.service = service
self.region = region
def __call__(self, r):
# 获取当前时间
amz_date = time.strftime('%Y%m%dT%H%M%SZ', time.gmtime())
date_stamp = time.strftime('%Y%m%d', time.gmtime())
# 规范请求(简化版,实际AWS签名更复杂)
canonical_request = self._create_canonical_request(r, amz_date)
# 创建字符串签名
string_to_sign = self._create_string_to_sign(canonical_request, amz_date, date_stamp)
# 计算签名
signature = self._calculate_signature(string_to_sign, date_stamp)
# 添加Authorization头部
credential_scope = f"{date_stamp}/{self.region}/{self.service}/aws4_request"
auth_header = (
f"AWS4-HMAC-SHA256 "
f"Credential={self.access_key}/{credential_scope}, "
f"SignedHeaders=host;x-amz-date, "
f"Signature={signature}"
)
r.headers['Authorization'] = auth_header
r.headers['X-Amz-Date'] = amz_date
return r
def _create_canonical_request(self, r, amz_date):
"""创建规范请求(简化版)"""
# 实际AWS签名需要完整的规范请求
return f"{r.method}\n/\n\nhost:{r.headers.get('host', '')}\nx-amz-date:{amz_date}\n\nhost;x-amz-date\nUNSIGNED-PAYLOAD"
def _create_string_to_sign(self, canonical_request, amz_date, date_stamp):
"""创建待签名字符串"""
credential_scope = f"{date_stamp}/{self.region}/{self.service}/aws4_request"
return f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n" + \
hashlib.sha256(canonical_request.encode()).hexdigest()
def _calculate_signature(self, string_to_sign, date_stamp):
"""计算签名"""
# 派生签名密钥
k_date = self._sign(('AWS4' + self.secret_key).encode(), date_stamp)
k_region = self._sign(k_date, self.region)
k_service = self._sign(k_region, self.service)
k_signing = self._sign(k_service, 'aws4_request')
# 计算签名
signature = hmac.new(
k_signing,
string_to_sign.encode(),
hashlib.sha256
).hexdigest()
return signature
def _sign(self, key, msg):
"""HMAC-SHA256签名"""
return hmac.new(key, msg.encode(), hashlib.sha256).digest()
# 使用HMAC认证(简化示例)
# 注意:这只是一个演示,实际AWS签名更复杂
hmac_auth = HMACAuth(
access_key='AKIAIOSFODNN7EXAMPLE',
secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)
# 实际使用可能需要更多配置
# response = requests.get('https://api.example.com/data', auth=hmac_auth)
import requests
from requests.auth import AuthBase
import base64
class MultiAuth(AuthBase):
"""多重认证(支持多种认证方式)"""
def __init__(self, auth_methods=None):
self.auth_methods = auth_methods or []
def add_auth(self, auth_method):
"""添加认证方法"""
self.auth_methods.append(auth_method)
def __call__(self, r):
# 应用所有认证方法
for auth_method in self.auth_methods:
r = auth_method(r)
return r
# 创建组合认证
class APIKeyInHeader(AuthBase):
"""API Key在头部"""
def __init__(self, api_key):
self.api_key = api_key
def __call__(self, r):
r.headers['X-API-Key'] = self.api_key
return r
class APIKeyInParam(AuthBase):
"""API Key在查询参数"""
def __init__(self, api_key):
self.api_key = api_key
def __call__(self, r):
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
parsed_url = urlparse(r.url)
query_params = parse_qs(parsed_url.query)
query_params['api_key'] = self.api_key
new_query = urlencode(query_params, doseq=True)
r.url = urlunparse(parsed_url._replace(query=new_query))
return r
class TimestampAuth(AuthBase):
"""时间戳认证(防止重放攻击)"""
def __init__(self):
import time
self.timestamp = str(int(time.time()))
def __call__(self, r):
r.headers['X-Timestamp'] = self.timestamp
return r
class SignatureAuth(AuthBase):
"""签名认证"""
def __init__(self, secret_key):
self.secret_key = secret_key
def __call__(self, r):
import hashlib
import hmac
# 创建签名字符串
string_to_sign = f"{r.method}\n{r.url}\n{r.headers.get('X-Timestamp', '')}"
# 计算签名
signature = hmac.new(
self.secret_key.encode(),
string_to_sign.encode(),
hashlib.sha256
).hexdigest()
r.headers['X-Signature'] = signature
return r
# 使用多重认证
multi_auth = MultiAuth()
multi_auth.add_auth(APIKeyInHeader('header_api_key_123'))
multi_auth.add_auth(APIKeyInParam('param_api_key_456'))
multi_auth.add_auth(TimestampAuth())
multi_auth.add_auth(SignatureAuth('my_secret_key'))
# 创建请求
session = requests.Session()
session.auth = multi_auth
# 查看请求头部
from requests import Request
req = Request('GET', 'https://api.example.com/data', auth=multi_auth)
prepared = session.prepare_request(req)
print("多重认证请求头部:")
for key, value in prepared.headers.items():
print(f" {key}: {value}")
print(f"\n请求URL: {prepared.url}")
# 实际发送请求
# response = session.send(prepared)
# print(f"响应状态码: {response.status_code}")
import requests
from requests.auth import AuthBase
from enum import Enum
class AuthType(Enum):
"""认证类型枚举"""
BASIC = 'basic'
BEARER = 'bearer'
API_KEY_HEADER = 'api_key_header'
API_KEY_PARAM = 'api_key_param'
CUSTOM = 'custom'
class AuthFactory:
"""认证工厂"""
@staticmethod
def create_auth(auth_type, **kwargs):
"""创建认证对象"""
if auth_type == AuthType.BASIC:
from requests.auth import HTTPBasicAuth
return HTTPBasicAuth(kwargs.get('username'), kwargs.get('password'))
elif auth_type == AuthType.BEARER:
class BearerAuth(AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers['Authorization'] = f'Bearer {self.token}'
return r
return BearerAuth(kwargs.get('token'))
elif auth_type == AuthType.API_KEY_HEADER:
class APIKeyHeaderAuth(AuthBase):
def __init__(self, api_key, header_name='X-API-Key'):
self.api_key = api_key
self.header_name = header_name
def __call__(self, r):
r.headers[self.header_name] = self.api_key
return r
return APIKeyHeaderAuth(
kwargs.get('api_key'),
kwargs.get('header_name', 'X-API-Key')
)
elif auth_type == AuthType.API_KEY_PARAM:
class APIKeyParamAuth(AuthBase):
def __init__(self, api_key, param_name='api_key'):
self.api_key = api_key
self.param_name = param_name
def __call__(self, r):
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
parsed_url = urlparse(r.url)
query_params = parse_qs(parsed_url.query)
query_params[self.param_name] = self.api_key
new_query = urlencode(query_params, doseq=True)
r.url = urlunparse(parsed_url._replace(query=new_query))
return r
return APIKeyParamAuth(
kwargs.get('api_key'),
kwargs.get('param_name', 'api_key')
)
elif auth_type == AuthType.CUSTOM:
# 使用自定义认证类
custom_class = kwargs.get('custom_class')
if custom_class and issubclass(custom_class, AuthBase):
return custom_class(**kwargs.get('params', {}))
else:
raise ValueError("无效的自定义认证类")
else:
raise ValueError(f"不支持的认证类型: {auth_type}")
# 使用认证工厂
# 创建Basic认证
basic_auth = AuthFactory.create_auth(
AuthType.BASIC,
username='admin',
password='secret123'
)
# 创建Bearer认证
bearer_auth = AuthFactory.create_auth(
AuthType.BEARER,
token='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
)
# 创建API Key头部认证
api_key_auth = AuthFactory.create_auth(
AuthType.API_KEY_HEADER,
api_key='api_key_12345',
header_name='X-API-Key'
)
# 创建API Key参数认证
api_key_param_auth = AuthFactory.create_auth(
AuthType.API_KEY_PARAM,
api_key='api_key_67890',
param_name='api_key'
)
# 测试各种认证
test_url = 'https://httpbin.org/headers'
for auth_name, auth_obj in [
('Basic', basic_auth),
('Bearer', bearer_auth),
('API Key Header', api_key_auth)
]:
try:
response = requests.get(test_url, auth=auth_obj)
print(f"{auth_name}认证测试: 状态码 {response.status_code}")
except Exception as e:
print(f"{auth_name}认证测试失败: {e}")
# 配置文件驱动的认证
class ConfigDrivenAuth:
"""配置文件驱动的认证"""
def __init__(self, config_path='auth_config.json'):
import json
import os
self.config_path = config_path
self.config = self._load_config()
def _load_config(self):
"""加载配置文件"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
else:
# 默认配置
return {
'default_auth': 'api_key_header',
'auth_methods': {
'basic': {
'username': '${BASIC_USERNAME}',
'password': '${BASIC_PASSWORD}'
},
'bearer': {
'token': '${BEARER_TOKEN}'
},
'api_key_header': {
'api_key': '${API_KEY}',
'header_name': 'X-API-Key'
}
}
}
def _resolve_env_vars(self, value):
"""解析环境变量"""
if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
env_var = value[2:-1]
return os.getenv(env_var, '')
return value
def get_auth(self, auth_type=None):
"""根据配置获取认证对象"""
auth_type = auth_type or self.config.get('default_auth', 'api_key_header')
auth_config = self.config['auth_methods'].get(auth_type, {})
# 解析环境变量
resolved_config = {}
for key, value in auth_config.items():
resolved_config[key] = self._resolve_env_vars(value)
# 映射配置到AuthType
auth_type_map = {
'basic': AuthType.BASIC,
'bearer': AuthType.BEARER,
'api_key_header': AuthType.API_KEY_HEADER,
'api_key_param': AuthType.API_KEY_PARAM
}
if auth_type in auth_type_map:
return AuthFactory.create_auth(auth_type_map[auth_type], **resolved_config)
else:
raise ValueError(f"不支持的认证类型: {auth_type}")
# 使用配置驱动的认证
config_auth = ConfigDrivenAuth()
# 从配置获取认证(会从环境变量读取值)
# 需要设置环境变量:BASIC_USERNAME, BASIC_PASSWORD等
try:
auth_from_config = config_auth.get_auth('basic')
print(f"\n从配置获取Basic认证: {type(auth_from_config).__name__}")
except Exception as e:
print(f"\n配置认证失败(需要设置环境变量): {e}")
使用Session对象可以统一管理认证信息,避免为每个请求重复配置。
import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
# 创建Session并配置认证
session = requests.Session()
# 方法1:使用auth属性
session.auth = HTTPBasicAuth('username', 'password')
# 方法2:使用headers(不推荐用于Basic/Digest,适合Bearer Token)
# session.headers.update({'Authorization': 'Bearer token_here'})
# 方法3:使用自定义认证类
class CustomSessionAuth:
"""Session级别的自定义认证"""
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers['X-Session-Token'] = self.token
return r
session.auth = CustomSessionAuth('session_token_123')
# 现在所有通过该Session的请求都会自动使用认证
response1 = session.get('https://api.example.com/resource1')
response2 = session.post('https://api.example.com/resource2', json={'data': 'test'})
print(f"请求1状态码: {response1.status_code}")
print(f"请求2状态码: {response2.status_code}")
# 临时覆盖Session的认证
response3 = session.get(
'https://api.example.com/public',
auth=None # 这个请求不使用认证
)
print(f"请求3状态码: {response3.status_code}")
# 使用不同的认证
temp_auth = HTTPDigestAuth('guest', 'readonly')
response4 = session.get(
'https://api.example.com/guest-area',
auth=temp_auth # 临时使用Digest认证
)
print(f"请求4状态码: {response4.status_code}")
# 清除Session认证
session.auth = None
print(f"Session认证已清除: {session.auth}")
import requests
from requests.auth import HTTPBasicAuth
class MultiAuthManager:
"""多认证会话管理器"""
def __init__(self):
self.sessions = {} # 存储不同认证的Session
def get_session(self, auth_key, auth_config=None):
"""获取或创建指定认证的Session"""
if auth_key not in self.sessions:
session = requests.Session()
# 配置认证
if auth_config:
self._configure_session_auth(session, auth_config)
self.sessions[auth_key] = session
return self.sessions[auth_key]
def _configure_session_auth(self, session, auth_config):
"""配置Session认证"""
auth_type = auth_config.get('type', 'none')
if auth_type == 'basic':
session.auth = HTTPBasicAuth(
auth_config.get('username'),
auth_config.get('password')
)
elif auth_type == 'bearer':
session.headers.update({
'Authorization': f'Bearer {auth_config.get("token")}'
})
elif auth_type == 'api_key':
session.headers.update({
auth_config.get('header_name', 'X-API-Key'): auth_config.get('api_key')
})
# 设置其他公共头部
session.headers.update({
'User-Agent': 'MultiAuthManager/1.0',
'Accept': 'application/json'
})
def close_all(self):
"""关闭所有Session"""
for session in self.sessions.values():
session.close()
self.sessions.clear()
# 使用多认证管理器
auth_manager = MultiAuthManager()
# 配置不同认证的Session
admin_config = {
'type': 'basic',
'username': 'admin',
'password': 'admin123'
}
api_config = {
'type': 'api_key',
'api_key': 'api_key_12345',
'header_name': 'X-API-Key'
}
user_config = {
'type': 'bearer',
'token': 'user_token_67890'
}
# 获取不同认证的Session
admin_session = auth_manager.get_session('admin', admin_config)
api_session = auth_manager.get_session('api', api_config)
user_session = auth_manager.get_session('user', user_config)
# 使用不同认证的Session访问相应API
try:
# 使用管理员认证访问管理API
admin_response = admin_session.get('https://api.example.com/admin/users')
print(f"管理员请求状态码: {admin_response.status_code}")
# 使用API Key访问数据API
api_response = api_session.get('https://api.example.com/api/data')
print(f"API Key请求状态码: {api_response.status_code}")
# 使用用户Token访问用户API
user_response = user_session.get('https://api.example.com/user/profile')
print(f"用户Token请求状态码: {user_response.status_code}")
finally:
# 清理资源
auth_manager.close_all()
print("所有Session已关闭")
# 上下文管理器版本
class AuthSessionManager:
"""认证会话管理器(上下文管理器)"""
def __init__(self, auth_configs):
self.auth_configs = auth_configs
self.sessions = {}
def __enter__(self):
# 创建所有Session
for key, config in self.auth_configs.items():
session = requests.Session()
self._configure_session(session, config)
self.sessions[key] = session
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# 关闭所有Session
for session in self.sessions.values():
session.close()
self.sessions.clear()
def _configure_session(self, session, config):
"""配置Session"""
if 'headers' in config:
session.headers.update(config['headers'])
if 'auth' in config:
session.auth = config['auth']
def get(self, auth_key):
"""获取指定认证的Session"""
return self.sessions.get(auth_key)
# 使用上下文管理器
auth_configs = {
'github': {
'headers': {
'Authorization': 'token your_github_token',
'Accept': 'application/vnd.github.v3+json'
}
},
'slack': {
'headers': {
'Authorization': 'Bearer your_slack_token'
}
}
}
with AuthSessionManager(auth_configs) as manager:
github_session = manager.get('github')
slack_session = manager.get('slack')
# 使用不同的Session访问不同的API
# github_response = github_session.get('https://api.github.com/user')
# slack_response = slack_session.get('https://slack.com/api/auth.test')
print("在上下文管理器中使用多个认证Session")
print("上下文管理器已退出,所有资源已清理")
import requests
import time
from requests.auth import AuthBase
class StatefulAuthSession:
"""有状态认证Session,处理认证过期和刷新"""
def __init__(self, base_url, auth_provider):
self.base_url = base_url
self.auth_provider = auth_provider # 认证提供者
self.session = requests.Session()
self.is_authenticated = False
self.auth_expiry = None
# 设置默认头部
self.session.headers.update({
'User-Agent': 'StatefulAuthClient/1.0',
'Accept': 'application/json'
})
def authenticate(self):
"""进行认证"""
try:
auth_data = self.auth_provider.authenticate()
# 配置Session认证
if 'token' in auth_data:
self.session.headers['Authorization'] = f'Bearer {auth_data["token"]}'
# 更新认证状态
self.is_authenticated = True
# 设置过期时间
if 'expires_in' in auth_data:
self.auth_expiry = time.time() + auth_data['expires_in']
print("认证成功")
return True
except Exception as e:
print(f"认证失败: {e}")
self.is_authenticated = False
return False
def check_auth(self):
"""检查认证状态"""
if not self.is_authenticated:
return False
# 检查是否过期
if self.auth_expiry and time.time() > self.auth_expiry:
print("认证已过期")
self.is_authenticated = False
return False
return True
def ensure_auth(self):
"""确保有有效的认证"""
if not self.check_auth():
return self.authenticate()
return True
def request(self, method, endpoint, **kwargs):
"""发送请求,自动处理认证"""
# 确保有有效的认证
if not self.ensure_auth():
raise Exception("无法建立有效认证")
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# 发送请求
response = self.session.request(method, url, **kwargs)
# 检查是否需要重新认证
if response.status_code == 401:
print("认证失效,尝试重新认证...")
if self.authenticate():
# 重试请求
response = self.session.request(method, url, **kwargs)
return response
def get(self, endpoint, **kwargs):
return self.request('GET', endpoint, **kwargs)
def post(self, endpoint, **kwargs):
return self.request('POST', endpoint, **kwargs)
def close(self):
"""关闭Session"""
self.session.close()
self.is_authenticated = False
# 示例认证提供者
class MockAuthProvider:
"""模拟认证提供者"""
def authenticate(self):
# 模拟认证过程
import random
# 模拟网络延迟
time.sleep(0.5)
# 生成模拟token
token = f"mock_token_{random.randint(10000, 99999)}"
return {
'token': token,
'expires_in': 30, # 30秒过期,方便测试
'token_type': 'Bearer'
}
# 使用有状态认证Session
if __name__ == '__main__':
auth_provider = MockAuthProvider()
client = StatefulAuthSession('https://api.example.com', auth_provider)
try:
# 首次认证
print("进行首次认证...")
if client.authenticate():
print("认证状态:", client.is_authenticated)
print("过期时间:", client.auth_expiry)
# 发送请求
print("\n发送请求1...")
response1 = client.get('/resource1')
print(f"请求1状态码: {response1.status_code}")
# 等待过期
print("\n等待认证过期...")
time.sleep(35) # 等待超过30秒
# 再次发送请求(应该自动重新认证)
print("\n发送请求2(认证已过期)...")
response2 = client.get('/resource2')
print(f"请求2状态码: {response2.status_code}")
print("认证状态:", client.is_authenticated)
# 模拟401错误
print("\n模拟401错误处理...")
# 这里可以创建一个模拟服务器返回401,然后测试重认证逻辑
finally:
client.close()
print("\nSession已关闭")
认证信息是系统的钥匙,一旦泄露可能导致严重的安全问题。以下最佳实践至关重要。
import os
import requests
from dotenv import load_dotenv # pip install python-dotenv
from requests.auth import HTTPBasicAuth
import keyring # pip install keyring(系统密钥库)
# 方法1:使用python-dotenv加载环境变量
load_dotenv() # 从.env文件加载环境变量
# 从环境变量获取凭证(最常用)
username = os.getenv('API_USERNAME')
password = os.getenv('API_PASSWORD')
api_key = os.getenv('API_KEY')
if not all([username, password]):
raise ValueError("请设置API_USERNAME和API_PASSWORD环境变量")
# 使用环境变量的凭证
auth = HTTPBasicAuth(username, password)
# 方法2:使用系统密钥库(更安全)
def get_credential_from_keyring(service_name, username):
"""从系统密钥库获取凭证"""
try:
return keyring.get_password(service_name, username)
except:
return None
# 存储凭证到密钥库
# keyring.set_password('my_api_service', 'api_user', 'secret_password')
# 从密钥库读取
stored_password = get_credential_from_keyring('my_api_service', 'api_user')
if stored_password:
auth = HTTPBasicAuth('api_user', stored_password)
# 方法3:使用配置文件(配合加密)
import json
import base64
from cryptography.fernet import Fernet # pip install cryptography
class SecureConfig:
"""安全配置管理器"""
def __init__(self, config_path='config.enc', key_path='key.key'):
self.config_path = config_path
self.key_path = key_path
self.key = self._load_or_create_key()
self.cipher = Fernet(self.key)
def _load_or_create_key(self):
"""加载或创建加密密钥"""
if os.path.exists(self.key_path):
with open(self.key_path, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(self.key_path, 'wb') as f:
f.write(key)
# 设置文件权限(仅所有者可读)
os.chmod(self.key_path, 0o600)
return key
def save_config(self, config_data):
"""保存加密配置"""
json_str = json.dumps(config_data)
encrypted = self.cipher.encrypt(json_str.encode())
with open(self.config_path, 'wb') as f:
f.write(encrypted)
os.chmod(self.config_path, 0o600)
def load_config(self):
"""加载解密配置"""
if not os.path.exists(self.config_path):
return {}
with open(self.config_path, 'rb') as f:
encrypted = f.read()
decrypted = self.cipher.decrypt(encrypted)
return json.loads(decrypted.decode())
# 使用安全配置
config = SecureConfig()
# 保存配置(加密)
config.save_config({
'api_url': 'https://api.example.com',
'username': 'api_user',
'password': 'secret_password', # 实际中不应该这样存储密码
'api_key': 'key_12345'
})
# 加载配置
loaded_config = config.load_config()
print(f"加载的配置: {loaded_config.get('api_url')}")
# 方法4:使用AWS Secrets Manager或其他云服务
try:
import boto3
from botocore.exceptions import ClientError
def get_secret_from_aws(secret_name, region_name='us-east-1'):
"""从AWS Secrets Manager获取密钥"""
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in response:
return json.loads(response['SecretString'])
else:
decoded_binary_secret = base64.b64decode(
response['SecretBinary']
)
return json.loads(decoded_binary_secret)
except ClientError as e:
print(f"AWS Secrets Manager错误: {e}")
return None
# 使用示例
# aws_secret = get_secret_from_aws('my-api-credentials')
# if aws_secret:
# username = aws_secret.get('username')
# password = aws_secret.get('password')
except ImportError:
print("boto3未安装,跳过AWS Secrets Manager示例")
# 完整的认证客户端示例
class SecureAPIClient:
"""安全的API客户端"""
def __init__(self, config_source='env'):
self.config_source = config_source
self.credentials = self._load_credentials()
self.session = requests.Session()
# 配置Session
self._configure_session()
def _load_credentials(self):
"""根据配置源加载凭证"""
if self.config_source == 'env':
return {
'username': os.getenv('API_USERNAME'),
'password': os.getenv('API_PASSWORD'),
'api_key': os.getenv('API_KEY')
}
elif self.config_source == 'keyring':
return {
'username': 'api_user',
'password': get_credential_from_keyring('my_api', 'api_user')
}
elif self.config_source == 'file':
config = SecureConfig()
return config.load_config()
else:
raise ValueError(f"不支持的配置源: {self.config_source}")
def _configure_session(self):
"""配置Session"""
# 添加认证头部
if self.credentials.get('api_key'):
self.session.headers['X-API-Key'] = self.credentials['api_key']
# 或者使用Basic认证
elif self.credentials.get('username') and self.credentials.get('password'):
self.session.auth = HTTPBasicAuth(
self.credentials['username'],
self.credentials['password']
)
# 其他公共头部
self.session.headers.update({
'User-Agent': 'SecureAPIClient/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
# 配置安全选项
self.session.verify = True # 启用SSL验证
self.session.timeout = 30 # 设置超时
def request(self, method, url, **kwargs):
"""发送请求"""
# 记录请求(不记录敏感信息)
safe_url = url
if 'api_key' in self.session.headers:
safe_url = url.replace(self.session.headers['X-API-Key'], '***')
print(f"发送请求: {method} {safe_url}")
response = self.session.request(method, url, **kwargs)
# 检查响应
if response.status_code == 401:
print("认证失败,可能需要更新凭证")
elif response.status_code == 403:
print("权限不足")
return response
def close(self):
"""关闭Session"""
self.session.close()
# 使用安全客户端
with SecureAPIClient(config_source='env') as client:
# 发送请求
response = client.request('GET', 'https://api.example.com/data')
print(f"响应状态码: {response.status_code}")
import requests
import time
import logging
from datetime import datetime
from requests.auth import AuthBase
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('auth_audit.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('AuthAudit')
class AuditedAuth(AuthBase):
"""带审计的自定义认证"""
def __init__(self, original_auth, user_id, client_ip='unknown'):
self.original_auth = original_auth
self.user_id = user_id
self.client_ip = client_ip
self.request_count = 0
def __call__(self, r):
self.request_count += 1
# 记录审计信息(不记录敏感信息)
audit_info = {
'timestamp': datetime.now().isoformat(),
'user_id': self.user_id,
'client_ip': self.client_ip,
'method': r.method,
'url': r.url,
'request_number': self.request_count
}
logger.info(f"认证请求: {audit_info}")
# 应用原始认证
return self.original_auth(r)
def get_stats(self):
"""获取统计信息"""
return {
'user_id': self.user_id,
'total_requests': self.request_count,
'last_activity': datetime.now().isoformat()
}
class MonitoredSession:
"""监控Session"""
def __init__(self, session_name, alert_threshold=100):
self.session_name = session_name
self.session = requests.Session()
self.request_count = 0
self.alert_threshold = alert_threshold
self.start_time = time.time()
# 监控指标
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'auth_failures': 0,
'last_request_time': None
}
def request(self, method, url, **kwargs):
"""发送请求并监控"""
self.request_count += 1
self.metrics['total_requests'] += 1
# 检查请求频率
if self.request_count > self.alert_threshold:
logger.warning(
f"高频率请求警告: {self.session_name} "
f"已发送 {self.request_count} 个请求"
)
start_time = time.time()
try:
response = self.session.request(method, url, **kwargs)
elapsed = time.time() - start_time
# 更新指标
if 200 <= response.status_code < 300:
self.metrics['successful_requests'] += 1
else:
self.metrics['failed_requests'] += 1
# 记录认证失败
if response.status_code == 401:
self.metrics['auth_failures'] += 1
logger.error(f"认证失败: {method} {url}")
# 记录慢请求
if elapsed > 5.0: # 超过5秒认为是慢请求
logger.warning(
f"慢请求: {method} {url} 耗时 {elapsed:.2f}秒"
)
self.metrics['last_request_time'] = datetime.now().isoformat()
return response
except Exception as e:
self.metrics['failed_requests'] += 1
logger.error(f"请求异常: {method} {url} - {e}")
raise
def get_metrics(self):
"""获取监控指标"""
uptime = time.time() - self.start_time
requests_per_second = self.metrics['total_requests'] / max(uptime, 1)
return {
**self.metrics,
'session_name': self.session_name,
'uptime_seconds': uptime,
'requests_per_second': requests_per_second,
'success_rate': (
self.metrics['successful_requests'] /
max(self.metrics['total_requests'], 1) * 100
)
}
# 使用审计和监控
# 创建基础认证
basic_auth = HTTPBasicAuth('admin', 'secret123')
# 包装为带审计的认证
audited_auth = AuditedAuth(
original_auth=basic_auth,
user_id='user_12345',
client_ip='192.168.1.100'
)
# 创建监控Session
monitored_session = MonitoredSession(
session_name='AdminAPISession',
alert_threshold=50
)
# 配置Session
monitored_session.session.auth = audited_auth
# 发送请求
try:
response = monitored_session.request(
'GET',
'https://api.example.com/admin/users'
)
print(f"请求状态码: {response.status_code}")
# 获取审计信息
auth_stats = audited_auth.get_stats()
print(f"用户 {auth_stats['user_id']} 发送了 {auth_stats['total_requests']} 个请求")
# 获取监控指标
metrics = monitored_session.get_metrics()
print(f"Session指标: {metrics['success_rate']:.1f}% 成功率")
except Exception as e:
print(f"请求失败: {e}")
# 定期报告
def generate_security_report(sessions):
"""生成安全报告"""
report = {
'generated_at': datetime.now().isoformat(),
'total_sessions': len(sessions),
'sessions': [],
'summary': {
'total_requests': 0,
'auth_failures': 0,
'suspicious_activity': []
}
}
for session in sessions:
metrics = session.get_metrics()
report['sessions'].append(metrics)
report['summary']['total_requests'] += metrics['total_requests']
report['summary']['auth_failures'] += metrics['auth_failures']
# 检测可疑活动
if metrics['auth_failures'] > 10:
report['summary']['suspicious_activity'].append({
'session': metrics['session_name'],
'reason': '过多的认证失败',
'count': metrics['auth_failures']
})
if metrics['requests_per_second'] > 10:
report['summary']['suspicious_activity'].append({
'session': metrics['session_name'],
'reason': '请求频率过高',
'rate': metrics['requests_per_second']
})
return report
# 生成报告
report = generate_security_report([monitored_session])
print(f"\n安全报告: {json.dumps(report, indent=2)}")
安全是一个持续的过程,而不是一次性的任务。 定期审查和更新你的认证机制,关注安全公告,及时修补漏洞。对于生产系统,考虑使用专业的安全审计工具和服务。
Requests库提供了强大而灵活的身份认证支持,从简单的Basic Auth到复杂的OAuth 2.0。关键要点:
AuthBase类处理特殊认证需求记住:无论使用哪种认证方式,安全都是首要考虑因素。始终遵循最小权限原则,安全地存储和管理凭证,使用HTTPS加密传输,并实现适当的监控和审计机制。