Requests 超时设置

超时设置是网络请求中至关重要的一环。合理的超时设置可以防止程序无限期等待,提高系统的稳定性和响应性。
HTTP请求超时时间线
连接阶段
读取阶段
开始连接
连接建立
开始读取
请求完成
连接超时
读取超时
连接超时

从开始连接到建立连接的最大时间

读取超时

从连接建立到接收完整响应的最大时间

超时设置概述

超时设置是HTTP客户端的重要功能,它定义了客户端等待服务器响应的最长时间。合理的超时设置可以:

为什么需要超时 4
  • 防止无限等待 - 避免程序因网络问题而卡死
  • 资源释放 - 及时释放连接和内存资源
  • 用户体验 - 快速失败,提供更好的用户体验
  • 系统稳定性 - 防止单个慢请求拖垮整个系统
超时类型 2
  • 连接超时 - 建立连接的最大时间
  • 读取超时 - 接收响应的最大时间
超时异常 3
  • ConnectTimeout - 连接超时异常
  • ReadTimeout - 读取超时异常
  • Timeout - 通用超时异常
处理策略 4
  • 重试机制
  • 降级处理
  • 超时监控
  • 动态调整
超时设置工作流程
1
客户端发起请求
设置连接超时和读取超时
2
尝试建立连接
如果超过连接超时时间,抛出ConnectTimeout异常
3
连接成功,等待响应
如果超过读取超时时间,抛出ReadTimeout异常
4
处理响应或异常
根据结果进行相应处理
超时设置的重要性:
  • 生产环境必须设置超时 - 没有超时设置的程序是不完整的
  • 超时时间需要合理 - 太短会导致频繁失败,太长会影响用户体验
  • 不同场景不同超时 - API调用、文件下载、实时通信需要不同的超时策略
  • 监控和调整 - 需要监控超时情况并动态调整超时时间

基本超时设置

Requests通过timeout参数设置超时时间,支持多种格式。

单个超时值
import requests

# 设置总超时时间(连接+读取)
timeout_seconds = 5

try:
    response = requests.get(
        'https://httpbin.org/delay/3',  # 这个端点会延迟3秒
        timeout=timeout_seconds
    )
    print(f"请求成功: {response.status_code}")

except requests.exceptions.Timeout:
    print(f"请求超时({timeout_seconds}秒)")

# 测试不同延迟
delays = [2, 6, 10]

print("\n=== 测试不同延迟 ===")
for delay in delays:
    url = f'https://httpbin.org/delay/{delay}'

    try:
        response = requests.get(url, timeout=5)
        print(f"延迟{delay}秒: ✓ 成功 ({response.status_code})")
    except requests.exceptions.Timeout:
        print(f"延迟{delay}秒: ✗ 超时")
    except Exception as e:
        print(f"延迟{delay}秒: ✗ 其他错误: {e}")
连接和读取分别设置
import requests

# 分别设置连接超时和读取超时
# 格式: (连接超时, 读取超时)
timeout_settings = (2.5, 5)  # 连接超时2.5秒,读取超时5秒

try:
    response = requests.get(
        'https://httpbin.org/delay/3',
        timeout=timeout_settings
    )
    print(f"请求成功: {response.status_code}")

except requests.exceptions.ConnectTimeout:
    print("连接超时:无法在2.5秒内建立连接")

except requests.exceptions.ReadTimeout:
    print("读取超时:连接已建立,但在5秒内未收到完整响应")

except requests.exceptions.Timeout:
    print("请求超时(通用超时异常)")

# 测试不同场景
test_cases = [
    ('连接慢的情况', 'https://httpbin.org/delay/1', (0.5, 10)),
    ('响应慢的情况', 'https://httpbin.org/delay/10', (2, 3)),
    ('正常情况', 'https://httpbin.org/delay/2', (3, 5)),
]

print("\n=== 测试不同场景 ===")
for name, url, timeout in test_cases:
    try:
        response = requests.get(url, timeout=timeout)
        print(f"{name}: ✓ 成功")
    except requests.exceptions.ConnectTimeout:
        print(f"{name}: ✗ 连接超时")
    except requests.exceptions.ReadTimeout:
        print(f"{name}: ✗ 读取超时")
    except Exception as e:
        print(f"{name}: ✗ 其他错误: {type(e).__name__}")

超时设置的多种格式

import requests

# 1. 整数(秒)
timeout_int = 5
print(f"整数超时: {timeout_int}秒")

# 2. 浮点数(支持小数秒)
timeout_float = 2.5
print(f"浮点数超时: {timeout_float}秒")

# 3. 元组(连接超时, 读取超时)
timeout_tuple = (3, 7)
print(f"元组超时: 连接{timeout_tuple[0]}秒, 读取{timeout_tuple[1]}秒")

# 4. 列表(也可以工作,但不推荐)
timeout_list = [3, 7]
print(f"列表超时(不推荐): {timeout_list}")

# 5. None(不超时 - 慎用!)
# timeout_none = None  # 危险!可能导致程序永远等待

# 测试各种格式
test_url = 'https://httpbin.org/delay/2'

timeout_formats = [
    ('整数', 3),
    ('浮点数', 2.5),
    ('元组', (1, 4)),
    ('元组(只有连接超时)', (3, None)),  # 只设置连接超时
    ('元组(只有读取超时)', (None, 3)),  # 只设置读取超时
]

print("\n=== 测试不同超时格式 ===")
for name, timeout in timeout_formats:
    try:
        response = requests.get(test_url, timeout=timeout)
        print(f"{name}: ✓ 成功 (超时设置: {timeout})")
    except requests.exceptions.Timeout:
        print(f"{name}: ✗ 超时 (超时设置: {timeout})")
    except Exception as e:
        print(f"{name}: ✗ 错误: {type(e).__name__} (超时设置: {timeout})")

# 6. 使用Session设置默认超时
print("\n=== Session默认超时 ===")
session = requests.Session()

# 为Session设置默认超时
session.request = lambda method, url, **kwargs: requests.Session.request(
    session, method, url,
    timeout=kwargs.pop('timeout', 10),  # 默认10秒超时
    **kwargs
)

try:
    response = session.get('https://httpbin.org/delay/5')
    print(f"Session默认超时: ✓ 成功")
except requests.exceptions.Timeout:
    print(f"Session默认超时: ✗ 超时")

session.close()
重要警告:
  • 永远不要设置timeout=None - 这会导致请求无限期等待
  • 生产环境必须设置超时 - 即使你认为服务很快
  • 考虑网络波动 - 设置合理的超时时间,考虑网络不稳定性
  • 测试超时行为 - 在实际网络环境中测试超时设置

超时类型详解

超时类型 异常类 触发时机 典型场景
连接超时 ConnectTimeout 建立TCP连接的时间超过设定值 服务器宕机、网络不通、防火墙阻挡
读取超时 ReadTimeout 从连接建立到接收完整响应的时间超过设定值 服务器处理慢、网络延迟高、响应体过大
总超时 Timeout 单个超时值时的通用超时 简单的超时控制,不区分连接和读取

连接超时 (ConnectTimeout)

import requests
import socket

def test_connect_timeout():
    """测试连接超时"""

    print("=== 连接超时测试 ===")

    # 1. 测试不可达的主机(快速失败)
    unreachable_hosts = [
        'https://192.0.2.1',  # 测试地址,通常不可达
        'https://10.255.255.1',  # 私有地址,通常不可达
        'https://nonexistent.example.com',  # 不存在的域名
    ]

    for url in unreachable_hosts:
        print(f"\n测试: {url}")
        try:
            # 设置很短的连接超时
            response = requests.get(url, timeout=(0.1, 5))
            print(f"  ✓ 成功(不应该发生)")
        except requests.exceptions.ConnectTimeout:
            print(f"  ✗ 连接超时(预期)")
        except socket.gaierror:
            print(f"  ✗ 域名解析失败")
        except Exception as e:
            print(f"  ✗ 其他错误: {type(e).__name__}")

    # 2. 测试连接超时与读取超时的区别
    print(f"\n=== 连接超时 vs 读取超时 ===")

    # 模拟慢连接(实际测试需要能控制连接时间的服务器)
    test_cases = [
        ('正常连接,快速响应', 'https://httpbin.org/delay/1', (3, 3)),
        ('慢连接,快速响应', 'https://httpbin.org/delay/1', (0.5, 3)),  # 连接超时会触发
        ('正常连接,慢响应', 'https://httpbin.org/delay/5', (3, 2)),  # 读取超时会触发
    ]

    for name, url, timeout in test_cases:
        print(f"\n测试: {name}")
        print(f"  超时设置: 连接{timeout[0]}秒, 读取{timeout[1]}秒")

        try:
            response = requests.get(url, timeout=timeout)
            print(f"  ✓ 成功: {response.status_code}")
        except requests.exceptions.ConnectTimeout:
            print(f"  ✗ 连接超时")
        except requests.exceptions.ReadTimeout:
            print(f"  ✗ 读取超时")
        except Exception as e:
            print(f"  ✗ 其他错误: {type(e).__name__}")

# 连接超时的根本原因
def analyze_connect_timeout_causes():
    """分析连接超时的可能原因"""

    causes = {
        'DNS解析失败': '域名无法解析为IP地址',
        '网络不可达': '目标服务器网络不通',
        '防火墙阻挡': '防火墙阻止了连接',
        '服务器宕机': '目标服务器没有运行',
        '端口关闭': '目标端口没有服务监听',
        '连接队列满': '服务器连接队列已满',
        '路由问题': '网络路由错误',
    }

    print("=== 连接超时可能原因 ===")
    for cause, description in causes.items():
        print(f"  • {cause}: {description}")

# 诊断连接问题
def diagnose_connection(url):
    """诊断连接问题"""
    from urllib.parse import urlparse
    import socket

    parsed = urlparse(url)
    hostname = parsed.hostname
    port = parsed.port or (443 if parsed.scheme == 'https' else 80)

    print(f"诊断连接: {url}")
    print(f"  主机: {hostname}")
    print(f"  端口: {port}")

    # 1. 测试DNS解析
    try:
        ip_address = socket.gethostbyname(hostname)
        print(f"  ✓ DNS解析成功: {ip_address}")
    except socket.gaierror:
        print(f"  ✗ DNS解析失败")
        return

    # 2. 测试端口连通性
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)  # 2秒超时
        result = sock.connect_ex((ip_address, port))
        sock.close()

        if result == 0:
            print(f"  ✓ 端口 {port} 可达")
        else:
            print(f"  ✗ 端口 {port} 不可达 (错误代码: {result})")
    except socket.timeout:
        print(f"  ✗ 连接超时")
    except Exception as e:
        print(f"  ✗ 连接错误: {e}")

# 运行测试
if __name__ == "__main__":
    test_connect_timeout()
    analyze_connect_timeout_causes()
    print("\n")
    diagnose_connection('https://httpbin.org')

读取超时 (ReadTimeout)

import requests

def test_read_timeout():
    """测试读取超时"""

    print("=== 读取超时测试 ===")

    # 1. 测试慢响应
    print("\n1. 测试慢响应接口:")

    # httpbin的/delay端点可以模拟慢响应
    delays = [1, 3, 6, 10]
    read_timeout = 5  # 读取超时5秒

    for delay in delays:
        url = f'https://httpbin.org/delay/{delay}'

        print(f"\n  延迟 {delay} 秒, 读取超时 {read_timeout} 秒:")

        try:
            # 设置较长的连接超时,较短的读取超时
            response = requests.get(url, timeout=(10, read_timeout))
            print(f"    ✓ 成功: {response.status_code}")
        except requests.exceptions.ReadTimeout:
            print(f"    ✗ 读取超时 (响应时间 > {read_timeout}秒)")
        except Exception as e:
            print(f"    ✗ 其他错误: {type(e).__name__}")

    # 2. 测试大文件下载(读取超时)
    print("\n2. 测试大文件下载(流式):")

    # 使用stream=True和分块读取
    def download_with_timeout(url, read_timeout, chunk_size=1024):
        """带读取超时的下载"""
        try:
            response = requests.get(url, stream=True, timeout=(10, read_timeout))

            total_downloaded = 0
            start_time = time.time()

            for chunk in response.iter_content(chunk_size=chunk_size):
                if chunk:
                    total_downloaded += len(chunk)
                    elapsed = time.time() - start_time

                    # 检查是否超时
                    if elapsed > read_timeout:
                        print(f"    ✗ 读取超时: 已下载 {total_downloaded} 字节")
                        return False

                    # 模拟处理(这里只是统计)
                    pass

            print(f"    ✓ 下载完成: {total_downloaded} 字节")
            return True

        except requests.exceptions.ReadTimeout:
            print(f"    ✗ 读取超时异常")
            return False

    # 测试下载(使用一个返回较大响应的端点)
    test_url = 'https://httpbin.org/stream-bytes/100000'  # 100KB数据
    print(f"  下载测试: {test_url}")
    download_with_timeout(test_url, read_timeout=2)

    # 3. 读取超时的根本原因分析
    print("\n3. 读取超时可能原因:")

    causes = [
        "服务器处理请求时间过长",
        "网络延迟高,数据传输慢",
        "响应体过大,下载时间长",
        "服务器发送数据间隔过长",
        "中间代理或CDN延迟",
        "客户端处理能力不足",
        "服务器负载过高",
    ]

    for i, cause in enumerate(causes, 1):
        print(f"  {i}. {cause}")

# 处理读取超时的策略
def handle_read_timeout_strategies():
    """处理读取超时的策略"""

    strategies = {
        '增加读取超时时间': '对于已知的慢接口,适当增加超时时间',
        '分块请求': '对于大文件,使用Range头分块下载',
        '压缩传输': '使用gzip等压缩减少传输数据量',
        '流式处理': '使用stream=True边下载边处理',
        '进度监控': '监控下载进度,超时前取消',
        '降级处理': '超时后使用缓存数据或默认值',
        '异步处理': '使用异步请求避免阻塞',
    }

    print("=== 读取超时处理策略 ===")
    for strategy, description in strategies.items():
        print(f"  • {strategy}: {description}")

# 读取超时监控
class ReadTimeoutMonitor:
    """读取超时监控器"""

    def __init__(self, warning_threshold=0.8):
        """
        warning_threshold: 警告阈值(超时时间的百分比)
        """
        self.warning_threshold = warning_threshold
        self.timeout_history = []

    def monitor_request(self, func, *args, **kwargs):
        """监控请求的执行时间"""
        import time

        start_time = time.time()

        try:
            result = func(*args, **kwargs)
            elapsed = time.time() - start_time

            # 检查是否接近超时
            timeout = kwargs.get('timeout')
            if timeout and isinstance(timeout, tuple) and len(timeout) > 1:
                read_timeout = timeout[1]
                if read_timeout and elapsed > read_timeout * self.warning_threshold:
                    print(f"⚠ 警告: 请求用时 {elapsed:.2f}秒,接近读取超时({read_timeout}秒)")

            self.timeout_history.append({
                'success': True,
                'elapsed': elapsed,
                'timeout': timeout
            })

            return result

        except requests.exceptions.ReadTimeout as e:
            elapsed = time.time() - start_time
            self.timeout_history.append({
                'success': False,
                'elapsed': elapsed,
                'error': 'ReadTimeout'
            })
            raise e

    def get_statistics(self):
        """获取统计信息"""
        if not self.timeout_history:
            return None

        successful = [h for h in self.timeout_history if h['success']]
        failed = [h for h in self.timeout_history if not h['success']]

        stats = {
            'total_requests': len(self.timeout_history),
            'successful_requests': len(successful),
            'failed_requests': len(failed),
            'success_rate': len(successful) / len(self.timeout_history) * 100 if self.timeout_history else 0,
            'avg_elapsed_time': sum(h['elapsed'] for h in successful) / len(successful) if successful else 0,
        }

        return stats

# 运行测试
import time
if __name__ == "__main__":
    test_read_timeout()
    handle_read_timeout_strategies()

    # 测试监控器
    print("\n=== 读取超时监控测试 ===")
    monitor = ReadTimeoutMonitor(warning_threshold=0.7)

    # 包装requests.get函数
    def monitored_get(url, **kwargs):
        return monitor.monitor_request(requests.get, url, **kwargs)

    # 测试几个请求
    try:
        response = monitored_get('https://httpbin.org/delay/1', timeout=(3, 2))
        print(f"请求1: ✓ 成功")
    except Exception as e:
        print(f"请求1: ✗ {type(e).__name__}")

    try:
        response = monitored_get('https://httpbin.org/delay/3', timeout=(3, 2))
        print(f"请求2: ✓ 成功")
    except Exception as e:
        print(f"请求2: ✗ {type(e).__name__}")

    # 查看统计
    stats = monitor.get_statistics()
    if stats:
        print(f"\n监控统计:")
        for key, value in stats.items():
            print(f"  {key}: {value:.2f}" if isinstance(value, float) else f"  {key}: {value}")

超时异常处理

正确处理超时异常对于构建健壮的应用至关重要。

基本异常处理
import requests
from requests.exceptions import Timeout, ConnectTimeout, ReadTimeout

def make_request_with_timeout(url, timeout=5):
    """带超时处理的请求函数"""

    try:
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()  # 检查HTTP错误
        return {'success': True, 'data': response}

    except ConnectTimeout:
        print(f"连接超时: 无法在{timeout if isinstance(timeout, (int, float)) else timeout[0]}秒内连接到服务器")
        return {'success': False, 'error': 'ConnectTimeout', 'type': 'connection'}

    except ReadTimeout:
        print(f"读取超时: 连接已建立,但在{timeout if isinstance(timeout, (int, float)) else timeout[1]}秒内未收到完整响应")
        return {'success': False, 'error': 'ReadTimeout', 'type': 'read'}

    except Timeout:
        print(f"请求超时: 总时间超过{timeout}秒")
        return {'success': False, 'error': 'Timeout', 'type': 'general'}

    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP错误: {http_err}")
        return {'success': False, 'error': 'HTTPError', 'details': str(http_err)}

    except requests.exceptions.ConnectionError as conn_err:
        print(f"连接错误: {conn_err}")
        return {'success': False, 'error': 'ConnectionError'}

    except requests.exceptions.RequestException as req_err:
        print(f"请求异常: {req_err}")
        return {'success': False, 'error': 'RequestException'}

    except Exception as e:
        print(f"未知错误: {type(e).__name__}: {e}")
        return {'success': False, 'error': 'UnknownError'}

# 测试不同类型的超时
print("=== 超时异常处理测试 ===")

test_cases = [
    ('正常请求', 'https://httpbin.org/delay/1', 3),
    ('连接超时', 'https://192.0.2.1', (0.1, 5)),  # 不可达地址
    ('读取超时', 'https://httpbin.org/delay/6', (3, 2)),  # 响应慢
]

for name, url, timeout in test_cases:
    print(f"\n测试: {name}")
    print(f"  URL: {url}")
    print(f"  超时: {timeout}")

    result = make_request_with_timeout(url, timeout)
    print(f"  结果: {'成功' if result['success'] else '失败'}")

    if not result['success']:
        print(f"  错误类型: {result.get('error')}")
        print(f"  错误详情: {result.get('details', '无')}")
上下文管理器
import requests
import contextlib
from requests.exceptions import Timeout

class TimeoutContext:
    """超时上下文管理器"""

    def __init__(self, timeout):
        self.timeout = timeout
        self.response = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 清理资源
        if self.response:
            self.response.close()

        # 处理超时异常
        if exc_type is Timeout:
            print(f"在上下文管理器中捕获超时异常: {exc_val}")
            # 可以在这里记录日志、发送通知等
            return True  # 抑制异常传播

        return False  # 不抑制其他异常

    def request(self, url, method='GET', **kwargs):
        """在上下文中执行请求"""
        kwargs['timeout'] = self.timeout
        self.response = requests.request(method, url, **kwargs)
        return self.response

# 使用上下文管理器处理超时
print("=== 使用上下文管理器 ===")

# 方法1:使用自定义上下文管理器
with TimeoutContext(timeout=2) as ctx:
    try:
        response = ctx.request('https://httpbin.org/delay/3')
        print(f"请求成功: {response.status_code}")
    except Exception as e:
        print(f"请求异常: {type(e).__name__}")

# 方法2:使用contextlib简化
@contextlib.contextmanager
def timeout_manager(timeout):
    """超时上下文管理器(简化版)"""
    try:
        yield
    except Timeout as e:
        print(f"上下文管理器捕获超时: {e}")
        # 可以进行重试、降级等处理
        raise  # 重新抛出异常

# 使用装饰器处理超时
def timeout_handler(timeout=5):
    """超时处理装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            try:
                # 如果函数有timeout参数,使用它,否则使用装饰器的timeout
                if 'timeout' not in kwargs:
                    kwargs['timeout'] = timeout

                return func(*args, **kwargs)

            except Timeout as e:
                print(f"装饰器捕获超时异常: {e}")
                # 可以在这里添加重试逻辑
                return None

            except Exception as e:
                print(f"装饰器捕获其他异常: {e}")
                raise

        return wrapper
    return decorator

# 使用装饰器
@timeout_handler(timeout=2)
def fetch_data(url):
    """获取数据的函数"""
    response = requests.get(url)
    return response.json()

print("\n=== 使用装饰器 ===")
result = fetch_data('https://httpbin.org/delay/3')
if result:
    print(f"获取数据成功")
else:
    print(f"获取数据失败(超时)")

# 异常链和详细错误信息
def detailed_timeout_handling():
    """详细的超时处理"""
    try:
        response = requests.get('https://httpbin.org/delay/10', timeout=(3, 5))
        return response

    except ConnectTimeout as e:
        print(f"连接超时详情:")
        print(f"  异常类型: {type(e).__name__}")
        print(f"  请求URL: {e.request.url if e.request else '未知'}")
        print(f"  连接超时设置: {e.timeout if hasattr(e, 'timeout') else '未知'}")

        # 获取更多上下文信息
        import traceback
        print(f"  异常追踪:")
        traceback.print_exc()

        raise

    except ReadTimeout as e:
        print(f"读取超时详情:")
        print(f"  异常类型: {type(e).__name__}")

        # 可能已经部分接收了响应
        if e.response is not None:
            print(f"  已接收字节: {len(e.response.content) if e.response else 0}")

        raise

print("\n=== 详细异常处理 ===")
try:
    detailed_timeout_handling()
except Exception as e:
    print(f"异常传播到外层: {type(e).__name__}")

超时异常的分类处理

import requests
from requests.exceptions import Timeout, ConnectionError, RequestException
from typing import Dict, Any, Optional

class TimeoutExceptionHandler:
    """超时异常处理器"""

    @staticmethod
    def handle_timeout_exception(e: Exception, context: Optional[Dict] = None) -> Dict[str, Any]:
        """处理超时异常"""

        context = context or {}

        # 基础信息
        result = {
            'exception_type': type(e).__name__,
            'exception_message': str(e),
            'context': context,
            'handled': True,
            'action_taken': None,
            'retry_recommended': False,
            'severity': 'medium'
        }

        # 根据异常类型分类处理
        if isinstance(e, requests.exceptions.ConnectTimeout):
            result.update({
                'error_type': 'connect_timeout',
                'description': '无法建立网络连接',
                'suggested_action': '检查网络连接、服务器状态和防火墙设置',
                'retry_recommended': True,
                'severity': 'high' if context.get('critical', False) else 'medium'
            })

        elif isinstance(e, requests.exceptions.ReadTimeout):
            result.update({
                'error_type': 'read_timeout',
                'description': '连接已建立,但服务器响应太慢',
                'suggested_action': '增加读取超时时间、优化服务器性能或使用异步处理',
                'retry_recommended': context.get('idempotent', True),  # 幂等操作可重试
                'severity': 'medium'
            })

        elif isinstance(e, requests.exceptions.Timeout):
            result.update({
                'error_type': 'general_timeout',
                'description': '请求总时间超时',
                'suggested_action': '调整超时设置或优化请求逻辑',
                'retry_recommended': True,
                'severity': 'medium'
            })

        elif isinstance(e, requests.exceptions.ConnectionError):
            result.update({
                'error_type': 'connection_error',
                'description': '网络连接错误',
                'suggested_action': '检查网络连接和服务器状态',
                'retry_recommended': True,
                'severity': 'high'
            })

        else:
            result.update({
                'error_type': 'other_exception',
                'description': '其他类型的异常',
                'suggested_action': '查看异常详情进行处理',
                'handled': False,
                'retry_recommended': False,
                'severity': 'unknown'
            })

        # 记录日志
        TimeoutExceptionHandler._log_exception(result)

        return result

    @staticmethod
    def _log_exception(exception_info: Dict):
        """记录异常日志"""
        log_entry = (
            f"Timeout Exception - "
            f"Type: {exception_info['error_type']}, "
            f"Severity: {exception_info['severity']}, "
            f"Context: {exception_info['context']}"
        )
        print(f"📝 日志: {log_entry}")

    @staticmethod
    def should_retry(exception_info: Dict, retry_count: int = 0, max_retries: int = 3) -> bool:
        """判断是否应该重试"""
        if retry_count >= max_retries:
            return False

        if not exception_info['retry_recommended']:
            return False

        # 根据严重程度决定是否重试
        if exception_info['severity'] == 'high':
            return retry_count < 2  # 严重错误最多重试2次

        return True

    @staticmethod
    def get_backoff_time(exception_info: Dict, retry_count: int) -> float:
        """获取重试等待时间(指数退避)"""
        base_delay = 1.0  # 基础延迟1秒

        # 根据错误类型调整基础延迟
        if exception_info['error_type'] == 'connect_timeout':
            base_delay = 2.0
        elif exception_info['error_type'] == 'read_timeout':
            base_delay = 3.0

        # 指数退避
        backoff_time = base_delay * (2 ** retry_count)

        # 限制最大等待时间
        max_backoff = 30.0
        return min(backoff_time, max_backoff)

# 使用异常处理器
print("=== 超时异常分类处理 ===")

# 模拟不同异常
exception_cases = [
    (requests.exceptions.ConnectTimeout("Connection timeout"),
     {'url': 'https://api.example.com/data', 'critical': True}),

    (requests.exceptions.ReadTimeout("Read timeout"),
     {'url': 'https://api.example.com/slow', 'idempotent': True}),

    (requests.exceptions.Timeout("General timeout"),
     {'url': 'https://api.example.com/normal'}),

    (requests.exceptions.ConnectionError("Connection error"),
     {'url': 'https://api.example.com/unreachable'}),

    (ValueError("Other error"),
     {'url': 'https://api.example.com/error'}),
]

for exception, context in exception_cases:
    print(f"\n处理异常: {type(exception).__name__}")

    result = TimeoutExceptionHandler.handle_timeout_exception(exception, context)

    print(f"  错误类型: {result['error_type']}")
    print(f"  描述: {result['description']}")
    print(f"  建议操作: {result['suggested_action']}")
    print(f"  推荐重试: {result['retry_recommended']}")
    print(f"  严重程度: {result['severity']}")

    # 测试重试逻辑
    retry_count = 1
    should_retry = TimeoutExceptionHandler.should_retry(result, retry_count)
    print(f"  第{retry_count}次重试: {'是' if should_retry else '否'}")

    if should_retry:
        backoff = TimeoutExceptionHandler.get_backoff_time(result, retry_count)
        print(f"  重试等待: {backoff:.1f}秒")

# 完整的异常处理流程
def robust_request(url, timeout=5, max_retries=3):
    """健壮的请求函数,包含完整的异常处理"""

    retry_count = 0
    last_exception = None

    while retry_count <= max_retries:
        try:
            print(f"\n尝试请求 (重试 {retry_count}/{max_retries}): {url}")

            response = requests.get(url, timeout=timeout)
            response.raise_for_status()

            print(f"✓ 请求成功")
            return {'success': True, 'data': response, 'retries': retry_count}

        except Exception as e:
            last_exception = e

            # 处理异常
            exception_info = TimeoutExceptionHandler.handle_timeout_exception(
                e, {'url': url, 'retry_count': retry_count}
            )

            # 判断是否重试
            if TimeoutExceptionHandler.should_retry(exception_info, retry_count, max_retries):
                retry_count += 1

                # 计算等待时间
                wait_time = TimeoutExceptionHandler.get_backoff_time(exception_info, retry_count)
                print(f"  等待 {wait_time:.1f} 秒后重试...")
                import time
                time.sleep(wait_time)

                continue

            else:
                print(f"✗ 达到最大重试次数或不需要重试")
                break

    # 所有重试都失败
    return {
        'success': False,
        'error': str(last_exception) if last_exception else 'Unknown error',
        'error_type': type(last_exception).__name__ if last_exception else 'Unknown',
        'retries': retry_count
    }

print("\n=== 完整的异常处理流程 ===")
result = robust_request('https://httpbin.org/delay/6', timeout=2, max_retries=2)
print(f"最终结果: {'成功' if result['success'] else '失败'}")
print(f"重试次数: {result['retries']}")
if not result['success']:
    print(f"错误: {result['error']}")

重试策略

当请求超时时,合理的重试策略可以提高请求的成功率。

基本重试实现
import requests
import time
from requests.exceptions import Timeout, ConnectionError

def retry_request_basic(url, max_retries=3, timeout=5):
    """基本的重试实现"""

    retry_count = 0

    while retry_count <= max_retries:
        try:
            print(f"尝试 {retry_count + 1}/{max_retries + 1}: {url}")

            response = requests.get(url, timeout=timeout)

            # 检查状态码,只对特定状态码重试
            if response.status_code >= 500:
                print(f"  服务器错误 ({response.status_code}),重试...")
                raise ConnectionError(f"Server error: {response.status_code}")

            print(f"✓ 请求成功")
            return response

        except (Timeout, ConnectionError) as e:
            retry_count += 1

            if retry_count <= max_retries:
                print(f"  请求失败: {type(e).__name__},等待后重试...")

                # 简单的固定等待
                wait_time = 1  # 固定等待1秒
                time.sleep(wait_time)
            else:
                print(f"✗ 达到最大重试次数")
                raise

    # 理论上不会执行到这里
    raise Exception("Unexpected error")

# 测试基本重试
print("=== 基本重试测试 ===")
try:
    # 测试超时重试
    response = retry_request_basic(
        'https://httpbin.org/delay/3',
        max_retries=2,
        timeout=2
    )
    print(f"最终成功: {response.status_code}")
except Exception as e:
    print(f"最终失败: {type(e).__name__}: {e}")

# 更智能的重试:指数退避
def retry_with_backoff(url, max_retries=3, timeout=5, backoff_factor=1):
    """带指数退避的重试"""

    retry_count = 0

    while retry_count <= max_retries:
        try:
            print(f"尝试 {retry_count + 1}/{max_retries + 1}")

            response = requests.get(url, timeout=timeout)
            print(f"✓ 请求成功")
            return response

        except (Timeout, ConnectionError) as e:
            retry_count += 1

            if retry_count <= max_retries:
                # 指数退避:等待时间 = backoff_factor * (2^(retry_count-1))
                wait_time = backoff_factor * (2 ** (retry_count - 1))
                print(f"  请求失败: {type(e).__name__},等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
            else:
                print(f"✗ 达到最大重试次数")
                raise

print("\n=== 指数退避重试测试 ===")
try:
    response = retry_with_backoff(
        'https://httpbin.org/delay/4',
        max_retries=3,
        timeout=2,
        backoff_factor=1
    )
    print(f"最终成功: {response.status_code}")
except Exception as e:
    print(f"最终失败: {type(e).__name__}: {e}")
使用urllib3重试
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# 创建重试策略
retry_strategy = Retry(
    total=3,  # 总重试次数
    backoff_factor=1,  # 退避因子
    status_forcelist=[429, 500, 502, 503, 504],  # 对这些状态码重试
    allowed_methods=["GET", "POST"],  # 只对GET和POST方法重试
)

# 创建适配器并设置重试策略
adapter = HTTPAdapter(max_retries=retry_strategy)

# 创建Session并挂载适配器
session = requests.Session()

# 为http和https都挂载适配器
session.mount("http://", adapter)
session.mount("https://", adapter)

print("=== 使用urllib3重试策略 ===")

# 测试重试
try:
    # 这个端点会返回500错误,触发重试
    response = session.get("https://httpbin.org/status/500")
    print(f"请求结果: {response.status_code}")
except Exception as e:
    print(f"请求失败: {e}")

# 更复杂的重试策略
def create_custom_retry_strategy():
    """创建自定义重试策略"""

    return Retry(
        total=5,  # 总重试次数
        connect=3,  # 连接错误重试次数
        read=2,  # 读取错误重试次数
        redirect=2,  # 重定向次数
        status=3,  # 状态码重试次数
        backoff_factor=0.5,  # 退避因子
        status_forcelist=[500, 502, 503, 504, 429],
        allowed_methods=frozenset(['GET', 'POST', 'PUT', 'DELETE']),
        raise_on_status=False,  # 不对状态码抛出异常
        respect_retry_after_header=True,  # 尊重Retry-After头
    )

# 自定义会话工厂
def create_retry_session(retries=3, backoff_factor=1):
    """创建带重试策略的Session"""
    session = requests.Session()

    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=[500, 502, 503, 504, 429],
        allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"]
    )

    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    return session

# 使用自定义Session
print("\n=== 使用自定义重试Session ===")
retry_session = create_retry_session(retries=2, backoff_factor=1)

try:
    # 测试一个会超时的请求
    response = retry_session.get(
        'https://httpbin.org/delay/5',
        timeout=2
    )
    print(f"请求成功: {response.status_code}")
except Exception as e:
    print(f"请求失败: {type(e).__name__}")

retry_session.close()

高级重试策略实现

import requests
import time
import random
from typing import Callable, Optional, Dict, Any
from requests.exceptions import RequestException

class SmartRetryStrategy:
    """智能重试策略"""

    def __init__(self,
                 max_retries: int = 3,
                 base_delay: float = 1.0,
                 max_delay: float = 30.0,
                 jitter: bool = True,
                 retry_on_status: list = None,
                 retry_on_exception: list = None):
        """
        初始化重试策略

        Args:
            max_retries: 最大重试次数
            base_delay: 基础延迟时间(秒)
            max_delay: 最大延迟时间(秒)
            jitter: 是否添加随机抖动
            retry_on_status: 需要重试的状态码列表
            retry_on_exception: 需要重试的异常类型列表
        """
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter

        self.retry_on_status = retry_on_status or [429, 500, 502, 503, 504]
        self.retry_on_exception = retry_on_exception or [
            requests.exceptions.ConnectTimeout,
            requests.exceptions.ReadTimeout,
            requests.exceptions.ConnectionError
        ]

        self.stats = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'retry_attempts': 0
        }

    def should_retry(self,
                    exception: Optional[Exception] = None,
                    response: Optional[requests.Response] = None) -> bool:
        """判断是否应该重试"""

        # 检查异常
        if exception:
            for exc_type in self.retry_on_exception:
                if isinstance(exception, exc_type):
                    return True

        # 检查响应状态码
        if response and response.status_code in self.retry_on_status:
            return True

        return False

    def calculate_delay(self, retry_count: int) -> float:
        """计算延迟时间(指数退避 + 随机抖动)"""

        # 指数退避
        delay = min(self.base_delay * (2 ** retry_count), self.max_delay)

        # 添加随机抖动
        if self.jitter:
            jitter_amount = random.uniform(0, delay * 0.1)  # 最多10%的抖动
            delay += jitter_amount

        return delay

    def execute_with_retry(self,
                          request_func: Callable,
                          *args,
                          **kwargs) -> Dict[str, Any]:
        """执行请求并应用重试策略"""

        retry_count = 0
        last_exception = None
        last_response = None

        while retry_count <= self.max_retries:
            self.stats['total_requests'] += 1

            try:
                print(f"尝试 {retry_count + 1}/{self.max_retries + 1}")

                response = request_func(*args, **kwargs)
                last_response = response

                # 检查响应状态码
                if response.status_code >= 200 and response.status_code < 300:
                    self.stats['successful_requests'] += 1
                    print(f"✓ 请求成功: {response.status_code}")
                    return {
                        'success': True,
                        'response': response,
                        'retries': retry_count,
                        'exception': None
                    }

                # 检查是否需要重试(基于状态码)
                if self.should_retry(response=response):
                    print(f"⚠ 需要重试的状态码: {response.status_code}")
                    raise requests.exceptions.HTTPError(f"Status code: {response.status_code}")

                else:
                    # 不需要重试的其他错误状态码
                    self.stats['failed_requests'] += 1
                    return {
                        'success': False,
                        'response': response,
                        'retries': retry_count,
                        'exception': None,
                        'error': f"HTTP {response.status_code}"
                    }

            except Exception as e:
                last_exception = e
                self.stats['failed_requests'] += 1

                # 检查是否需要重试(基于异常)
                if self.should_retry(exception=e) and retry_count < self.max_retries:
                    retry_count += 1
                    self.stats['retry_attempts'] += 1

                    # 计算等待时间
                    delay = self.calculate_delay(retry_count)
                    print(f"  请求失败: {type(e).__name__},等待 {delay:.2f} 秒后重试...")

                    time.sleep(delay)
                    continue

                else:
                    # 不需要重试或达到最大重试次数
                    print(f"✗ 请求失败: {type(e).__name__}: {e}")
                    return {
                        'success': False,
                        'response': last_response,
                        'retries': retry_count,
                        'exception': e,
                        'error': str(e)
                    }

        # 达到最大重试次数
        return {
            'success': False,
            'response': last_response,
            'retries': retry_count,
            'exception': last_exception,
            'error': f"Max retries ({self.max_retries}) exceeded"
        }

    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        success_rate = (self.stats['successful_requests'] / self.stats['total_requests'] * 100
                       if self.stats['total_requests'] > 0 else 0)

        return {
            **self.stats,
            'success_rate': f"{success_rate:.1f}%"
        }

# 使用智能重试策略
print("=== 智能重试策略测试 ===")

# 创建重试策略
retry_strategy = SmartRetryStrategy(
    max_retries=3,
    base_delay=1.0,
    max_delay=10.0,
    jitter=True,
    retry_on_status=[429, 500, 502, 503, 504],
    retry_on_exception=[
        requests.exceptions.ConnectTimeout,
        requests.exceptions.ReadTimeout,
        requests.exceptions.ConnectionError
    ]
)

# 包装requests.get函数
def make_request(url, timeout=5):
    """包装请求函数"""
    return requests.get(url, timeout=timeout)

# 测试不同场景
test_cases = [
    ('正常请求', 'https://httpbin.org/status/200', 5),
    ('服务器错误', 'https://httpbin.org/status/500', 5),
    ('超时请求', 'https://httpbin.org/delay/6', 2),
    ('连接错误', 'https://192.0.2.1', 2),
]

for name, url, timeout in test_cases:
    print(f"\n测试: {name}")
    print(f"  URL: {url}")
    print(f"  超时: {timeout}秒")

    result = retry_strategy.execute_with_retry(
        make_request, url, timeout=timeout
    )

    print(f"  结果: {'成功' if result['success'] else '失败'}")
    print(f"  重试次数: {result['retries']}")

    if result['exception']:
        print(f"  异常: {type(result['exception']).__name__}")

# 查看统计信息
print(f"\n=== 重试统计 ===")
stats = retry_strategy.get_statistics()
for key, value in stats.items():
    print(f"  {key}: {value}")

# 基于响应内容的动态重试
class ContentAwareRetryStrategy(SmartRetryStrategy):
    """基于响应内容的智能重试策略"""

    def should_retry(self,
                    exception: Optional[Exception] = None,
                    response: Optional[requests.Response] = None) -> bool:
        """扩展父类方法,添加基于内容的判断"""

        # 先调用父类方法
        if super().should_retry(exception, response):
            return True

        # 检查响应内容
        if response:
            try:
                content = response.json()

                # 如果响应中有特定错误码,可能需要重试
                if isinstance(content, dict):
                    error_code = content.get('error_code')

                    # 假设某些错误码需要重试
                    retry_error_codes = ['rate_limited', 'temporary_error', 'busy']
                    if error_code in retry_error_codes:
                        return True

                    # 检查错误消息
                    error_message = content.get('error', '').lower()
                    if any(phrase in error_message for phrase in ['try again', 'retry', 'busy', 'overload']):
                        return True

            except (ValueError, AttributeError):
                # 不是JSON或无法解析
                pass

        return False

print(f"\n=== 基于内容的智能重试 ===")
content_retry = ContentAwareRetryStrategy(max_retries=2)

# 模拟一个返回JSON错误的请求
def mock_request_with_json_error():
    """模拟返回JSON错误的请求"""
    from unittest.mock import Mock
    response = Mock(spec=requests.Response)
    response.status_code = 200
    response.json.return_value = {
        'error_code': 'rate_limited',
        'error': 'Rate limit exceeded, please try again later'
    }
    return response

# 测试
result = content_retry.execute_with_retry(mock_request_with_json_error)
print(f"基于内容的判断结果: {'重试' if not result['success'] else '不重试'}")

高级超时配置

1. 动态超时调整

import requests
import time
from typing import Dict, Any, Optional
from statistics import mean, median

class AdaptiveTimeoutManager:
    """自适应超时管理器"""

    def __init__(self,
                 initial_timeout: float = 5.0,
                 min_timeout: float = 1.0,
                 max_timeout: float = 30.0,
                 history_size: int = 100):
        """
        初始化自适应超时管理器

        Args:
            initial_timeout: 初始超时时间
            min_timeout: 最小超时时间
            max_timeout: 最大超时时间
            history_size: 历史记录大小
        """
        self.initial_timeout = initial_timeout
        self.min_timeout = min_timeout
        self.max_timeout = max_timeout

        self.history = []  # 存储(响应时间, 成功与否)的列表
        self.history_size = history_size

        self.success_count = 0
        self.failure_count = 0

    def record_request(self, response_time: float, success: bool):
        """记录请求结果"""
        self.history.append((response_time, success))

        # 保持历史记录大小
        if len(self.history) > self.history_size:
            self.history.pop(0)

        if success:
            self.success_count += 1
        else:
            self.failure_count += 1

    def calculate_timeout(self, percentile: float = 95.0) -> float:
        """计算合适的超时时间"""

        if not self.history:
            return self.initial_timeout

        # 只考虑成功的请求
        successful_times = [time for time, success in self.history if success]

        if not successful_times:
            return self.initial_timeout

        # 按响应时间排序
        sorted_times = sorted(successful_times)

        # 计算百分位数
        index = int(len(sorted_times) * percentile / 100.0)
        index = min(index, len(sorted_times) - 1)

        percentile_time = sorted_times[index]

        # 添加安全边际(20%)
        recommended_timeout = percentile_time * 1.2

        # 限制在最小和最大超时之间
        return max(self.min_timeout, min(recommended_timeout, self.max_timeout))

    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""

        if not self.history:
            return {
                'total_requests': 0,
                'success_rate': 0,
                'current_timeout': self.initial_timeout
            }

        successful_times = [time for time, success in self.history if success]

        stats = {
            'total_requests': len(self.history),
            'successful_requests': len(successful_times),
            'failed_requests': len(self.history) - len(successful_times),
            'success_rate': len(successful_times) / len(self.history) * 100,
            'current_timeout': self.calculate_timeout(),
        }

        if successful_times:
            stats.update({
                'avg_response_time': mean(successful_times),
                'median_response_time': median(successful_times),
                'min_response_time': min(successful_times),
                'max_response_time': max(successful_times),
                'p95_response_time': sorted(successful_times)[int(len(successful_times) * 0.95)],
            })

        return stats

    def adaptive_request(self,
                        url: str,
                        method: str = 'GET',
                        **kwargs) -> Optional[requests.Response]:
        """执行自适应超时请求"""

        # 计算当前超时时间
        current_timeout = self.calculate_timeout()

        # 确保使用计算出的超时时间
        if 'timeout' not in kwargs:
            kwargs['timeout'] = current_timeout

        print(f"自适应超时: {current_timeout:.2f}秒")

        start_time = time.time()

        try:
            response = requests.request(method, url, **kwargs)
            response_time = time.time() - start_time

            # 记录成功请求
            self.record_request(response_time, True)

            print(f"✓ 请求成功: {response_time:.2f}秒")
            return response

        except requests.exceptions.Timeout:
            response_time = time.time() - start_time

            # 记录失败请求
            self.record_request(response_time, False)

            print(f"✗ 请求超时: {response_time:.2f}秒")

            # 根据失败情况调整超时
            failure_rate = self.failure_count / (self.success_count + self.failure_count)

            if failure_rate > 0.3:  # 失败率超过30%
                # 增加超时时间
                new_timeout = min(current_timeout * 1.5, self.max_timeout)
                print(f"  失败率高,增加超时到: {new_timeout:.2f}秒")

            raise

        except Exception as e:
            response_time = time.time() - start_time
            self.record_request(response_time, False)

            print(f"✗ 请求异常: {type(e).__name__}")
            raise

# 使用自适应超时管理器
print("=== 自适应超时管理器测试 ===")

timeout_manager = AdaptiveTimeoutManager(
    initial_timeout=2.0,
    min_timeout=1.0,
    max_timeout=10.0,
    history_size=10
)

# 模拟一系列请求
test_urls = [
    ('快速响应', 'https://httpbin.org/delay/0.5'),
    ('中等响应', 'https://httpbin.org/delay/2'),
    ('慢响应', 'https://httpbin.org/delay/4'),
    ('超慢响应', 'https://httpbin.org/delay/8'),
]

print("初始统计:")
stats = timeout_manager.get_statistics()
for key, value in stats.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.2f}")
    else:
        print(f"  {key}: {value}")

# 执行一系列请求
print("\n执行请求序列:")
for i, (name, url) in enumerate(test_urls, 1):
    print(f"\n请求 {i}: {name}")

    try:
        response = timeout_manager.adaptive_request(url)
        print(f"  状态码: {response.status_code}")
    except requests.exceptions.Timeout:
        print(f"  超时")
    except Exception as e:
        print(f"  错误: {type(e).__name__}")

# 查看最终统计
print(f"\n最终统计:")
final_stats = timeout_manager.get_statistics()
for key, value in final_stats.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.2f}")
    else:
        print(f"  {key}: {value}")

# 基于服务质量的自适应超时
class QoSTimeoutManager(AdaptiveTimeoutManager):
    """基于服务质量的自适应超时管理器"""

    def calculate_timeout(self,
                         priority: str = 'normal',
                         service_level: str = 'standard') -> float:
        """根据优先级和服务等级计算超时时间"""

        base_timeout = super().calculate_timeout()

        # 根据优先级调整
        priority_factors = {
            'high': 0.7,    # 高优先级,更短的超时
            'normal': 1.0,  # 正常优先级
            'low': 1.5,     # 低优先级,更长的超时
        }

        # 根据服务等级调整
        service_factors = {
            'premium': 0.8,     # 高级服务,更短的超时
            'standard': 1.0,    # 标准服务
            'economy': 1.3,     # 经济服务,更长的超时
        }

        factor = priority_factors.get(priority, 1.0) * service_factors.get(service_level, 1.0)

        adjusted_timeout = base_timeout * factor

        return max(self.min_timeout, min(adjusted_timeout, self.max_timeout))

print(f"\n=== 基于服务质量的自适应超时 ===")

qos_manager = QoSTimeoutManager(
    initial_timeout=5.0,
    min_timeout=1.0,
    max_timeout=20.0
)

# 测试不同优先级和服务等级
test_scenarios = [
    ('高优先级,高级服务', 'high', 'premium'),
    ('正常优先级,标准服务', 'normal', 'standard'),
    ('低优先级,经济服务', 'low', 'economy'),
]

for name, priority, service_level in test_scenarios:
    timeout = qos_manager.calculate_timeout(priority, service_level)
    print(f"{name}: {timeout:.2f}秒")

2. 全局超时配置

import requests
import threading
from typing import Dict, Any, Optional
from contextlib import contextmanager

class GlobalTimeoutConfig:
    """全局超时配置管理器"""

    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        """单例模式"""
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialize()
        return cls._instance

    def _initialize(self):
        """初始化配置"""
        self.default_timeout = 10.0  # 默认超时10秒
        self.endpoint_timeouts = {}  # 端点特定的超时配置
        self.domain_timeouts = {}    # 域名级别的超时配置
        self.environment_timeouts = {}  # 环境特定的超时配置
        self.current_environment = 'production'

        # 加载配置
        self._load_default_config()

    def _load_default_config(self):
        """加载默认配置"""
        # 端点特定配置
        self.endpoint_timeouts = {
            'api.example.com/login': 5.0,
            'api.example.com/upload': 30.0,
            'api.example.com/report': 60.0,
        }

        # 域名级别配置
        self.domain_timeouts = {
            'api.example.com': 10.0,
            'cdn.example.com': 5.0,
            'storage.example.com': 30.0,
        }

        # 环境配置
        self.environment_timeouts = {
            'development': {
                'default': 30.0,
                'api.example.com': 15.0,
            },
            'testing': {
                'default': 20.0,
                'api.example.com': 10.0,
            },
            'production': {
                'default': 10.0,
                'api.example.com': 5.0,
            }
        }

    def get_timeout_for_url(self, url: str) -> float:
        """获取URL对应的超时时间"""

        from urllib.parse import urlparse

        parsed = urlparse(url)
        hostname = parsed.hostname
        path = parsed.path

        if not hostname:
            return self.get_default_timeout()

        # 1. 检查端点特定配置
        endpoint_key = f"{hostname}{path}"
        if endpoint_key in self.endpoint_timeouts:
            return self.endpoint_timeouts[endpoint_key]

        # 2. 检查域名级别配置
        if hostname in self.domain_timeouts:
            return self.domain_timeouts[hostname]

        # 3. 检查环境配置
        env_config = self.environment_timeouts.get(self.current_environment, {})
        if hostname in env_config:
            return env_config[hostname]

        # 4. 使用环境默认配置
        if 'default' in env_config:
            return env_config['default']

        # 5. 使用全局默认配置
        return self.default_timeout

    def get_default_timeout(self) -> float:
        """获取默认超时时间"""
        env_config = self.environment_timeouts.get(self.current_environment, {})
        return env_config.get('default', self.default_timeout)

    def set_environment(self, environment: str):
        """设置当前环境"""
        self.current_environment = environment

    def configure_endpoint(self, endpoint: str, timeout: float):
        """配置端点超时"""
        self.endpoint_timeouts[endpoint] = timeout

    def configure_domain(self, domain: str, timeout: float):
        """配置域名超时"""
        self.domain_timeouts[domain] = timeout

    def get_config_summary(self) -> Dict[str, Any]:
        """获取配置摘要"""
        return {
            'current_environment': self.current_environment,
            'default_timeout': self.get_default_timeout(),
            'endpoint_configs': len(self.endpoint_timeouts),
            'domain_configs': len(self.domain_timeouts),
            'environment_configs': {
                env: len(configs) for env, configs in self.environment_timeouts.items()
            }
        }

# 全局超时装饰器
def with_global_timeout(func):
    """使用全局超时配置的装饰器"""

    def wrapper(*args, **kwargs):
        # 获取URL参数
        url = None
        for arg in args:
            if isinstance(arg, str) and arg.startswith(('http://', 'https://')):
                url = arg
                break

        if 'url' in kwargs:
            url = kwargs['url']

        # 如果没有timeout参数,使用全局配置
        if 'timeout' not in kwargs and url:
            config = GlobalTimeoutConfig()
            timeout = config.get_timeout_for_url(url)
            kwargs['timeout'] = timeout

            print(f"使用全局超时配置: {timeout}秒 (URL: {url})")

        return func(*args, **kwargs)

    return wrapper

# 应用全局超时的请求函数
@with_global_timeout
def global_timeout_request(url, **kwargs):
    """使用全局超时配置的请求函数"""
    return requests.get(url, **kwargs)

# 使用示例
print("=== 全局超时配置测试 ===")

# 获取配置实例
config = GlobalTimeoutConfig()

# 查看当前配置
summary = config.get_config_summary()
print("当前配置摘要:")
for key, value in summary.items():
    print(f"  {key}: {value}")

# 测试不同URL的超时
test_urls = [
    'https://api.example.com/login',
    'https://api.example.com/upload',
    'https://cdn.example.com/image.jpg',
    'https://unknown.com/data',
]

print("\n测试URL超时配置:")
for url in test_urls:
    timeout = config.get_timeout_for_url(url)
    print(f"  {url}: {timeout}秒")

# 测试不同环境
print("\n测试不同环境:")
environments = ['development', 'testing', 'production']

for env in environments:
    config.set_environment(env)
    timeout = config.get_timeout_for_url('https://api.example.com/data')
    print(f"  {env}: {timeout}秒")

# 重置为生产环境
config.set_environment('production')

# 使用装饰器
print("\n使用全局超时装饰器:")
try:
    # 注意:这里使用模拟URL,实际不会真正请求
    # response = global_timeout_request('https://api.example.com/login')
    # print(f"请求成功")
    print("装饰器测试完成")
except Exception as e:
    print(f"错误: {e}")

# 线程安全的全局配置管理器
class ThreadSafeTimeoutConfig(GlobalTimeoutConfig):
    """线程安全的全局超时配置管理器"""

    def __init__(self):
        self._lock = threading.RLock()
        super()._initialize()

    def get_timeout_for_url(self, url: str) -> float:
        """线程安全的获取超时时间"""
        with self._lock:
            return super().get_timeout_for_url(url)

    def configure_endpoint(self, endpoint: str, timeout: float):
        """线程安全的配置端点超时"""
        with self._lock:
            super().configure_endpoint(endpoint, timeout)

    def configure_domain(self, domain: str, timeout: float):
        """线程安全的配置域名超时"""
        with self._lock:
            super().configure_domain(domain, timeout)

    def set_environment(self, environment: str):
        """线程安全的设置环境"""
        with self._lock:
            super().set_environment(environment)

# 上下文管理器:临时修改超时配置
@contextmanager
def temporary_timeout_config(environment: Optional[str] = None,
                           endpoint_configs: Optional[Dict] = None,
                           domain_configs: Optional[Dict] = None):
    """临时修改超时配置的上下文管理器"""

    config = GlobalTimeoutConfig()

    # 保存当前配置
    original_environment = config.current_environment
    original_endpoints = config.endpoint_timeouts.copy()
    original_domains = config.domain_timeouts.copy()

    try:
        # 应用临时配置
        if environment:
            config.set_environment(environment)

        if endpoint_configs:
            for endpoint, timeout in endpoint_configs.items():
                config.configure_endpoint(endpoint, timeout)

        if domain_configs:
            for domain, timeout in domain_configs.items():
                config.configure_domain(domain, timeout)

        yield config

    finally:
        # 恢复原始配置
        config.current_environment = original_environment
        config.endpoint_timeouts = original_endpoints
        config.domain_timeouts = original_domains

print(f"\n=== 使用上下文管理器临时修改配置 ===")

# 保存原始配置
original_summary = config.get_config_summary()
print(f"原始配置: {original_summary['default_timeout']}秒")

# 临时修改配置
with temporary_timeout_config(
    environment='development',
    endpoint_configs={'api.example.com/test': 60.0}
) as temp_config:

    temp_summary = temp_config.get_config_summary()
    print(f"临时配置: {temp_summary['default_timeout']}秒")
    print(f"测试端点超时: {temp_config.get_timeout_for_url('https://api.example.com/test')}秒")

# 验证配置已恢复
final_summary = config.get_config_summary()
print(f"恢复后配置: {final_summary['default_timeout']}秒")

最佳实践

超时设置最佳实践:
  • 永远设置超时 - 生产环境中必须设置超时,永远不要使用timeout=None
  • 区分连接超时和读取超时 - 使用元组格式分别设置,便于问题诊断
  • 合理的超时值 - 根据服务特点和网络环境设置合理的超时时间
  • 监控和调整 - 监控超时情况,根据实际情况动态调整超时设置
  • 实现重试机制 - 对于临时性错误,实现合理的重试策略
  • 优雅降级 - 超时后提供备用方案或友好的错误提示
  • 记录超时日志 - 记录超时发生的频率和模式,用于问题分析
  • 测试超时行为 - 在测试环境中验证超时设置的有效性

超时设置的建议值

tr>
场景 连接超时 读取超时 说明
内部API调用 1-3秒 5-10秒 内部网络环境相对稳定
外部API调用 3-5秒 10-30秒 考虑网络波动和第三方服务性能
用户认证 2-3秒 5-8秒 需要快速响应,避免用户等待
文件上传 5-10秒 30-60秒 大文件需要较长时间
文件下载 3-5秒 30-300秒 根据文件大小动态调整
实时通信 1-2秒 2-5秒 需要低延迟
批量处理 5-10秒 60-300秒 处理时间可能较长

生产环境超时配置示例

import requests
import os
from typing import Dict, Any
from dataclasses import dataclass, field
from enum import Enum

class Environment(Enum):
    """环境枚举"""
    DEVELOPMENT = "development"
    TESTING = "testing"
    STAGING = "staging"
    PRODUCTION = "production"

class ServiceType(Enum):
    """服务类型枚举"""
    INTERNAL_API = "internal_api"
    EXTERNAL_API = "external_api"
    DATABASE = "database"
    CACHE = "cache"
    STORAGE = "storage"
    QUEUE = "queue"

@dataclass
class TimeoutProfile:
    """超时配置模板"""
    connect_timeout: float
    read_timeout: float
    max_retries: int = 3
    backoff_factor: float = 1.0

    def as_tuple(self):
        """转换为元组格式"""
        return (self.connect_timeout, self.read_timeout)

class ProductionTimeoutConfig:
    """生产环境超时配置"""

    def __init__(self):
        # 基础配置
        self.environment = self._get_environment()

        # 服务类型配置模板
        self.service_profiles = {
            ServiceType.INTERNAL_API: TimeoutProfile(
                connect_timeout=2.0,
                read_timeout=10.0,
                max_retries=2
            ),
            ServiceType.EXTERNAL_API: TimeoutProfile(
                connect_timeout=5.0,
                read_timeout=30.0,
                max_retries=3
            ),
            ServiceType.DATABASE: TimeoutProfile(
                connect_timeout=3.0,
                read_timeout=10.0,
                max_retries=1  # 数据库连接失败通常不需要重试
            ),
            ServiceType.CACHE: TimeoutProfile(
                connect_timeout=1.0,
                read_timeout=3.0,
                max_retries=2
            ),
        }

        # 特定服务配置(覆盖模板)
        self.specific_service_configs = {
            'auth-service.internal.com': TimeoutProfile(
                connect_timeout=1.0,
                read_timeout=5.0,
                max_retries=2
            ),
            'upload-service.internal.com': TimeoutProfile(
                connect_timeout=5.0,
                read_timeout=60.0,
                max_retries=2
            ),
            'payment-gateway.com': TimeoutProfile(
                connect_timeout=3.0,
                read_timeout=15.0,
                max_retries=3
            ),
        }

        # 环境特定调整因子
        self.environment_factors = {
            Environment.DEVELOPMENT: 2.0,   # 开发环境更宽松
            Environment.TESTING: 1.5,       # 测试环境稍宽松
            Environment.STAGING: 1.2,       # 预发环境接近生产
            Environment.PRODUCTION: 1.0,    # 生产环境
        }

    def _get_environment(self) -> Environment:
        """从环境变量获取当前环境"""
        env_str = os.getenv('ENVIRONMENT', 'development').upper()
        try:
            return Environment[env_str]
        except KeyError:
            return Environment.DEVELOPMENT

    def get_timeout_for_service(self,
                               service_url: str,
                               service_type: Optional[ServiceType] = None) -> Dict[str, Any]:
        """获取服务的超时配置"""

        from urllib.parse import urlparse

        parsed = urlparse(service_url)
        hostname = parsed.hostname

        # 1. 检查特定服务配置
        if hostname and hostname in self.specific_service_configs:
            profile = self.specific_service_configs[hostname]
        elif service_type and service_type in self.service_profiles:
            # 2. 使用服务类型模板
            profile = self.service_profiles[service_type]
        else:
            # 3. 使用默认外部API配置
            profile = self.service_profiles[ServiceType.EXTERNAL_API]

        # 应用环境因子
        env_factor = self.environment_factors.get(self.environment, 1.0)

        adjusted_profile = TimeoutProfile(
            connect_timeout=profile.connect_timeout * env_factor,
            read_timeout=profile.read_timeout * env_factor,
            max_retries=profile.max_retries,
            backoff_factor=profile.backoff_factor
        )

        return {
            'timeout': adjusted_profile.as_tuple(),
            'max_retries': adjusted_profile.max_retries,
            'backoff_factor': adjusted_profile.backoff_factor,
            'service_host': hostname,
            'service_type': service_type.name if service_type else 'unknown',
            'environment': self.environment.value,
            'environment_factor': env_factor
        }

# 生产环境请求工厂
class ProductionRequestFactory:
    """生产环境请求工厂"""

    def __init__(self):
        self.config = ProductionTimeoutConfig()
        self.session_cache = {}

    def create_session(self, service_type: ServiceType) -> requests.Session:
        """创建适合服务类型的Session"""

        if service_type in self.session_cache:
            return self.session_cache[service_type]

        # 获取配置
        # 这里使用一个示例URL来获取配置模板
        example_url = f"https://example.{service_type.value}"
        config = self.config.get_timeout_for_service(example_url, service_type)

        # 创建Session
        session = requests.Session()

        # 配置重试策略
        from requests.adapters import HTTPAdapter
        from urllib3.util.retry import Retry

        retry_strategy = Retry(
            total=config['max_retries'],
            backoff_factor=config['backoff_factor'],
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST", "PUT", "DELETE"]
        )

        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)
        session.mount("http://", adapter)

        # 缓存Session
        self.session_cache[service_type] = session

        return session

    def make_request(self,
                    url: str,
                    service_type: Optional[ServiceType] = None,
                    method: str = 'GET',
                    **kwargs) -> Dict[str, Any]:
        """执行生产环境请求"""

        # 获取配置
        config = self.config.get_timeout_for_service(url, service_type)

        # 创建或获取Session
        session = self.create_session(service_type or ServiceType.EXTERNAL_API)

        # 准备请求参数
        request_kwargs = kwargs.copy()

        # 设置超时(如果未指定)
        if 'timeout' not in request_kwargs:
            request_kwargs['timeout'] = config['timeout']

        print(f"执行请求:")
        print(f"  URL: {url}")
        print(f"  服务类型: {config['service_type']}")
        print(f"  环境: {config['environment']}")
        print(f"  超时: 连接{config['timeout'][0]:.1f}秒, 读取{config['timeout'][1]:.1f}秒")
        print(f"  最大重试: {config['max_retries']}")

        try:
            response = session.request(method, url, **request_kwargs)
            response.raise_for_status()

            return {
                'success': True,
                'response': response,
                'config': config
            }

        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'error_type': type(e).__name__,
                'config': config
            }

# 使用示例
print("=== 生产环境超时配置示例 ===")

# 创建工厂
factory = ProductionRequestFactory()

# 测试不同服务类型
test_services = [
    ('内部认证服务', 'https://auth-service.internal.com/login', ServiceType.INTERNAL_API),
    ('外部支付网关', 'https://payment-gateway.com/charge', ServiceType.EXTERNAL_API),
    ('数据库查询', 'https://database.internal.com/query', ServiceType.DATABASE),
]

print("\n测试不同服务类型的超时配置:")
for name, url, service_type in test_services:
    print(f"\n{name}:")

    # 获取配置(不实际发送请求)
    config = factory.config.get_timeout_for_service(url, service_type)

    print(f"  连接超时: {config['timeout'][0]:.1f}秒")
    print(f"  读取超时: {config['timeout'][1]:.1f}秒")
    print(f"  最大重试: {config['max_retries']}")
    print(f"  环境因子: {config['environment_factor']}")

# 测试不同环境
print(f"\n=== 测试不同环境的超时配置 ===")

environments = [Environment.DEVELOPMENT, Environment.TESTING, Environment.PRODUCTION]

for env in environments:
    # 临时修改环境
    original_env = factory.config.environment
    factory.config.environment = env

    config = factory.config.get_timeout_for_service(
        'https://api.example.com/data',
        ServiceType.EXTERNAL_API
    )

    print(f"\n{env.value}环境:")
    print(f"  连接超时: {config['timeout'][0]:.1f}秒")
    print(f"  读取超时: {config['timeout'][1]:.1f}秒")

    # 恢复环境
    factory.config.environment = original_env

# 清理Session
for session in factory.session_cache.values():
    session.close()

print(f"\n✓ 生产环境超时配置示例完成")

总结

本章详细介绍了Requests库中超时设置的各个方面:

  1. 超时设置概述 - 超时的重要性、类型和工作流程
  2. 基本超时设置 - 使用timeout参数的各种格式
  3. 超时类型详解 - 连接超时和读取超时的区别、触发时机和处理
  4. 超时异常处理 - 捕获和处理各种超时异常,实现健壮的错误处理
  5. 重试策略 - 实现合理的重试机制,包括指数退避和智能重试
  6. 高级超时配置 - 自适应超时、全局配置、生产环境配置
  7. 最佳实践 - 超时设置的建议值和生产环境配置示例

关键要点:

  • 永远设置超时,区分连接超时和读取超时
  • 根据服务类型和网络环境设置合理的超时值
  • 实现完善的异常处理和重试机制
  • 监控超时情况并动态调整超时设置
  • 在生产环境中使用配置化的超时管理