从开始连接到建立连接的最大时间
从连接建立到接收完整响应的最大时间
超时设置是HTTP客户端的重要功能,它定义了客户端等待服务器响应的最长时间。合理的超时设置可以:
ConnectTimeout - 连接超时异常ReadTimeout - 读取超时异常Timeout - 通用超时异常Requests通过timeout参数设置超时时间,支持多种格式。
import requests
# 设置总超时时间(连接+读取)
timeout_seconds = 5
try:
response = requests.get(
'https://httpbin.org/delay/3', # 这个端点会延迟3秒
timeout=timeout_seconds
)
print(f"请求成功: {response.status_code}")
except requests.exceptions.Timeout:
print(f"请求超时({timeout_seconds}秒)")
# 测试不同延迟
delays = [2, 6, 10]
print("\n=== 测试不同延迟 ===")
for delay in delays:
url = f'https://httpbin.org/delay/{delay}'
try:
response = requests.get(url, timeout=5)
print(f"延迟{delay}秒: ✓ 成功 ({response.status_code})")
except requests.exceptions.Timeout:
print(f"延迟{delay}秒: ✗ 超时")
except Exception as e:
print(f"延迟{delay}秒: ✗ 其他错误: {e}")
import requests
# 分别设置连接超时和读取超时
# 格式: (连接超时, 读取超时)
timeout_settings = (2.5, 5) # 连接超时2.5秒,读取超时5秒
try:
response = requests.get(
'https://httpbin.org/delay/3',
timeout=timeout_settings
)
print(f"请求成功: {response.status_code}")
except requests.exceptions.ConnectTimeout:
print("连接超时:无法在2.5秒内建立连接")
except requests.exceptions.ReadTimeout:
print("读取超时:连接已建立,但在5秒内未收到完整响应")
except requests.exceptions.Timeout:
print("请求超时(通用超时异常)")
# 测试不同场景
test_cases = [
('连接慢的情况', 'https://httpbin.org/delay/1', (0.5, 10)),
('响应慢的情况', 'https://httpbin.org/delay/10', (2, 3)),
('正常情况', 'https://httpbin.org/delay/2', (3, 5)),
]
print("\n=== 测试不同场景 ===")
for name, url, timeout in test_cases:
try:
response = requests.get(url, timeout=timeout)
print(f"{name}: ✓ 成功")
except requests.exceptions.ConnectTimeout:
print(f"{name}: ✗ 连接超时")
except requests.exceptions.ReadTimeout:
print(f"{name}: ✗ 读取超时")
except Exception as e:
print(f"{name}: ✗ 其他错误: {type(e).__name__}")
import requests
# 1. 整数(秒)
timeout_int = 5
print(f"整数超时: {timeout_int}秒")
# 2. 浮点数(支持小数秒)
timeout_float = 2.5
print(f"浮点数超时: {timeout_float}秒")
# 3. 元组(连接超时, 读取超时)
timeout_tuple = (3, 7)
print(f"元组超时: 连接{timeout_tuple[0]}秒, 读取{timeout_tuple[1]}秒")
# 4. 列表(也可以工作,但不推荐)
timeout_list = [3, 7]
print(f"列表超时(不推荐): {timeout_list}")
# 5. None(不超时 - 慎用!)
# timeout_none = None # 危险!可能导致程序永远等待
# 测试各种格式
test_url = 'https://httpbin.org/delay/2'
timeout_formats = [
('整数', 3),
('浮点数', 2.5),
('元组', (1, 4)),
('元组(只有连接超时)', (3, None)), # 只设置连接超时
('元组(只有读取超时)', (None, 3)), # 只设置读取超时
]
print("\n=== 测试不同超时格式 ===")
for name, timeout in timeout_formats:
try:
response = requests.get(test_url, timeout=timeout)
print(f"{name}: ✓ 成功 (超时设置: {timeout})")
except requests.exceptions.Timeout:
print(f"{name}: ✗ 超时 (超时设置: {timeout})")
except Exception as e:
print(f"{name}: ✗ 错误: {type(e).__name__} (超时设置: {timeout})")
# 6. 使用Session设置默认超时
print("\n=== Session默认超时 ===")
session = requests.Session()
# 为Session设置默认超时
session.request = lambda method, url, **kwargs: requests.Session.request(
session, method, url,
timeout=kwargs.pop('timeout', 10), # 默认10秒超时
**kwargs
)
try:
response = session.get('https://httpbin.org/delay/5')
print(f"Session默认超时: ✓ 成功")
except requests.exceptions.Timeout:
print(f"Session默认超时: ✗ 超时")
session.close()
timeout=None - 这会导致请求无限期等待| 超时类型 | 异常类 | 触发时机 | 典型场景 |
|---|---|---|---|
| 连接超时 | ConnectTimeout |
建立TCP连接的时间超过设定值 | 服务器宕机、网络不通、防火墙阻挡 |
| 读取超时 | ReadTimeout |
从连接建立到接收完整响应的时间超过设定值 | 服务器处理慢、网络延迟高、响应体过大 |
| 总超时 | Timeout |
单个超时值时的通用超时 | 简单的超时控制,不区分连接和读取 |
import requests
import socket
def test_connect_timeout():
"""测试连接超时"""
print("=== 连接超时测试 ===")
# 1. 测试不可达的主机(快速失败)
unreachable_hosts = [
'https://192.0.2.1', # 测试地址,通常不可达
'https://10.255.255.1', # 私有地址,通常不可达
'https://nonexistent.example.com', # 不存在的域名
]
for url in unreachable_hosts:
print(f"\n测试: {url}")
try:
# 设置很短的连接超时
response = requests.get(url, timeout=(0.1, 5))
print(f" ✓ 成功(不应该发生)")
except requests.exceptions.ConnectTimeout:
print(f" ✗ 连接超时(预期)")
except socket.gaierror:
print(f" ✗ 域名解析失败")
except Exception as e:
print(f" ✗ 其他错误: {type(e).__name__}")
# 2. 测试连接超时与读取超时的区别
print(f"\n=== 连接超时 vs 读取超时 ===")
# 模拟慢连接(实际测试需要能控制连接时间的服务器)
test_cases = [
('正常连接,快速响应', 'https://httpbin.org/delay/1', (3, 3)),
('慢连接,快速响应', 'https://httpbin.org/delay/1', (0.5, 3)), # 连接超时会触发
('正常连接,慢响应', 'https://httpbin.org/delay/5', (3, 2)), # 读取超时会触发
]
for name, url, timeout in test_cases:
print(f"\n测试: {name}")
print(f" 超时设置: 连接{timeout[0]}秒, 读取{timeout[1]}秒")
try:
response = requests.get(url, timeout=timeout)
print(f" ✓ 成功: {response.status_code}")
except requests.exceptions.ConnectTimeout:
print(f" ✗ 连接超时")
except requests.exceptions.ReadTimeout:
print(f" ✗ 读取超时")
except Exception as e:
print(f" ✗ 其他错误: {type(e).__name__}")
# 连接超时的根本原因
def analyze_connect_timeout_causes():
"""分析连接超时的可能原因"""
causes = {
'DNS解析失败': '域名无法解析为IP地址',
'网络不可达': '目标服务器网络不通',
'防火墙阻挡': '防火墙阻止了连接',
'服务器宕机': '目标服务器没有运行',
'端口关闭': '目标端口没有服务监听',
'连接队列满': '服务器连接队列已满',
'路由问题': '网络路由错误',
}
print("=== 连接超时可能原因 ===")
for cause, description in causes.items():
print(f" • {cause}: {description}")
# 诊断连接问题
def diagnose_connection(url):
"""诊断连接问题"""
from urllib.parse import urlparse
import socket
parsed = urlparse(url)
hostname = parsed.hostname
port = parsed.port or (443 if parsed.scheme == 'https' else 80)
print(f"诊断连接: {url}")
print(f" 主机: {hostname}")
print(f" 端口: {port}")
# 1. 测试DNS解析
try:
ip_address = socket.gethostbyname(hostname)
print(f" ✓ DNS解析成功: {ip_address}")
except socket.gaierror:
print(f" ✗ DNS解析失败")
return
# 2. 测试端口连通性
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2) # 2秒超时
result = sock.connect_ex((ip_address, port))
sock.close()
if result == 0:
print(f" ✓ 端口 {port} 可达")
else:
print(f" ✗ 端口 {port} 不可达 (错误代码: {result})")
except socket.timeout:
print(f" ✗ 连接超时")
except Exception as e:
print(f" ✗ 连接错误: {e}")
# 运行测试
if __name__ == "__main__":
test_connect_timeout()
analyze_connect_timeout_causes()
print("\n")
diagnose_connection('https://httpbin.org')
import requests
def test_read_timeout():
"""测试读取超时"""
print("=== 读取超时测试 ===")
# 1. 测试慢响应
print("\n1. 测试慢响应接口:")
# httpbin的/delay端点可以模拟慢响应
delays = [1, 3, 6, 10]
read_timeout = 5 # 读取超时5秒
for delay in delays:
url = f'https://httpbin.org/delay/{delay}'
print(f"\n 延迟 {delay} 秒, 读取超时 {read_timeout} 秒:")
try:
# 设置较长的连接超时,较短的读取超时
response = requests.get(url, timeout=(10, read_timeout))
print(f" ✓ 成功: {response.status_code}")
except requests.exceptions.ReadTimeout:
print(f" ✗ 读取超时 (响应时间 > {read_timeout}秒)")
except Exception as e:
print(f" ✗ 其他错误: {type(e).__name__}")
# 2. 测试大文件下载(读取超时)
print("\n2. 测试大文件下载(流式):")
# 使用stream=True和分块读取
def download_with_timeout(url, read_timeout, chunk_size=1024):
"""带读取超时的下载"""
try:
response = requests.get(url, stream=True, timeout=(10, read_timeout))
total_downloaded = 0
start_time = time.time()
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
total_downloaded += len(chunk)
elapsed = time.time() - start_time
# 检查是否超时
if elapsed > read_timeout:
print(f" ✗ 读取超时: 已下载 {total_downloaded} 字节")
return False
# 模拟处理(这里只是统计)
pass
print(f" ✓ 下载完成: {total_downloaded} 字节")
return True
except requests.exceptions.ReadTimeout:
print(f" ✗ 读取超时异常")
return False
# 测试下载(使用一个返回较大响应的端点)
test_url = 'https://httpbin.org/stream-bytes/100000' # 100KB数据
print(f" 下载测试: {test_url}")
download_with_timeout(test_url, read_timeout=2)
# 3. 读取超时的根本原因分析
print("\n3. 读取超时可能原因:")
causes = [
"服务器处理请求时间过长",
"网络延迟高,数据传输慢",
"响应体过大,下载时间长",
"服务器发送数据间隔过长",
"中间代理或CDN延迟",
"客户端处理能力不足",
"服务器负载过高",
]
for i, cause in enumerate(causes, 1):
print(f" {i}. {cause}")
# 处理读取超时的策略
def handle_read_timeout_strategies():
"""处理读取超时的策略"""
strategies = {
'增加读取超时时间': '对于已知的慢接口,适当增加超时时间',
'分块请求': '对于大文件,使用Range头分块下载',
'压缩传输': '使用gzip等压缩减少传输数据量',
'流式处理': '使用stream=True边下载边处理',
'进度监控': '监控下载进度,超时前取消',
'降级处理': '超时后使用缓存数据或默认值',
'异步处理': '使用异步请求避免阻塞',
}
print("=== 读取超时处理策略 ===")
for strategy, description in strategies.items():
print(f" • {strategy}: {description}")
# 读取超时监控
class ReadTimeoutMonitor:
"""读取超时监控器"""
def __init__(self, warning_threshold=0.8):
"""
warning_threshold: 警告阈值(超时时间的百分比)
"""
self.warning_threshold = warning_threshold
self.timeout_history = []
def monitor_request(self, func, *args, **kwargs):
"""监控请求的执行时间"""
import time
start_time = time.time()
try:
result = func(*args, **kwargs)
elapsed = time.time() - start_time
# 检查是否接近超时
timeout = kwargs.get('timeout')
if timeout and isinstance(timeout, tuple) and len(timeout) > 1:
read_timeout = timeout[1]
if read_timeout and elapsed > read_timeout * self.warning_threshold:
print(f"⚠ 警告: 请求用时 {elapsed:.2f}秒,接近读取超时({read_timeout}秒)")
self.timeout_history.append({
'success': True,
'elapsed': elapsed,
'timeout': timeout
})
return result
except requests.exceptions.ReadTimeout as e:
elapsed = time.time() - start_time
self.timeout_history.append({
'success': False,
'elapsed': elapsed,
'error': 'ReadTimeout'
})
raise e
def get_statistics(self):
"""获取统计信息"""
if not self.timeout_history:
return None
successful = [h for h in self.timeout_history if h['success']]
failed = [h for h in self.timeout_history if not h['success']]
stats = {
'total_requests': len(self.timeout_history),
'successful_requests': len(successful),
'failed_requests': len(failed),
'success_rate': len(successful) / len(self.timeout_history) * 100 if self.timeout_history else 0,
'avg_elapsed_time': sum(h['elapsed'] for h in successful) / len(successful) if successful else 0,
}
return stats
# 运行测试
import time
if __name__ == "__main__":
test_read_timeout()
handle_read_timeout_strategies()
# 测试监控器
print("\n=== 读取超时监控测试 ===")
monitor = ReadTimeoutMonitor(warning_threshold=0.7)
# 包装requests.get函数
def monitored_get(url, **kwargs):
return monitor.monitor_request(requests.get, url, **kwargs)
# 测试几个请求
try:
response = monitored_get('https://httpbin.org/delay/1', timeout=(3, 2))
print(f"请求1: ✓ 成功")
except Exception as e:
print(f"请求1: ✗ {type(e).__name__}")
try:
response = monitored_get('https://httpbin.org/delay/3', timeout=(3, 2))
print(f"请求2: ✓ 成功")
except Exception as e:
print(f"请求2: ✗ {type(e).__name__}")
# 查看统计
stats = monitor.get_statistics()
if stats:
print(f"\n监控统计:")
for key, value in stats.items():
print(f" {key}: {value:.2f}" if isinstance(value, float) else f" {key}: {value}")
正确处理超时异常对于构建健壮的应用至关重要。
import requests
from requests.exceptions import Timeout, ConnectTimeout, ReadTimeout
def make_request_with_timeout(url, timeout=5):
"""带超时处理的请求函数"""
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status() # 检查HTTP错误
return {'success': True, 'data': response}
except ConnectTimeout:
print(f"连接超时: 无法在{timeout if isinstance(timeout, (int, float)) else timeout[0]}秒内连接到服务器")
return {'success': False, 'error': 'ConnectTimeout', 'type': 'connection'}
except ReadTimeout:
print(f"读取超时: 连接已建立,但在{timeout if isinstance(timeout, (int, float)) else timeout[1]}秒内未收到完整响应")
return {'success': False, 'error': 'ReadTimeout', 'type': 'read'}
except Timeout:
print(f"请求超时: 总时间超过{timeout}秒")
return {'success': False, 'error': 'Timeout', 'type': 'general'}
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误: {http_err}")
return {'success': False, 'error': 'HTTPError', 'details': str(http_err)}
except requests.exceptions.ConnectionError as conn_err:
print(f"连接错误: {conn_err}")
return {'success': False, 'error': 'ConnectionError'}
except requests.exceptions.RequestException as req_err:
print(f"请求异常: {req_err}")
return {'success': False, 'error': 'RequestException'}
except Exception as e:
print(f"未知错误: {type(e).__name__}: {e}")
return {'success': False, 'error': 'UnknownError'}
# 测试不同类型的超时
print("=== 超时异常处理测试 ===")
test_cases = [
('正常请求', 'https://httpbin.org/delay/1', 3),
('连接超时', 'https://192.0.2.1', (0.1, 5)), # 不可达地址
('读取超时', 'https://httpbin.org/delay/6', (3, 2)), # 响应慢
]
for name, url, timeout in test_cases:
print(f"\n测试: {name}")
print(f" URL: {url}")
print(f" 超时: {timeout}")
result = make_request_with_timeout(url, timeout)
print(f" 结果: {'成功' if result['success'] else '失败'}")
if not result['success']:
print(f" 错误类型: {result.get('error')}")
print(f" 错误详情: {result.get('details', '无')}")
import requests
import contextlib
from requests.exceptions import Timeout
class TimeoutContext:
"""超时上下文管理器"""
def __init__(self, timeout):
self.timeout = timeout
self.response = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# 清理资源
if self.response:
self.response.close()
# 处理超时异常
if exc_type is Timeout:
print(f"在上下文管理器中捕获超时异常: {exc_val}")
# 可以在这里记录日志、发送通知等
return True # 抑制异常传播
return False # 不抑制其他异常
def request(self, url, method='GET', **kwargs):
"""在上下文中执行请求"""
kwargs['timeout'] = self.timeout
self.response = requests.request(method, url, **kwargs)
return self.response
# 使用上下文管理器处理超时
print("=== 使用上下文管理器 ===")
# 方法1:使用自定义上下文管理器
with TimeoutContext(timeout=2) as ctx:
try:
response = ctx.request('https://httpbin.org/delay/3')
print(f"请求成功: {response.status_code}")
except Exception as e:
print(f"请求异常: {type(e).__name__}")
# 方法2:使用contextlib简化
@contextlib.contextmanager
def timeout_manager(timeout):
"""超时上下文管理器(简化版)"""
try:
yield
except Timeout as e:
print(f"上下文管理器捕获超时: {e}")
# 可以进行重试、降级等处理
raise # 重新抛出异常
# 使用装饰器处理超时
def timeout_handler(timeout=5):
"""超时处理装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
try:
# 如果函数有timeout参数,使用它,否则使用装饰器的timeout
if 'timeout' not in kwargs:
kwargs['timeout'] = timeout
return func(*args, **kwargs)
except Timeout as e:
print(f"装饰器捕获超时异常: {e}")
# 可以在这里添加重试逻辑
return None
except Exception as e:
print(f"装饰器捕获其他异常: {e}")
raise
return wrapper
return decorator
# 使用装饰器
@timeout_handler(timeout=2)
def fetch_data(url):
"""获取数据的函数"""
response = requests.get(url)
return response.json()
print("\n=== 使用装饰器 ===")
result = fetch_data('https://httpbin.org/delay/3')
if result:
print(f"获取数据成功")
else:
print(f"获取数据失败(超时)")
# 异常链和详细错误信息
def detailed_timeout_handling():
"""详细的超时处理"""
try:
response = requests.get('https://httpbin.org/delay/10', timeout=(3, 5))
return response
except ConnectTimeout as e:
print(f"连接超时详情:")
print(f" 异常类型: {type(e).__name__}")
print(f" 请求URL: {e.request.url if e.request else '未知'}")
print(f" 连接超时设置: {e.timeout if hasattr(e, 'timeout') else '未知'}")
# 获取更多上下文信息
import traceback
print(f" 异常追踪:")
traceback.print_exc()
raise
except ReadTimeout as e:
print(f"读取超时详情:")
print(f" 异常类型: {type(e).__name__}")
# 可能已经部分接收了响应
if e.response is not None:
print(f" 已接收字节: {len(e.response.content) if e.response else 0}")
raise
print("\n=== 详细异常处理 ===")
try:
detailed_timeout_handling()
except Exception as e:
print(f"异常传播到外层: {type(e).__name__}")
import requests
from requests.exceptions import Timeout, ConnectionError, RequestException
from typing import Dict, Any, Optional
class TimeoutExceptionHandler:
"""超时异常处理器"""
@staticmethod
def handle_timeout_exception(e: Exception, context: Optional[Dict] = None) -> Dict[str, Any]:
"""处理超时异常"""
context = context or {}
# 基础信息
result = {
'exception_type': type(e).__name__,
'exception_message': str(e),
'context': context,
'handled': True,
'action_taken': None,
'retry_recommended': False,
'severity': 'medium'
}
# 根据异常类型分类处理
if isinstance(e, requests.exceptions.ConnectTimeout):
result.update({
'error_type': 'connect_timeout',
'description': '无法建立网络连接',
'suggested_action': '检查网络连接、服务器状态和防火墙设置',
'retry_recommended': True,
'severity': 'high' if context.get('critical', False) else 'medium'
})
elif isinstance(e, requests.exceptions.ReadTimeout):
result.update({
'error_type': 'read_timeout',
'description': '连接已建立,但服务器响应太慢',
'suggested_action': '增加读取超时时间、优化服务器性能或使用异步处理',
'retry_recommended': context.get('idempotent', True), # 幂等操作可重试
'severity': 'medium'
})
elif isinstance(e, requests.exceptions.Timeout):
result.update({
'error_type': 'general_timeout',
'description': '请求总时间超时',
'suggested_action': '调整超时设置或优化请求逻辑',
'retry_recommended': True,
'severity': 'medium'
})
elif isinstance(e, requests.exceptions.ConnectionError):
result.update({
'error_type': 'connection_error',
'description': '网络连接错误',
'suggested_action': '检查网络连接和服务器状态',
'retry_recommended': True,
'severity': 'high'
})
else:
result.update({
'error_type': 'other_exception',
'description': '其他类型的异常',
'suggested_action': '查看异常详情进行处理',
'handled': False,
'retry_recommended': False,
'severity': 'unknown'
})
# 记录日志
TimeoutExceptionHandler._log_exception(result)
return result
@staticmethod
def _log_exception(exception_info: Dict):
"""记录异常日志"""
log_entry = (
f"Timeout Exception - "
f"Type: {exception_info['error_type']}, "
f"Severity: {exception_info['severity']}, "
f"Context: {exception_info['context']}"
)
print(f"📝 日志: {log_entry}")
@staticmethod
def should_retry(exception_info: Dict, retry_count: int = 0, max_retries: int = 3) -> bool:
"""判断是否应该重试"""
if retry_count >= max_retries:
return False
if not exception_info['retry_recommended']:
return False
# 根据严重程度决定是否重试
if exception_info['severity'] == 'high':
return retry_count < 2 # 严重错误最多重试2次
return True
@staticmethod
def get_backoff_time(exception_info: Dict, retry_count: int) -> float:
"""获取重试等待时间(指数退避)"""
base_delay = 1.0 # 基础延迟1秒
# 根据错误类型调整基础延迟
if exception_info['error_type'] == 'connect_timeout':
base_delay = 2.0
elif exception_info['error_type'] == 'read_timeout':
base_delay = 3.0
# 指数退避
backoff_time = base_delay * (2 ** retry_count)
# 限制最大等待时间
max_backoff = 30.0
return min(backoff_time, max_backoff)
# 使用异常处理器
print("=== 超时异常分类处理 ===")
# 模拟不同异常
exception_cases = [
(requests.exceptions.ConnectTimeout("Connection timeout"),
{'url': 'https://api.example.com/data', 'critical': True}),
(requests.exceptions.ReadTimeout("Read timeout"),
{'url': 'https://api.example.com/slow', 'idempotent': True}),
(requests.exceptions.Timeout("General timeout"),
{'url': 'https://api.example.com/normal'}),
(requests.exceptions.ConnectionError("Connection error"),
{'url': 'https://api.example.com/unreachable'}),
(ValueError("Other error"),
{'url': 'https://api.example.com/error'}),
]
for exception, context in exception_cases:
print(f"\n处理异常: {type(exception).__name__}")
result = TimeoutExceptionHandler.handle_timeout_exception(exception, context)
print(f" 错误类型: {result['error_type']}")
print(f" 描述: {result['description']}")
print(f" 建议操作: {result['suggested_action']}")
print(f" 推荐重试: {result['retry_recommended']}")
print(f" 严重程度: {result['severity']}")
# 测试重试逻辑
retry_count = 1
should_retry = TimeoutExceptionHandler.should_retry(result, retry_count)
print(f" 第{retry_count}次重试: {'是' if should_retry else '否'}")
if should_retry:
backoff = TimeoutExceptionHandler.get_backoff_time(result, retry_count)
print(f" 重试等待: {backoff:.1f}秒")
# 完整的异常处理流程
def robust_request(url, timeout=5, max_retries=3):
"""健壮的请求函数,包含完整的异常处理"""
retry_count = 0
last_exception = None
while retry_count <= max_retries:
try:
print(f"\n尝试请求 (重试 {retry_count}/{max_retries}): {url}")
response = requests.get(url, timeout=timeout)
response.raise_for_status()
print(f"✓ 请求成功")
return {'success': True, 'data': response, 'retries': retry_count}
except Exception as e:
last_exception = e
# 处理异常
exception_info = TimeoutExceptionHandler.handle_timeout_exception(
e, {'url': url, 'retry_count': retry_count}
)
# 判断是否重试
if TimeoutExceptionHandler.should_retry(exception_info, retry_count, max_retries):
retry_count += 1
# 计算等待时间
wait_time = TimeoutExceptionHandler.get_backoff_time(exception_info, retry_count)
print(f" 等待 {wait_time:.1f} 秒后重试...")
import time
time.sleep(wait_time)
continue
else:
print(f"✗ 达到最大重试次数或不需要重试")
break
# 所有重试都失败
return {
'success': False,
'error': str(last_exception) if last_exception else 'Unknown error',
'error_type': type(last_exception).__name__ if last_exception else 'Unknown',
'retries': retry_count
}
print("\n=== 完整的异常处理流程 ===")
result = robust_request('https://httpbin.org/delay/6', timeout=2, max_retries=2)
print(f"最终结果: {'成功' if result['success'] else '失败'}")
print(f"重试次数: {result['retries']}")
if not result['success']:
print(f"错误: {result['error']}")
当请求超时时,合理的重试策略可以提高请求的成功率。
import requests
import time
from requests.exceptions import Timeout, ConnectionError
def retry_request_basic(url, max_retries=3, timeout=5):
"""基本的重试实现"""
retry_count = 0
while retry_count <= max_retries:
try:
print(f"尝试 {retry_count + 1}/{max_retries + 1}: {url}")
response = requests.get(url, timeout=timeout)
# 检查状态码,只对特定状态码重试
if response.status_code >= 500:
print(f" 服务器错误 ({response.status_code}),重试...")
raise ConnectionError(f"Server error: {response.status_code}")
print(f"✓ 请求成功")
return response
except (Timeout, ConnectionError) as e:
retry_count += 1
if retry_count <= max_retries:
print(f" 请求失败: {type(e).__name__},等待后重试...")
# 简单的固定等待
wait_time = 1 # 固定等待1秒
time.sleep(wait_time)
else:
print(f"✗ 达到最大重试次数")
raise
# 理论上不会执行到这里
raise Exception("Unexpected error")
# 测试基本重试
print("=== 基本重试测试 ===")
try:
# 测试超时重试
response = retry_request_basic(
'https://httpbin.org/delay/3',
max_retries=2,
timeout=2
)
print(f"最终成功: {response.status_code}")
except Exception as e:
print(f"最终失败: {type(e).__name__}: {e}")
# 更智能的重试:指数退避
def retry_with_backoff(url, max_retries=3, timeout=5, backoff_factor=1):
"""带指数退避的重试"""
retry_count = 0
while retry_count <= max_retries:
try:
print(f"尝试 {retry_count + 1}/{max_retries + 1}")
response = requests.get(url, timeout=timeout)
print(f"✓ 请求成功")
return response
except (Timeout, ConnectionError) as e:
retry_count += 1
if retry_count <= max_retries:
# 指数退避:等待时间 = backoff_factor * (2^(retry_count-1))
wait_time = backoff_factor * (2 ** (retry_count - 1))
print(f" 请求失败: {type(e).__name__},等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
else:
print(f"✗ 达到最大重试次数")
raise
print("\n=== 指数退避重试测试 ===")
try:
response = retry_with_backoff(
'https://httpbin.org/delay/4',
max_retries=3,
timeout=2,
backoff_factor=1
)
print(f"最终成功: {response.status_code}")
except Exception as e:
print(f"最终失败: {type(e).__name__}: {e}")
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 创建重试策略
retry_strategy = Retry(
total=3, # 总重试次数
backoff_factor=1, # 退避因子
status_forcelist=[429, 500, 502, 503, 504], # 对这些状态码重试
allowed_methods=["GET", "POST"], # 只对GET和POST方法重试
)
# 创建适配器并设置重试策略
adapter = HTTPAdapter(max_retries=retry_strategy)
# 创建Session并挂载适配器
session = requests.Session()
# 为http和https都挂载适配器
session.mount("http://", adapter)
session.mount("https://", adapter)
print("=== 使用urllib3重试策略 ===")
# 测试重试
try:
# 这个端点会返回500错误,触发重试
response = session.get("https://httpbin.org/status/500")
print(f"请求结果: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
# 更复杂的重试策略
def create_custom_retry_strategy():
"""创建自定义重试策略"""
return Retry(
total=5, # 总重试次数
connect=3, # 连接错误重试次数
read=2, # 读取错误重试次数
redirect=2, # 重定向次数
status=3, # 状态码重试次数
backoff_factor=0.5, # 退避因子
status_forcelist=[500, 502, 503, 504, 429],
allowed_methods=frozenset(['GET', 'POST', 'PUT', 'DELETE']),
raise_on_status=False, # 不对状态码抛出异常
respect_retry_after_header=True, # 尊重Retry-After头
)
# 自定义会话工厂
def create_retry_session(retries=3, backoff_factor=1):
"""创建带重试策略的Session"""
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504, 429],
allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 使用自定义Session
print("\n=== 使用自定义重试Session ===")
retry_session = create_retry_session(retries=2, backoff_factor=1)
try:
# 测试一个会超时的请求
response = retry_session.get(
'https://httpbin.org/delay/5',
timeout=2
)
print(f"请求成功: {response.status_code}")
except Exception as e:
print(f"请求失败: {type(e).__name__}")
retry_session.close()
import requests
import time
import random
from typing import Callable, Optional, Dict, Any
from requests.exceptions import RequestException
class SmartRetryStrategy:
"""智能重试策略"""
def __init__(self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
jitter: bool = True,
retry_on_status: list = None,
retry_on_exception: list = None):
"""
初始化重试策略
Args:
max_retries: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
jitter: 是否添加随机抖动
retry_on_status: 需要重试的状态码列表
retry_on_exception: 需要重试的异常类型列表
"""
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
self.retry_on_status = retry_on_status or [429, 500, 502, 503, 504]
self.retry_on_exception = retry_on_exception or [
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError
]
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'retry_attempts': 0
}
def should_retry(self,
exception: Optional[Exception] = None,
response: Optional[requests.Response] = None) -> bool:
"""判断是否应该重试"""
# 检查异常
if exception:
for exc_type in self.retry_on_exception:
if isinstance(exception, exc_type):
return True
# 检查响应状态码
if response and response.status_code in self.retry_on_status:
return True
return False
def calculate_delay(self, retry_count: int) -> float:
"""计算延迟时间(指数退避 + 随机抖动)"""
# 指数退避
delay = min(self.base_delay * (2 ** retry_count), self.max_delay)
# 添加随机抖动
if self.jitter:
jitter_amount = random.uniform(0, delay * 0.1) # 最多10%的抖动
delay += jitter_amount
return delay
def execute_with_retry(self,
request_func: Callable,
*args,
**kwargs) -> Dict[str, Any]:
"""执行请求并应用重试策略"""
retry_count = 0
last_exception = None
last_response = None
while retry_count <= self.max_retries:
self.stats['total_requests'] += 1
try:
print(f"尝试 {retry_count + 1}/{self.max_retries + 1}")
response = request_func(*args, **kwargs)
last_response = response
# 检查响应状态码
if response.status_code >= 200 and response.status_code < 300:
self.stats['successful_requests'] += 1
print(f"✓ 请求成功: {response.status_code}")
return {
'success': True,
'response': response,
'retries': retry_count,
'exception': None
}
# 检查是否需要重试(基于状态码)
if self.should_retry(response=response):
print(f"⚠ 需要重试的状态码: {response.status_code}")
raise requests.exceptions.HTTPError(f"Status code: {response.status_code}")
else:
# 不需要重试的其他错误状态码
self.stats['failed_requests'] += 1
return {
'success': False,
'response': response,
'retries': retry_count,
'exception': None,
'error': f"HTTP {response.status_code}"
}
except Exception as e:
last_exception = e
self.stats['failed_requests'] += 1
# 检查是否需要重试(基于异常)
if self.should_retry(exception=e) and retry_count < self.max_retries:
retry_count += 1
self.stats['retry_attempts'] += 1
# 计算等待时间
delay = self.calculate_delay(retry_count)
print(f" 请求失败: {type(e).__name__},等待 {delay:.2f} 秒后重试...")
time.sleep(delay)
continue
else:
# 不需要重试或达到最大重试次数
print(f"✗ 请求失败: {type(e).__name__}: {e}")
return {
'success': False,
'response': last_response,
'retries': retry_count,
'exception': e,
'error': str(e)
}
# 达到最大重试次数
return {
'success': False,
'response': last_response,
'retries': retry_count,
'exception': last_exception,
'error': f"Max retries ({self.max_retries}) exceeded"
}
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
success_rate = (self.stats['successful_requests'] / self.stats['total_requests'] * 100
if self.stats['total_requests'] > 0 else 0)
return {
**self.stats,
'success_rate': f"{success_rate:.1f}%"
}
# 使用智能重试策略
print("=== 智能重试策略测试 ===")
# 创建重试策略
retry_strategy = SmartRetryStrategy(
max_retries=3,
base_delay=1.0,
max_delay=10.0,
jitter=True,
retry_on_status=[429, 500, 502, 503, 504],
retry_on_exception=[
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError
]
)
# 包装requests.get函数
def make_request(url, timeout=5):
"""包装请求函数"""
return requests.get(url, timeout=timeout)
# 测试不同场景
test_cases = [
('正常请求', 'https://httpbin.org/status/200', 5),
('服务器错误', 'https://httpbin.org/status/500', 5),
('超时请求', 'https://httpbin.org/delay/6', 2),
('连接错误', 'https://192.0.2.1', 2),
]
for name, url, timeout in test_cases:
print(f"\n测试: {name}")
print(f" URL: {url}")
print(f" 超时: {timeout}秒")
result = retry_strategy.execute_with_retry(
make_request, url, timeout=timeout
)
print(f" 结果: {'成功' if result['success'] else '失败'}")
print(f" 重试次数: {result['retries']}")
if result['exception']:
print(f" 异常: {type(result['exception']).__name__}")
# 查看统计信息
print(f"\n=== 重试统计 ===")
stats = retry_strategy.get_statistics()
for key, value in stats.items():
print(f" {key}: {value}")
# 基于响应内容的动态重试
class ContentAwareRetryStrategy(SmartRetryStrategy):
"""基于响应内容的智能重试策略"""
def should_retry(self,
exception: Optional[Exception] = None,
response: Optional[requests.Response] = None) -> bool:
"""扩展父类方法,添加基于内容的判断"""
# 先调用父类方法
if super().should_retry(exception, response):
return True
# 检查响应内容
if response:
try:
content = response.json()
# 如果响应中有特定错误码,可能需要重试
if isinstance(content, dict):
error_code = content.get('error_code')
# 假设某些错误码需要重试
retry_error_codes = ['rate_limited', 'temporary_error', 'busy']
if error_code in retry_error_codes:
return True
# 检查错误消息
error_message = content.get('error', '').lower()
if any(phrase in error_message for phrase in ['try again', 'retry', 'busy', 'overload']):
return True
except (ValueError, AttributeError):
# 不是JSON或无法解析
pass
return False
print(f"\n=== 基于内容的智能重试 ===")
content_retry = ContentAwareRetryStrategy(max_retries=2)
# 模拟一个返回JSON错误的请求
def mock_request_with_json_error():
"""模拟返回JSON错误的请求"""
from unittest.mock import Mock
response = Mock(spec=requests.Response)
response.status_code = 200
response.json.return_value = {
'error_code': 'rate_limited',
'error': 'Rate limit exceeded, please try again later'
}
return response
# 测试
result = content_retry.execute_with_retry(mock_request_with_json_error)
print(f"基于内容的判断结果: {'重试' if not result['success'] else '不重试'}")
import requests
import time
from typing import Dict, Any, Optional
from statistics import mean, median
class AdaptiveTimeoutManager:
"""自适应超时管理器"""
def __init__(self,
initial_timeout: float = 5.0,
min_timeout: float = 1.0,
max_timeout: float = 30.0,
history_size: int = 100):
"""
初始化自适应超时管理器
Args:
initial_timeout: 初始超时时间
min_timeout: 最小超时时间
max_timeout: 最大超时时间
history_size: 历史记录大小
"""
self.initial_timeout = initial_timeout
self.min_timeout = min_timeout
self.max_timeout = max_timeout
self.history = [] # 存储(响应时间, 成功与否)的列表
self.history_size = history_size
self.success_count = 0
self.failure_count = 0
def record_request(self, response_time: float, success: bool):
"""记录请求结果"""
self.history.append((response_time, success))
# 保持历史记录大小
if len(self.history) > self.history_size:
self.history.pop(0)
if success:
self.success_count += 1
else:
self.failure_count += 1
def calculate_timeout(self, percentile: float = 95.0) -> float:
"""计算合适的超时时间"""
if not self.history:
return self.initial_timeout
# 只考虑成功的请求
successful_times = [time for time, success in self.history if success]
if not successful_times:
return self.initial_timeout
# 按响应时间排序
sorted_times = sorted(successful_times)
# 计算百分位数
index = int(len(sorted_times) * percentile / 100.0)
index = min(index, len(sorted_times) - 1)
percentile_time = sorted_times[index]
# 添加安全边际(20%)
recommended_timeout = percentile_time * 1.2
# 限制在最小和最大超时之间
return max(self.min_timeout, min(recommended_timeout, self.max_timeout))
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
if not self.history:
return {
'total_requests': 0,
'success_rate': 0,
'current_timeout': self.initial_timeout
}
successful_times = [time for time, success in self.history if success]
stats = {
'total_requests': len(self.history),
'successful_requests': len(successful_times),
'failed_requests': len(self.history) - len(successful_times),
'success_rate': len(successful_times) / len(self.history) * 100,
'current_timeout': self.calculate_timeout(),
}
if successful_times:
stats.update({
'avg_response_time': mean(successful_times),
'median_response_time': median(successful_times),
'min_response_time': min(successful_times),
'max_response_time': max(successful_times),
'p95_response_time': sorted(successful_times)[int(len(successful_times) * 0.95)],
})
return stats
def adaptive_request(self,
url: str,
method: str = 'GET',
**kwargs) -> Optional[requests.Response]:
"""执行自适应超时请求"""
# 计算当前超时时间
current_timeout = self.calculate_timeout()
# 确保使用计算出的超时时间
if 'timeout' not in kwargs:
kwargs['timeout'] = current_timeout
print(f"自适应超时: {current_timeout:.2f}秒")
start_time = time.time()
try:
response = requests.request(method, url, **kwargs)
response_time = time.time() - start_time
# 记录成功请求
self.record_request(response_time, True)
print(f"✓ 请求成功: {response_time:.2f}秒")
return response
except requests.exceptions.Timeout:
response_time = time.time() - start_time
# 记录失败请求
self.record_request(response_time, False)
print(f"✗ 请求超时: {response_time:.2f}秒")
# 根据失败情况调整超时
failure_rate = self.failure_count / (self.success_count + self.failure_count)
if failure_rate > 0.3: # 失败率超过30%
# 增加超时时间
new_timeout = min(current_timeout * 1.5, self.max_timeout)
print(f" 失败率高,增加超时到: {new_timeout:.2f}秒")
raise
except Exception as e:
response_time = time.time() - start_time
self.record_request(response_time, False)
print(f"✗ 请求异常: {type(e).__name__}")
raise
# 使用自适应超时管理器
print("=== 自适应超时管理器测试 ===")
timeout_manager = AdaptiveTimeoutManager(
initial_timeout=2.0,
min_timeout=1.0,
max_timeout=10.0,
history_size=10
)
# 模拟一系列请求
test_urls = [
('快速响应', 'https://httpbin.org/delay/0.5'),
('中等响应', 'https://httpbin.org/delay/2'),
('慢响应', 'https://httpbin.org/delay/4'),
('超慢响应', 'https://httpbin.org/delay/8'),
]
print("初始统计:")
stats = timeout_manager.get_statistics()
for key, value in stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
# 执行一系列请求
print("\n执行请求序列:")
for i, (name, url) in enumerate(test_urls, 1):
print(f"\n请求 {i}: {name}")
try:
response = timeout_manager.adaptive_request(url)
print(f" 状态码: {response.status_code}")
except requests.exceptions.Timeout:
print(f" 超时")
except Exception as e:
print(f" 错误: {type(e).__name__}")
# 查看最终统计
print(f"\n最终统计:")
final_stats = timeout_manager.get_statistics()
for key, value in final_stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
# 基于服务质量的自适应超时
class QoSTimeoutManager(AdaptiveTimeoutManager):
"""基于服务质量的自适应超时管理器"""
def calculate_timeout(self,
priority: str = 'normal',
service_level: str = 'standard') -> float:
"""根据优先级和服务等级计算超时时间"""
base_timeout = super().calculate_timeout()
# 根据优先级调整
priority_factors = {
'high': 0.7, # 高优先级,更短的超时
'normal': 1.0, # 正常优先级
'low': 1.5, # 低优先级,更长的超时
}
# 根据服务等级调整
service_factors = {
'premium': 0.8, # 高级服务,更短的超时
'standard': 1.0, # 标准服务
'economy': 1.3, # 经济服务,更长的超时
}
factor = priority_factors.get(priority, 1.0) * service_factors.get(service_level, 1.0)
adjusted_timeout = base_timeout * factor
return max(self.min_timeout, min(adjusted_timeout, self.max_timeout))
print(f"\n=== 基于服务质量的自适应超时 ===")
qos_manager = QoSTimeoutManager(
initial_timeout=5.0,
min_timeout=1.0,
max_timeout=20.0
)
# 测试不同优先级和服务等级
test_scenarios = [
('高优先级,高级服务', 'high', 'premium'),
('正常优先级,标准服务', 'normal', 'standard'),
('低优先级,经济服务', 'low', 'economy'),
]
for name, priority, service_level in test_scenarios:
timeout = qos_manager.calculate_timeout(priority, service_level)
print(f"{name}: {timeout:.2f}秒")
import requests
import threading
from typing import Dict, Any, Optional
from contextlib import contextmanager
class GlobalTimeoutConfig:
"""全局超时配置管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
"""单例模式"""
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
"""初始化配置"""
self.default_timeout = 10.0 # 默认超时10秒
self.endpoint_timeouts = {} # 端点特定的超时配置
self.domain_timeouts = {} # 域名级别的超时配置
self.environment_timeouts = {} # 环境特定的超时配置
self.current_environment = 'production'
# 加载配置
self._load_default_config()
def _load_default_config(self):
"""加载默认配置"""
# 端点特定配置
self.endpoint_timeouts = {
'api.example.com/login': 5.0,
'api.example.com/upload': 30.0,
'api.example.com/report': 60.0,
}
# 域名级别配置
self.domain_timeouts = {
'api.example.com': 10.0,
'cdn.example.com': 5.0,
'storage.example.com': 30.0,
}
# 环境配置
self.environment_timeouts = {
'development': {
'default': 30.0,
'api.example.com': 15.0,
},
'testing': {
'default': 20.0,
'api.example.com': 10.0,
},
'production': {
'default': 10.0,
'api.example.com': 5.0,
}
}
def get_timeout_for_url(self, url: str) -> float:
"""获取URL对应的超时时间"""
from urllib.parse import urlparse
parsed = urlparse(url)
hostname = parsed.hostname
path = parsed.path
if not hostname:
return self.get_default_timeout()
# 1. 检查端点特定配置
endpoint_key = f"{hostname}{path}"
if endpoint_key in self.endpoint_timeouts:
return self.endpoint_timeouts[endpoint_key]
# 2. 检查域名级别配置
if hostname in self.domain_timeouts:
return self.domain_timeouts[hostname]
# 3. 检查环境配置
env_config = self.environment_timeouts.get(self.current_environment, {})
if hostname in env_config:
return env_config[hostname]
# 4. 使用环境默认配置
if 'default' in env_config:
return env_config['default']
# 5. 使用全局默认配置
return self.default_timeout
def get_default_timeout(self) -> float:
"""获取默认超时时间"""
env_config = self.environment_timeouts.get(self.current_environment, {})
return env_config.get('default', self.default_timeout)
def set_environment(self, environment: str):
"""设置当前环境"""
self.current_environment = environment
def configure_endpoint(self, endpoint: str, timeout: float):
"""配置端点超时"""
self.endpoint_timeouts[endpoint] = timeout
def configure_domain(self, domain: str, timeout: float):
"""配置域名超时"""
self.domain_timeouts[domain] = timeout
def get_config_summary(self) -> Dict[str, Any]:
"""获取配置摘要"""
return {
'current_environment': self.current_environment,
'default_timeout': self.get_default_timeout(),
'endpoint_configs': len(self.endpoint_timeouts),
'domain_configs': len(self.domain_timeouts),
'environment_configs': {
env: len(configs) for env, configs in self.environment_timeouts.items()
}
}
# 全局超时装饰器
def with_global_timeout(func):
"""使用全局超时配置的装饰器"""
def wrapper(*args, **kwargs):
# 获取URL参数
url = None
for arg in args:
if isinstance(arg, str) and arg.startswith(('http://', 'https://')):
url = arg
break
if 'url' in kwargs:
url = kwargs['url']
# 如果没有timeout参数,使用全局配置
if 'timeout' not in kwargs and url:
config = GlobalTimeoutConfig()
timeout = config.get_timeout_for_url(url)
kwargs['timeout'] = timeout
print(f"使用全局超时配置: {timeout}秒 (URL: {url})")
return func(*args, **kwargs)
return wrapper
# 应用全局超时的请求函数
@with_global_timeout
def global_timeout_request(url, **kwargs):
"""使用全局超时配置的请求函数"""
return requests.get(url, **kwargs)
# 使用示例
print("=== 全局超时配置测试 ===")
# 获取配置实例
config = GlobalTimeoutConfig()
# 查看当前配置
summary = config.get_config_summary()
print("当前配置摘要:")
for key, value in summary.items():
print(f" {key}: {value}")
# 测试不同URL的超时
test_urls = [
'https://api.example.com/login',
'https://api.example.com/upload',
'https://cdn.example.com/image.jpg',
'https://unknown.com/data',
]
print("\n测试URL超时配置:")
for url in test_urls:
timeout = config.get_timeout_for_url(url)
print(f" {url}: {timeout}秒")
# 测试不同环境
print("\n测试不同环境:")
environments = ['development', 'testing', 'production']
for env in environments:
config.set_environment(env)
timeout = config.get_timeout_for_url('https://api.example.com/data')
print(f" {env}: {timeout}秒")
# 重置为生产环境
config.set_environment('production')
# 使用装饰器
print("\n使用全局超时装饰器:")
try:
# 注意:这里使用模拟URL,实际不会真正请求
# response = global_timeout_request('https://api.example.com/login')
# print(f"请求成功")
print("装饰器测试完成")
except Exception as e:
print(f"错误: {e}")
# 线程安全的全局配置管理器
class ThreadSafeTimeoutConfig(GlobalTimeoutConfig):
"""线程安全的全局超时配置管理器"""
def __init__(self):
self._lock = threading.RLock()
super()._initialize()
def get_timeout_for_url(self, url: str) -> float:
"""线程安全的获取超时时间"""
with self._lock:
return super().get_timeout_for_url(url)
def configure_endpoint(self, endpoint: str, timeout: float):
"""线程安全的配置端点超时"""
with self._lock:
super().configure_endpoint(endpoint, timeout)
def configure_domain(self, domain: str, timeout: float):
"""线程安全的配置域名超时"""
with self._lock:
super().configure_domain(domain, timeout)
def set_environment(self, environment: str):
"""线程安全的设置环境"""
with self._lock:
super().set_environment(environment)
# 上下文管理器:临时修改超时配置
@contextmanager
def temporary_timeout_config(environment: Optional[str] = None,
endpoint_configs: Optional[Dict] = None,
domain_configs: Optional[Dict] = None):
"""临时修改超时配置的上下文管理器"""
config = GlobalTimeoutConfig()
# 保存当前配置
original_environment = config.current_environment
original_endpoints = config.endpoint_timeouts.copy()
original_domains = config.domain_timeouts.copy()
try:
# 应用临时配置
if environment:
config.set_environment(environment)
if endpoint_configs:
for endpoint, timeout in endpoint_configs.items():
config.configure_endpoint(endpoint, timeout)
if domain_configs:
for domain, timeout in domain_configs.items():
config.configure_domain(domain, timeout)
yield config
finally:
# 恢复原始配置
config.current_environment = original_environment
config.endpoint_timeouts = original_endpoints
config.domain_timeouts = original_domains
print(f"\n=== 使用上下文管理器临时修改配置 ===")
# 保存原始配置
original_summary = config.get_config_summary()
print(f"原始配置: {original_summary['default_timeout']}秒")
# 临时修改配置
with temporary_timeout_config(
environment='development',
endpoint_configs={'api.example.com/test': 60.0}
) as temp_config:
temp_summary = temp_config.get_config_summary()
print(f"临时配置: {temp_summary['default_timeout']}秒")
print(f"测试端点超时: {temp_config.get_timeout_for_url('https://api.example.com/test')}秒")
# 验证配置已恢复
final_summary = config.get_config_summary()
print(f"恢复后配置: {final_summary['default_timeout']}秒")
timeout=None| 场景 | 连接超时 | 读取超时 | 说明 |
|---|---|---|---|
| 内部API调用 | 1-3秒 | 5-10秒 | 内部网络环境相对稳定 |
| 外部API调用 | 3-5秒 | 10-30秒 | 考虑网络波动和第三方服务性能 |
| 用户认证 | 2-3秒 | 5-8秒 | 需要快速响应,避免用户等待 |
| 文件上传 | 5-10秒 | 30-60秒 | 大文件需要较长时间 |
| 文件下载 | 3-5秒 | 30-300秒 | 根据文件大小动态调整 |
| 实时通信 | 1-2秒 | 2-5秒 | 需要低延迟 | tr>
| 批量处理 | 5-10秒 | 60-300秒 | 处理时间可能较长 |
import requests
import os
from typing import Dict, Any
from dataclasses import dataclass, field
from enum import Enum
class Environment(Enum):
"""环境枚举"""
DEVELOPMENT = "development"
TESTING = "testing"
STAGING = "staging"
PRODUCTION = "production"
class ServiceType(Enum):
"""服务类型枚举"""
INTERNAL_API = "internal_api"
EXTERNAL_API = "external_api"
DATABASE = "database"
CACHE = "cache"
STORAGE = "storage"
QUEUE = "queue"
@dataclass
class TimeoutProfile:
"""超时配置模板"""
connect_timeout: float
read_timeout: float
max_retries: int = 3
backoff_factor: float = 1.0
def as_tuple(self):
"""转换为元组格式"""
return (self.connect_timeout, self.read_timeout)
class ProductionTimeoutConfig:
"""生产环境超时配置"""
def __init__(self):
# 基础配置
self.environment = self._get_environment()
# 服务类型配置模板
self.service_profiles = {
ServiceType.INTERNAL_API: TimeoutProfile(
connect_timeout=2.0,
read_timeout=10.0,
max_retries=2
),
ServiceType.EXTERNAL_API: TimeoutProfile(
connect_timeout=5.0,
read_timeout=30.0,
max_retries=3
),
ServiceType.DATABASE: TimeoutProfile(
connect_timeout=3.0,
read_timeout=10.0,
max_retries=1 # 数据库连接失败通常不需要重试
),
ServiceType.CACHE: TimeoutProfile(
connect_timeout=1.0,
read_timeout=3.0,
max_retries=2
),
}
# 特定服务配置(覆盖模板)
self.specific_service_configs = {
'auth-service.internal.com': TimeoutProfile(
connect_timeout=1.0,
read_timeout=5.0,
max_retries=2
),
'upload-service.internal.com': TimeoutProfile(
connect_timeout=5.0,
read_timeout=60.0,
max_retries=2
),
'payment-gateway.com': TimeoutProfile(
connect_timeout=3.0,
read_timeout=15.0,
max_retries=3
),
}
# 环境特定调整因子
self.environment_factors = {
Environment.DEVELOPMENT: 2.0, # 开发环境更宽松
Environment.TESTING: 1.5, # 测试环境稍宽松
Environment.STAGING: 1.2, # 预发环境接近生产
Environment.PRODUCTION: 1.0, # 生产环境
}
def _get_environment(self) -> Environment:
"""从环境变量获取当前环境"""
env_str = os.getenv('ENVIRONMENT', 'development').upper()
try:
return Environment[env_str]
except KeyError:
return Environment.DEVELOPMENT
def get_timeout_for_service(self,
service_url: str,
service_type: Optional[ServiceType] = None) -> Dict[str, Any]:
"""获取服务的超时配置"""
from urllib.parse import urlparse
parsed = urlparse(service_url)
hostname = parsed.hostname
# 1. 检查特定服务配置
if hostname and hostname in self.specific_service_configs:
profile = self.specific_service_configs[hostname]
elif service_type and service_type in self.service_profiles:
# 2. 使用服务类型模板
profile = self.service_profiles[service_type]
else:
# 3. 使用默认外部API配置
profile = self.service_profiles[ServiceType.EXTERNAL_API]
# 应用环境因子
env_factor = self.environment_factors.get(self.environment, 1.0)
adjusted_profile = TimeoutProfile(
connect_timeout=profile.connect_timeout * env_factor,
read_timeout=profile.read_timeout * env_factor,
max_retries=profile.max_retries,
backoff_factor=profile.backoff_factor
)
return {
'timeout': adjusted_profile.as_tuple(),
'max_retries': adjusted_profile.max_retries,
'backoff_factor': adjusted_profile.backoff_factor,
'service_host': hostname,
'service_type': service_type.name if service_type else 'unknown',
'environment': self.environment.value,
'environment_factor': env_factor
}
# 生产环境请求工厂
class ProductionRequestFactory:
"""生产环境请求工厂"""
def __init__(self):
self.config = ProductionTimeoutConfig()
self.session_cache = {}
def create_session(self, service_type: ServiceType) -> requests.Session:
"""创建适合服务类型的Session"""
if service_type in self.session_cache:
return self.session_cache[service_type]
# 获取配置
# 这里使用一个示例URL来获取配置模板
example_url = f"https://example.{service_type.value}"
config = self.config.get_timeout_for_service(example_url, service_type)
# 创建Session
session = requests.Session()
# 配置重试策略
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retry_strategy = Retry(
total=config['max_retries'],
backoff_factor=config['backoff_factor'],
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
# 缓存Session
self.session_cache[service_type] = session
return session
def make_request(self,
url: str,
service_type: Optional[ServiceType] = None,
method: str = 'GET',
**kwargs) -> Dict[str, Any]:
"""执行生产环境请求"""
# 获取配置
config = self.config.get_timeout_for_service(url, service_type)
# 创建或获取Session
session = self.create_session(service_type or ServiceType.EXTERNAL_API)
# 准备请求参数
request_kwargs = kwargs.copy()
# 设置超时(如果未指定)
if 'timeout' not in request_kwargs:
request_kwargs['timeout'] = config['timeout']
print(f"执行请求:")
print(f" URL: {url}")
print(f" 服务类型: {config['service_type']}")
print(f" 环境: {config['environment']}")
print(f" 超时: 连接{config['timeout'][0]:.1f}秒, 读取{config['timeout'][1]:.1f}秒")
print(f" 最大重试: {config['max_retries']}")
try:
response = session.request(method, url, **request_kwargs)
response.raise_for_status()
return {
'success': True,
'response': response,
'config': config
}
except Exception as e:
return {
'success': False,
'error': str(e),
'error_type': type(e).__name__,
'config': config
}
# 使用示例
print("=== 生产环境超时配置示例 ===")
# 创建工厂
factory = ProductionRequestFactory()
# 测试不同服务类型
test_services = [
('内部认证服务', 'https://auth-service.internal.com/login', ServiceType.INTERNAL_API),
('外部支付网关', 'https://payment-gateway.com/charge', ServiceType.EXTERNAL_API),
('数据库查询', 'https://database.internal.com/query', ServiceType.DATABASE),
]
print("\n测试不同服务类型的超时配置:")
for name, url, service_type in test_services:
print(f"\n{name}:")
# 获取配置(不实际发送请求)
config = factory.config.get_timeout_for_service(url, service_type)
print(f" 连接超时: {config['timeout'][0]:.1f}秒")
print(f" 读取超时: {config['timeout'][1]:.1f}秒")
print(f" 最大重试: {config['max_retries']}")
print(f" 环境因子: {config['environment_factor']}")
# 测试不同环境
print(f"\n=== 测试不同环境的超时配置 ===")
environments = [Environment.DEVELOPMENT, Environment.TESTING, Environment.PRODUCTION]
for env in environments:
# 临时修改环境
original_env = factory.config.environment
factory.config.environment = env
config = factory.config.get_timeout_for_service(
'https://api.example.com/data',
ServiceType.EXTERNAL_API
)
print(f"\n{env.value}环境:")
print(f" 连接超时: {config['timeout'][0]:.1f}秒")
print(f" 读取超时: {config['timeout'][1]:.1f}秒")
# 恢复环境
factory.config.environment = original_env
# 清理Session
for session in factory.session_cache.values():
session.close()
print(f"\n✓ 生产环境超时配置示例完成")
本章详细介绍了Requests库中超时设置的各个方面:
timeout参数的各种格式关键要点: