Session对象不仅可以保持cookies,还可以配置默认参数、连接池、适配器等。
import requests
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class AdvancedSession:
"""高级Session管理器"""
def __init__(self):
self.session = requests.Session()
self._setup_session()
def _setup_session(self):
"""配置Session的默认参数"""
# 1. 设置默认请求头
self.session.headers.update({
'User-Agent': 'MyAdvancedClient/2.0',
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
})
# 2. 设置默认超时
self.session.request = lambda method, url, **kwargs: (
super(type(self.session), self.session).request(
method, url, timeout=(3.05, 30), **kwargs
)
)
# 3. 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE"]
)
# 4. 创建适配器
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # 连接池大小
pool_maxsize=10, # 最大连接数
pool_block=False # 是否阻塞等待连接
)
# 5. 挂载适配器到所有URL
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 6. 配置Cookie策略
self.session.cookies.set_policy(
requests.cookies.DefaultCookiePolicy(
blocked_domains=['ads.example.com'],
allowed_domains=['api.example.com']
)
)
def request_with_metrics(self, method, url, **kwargs):
"""带性能监控的请求"""
import time
start_time = time.time()
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
elapsed = time.time() - start_time
# 记录性能指标
metrics = {
'url': url,
'method': method,
'status_code': response.status_code,
'response_time': elapsed,
'content_length': len(response.content),
'from_cache': getattr(response, 'from_cache', False)
}
print(f"请求指标: {json.dumps(metrics, indent=2)}")
return response
except requests.exceptions.RequestException as e:
elapsed = time.time() - start_time
print(f"请求失败: {e} (耗时: {elapsed:.2f}s)")
raise
def close(self):
"""关闭Session并清理资源"""
self.session.close()
print("Session已关闭")
# 使用示例
if __name__ == "__main__":
# 创建高级Session
advanced_session = AdvancedSession()
try:
# 发送请求
response = advanced_session.request_with_metrics(
'GET',
'https://httpbin.org/json'
)
print(f"响应状态码: {response.status_code}")
print(f"响应头: {dict(response.headers)}")
finally:
# 确保关闭Session
advanced_session.close()
import requests
from requests.adapters import HTTPAdapter
import time
import threading
class ConnectionPoolManager:
"""连接池管理器"""
def __init__(self, pool_connections=10, pool_maxsize=10, max_retries=3):
self.session = requests.Session()
# 创建适配器并配置连接池
adapter = HTTPAdapter(
pool_connections=pool_connections, # 每个主机的连接数
pool_maxsize=pool_maxsize, # 连接池最大连接数
pool_block=False, # 连接池满时是否阻塞
max_retries=max_retries # 最大重试次数
)
# 挂载适配器
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 存储连接池统计信息
self.stats = {
'total_requests': 0,
'pool_reuses': 0,
'new_connections': 0
}
def get_connection_pool_stats(self):
"""获取连接池统计信息"""
pool = self.session.get_adapter('https://').poolmanager
stats = {
'num_connections': len(pool.pools),
'connection_info': {}
}
for host, pool_obj in pool.pools.items():
stats['connection_info'][host] = {
'num_connections': pool_obj.num_connections,
'num_requests': pool_obj.num_requests,
'queue_size': pool_obj.queue.qsize() if hasattr(pool_obj.queue, 'qsize') else 0
}
return stats
def make_concurrent_requests(self, url, num_requests=10):
"""并发请求测试"""
from concurrent.futures import ThreadPoolExecutor, as_completed
def make_request(i):
start_time = time.time()
try:
response = self.session.get(f"{url}?request={i}", timeout=5)
elapsed = time.time() - start_time
return {
'request_id': i,
'status': response.status_code,
'time': elapsed,
'success': True
}
except Exception as e:
elapsed = time.time() - start_time
return {
'request_id': i,
'status': 'error',
'error': str(e),
'time': elapsed,
'success': False
}
# 使用线程池并发请求
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(make_request, i) for i in range(num_requests)]
results = []
for future in as_completed(futures):
results.append(future.result())
return results
# 使用示例
if __name__ == "__main__":
# 创建连接池管理器
pool_manager = ConnectionPoolManager(
pool_connections=5,
pool_maxsize=20,
max_retries=2
)
try:
# 测试URL
test_url = "https://httpbin.org/get"
# 进行并发请求测试
print("开始并发请求测试...")
results = pool_manager.make_concurrent_requests(test_url, num_requests=15)
# 分析结果
successful = sum(1 for r in results if r['success'])
avg_time = sum(r['time'] for r in results) / len(results)
print(f"\n测试结果:")
print(f"总请求数: {len(results)}")
print(f"成功请求: {successful}")
print(f"平均响应时间: {avg_time:.3f}秒")
# 获取连接池统计
pool_stats = pool_manager.get_connection_pool_stats()
print(f"\n连接池统计:")
print(f"连接池数量: {pool_stats['num_connections']}")
finally:
pool_manager.session.close()
请求发送前执行,可以修改请求参数
def add_auth_header(request):
request.headers['Authorization'] = 'Bearer token'
return request
收到响应后执行,可以修改响应内容
def log_response(response, *args, **kwargs):
print(f"响应状态: {response.status_code}")
return response
请求异常时执行,可以处理错误或重试
def retry_on_timeout(request, exception):
if isinstance(exception, Timeout):
# 实现重试逻辑
pass
import requests
import time
import hashlib
import json
from functools import wraps
class HookManager:
"""钩子管理器"""
def __init__(self):
self.hooks = {
'request': [],
'response': [],
'exception': []
}
self.metrics = []
def register_hook(self, event, func):
"""注册钩子函数"""
self.hooks[event].append(func)
return func
def hook_decorator(self, event):
"""创建钩子装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 调用原始函数
result = func(*args, **kwargs)
return result
# 注册钩子
self.register_hook(event, wrapper)
return wrapper
return decorator
def execute_hooks(self, event, *args, **kwargs):
"""执行指定事件的钩子"""
result = args[0] if args else None
for hook in self.hooks.get(event, []):
try:
result = hook(*args, **kwargs) if result is not None else hook(*args, **kwargs)
except Exception as e:
print(f"钩子执行错误: {e}")
continue
return result
# 创建钩子管理器实例
hook_manager = HookManager()
# 使用装饰器注册钩子
@hook_manager.hook_decorator('request')
def add_timestamp(request):
"""添加时间戳到请求头"""
request.headers['X-Request-Timestamp'] = str(time.time())
return request
@hook_manager.hook_decorator('request')
def sign_request(request):
"""请求签名"""
# 创建请求签名
content = request.body if request.body else ''
path = request.path_url
timestamp = request.headers.get('X-Request-Timestamp', '')
# 生成签名
sign_string = f"{path}{timestamp}{content}"
signature = hashlib.sha256(sign_string.encode()).hexdigest()
request.headers['X-Request-Signature'] = signature
return request
@hook_manager.hook_decorator('response')
def cache_response(response, *args, **kwargs):
"""缓存响应"""
request = kwargs.get('request')
if request and response.status_code == 200:
# 简单的内存缓存示例
cache_key = f"{request.method}:{request.url}"
response.cache_key = cache_key
response.cached_at = time.time()
print(f"缓存响应: {cache_key}")
return response
@hook_manager.hook_decorator('response')
def collect_metrics(response, *args, **kwargs):
"""收集性能指标"""
request = kwargs.get('request')
if request:
metrics = {
'url': request.url,
'method': request.method,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'timestamp': time.time()
}
hook_manager.metrics.append(metrics)
return response
@hook_manager.hook_decorator('exception')
def handle_timeout(request, exception):
"""处理超时异常"""
if isinstance(exception, requests.exceptions.Timeout):
print(f"请求超时: {request.url}")
# 这里可以实现重试逻辑
return True # 返回True表示已处理异常
return False
# 创建自定义请求函数
def hooked_request(method, url, **kwargs):
"""带有钩子的请求函数"""
# 创建请求对象
from requests import Request
request = Request(method, url, **kwargs)
# 准备请求
prepared_request = request.prepare()
# 执行请求钩子
prepared_request = hook_manager.execute_hooks('request', prepared_request)
# 发送请求
session = requests.Session()
try:
response = session.send(prepared_request)
# 执行响应钩子
response = hook_manager.execute_hooks(
'response',
response,
request=prepared_request
)
return response
except requests.exceptions.RequestException as e:
# 执行异常钩子
handled = hook_manager.execute_hooks(
'exception',
prepared_request,
e
)
if not handled:
raise
# 使用示例
if __name__ == "__main__":
# 发送带有钩子的请求
response = hooked_request(
'GET',
'https://httpbin.org/get',
params={'test': 'hook'}
)
print(f"状态码: {response.status_code}")
print(f"请求头: {dict(response.request.headers)}")
print(f"性能指标: {json.dumps(hook_manager.metrics, indent=2)}")
session = requests.Session()
# 默认配置
session.mount('http://', HTTPAdapter())
session.mount('https://', HTTPAdapter())
class CustomAdapter(HTTPAdapter):
def send(self, request, **kwargs):
# 自定义发送逻辑
response = super().send(request, **kwargs)
# 自定义处理逻辑
return response
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
import hashlib
class MetricsAdapter(HTTPAdapter):
"""带度量的适配器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = {
'total_requests': 0,
'successful': 0,
'failed': 0,
'total_time': 0,
'cache_hits': 0
}
self.response_cache = {} # 简单的内存缓存
def _cache_key(self, request):
"""生成缓存键"""
key_data = f"{request.method}:{request.url}:{request.body}"
return hashlib.md5(key_data.encode()).hexdigest()
def send(self, request, **kwargs):
"""重写send方法以添加度量"""
start_time = time.time()
# 检查缓存
cache_key = self._cache_key(request)
if cache_key in self.response_cache:
cache_entry = self.response_cache[cache_key]
# 检查缓存是否过期(5分钟)
if time.time() - cache_entry['timestamp'] < 300:
self.metrics['cache_hits'] += 1
print(f"缓存命中: {request.url}")
# 创建缓存的响应
from requests import Response
cached_response = Response()
cached_response.status_code = cache_entry['status_code']
cached_response.headers = cache_entry['headers']
cached_response._content = cache_entry['content']
cached_response.elapsed = cache_entry['elapsed']
cached_response.request = request
cached_response.from_cache = True
return cached_response
try:
# 调用父类的send方法
response = super().send(request, **kwargs)
# 更新度量
self.metrics['total_requests'] += 1
self.metrics['successful'] += 1
elapsed = time.time() - start_time
self.metrics['total_time'] += elapsed
# 缓存成功的GET响应
if request.method == 'GET' and response.status_code == 200:
self.response_cache[cache_key] = {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': response.content,
'elapsed': response.elapsed,
'timestamp': time.time()
}
# 添加自定义响应头
response.headers['X-Request-Metrics'] = json.dumps({
'adapter': 'MetricsAdapter',
'request_time': elapsed,
'cache_hit': False
})
return response
except Exception as e:
self.metrics['total_requests'] += 1
self.metrics['failed'] += 1
raise
class RetryWithJitterAdapter(HTTPAdapter):
"""带有随机退避的重试适配器"""
def __init__(self, *args, **kwargs):
# 获取重试配置
max_retries = kwargs.pop('max_retries', 3)
jitter = kwargs.pop('jitter', 0.1) # 10%的随机抖动
# 创建自定义重试策略
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
backoff_jitter=jitter # 添加随机抖动
)
kwargs['max_retries'] = retry_strategy
super().__init__(*args, **kwargs)
class TimeoutAdapter(HTTPAdapter):
"""动态超时适配器"""
def __init__(self, *args, **kwargs):
self.default_timeout = kwargs.pop('default_timeout', (3.05, 30))
self.timeout_multiplier = kwargs.pop('timeout_multiplier', 1.5)
super().__init__(*args, **kwargs)
def send(self, request, **kwargs):
"""根据请求类型动态调整超时"""
# 根据请求方法调整超时
if request.method == 'GET':
# GET请求使用默认超时
kwargs['timeout'] = self.default_timeout
elif request.method == 'POST':
# POST请求使用更长的超时
timeout = (
self.default_timeout[0],
self.default_timeout[1] * self.timeout_multiplier
)
kwargs['timeout'] = timeout
return super().send(request, **kwargs)
# 创建带有多适配器的会话
def create_multi_adapter_session():
"""创建配置了多个适配器的会话"""
session = requests.Session()
# 为不同域名使用不同的适配器
api_adapter = MetricsAdapter(
pool_connections=10,
pool_maxsize=20
)
# 外部API使用带抖动的重试适配器
external_adapter = RetryWithJitterAdapter(
max_retries=5,
jitter=0.2,
pool_connections=5,
pool_maxsize=10
)
# 文件下载使用动态超时适配器
download_adapter = TimeoutAdapter(
default_timeout=(5, 60), # 更长的超时
timeout_multiplier=2,
pool_connections=3,
pool_maxsize=5
)
# 挂载适配器到不同的域名
session.mount('https://api.internal.com', api_adapter)
session.mount('https://api.external.com', external_adapter)
session.mount('https://downloads.example.com', download_adapter)
# 默认适配器
session.mount('http://', HTTPAdapter())
session.mount('https://', HTTPAdapter())
return session
# 使用示例
if __name__ == "__main__":
session = create_multi_adapter_session()
try:
# 发送请求
response = session.get('https://api.internal.com/data')
print(f"响应状态码: {response.status_code}")
# 获取适配器度量
adapter = session.get_adapter('https://api.internal.com')
if isinstance(adapter, MetricsAdapter):
print(f"适配器度量: {adapter.metrics}")
finally:
session.close()
JSON Web Token认证,自动刷新过期token
支持授权码、客户端凭证等OAuth流程
基于请求内容的哈希消息认证码
Token过期时自动刷新并重试请求
import requests
import time
import json
from requests.auth import AuthBase
from typing import Optional, Dict, Any
class JWTAuth(AuthBase):
"""JWT认证类"""
def __init__(self, token_url: str, client_id: str, client_secret: str,
scope: Optional[str] = None):
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.token_expiry: Optional[float] = None
self._session = requests.Session()
def _get_token(self, grant_type: str = 'client_credentials',
refresh_token: Optional[str] = None) -> Dict[str, Any]:
"""获取访问令牌"""
data = {
'grant_type': grant_type,
'client_id': self.client_id,
'client_secret': self.client_secret,
}
if grant_type == 'client_credentials' and self.scope:
data['scope'] = self.scope
elif grant_type == 'refresh_token' and refresh_token:
data['refresh_token'] = refresh_token
response = self._session.post(self.token_url, data=data)
response.raise_for_status()
token_data = response.json()
# 解析响应
self.access_token = token_data.get('access_token')
self.refresh_token = token_data.get('refresh_token', self.refresh_token)
# 计算过期时间(提前30秒刷新)
expires_in = token_data.get('expires_in', 3600)
self.token_expiry = time.time() + expires_in - 30
return token_data
def _ensure_valid_token(self):
"""确保令牌有效,必要时刷新"""
if not self.access_token:
# 初次获取令牌
self._get_token()
return
if time.time() >= self.token_expiry:
if self.refresh_token:
# 使用刷新令牌获取新令牌
try:
self._get_token('refresh_token', self.refresh_token)
except requests.exceptions.HTTPError:
# 刷新失败,重新获取令牌
self._get_token()
else:
# 没有刷新令牌,重新获取
self._get_token()
def __call__(self, request):
"""实现认证接口"""
self._ensure_valid_token()
if self.access_token:
request.headers['Authorization'] = f'Bearer {self.access_token}'
return request
def clear_tokens(self):
"""清除所有令牌"""
self.access_token = None
self.refresh_token = None
self.token_expiry = None
class HMACAuth(AuthBase):
"""HMAC签名认证"""
def __init__(self, api_key: str, secret_key: str):
self.api_key = api_key
self.secret_key = secret_key
def __call__(self, request):
"""实现HMAC签名"""
import hmac
import hashlib
import base64
import urllib.parse
# 获取请求时间戳
timestamp = str(int(time.time() * 1000))
# 构建签名字符串
method = request.method.upper()
path = urllib.parse.urlparse(request.url).path
# 获取查询参数
query_params = urllib.parse.urlparse(request.url).query
if query_params:
path = f"{path}?{query_params}"
# 获取请求体
body = request.body.decode('utf-8') if request.body else ''
# 构建待签名字符串
sign_string = f"{method}\n{path}\n{timestamp}\n{body}"
# 计算HMAC签名
signature = hmac.new(
self.secret_key.encode('utf-8'),
sign_string.encode('utf-8'),
hashlib.sha256
).digest()
# Base64编码
signature_b64 = base64.b64encode(signature).decode('utf-8')
# 添加认证头
request.headers.update({
'X-API-Key': self.api_key,
'X-Timestamp': timestamp,
'X-Signature': signature_b64
})
return request
class AutoRefreshAuth(AuthBase):
"""自动刷新认证的包装器"""
def __init__(self, auth_instance: AuthBase):
self.auth_instance = auth_instance
self.max_retries = 3
def __call__(self, request):
"""包装认证调用,支持自动刷新"""
# 应用原始认证
request = self.auth_instance(request)
# 保存原始发送函数
original_send = request.send
def wrapped_send(*args, **kwargs):
for attempt in range(self.max_retries):
try:
response = original_send(*args, **kwargs)
# 检查是否需要刷新认证
if response.status_code == 401:
# 尝试刷新认证
if hasattr(self.auth_instance, 'refresh'):
self.auth_instance.refresh()
# 重新应用认证
request = self.auth_instance(request)
# 重试请求
if attempt < self.max_retries - 1:
continue
return response
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401 and attempt < self.max_retries - 1:
# 尝试刷新认证
if hasattr(self.auth_instance, 'refresh'):
self.auth_instance.refresh()
continue
raise
# 所有重试都失败
raise requests.exceptions.HTTPError("认证失败")
# 替换send方法
request.send = wrapped_send
return request
# 使用示例
if __name__ == "__main__":
# 创建JWT认证
jwt_auth = JWTAuth(
token_url='https://auth.example.com/oauth/token',
client_id='your_client_id',
client_secret='your_client_secret',
scope='api.read api.write'
)
# 创建会话并设置认证
session = requests.Session()
session.auth = jwt_auth
try:
# 发送请求(认证会自动处理)
response = session.get('https://api.example.com/protected')
print(f"响应状态码: {response.status_code}")
finally:
session.close()
| 事件类型 | 触发时机 | 常见用途 | 示例 |
|---|---|---|---|
pre_request |
请求发送前 | 添加签名、验证参数 | add_auth_header |
post_request |
收到响应后 | 记录日志、更新缓存 | log_response |
pre_send |
发送到网络前 | 压缩数据、添加时间戳 | compress_body |
post_send |
网络发送完成 | 统计发送时长 | record_send_time |
on_error |
发生错误时 | 错误处理、重试逻辑 | retry_on_timeout |
import requests
import time
import json
from typing import Dict, List, Callable, Any
from enum import Enum
class EventType(Enum):
"""事件类型枚举"""
PRE_REQUEST = 'pre_request'
PRE_SEND = 'pre_send'
POST_SEND = 'post_send'
POST_REQUEST = 'post_request'
ON_ERROR = 'on_error'
ON_SUCCESS = 'on_success'
class EventHookSystem:
"""事件钩子系统"""
def __init__(self):
self.hooks: Dict[EventType, List[Callable]] = {
event_type: [] for event_type in EventType
}
self.event_log: List[Dict] = []
def register_hook(self, event_type: EventType, hook_func: Callable):
"""注册事件钩子"""
self.hooks[event_type].append(hook_func)
return self
def trigger_event(self, event_type: EventType, **kwargs) -> Any:
"""触发事件并执行所有钩子"""
event_data = {
'event_type': event_type.value,
'timestamp': time.time(),
'data': kwargs
}
# 记录事件
self.event_log.append(event_data)
# 执行钩子
result = kwargs.get('result')
for hook in self.hooks[event_type]:
try:
result = hook(**kwargs)
kwargs['result'] = result
except Exception as e:
print(f"钩子执行错误 {event_type.value}: {e}")
return result
def get_event_stats(self) -> Dict[str, Any]:
"""获取事件统计"""
stats = {}
for event_type in EventType:
stats[event_type.value] = len([
event for event in self.event_log
if event['event_type'] == event_type.value
])
return stats
class InstrumentedSession(requests.Session):
"""带有事件钩子的Session"""
def __init__(self):
super().__init__()
self.event_system = EventHookSystem()
self._setup_default_hooks()
def _setup_default_hooks(self):
"""设置默认钩子"""
@self.event_system.register_hook(EventType.PRE_REQUEST)
def log_pre_request(**kwargs):
request = kwargs.get('request')
if request:
print(f"[PRE_REQUEST] {request.method} {request.url}")
return request
@self.event_system.register_hook(EventType.PRE_SEND)
def add_request_id(**kwargs):
request = kwargs.get('request')
if request:
import uuid
request.headers['X-Request-ID'] = str(uuid.uuid4())
return request
@self.event_system.register_hook(EventType.POST_SEND)
def record_send_metrics(**kwargs):
request = kwargs.get('request')
response = kwargs.get('response')
if request and response:
metrics = {
'request_id': request.headers.get('X-Request-ID'),
'url': request.url,
'method': request.method,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'content_length': len(response.content) if response.content else 0
}
print(f"[POST_SEND] 请求完成: {json.dumps(metrics, indent=2)}")
return response
@self.event_system.register_hook(EventType.ON_ERROR)
def handle_error_with_retry(**kwargs):
error = kwargs.get('error')
request = kwargs.get('request')
if isinstance(error, requests.exceptions.Timeout) and request:
print(f"[ON_ERROR] 请求超时: {request.url}")
# 这里可以实现重试逻辑
return error
def send(self, request, **kwargs):
"""重写send方法以添加事件钩子"""
# PRE_REQUEST 事件
request = self.event_system.trigger_event(
EventType.PRE_REQUEST,
request=request,
kwargs=kwargs
)
try:
# PRE_SEND 事件
request = self.event_system.trigger_event(
EventType.PRE_SEND,
request=request,
kwargs=kwargs
)
# 发送请求
response = super().send(request, **kwargs)
# POST_SEND 事件
response = self.event_system.trigger_event(
EventType.POST_SEND,
request=request,
response=response,
kwargs=kwargs
)
# POST_REQUEST 事件
response = self.event_system.trigger_event(
EventType.POST_REQUEST,
request=request,
response=response,
kwargs=kwargs
)
# ON_SUCCESS 事件
self.event_system.trigger_event(
EventType.ON_SUCCESS,
request=request,
response=response,
kwargs=kwargs
)
return response
except Exception as e:
# ON_ERROR 事件
self.event_system.trigger_event(
EventType.ON_ERROR,
request=request,
error=e,
kwargs=kwargs
)
raise
# 使用示例
if __name__ == "__main__":
# 创建带有事件钩子的Session
session = InstrumentedSession()
# 添加自定义钩子
@session.event_system.register_hook(EventType.PRE_SEND)
def add_custom_header(**kwargs):
request = kwargs.get('request')
if request:
request.headers['X-Custom-Header'] = 'custom_value'
return request
@session.event_system.register_hook(EventType.POST_REQUEST)
def validate_response(**kwargs):
response = kwargs.get('response')
if response and response.status_code == 200:
try:
data = response.json()
print(f"[VALIDATE] 响应数据验证通过")
except:
print(f"[VALIDATE] 响应不是有效的JSON")
return response
try:
# 发送请求
response = session.get('https://httpbin.org/get')
print(f"最终状态码: {response.status_code}")
# 获取事件统计
stats = session.event_system.get_event_stats()
print(f"事件统计: {json.dumps(stats, indent=2)}")
finally:
session.close()
import requests
import random
import time
from typing import Dict, List, Optional
from urllib.parse import urlparse
class ProxyManager:
"""代理管理器"""
def __init__(self):
self.proxies: Dict[str, List[str]] = {
'http': [],
'https': [],
'socks5': []
}
self.proxy_stats: Dict[str, Dict] = {}
self.blacklist: Dict[str, float] = {} # 黑名单:代理地址 -> 解封时间
def add_proxy(self, proxy_url: str, proxy_type: Optional[str] = None):
"""添加代理"""
if proxy_type is None:
# 自动检测代理类型
if proxy_url.startswith('http://'):
proxy_type = 'http'
elif proxy_url.startswith('https://'):
proxy_type = 'https'
elif proxy_url.startswith('socks5://'):
proxy_type = 'socks5'
else:
proxy_type = 'http' # 默认
if proxy_type in self.proxies:
self.proxies[proxy_type].append(proxy_url)
# 初始化统计信息
self.proxy_stats[proxy_url] = {
'success': 0,
'failure': 0,
'total_time': 0,
'last_used': 0,
'avg_response_time': 0
}
def get_proxy(self, url: str, strategy: str = 'random') -> Optional[str]:
"""根据策略获取代理"""
# 清理黑名单中过期的代理
self._clean_blacklist()
# 根据URL协议选择代理类型
parsed_url = urlparse(url)
proxy_type = 'https' if parsed_url.scheme == 'https' else 'http'
if proxy_type not in self.proxies or not self.proxies[proxy_type]:
return None
# 过滤掉黑名单中的代理
available_proxies = [
p for p in self.proxies[proxy_type]
if p not in self.blacklist
]
if not available_proxies:
return None
# 选择策略
if strategy == 'random':
return random.choice(available_proxies)
elif strategy == 'round_robin':
# 简单的轮询(按最后使用时间)
available_proxies.sort(key=lambda p: self.proxy_stats[p]['last_used'])
return available_proxies[0]
elif strategy == 'performance':
# 基于性能的选择
scored_proxies = []
for proxy in available_proxies:
stats = self.proxy_stats[proxy]
# 计算分数(成功率 * 响应时间倒数)
success_rate = stats['success'] / max(stats['success'] + stats['failure'], 1)
avg_time = stats['avg_response_time'] if stats['avg_response_time'] > 0 else 1
# 分数越高越好
score = success_rate * (1 / avg_time)
scored_proxies.append((score, proxy))
# 选择分数最高的代理
scored_proxies.sort(reverse=True)
return scored_proxies[0][1] if scored_proxies else None
return None
def record_proxy_result(self, proxy_url: str, success: bool, response_time: float):
"""记录代理使用结果"""
if proxy_url not in self.proxy_stats:
return
stats = self.proxy_stats[proxy_url]
if success:
stats['success'] += 1
stats['total_time'] += response_time
stats['avg_response_time'] = stats['total_time'] / stats['success']
else:
stats['failure'] += 1
# 失败次数过多,加入黑名单
failure_rate = stats['failure'] / (stats['success'] + stats['failure'])
if failure_rate > 0.5 and stats['failure'] > 3:
# 加入黑名单5分钟
self.blacklist[proxy_url] = time.time() + 300
stats['last_used'] = time.time()
def _clean_blacklist(self):
"""清理过期的黑名单条目"""
current_time = time.time()
expired = [proxy for proxy, expire_time in self.blacklist.items()
if expire_time < current_time]
for proxy in expired:
del self.blacklist[proxy]
def get_proxy_config(self, url: str, strategy: str = 'random') -> Dict[str, str]:
"""获取代理配置字典"""
proxy_url = self.get_proxy(url, strategy)
if not proxy_url:
return {}
parsed_proxy = urlparse(proxy_url)
# 构建代理配置
config = {}
if parsed_proxy.scheme in ['http', 'https']:
config['http'] = proxy_url
config['https'] = proxy_url
elif parsed_proxy.scheme == 'socks5':
# 对于SOCKS代理,需要特殊处理
config['http'] = proxy_url
config['https'] = proxy_url
return config
class ProxyAwareSession(requests.Session):
"""支持代理管理的Session"""
def __init__(self, proxy_manager: Optional[ProxyManager] = None):
super().__init__()
self.proxy_manager = proxy_manager or ProxyManager()
self.proxy_strategy = 'performance'
# 添加一些默认代理(示例)
self.proxy_manager.add_proxy('http://proxy1.example.com:8080')
self.proxy_manager.add_proxy('http://proxy2.example.com:8080')
self.proxy_manager.add_proxy('socks5://socks-proxy.example.com:1080')
def request(self, method, url, **kwargs):
"""重写request方法以支持代理管理"""
# 获取代理配置
proxy_config = self.proxy_manager.get_proxy_config(url, self.proxy_strategy)
if proxy_config:
kwargs['proxies'] = proxy_config
selected_proxy = list(proxy_config.values())[0]
else:
selected_proxy = None
# 记录开始时间
start_time = time.time()
try:
response = super().request(method, url, **kwargs)
response_time = time.time() - start_time
# 记录代理使用结果
if selected_proxy:
self.proxy_manager.record_proxy_result(
selected_proxy,
True,
response_time
)
return response
except Exception as e:
response_time = time.time() - start_time
# 记录代理使用结果
if selected_proxy:
self.proxy_manager.record_proxy_result(
selected_proxy,
False,
response_time
)
raise
# 使用示例
if __name__ == "__main__":
# 创建代理感知的Session
session = ProxyAwareSession()
try:
# 发送请求(会自动使用代理)
response = session.get('https://httpbin.org/ip')
print(f"状态码: {response.status_code}")
print(f"IP信息: {response.text}")
# 查看代理统计
print(f"\n代理统计:")
for proxy, stats in session.proxy_manager.proxy_stats.items():
print(f"{proxy}: {stats}")
finally:
session.close()
import requests
import ssl
import certifi
from OpenSSL import crypto
from requests.adapters import HTTPAdapter
import hashlib
class SSLConfigManager:
"""SSL配置管理器"""
def __init__(self):
self.ssl_context = ssl.create_default_context()
# 加载系统证书
self.ssl_context.load_default_certs()
# 额外加载certifi的证书
self.ssl_context.load_verify_locations(certifi.where())
def add_custom_ca(self, ca_cert_path: str):
"""添加自定义CA证书"""
self.ssl_context.load_verify_locations(ca_cert_path)
def set_cipher_suites(self, ciphers: str):
"""设置加密套件"""
self.ssl_context.set_ciphers(ciphers)
def set_tls_version(self, min_version: ssl.TLSVersion, max_version: ssl.TLSVersion = None):
"""设置TLS版本范围"""
self.ssl_context.minimum_version = min_version
if max_version:
self.ssl_context.maximum_version = max_version
def enable_certificate_pinning(self, hostname: str, expected_fingerprints: list):
"""启用证书锁定"""
def verify_certificate_pinning(ssl_sock, cert, errno, depth, ok):
if depth == 0: # 只验证服务器证书
cert_hash = hashlib.sha256(crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)).hexdigest()
if cert_hash not in expected_fingerprints:
raise ssl.SSLCertVerificationError(f"证书指纹不匹配: {cert_hash}")
return ok
# 设置证书验证回调
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.check_hostname = True
self.ssl_context.set_verify_callback(verify_certificate_pinning)
class SSLAwareAdapter(HTTPAdapter):
"""支持SSL高级配置的适配器"""
def __init__(self, ssl_config: SSLConfigManager, **kwargs):
self.ssl_config = ssl_config
super().__init__(**kwargs)
def init_poolmanager(self, *args, **kwargs):
"""初始化连接池管理器"""
kwargs['ssl_context'] = self.ssl_config.ssl_context
return super().init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
"""为代理创建连接池管理器"""
kwargs['ssl_context'] = self.ssl_config.ssl_context
return super().proxy_manager_for(*args, **kwargs)
def create_secure_session():
"""创建安全的Session"""
# 创建SSL配置
ssl_config = SSLConfigManager()
# 配置TLS版本(强制使用TLS 1.2+)
ssl_config.set_tls_version(ssl.TLSVersion.TLSv1_2)
# 设置强加密套件
strong_ciphers = (
'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:'
'ECDH+AESGCM:DH+AESGCM:ECDH+AES:DH+AES:'
'RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS'
)
ssl_config.set_cipher_suites(strong_ciphers)
# 添加自定义CA证书(如果需要)
# ssl_config.add_custom_ca('/path/to/custom-ca.pem')
# 创建适配器
adapter = SSLAwareAdapter(ssl_config)
# 创建Session
session = requests.Session()
# 挂载适配器
session.mount('https://', adapter)
# 配置Session
session.verify = True # 启用SSL验证
session.cert = None # 客户端证书(如果需要)
return session
def create_insecure_session_for_testing():
"""创建用于测试的不安全Session(仅用于开发环境)"""
import warnings
# 禁用SSL警告
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
# 创建不验证SSL的Session
session = requests.Session()
# 创建不验证SSL的适配器
class InsecureAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = ssl._create_unverified_context()
return super().init_poolmanager(*args, **kwargs)
adapter = InsecureAdapter()
session.mount('https://', adapter)
session.verify = False
return session
def extract_certificate_info(url: str):
"""提取SSL证书信息"""
import socket
import ssl
# 解析主机名和端口
from urllib.parse import urlparse
parsed = urlparse(url)
hostname = parsed.hostname
port = parsed.port or (443 if parsed.scheme == 'https' else 80)
# 创建SSL上下文
context = ssl.create_default_context()
# 连接并获取证书
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert(binary_form=True)
# 解析证书
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
# 提取信息
subject = dict(x509.get_subject().get_components())
issuer = dict(x509.get_issuer().get_components())
# 计算指纹
cert_hash = hashlib.sha256(cert).hexdigest()
return {
'subject': subject,
'issuer': issuer,
'fingerprint_sha256': cert_hash,
'serial_number': x509.get_serial_number(),
'version': x509.get_version(),
'not_before': x509.get_notBefore().decode('utf-8'),
'not_after': x509.get_notAfter().decode('utf-8'),
'signature_algorithm': x509.get_signature_algorithm().decode('utf-8')
}
# 使用示例
if __name__ == "__main__":
# 创建安全Session
secure_session = create_secure_session()
try:
# 发送安全请求
response = secure_session.get('https://httpbin.org/headers')
print(f"安全请求状态码: {response.status_code}")
# 提取证书信息
cert_info = extract_certificate_info('https://httpbin.org')
print(f"\n证书信息:")
print(f"主题: {cert_info['subject']}")
print(f"SHA256指纹: {cert_info['fingerprint_sha256']}")
print(f"有效期: {cert_info['not_before']} 到 {cert_info['not_after']}")
finally:
secure_session.close()
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import threading
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self):
self.session_cache = {}
self.lock = threading.Lock()
def get_optimized_session(self, name='default'):
"""获取优化的Session(带缓存)"""
with self.lock:
if name not in self.session_cache:
session = self._create_optimized_session()
self.session_cache[name] = session
return self.session_cache[name]
def _create_optimized_session(self):
"""创建性能优化的Session"""
session = requests.Session()
# 1. 优化连接池
adapter = HTTPAdapter(
pool_connections=20, # 增加连接池大小
pool_maxsize=100, # 增加最大连接数
pool_block=True, # 阻塞等待可用连接
max_retries=Retry( # 优化重试策略
total=2,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 2. 优化请求头
session.headers.update({
'Accept-Encoding': 'gzip, deflate, br', # 启用压缩
'Connection': 'keep-alive', # 保持连接
'Accept': 'application/json', # 明确期望的响应类型
})
# 3. 启用响应体流式处理(对于大响应)
session.stream = False # 默认关闭,需要时手动开启
return session
def benchmark(self, url, num_requests=100, concurrency=10):
"""性能基准测试"""
session = self.get_optimized_session()
def make_request(i):
start_time = time.time()
try:
response = session.get(f"{url}?request={i}", timeout=10)
elapsed = time.time() - start_time
return {
'success': True,
'status_code': response.status_code,
'time': elapsed,
'size': len(response.content)
}
except Exception as e:
elapsed = time.time() - start_time
return {
'success': False,
'error': str(e),
'time': elapsed
}
# 并发执行
start_total = time.time()
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [executor.submit(make_request, i) for i in range(num_requests)]
results = []
for future in as_completed(futures):
results.append(future.result())
total_time = time.time() - start_total
# 分析结果
successful = sum(1 for r in results if r['success'])
avg_time = sum(r['time'] for r in results) / len(results)
requests_per_second = num_requests / total_time
# 获取连接池统计
adapter = session.get_adapter('https://')
pool_stats = {
'num_connections': len(adapter.poolmanager.pools),
'pool_connections': adapter._pool_connections,
'pool_maxsize': adapter._pool_maxsize
}
return {
'total_requests': num_requests,
'successful': successful,
'failed': num_requests - successful,
'total_time': total_time,
'avg_time_per_request': avg_time,
'requests_per_second': requests_per_second,
'pool_stats': pool_stats,
'results': results
}
class DNSResolver:
"""DNS解析优化器"""
def __init__(self, ttl=300): # 5分钟TTL
self.cache = {}
self.ttl = ttl
self.lock = threading.Lock()
def resolve(self, hostname):
"""解析主机名(带缓存)"""
with self.lock:
current_time = time.time()
# 检查缓存
if hostname in self.cache:
ip, timestamp = self.cache[hostname]
if current_time - timestamp < self.ttl:
return ip
# 解析DNS
import socket
try:
ip = socket.gethostbyname(hostname)
self.cache[hostname] = (ip, current_time)
return ip
except socket.gaierror:
return None
def create_compressed_session():
"""创建支持响应压缩的Session"""
import gzip
import io
class CompressedAdapter(HTTPAdapter):
def send(self, request, **kwargs):
# 添加压缩头
request.headers['Accept-Encoding'] = 'gzip, deflate'
response = super().send(request, **kwargs)
# 自动解压响应
if 'content-encoding' in response.headers:
encoding = response.headers['content-encoding'].lower()
if encoding == 'gzip':
response._content = gzip.decompress(response.content)
response.headers.pop('content-encoding', None)
elif encoding == 'deflate':
import zlib
response._content = zlib.decompress(response.content)
response.headers.pop('content-encoding', None)
return response
session = requests.Session()
adapter = CompressedAdapter()
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
# 使用示例
if __name__ == "__main__":
# 创建性能优化器
optimizer = PerformanceOptimizer()
# 运行基准测试
print("开始性能基准测试...")
results = optimizer.benchmark(
url='https://httpbin.org/get',
num_requests=50,
concurrency=5
)
print(f"\n性能测试结果:")
print(f"总请求数: {results['total_requests']}")
print(f"成功请求: {results['successful']}")
print(f"总耗时: {results['total_time']:.2f}秒")
print(f"平均响应时间: {results['avg_time_per_request']:.3f}秒")
print(f"每秒请求数: {results['requests_per_second']:.1f}")
print(f"连接池统计: {results['pool_stats']}")
# 清理
for session in optimizer.session_cache.values():
session.close()
服务间通信,带负载均衡和熔断
多平台商品同步,带缓存和重试
批量发送指标,带压缩和队列
分布式爬取,带代理轮换和反爬策略
import requests
import time
import json
import hashlib
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor
import threading
@dataclass
class ClientConfig:
"""客户端配置"""
base_url: str
timeout: tuple = (3.05, 30)
max_retries: int = 3
pool_size: int = 10
rate_limit: Optional[int] = None # 每秒请求数限制
cache_ttl: int = 300 # 缓存过期时间(秒)
class CircuitBreaker:
"""熔断器模式"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
print(f"熔断器打开: 失败次数 {self.failure_count}")
def record_success(self):
"""记录成功"""
self.failure_count = 0
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
print("熔断器关闭")
def allow_request(self) -> bool:
"""是否允许请求"""
if self.state == 'CLOSED':
return True
elif self.state == 'OPEN':
# 检查是否应该进入半开状态
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
print("熔断器进入半开状态")
return True
return False
elif self.state == 'HALF_OPEN':
return True
return False
class RateLimiter:
"""速率限制器"""
def __init__(self, requests_per_second: int):
self.requests_per_second = requests_per_second
self.request_times: List[float] = []
self.lock = threading.Lock()
def acquire(self):
"""获取请求许可"""
with self.lock:
current_time = time.time()
# 清理过期的请求记录
self.request_times = [
t for t in self.request_times
if current_time - t < 1
]
# 检查是否超过限制
if len(self.request_times) >= self.requests_per_second:
# 计算需要等待的时间
oldest_request = self.request_times[0]
wait_time = 1 - (current_time - oldest_request)
if wait_time > 0:
time.sleep(wait_time)
current_time = time.time()
# 添加当前请求
self.request_times.append(current_time)
return current_time
class ResponseCache:
"""响应缓存"""
def __init__(self, ttl: int = 300):
self.cache: Dict[str, Dict] = {}
self.ttl = ttl
self.lock = threading.Lock()
def get_key(self, method: str, url: str, params: Optional[Dict] = None,
data: Optional[Dict] = None) -> str:
"""生成缓存键"""
key_data = f"{method}:{url}:{params}:{data}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Dict]:
"""获取缓存"""
with self.lock:
if key in self.cache:
entry = self.cache[key]
if time.time() - entry['timestamp'] < self.ttl:
return entry['response']
else:
# 清理过期缓存
del self.cache[key]
return None
def set(self, key: str, response: Dict):
"""设置缓存"""
with self.lock:
self.cache[key] = {
'response': response,
'timestamp': time.time()
}
class EnterpriseAPIClient:
"""企业级API客户端"""
def __init__(self, config: ClientConfig):
self.config = config
self.session = self._create_session()
self.circuit_breaker = CircuitBreaker()
self.rate_limiter = RateLimiter(config.rate_limit) if config.rate_limit else None
self.response_cache = ResponseCache(config.cache_ttl)
self.metrics = {
'total_requests': 0,
'successful': 0,
'failed': 0,
'cache_hits': 0,
'total_time': 0
}
def _create_session(self):
"""创建优化过的Session"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=self.config.max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE"]
)
# 配置适配器
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=self.config.pool_size,
pool_maxsize=self.config.pool_size * 2
)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 设置默认头部
session.headers.update({
'User-Agent': 'EnterpriseAPIClient/2.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
return session
def request(self, method: str, endpoint: str, use_cache: bool = False,
**kwargs) -> Optional[Dict[str, Any]]:
"""发送请求"""
start_time = time.time()
self.metrics['total_requests'] += 1
# 检查熔断器
if not self.circuit_breaker.allow_request():
print("熔断器阻止请求")
return None
# 应用速率限制
if self.rate_limiter:
self.rate_limiter.acquire()
url = f"{self.config.base_url}{endpoint}"
# 生成缓存键(仅对GET请求)
cache_key = None
if use_cache and method.upper() == 'GET':
cache_key = self.response_cache.get_key(method, url, kwargs.get('params'))
cached_response = self.response_cache.get(cache_key)
if cached_response:
self.metrics['cache_hits'] += 1
print(f"缓存命中: {endpoint}")
return cached_response
try:
# 发送请求
response = self.session.request(
method,
url,
timeout=self.config.timeout,
**kwargs
)
response_time = time.time() - start_time
self.metrics['total_time'] += response_time
# 检查HTTP状态
response.raise_for_status()
# 解析响应
if response.content:
data = response.json()
else:
data = {}
# 记录成功
self.circuit_breaker.record_success()
self.metrics['successful'] += 1
# 缓存响应(仅对成功的GET请求)
if use_cache and method.upper() == 'GET' and cache_key:
self.response_cache.set(cache_key, data)
# 记录详细日志
log_data = {
'method': method,
'url': url,
'status_code': response.status_code,
'response_time': response_time,
'timestamp': time.time()
}
print(f"请求成功: {json.dumps(log_data, indent=2)}")
return data
except requests.exceptions.RequestException as e:
response_time = time.time() - start_time
self.metrics['total_time'] += response_time
self.metrics['failed'] += 1
# 记录失败
self.circuit_breaker.record_failure()
# 错误处理
error_info = {
'error': str(e),
'method': method,
'url': url,
'response_time': response_time
}
if hasattr(e, 'response') and e.response:
error_info['status_code'] = e.response.status_code
error_info['response_body'] = e.response.text[:500]
print(f"请求失败: {json.dumps(error_info, indent=2)}")
return None
def get_metrics(self) -> Dict[str, Any]:
"""获取客户端指标"""
metrics = self.metrics.copy()
if metrics['total_requests'] > 0:
metrics['success_rate'] = metrics['successful'] / metrics['total_requests']
metrics['avg_response_time'] = metrics['total_time'] / metrics['total_requests']
else:
metrics['success_rate'] = 0
metrics['avg_response_time'] = 0
# 添加熔断器状态
metrics['circuit_breaker'] = {
'state': self.circuit_breaker.state,
'failure_count': self.circuit_breaker.failure_count
}
return metrics
def close(self):
"""关闭客户端"""
self.session.close()
print("API客户端已关闭")
# 使用示例
if __name__ == "__main__":
# 配置客户端
config = ClientConfig(
base_url='https://api.example.com/v1',
timeout=(5, 30),
max_retries=3,
pool_size=10,
rate_limit=10, # 每秒10个请求
cache_ttl=300 # 5分钟缓存
)
# 创建客户端
client = EnterpriseAPIClient(config)
try:
# 发送请求
data = client.request(
'GET',
'/users',
use_cache=True,
params={'limit': 10}
)
if data:
print(f"获取到 {len(data.get('users', []))} 个用户")
# 获取指标
metrics = client.get_metrics()
print(f"\n客户端指标:")
for key, value in metrics.items():
if isinstance(value, dict):
print(f" {key}:")
for k, v in value.items():
print(f" {k}: {v}")
else:
print(f" {key}: {value}")
finally:
client.close()
Requests库的高级功能为构建企业级HTTP客户端提供了强大的工具集。通过合理运用这些高级特性,可以:
掌握这些高级用法,你就能构建出高性能、高可用、易维护的企业级HTTP客户端,满足各种复杂的业务需求。