在Web应用中,错误是不可避免的。良好的错误处理可以提供:
| 状态码 | 名称 | 说明 | Flask处理方式 |
|---|---|---|---|
| 400 | Bad Request | 请求参数错误 | abort(400) |
| 401 | Unauthorized | 未授权访问 | abort(401) |
| 403 | Forbidden | 禁止访问 | abort(403) |
| 404 | Not Found | 资源不存在 | abort(404) |
| 405 | Method Not Allowed | 请求方法不允许 | 自动处理 |
| 500 | Internal Server Error | 服务器内部错误 | 未捕获的异常 |
| 502 | Bad Gateway | 网关错误 | 代理服务器问题 |
| 503 | Service Unavailable | 服务不可用 | abort(503) |
# 基础错误处理示例
from flask import Flask, render_template, request, abort
app = Flask(__name__)
@app.route('/user/<int:user_id>')
def show_user(user_id):
# 模拟数据库查询
user = get_user_from_db(user_id)
if not user:
# 用户不存在,抛出404错误
abort(404)
if not user.is_active:
# 用户未激活,抛出403错误
abort(403, description="用户账户已被禁用")
return render_template('user.html', user=user)
@app.route('/api/data')
def get_data():
# 检查必要的查询参数
if 'token' not in request.args:
abort(400, description="缺少token参数")
token = request.args.get('token')
if not validate_token(token):
abort(401, description="无效的token")
# 返回数据
return {'data': 'some data'}
def get_user_from_db(user_id):
"""模拟从数据库获取用户"""
# 这里应该是数据库查询逻辑
return None # 模拟用户不存在
def validate_token(token):
"""验证token"""
return token == 'valid_token' # 简化验证逻辑
# 自定义错误处理器
from flask import Flask, render_template, jsonify
app = Flask(__name__)
# 404错误处理器
@app.errorhandler(404)
def page_not_found(error):
# 如果是API请求,返回JSON
if request.path.startswith('/api/'):
return jsonify({
'error': 'Not Found',
'message': '请求的资源不存在',
'path': request.path,
'status_code': 404
}), 404
# 否则返回HTML页面
return render_template('errors/404.html', error=error), 404
# 403错误处理器
@app.errorhandler(403)
def forbidden(error):
if request.path.startswith('/api/'):
return jsonify({
'error': 'Forbidden',
'message': '您没有权限访问此资源',
'status_code': 403
}), 403
return render_template('errors/403.html', error=error), 403
# 500错误处理器
@app.errorhandler(500)
def internal_server_error(error):
app.logger.error(f"服务器内部错误: {error}")
if request.path.startswith('/api/'):
return jsonify({
'error': 'Internal Server Error',
'message': '服务器内部错误,请稍后再试',
'status_code': 500
}), 500
return render_template('errors/500.html', error=error), 500
# 处理所有异常
@app.errorhandler(Exception)
def handle_all_exceptions(error):
# 记录完整的错误信息
app.logger.exception(f"未处理的异常: {error}")
# 根据错误类型返回不同的响应
if isinstance(error, ValueError):
return jsonify({
'error': 'Bad Request',
'message': str(error),
'status_code': 400
}), 400
# 默认返回500错误
return internal_server_error(error)
<!-- templates/errors/404.html -->
{% raw %}{% extends "base.html" %}{% endraw %}
{% raw %}{% block title %}{% endraw %}404 - 页面未找到{% raw %}{% endblock %}{% endraw %}
{% raw %}{% block content %}{% endraw %}
<div class="container mt-5">
<div class="row justify-content-center">
<div class="col-md-8 text-center">
<div class="error-template">
<h1>
<i class="fas fa-map-signs fa-3x text-warning"></i>
</h1>
<h1>404</h1>
<h2>页面未找到</h2>
<div class="error-details mb-4">
抱歉,您请求的页面不存在。请检查URL是否正确。
</div>
<div class="error-actions">
<a href="{{ url_for('index') }}" class="btn btn-primary btn-lg">
<i class="fas fa-home"></i> 返回首页
</a>
<a href="javascript:history.back()" class="btn btn-secondary btn-lg ms-2">
<i class="fas fa-arrow-left"></i> 返回上一页
</a>
</div>
<div class="mt-4 text-muted">
<small>
错误路径: <code>{% raw %}{{ request.path }}{% endraw %}</code>
</small>
</div>
</div>
</div>
</div>
</div>
{% raw %}{% endblock %}{% endraw %}
<!-- templates/errors/500.html -->
{% raw %}{% extends "base.html" %}{% endraw %}
{% raw %}{% block title %}{% endraw %}500 - 服务器错误{% raw %}{% endblock %}{% endraw %}
{% raw %}{% block content %}{% endraw %}
<div class="container mt-5">
<div class="row justify-content-center">
<div class="col-md-8 text-center">
<div class="error-template">
<h1>
<i class="fas fa-exclamation-triangle fa-3x text-danger"></i>
</h1>
<h1>500</h1>
<h2>服务器内部错误</h2>
<div class="error-details mb-4">
抱歉,服务器遇到了意外错误。我们的工程师已收到通知,正在处理中。
</div>
<div class="error-actions">
<a href="{{ url_for('index') }}" class="btn btn-primary btn-lg">
<i class="fas fa-home"></i> 返回首页
</a>
<button onclick="location.reload()" class="btn btn-warning btn-lg ms-2">
<i class="fas fa-redo"></i> 重新加载
</button>
</div>
<div class="mt-4">
<div class="alert alert-info">
<h6><i class="fas fa-info-circle"></i> 需要帮助?</h6>
<p class="mb-0">
如果问题持续存在,请联系技术支持:
<a href="mailto:support@example.com">support@example.com</a>
</p>
</div>
</div>
</div>
</div>
</div>
</div>
{% raw %}{% endblock %}{% endraw %}
# 上下文感知的错误处理
from flask import Flask, request, jsonify, render_template
import functools
app = Flask(__name__)
def error_response(error_code, message, error_details=None):
"""创建统一的错误响应"""
response = {
'success': False,
'error': {
'code': error_code,
'message': message,
'timestamp': datetime.now().isoformat(),
'request_id': getattr(request, 'request_id', None)
}
}
# 只在调试模式下添加详细信息
if app.debug and error_details:
response['error']['details'] = str(error_details)
return response
def api_error_handler(func):
"""API错误处理装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ValueError as e:
app.logger.warning(f"请求参数错误: {e}")
return jsonify(error_response(400, "请求参数错误", e)), 400
except PermissionError as e:
app.logger.warning(f"权限错误: {e}")
return jsonify(error_response(403, "没有访问权限", e)), 403
except Exception as e:
app.logger.exception(f"API处理异常: {e}")
return jsonify(error_response(500, "服务器内部错误", e)), 500
return wrapper
# 使用装饰器处理API错误
@app.route('/api/users/<int:user_id>')
@api_error_handler
def get_user_api(user_id):
# 模拟可能抛出的异常
if user_id < 0:
raise ValueError("用户ID不能为负数")
if user_id == 13:
raise PermissionError("禁止访问该用户")
user = {'id': user_id, 'name': '张三'}
return jsonify({'success': True, 'data': user})
# 全局异常处理器
@app.errorhandler(Exception)
def global_error_handler(error):
# 根据请求类型返回不同的响应
if request.path.startswith('/api/'):
return jsonify(error_response(500, "服务器内部错误", error)), 500
else:
# 对于Web页面,记录错误并显示通用错误页面
app.logger.exception(f"全局异常: {error}")
return render_template('errors/generic.html', error=error), 500
# 自定义异常类
class AppError(Exception):
"""应用基础异常类"""
def __init__(self, message, status_code=400, payload=None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or {})
rv['message'] = self.message
rv['code'] = self.status_code
return rv
class ValidationError(AppError):
"""数据验证错误"""
def __init__(self, message="数据验证失败", errors=None):
payload = {'errors': errors} if errors else None
super().__init__(message, 400, payload)
class AuthenticationError(AppError):
"""认证错误"""
def __init__(self, message="认证失败"):
super().__init__(message, 401)
class AuthorizationError(AppError):
"""授权错误"""
def __init__(self, message="没有操作权限"):
super().__init__(message, 403)
class ResourceNotFoundError(AppError):
"""资源不存在错误"""
def __init__(self, resource_name, resource_id=None):
message = f"{resource_name}不存在"
if resource_id:
message = f"{resource_name}(ID: {resource_id})不存在"
super().__init__(message, 404)
class BusinessError(AppError):
"""业务逻辑错误"""
def __init__(self, message="业务逻辑错误"):
super().__init__(message, 422) # 422 Unprocessable Entity
# 注册自定义异常处理器
@app.errorhandler(AppError)
def handle_app_error(error):
"""处理自定义应用异常"""
response = jsonify({
'success': False,
'error': error.to_dict()
})
response.status_code = error.status_code
return response
# 使用自定义异常
@app.route('/api/products/<int:product_id>')
def get_product(product_id):
product = get_product_from_db(product_id)
if not product:
raise ResourceNotFoundError("产品", product_id)
if not product.is_available:
raise BusinessError("产品已下架")
return jsonify({'success': True, 'data': product})
@app.route('/api/orders', methods=['POST'])
def create_order():
data = request.get_json()
# 验证数据
if 'items' not in data or not data['items']:
raise ValidationError("订单必须包含商品",
{'items': '不能为空'})
if 'user_id' not in data:
raise AuthenticationError("需要用户认证")
# 检查用户权限
user = get_current_user()
if not user.can_create_order():
raise AuthorizationError("用户没有创建订单的权限")
# 创建订单逻辑
try:
order = create_order_in_db(data)
return jsonify({'success': True, 'data': order})
except Exception as e:
raise BusinessError(f"创建订单失败: {str(e)}")
# 日志配置
import logging
from logging.handlers import RotatingFileHandler, SMTPHandler
import os
def setup_logging(app):
"""配置应用日志"""
# 设置日志级别
if app.debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# 清除默认处理器
app.logger.handlers.clear()
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s '
'[in %(pathname)s:%(lineno)d]'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
app.logger.addHandler(console_handler)
# 文件处理器(按大小轮转)
if not app.debug:
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'app.log'),
maxBytes=10*1024*1024, # 10MB
backupCount=10
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
app.logger.addHandler(file_handler)
# 错误日志单独记录
error_handler = RotatingFileHandler(
os.path.join(log_dir, 'error.log'),
maxBytes=10*1024*1024,
backupCount=10
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
app.logger.addHandler(error_handler)
# 邮件处理器(生产环境)
if not app.debug and app.config.get('MAIL_SERVER'):
mail_handler = SMTPHandler(
mailhost=(app.config['MAIL_SERVER'], app.config['MAIL_PORT']),
fromaddr=app.config['MAIL_FROM'],
toaddrs=app.config['ADMINS'],
subject='应用错误',
credentials=(app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD']),
secure=() if app.config.get('MAIL_USE_TLS') else None
)
mail_handler.setLevel(logging.ERROR)
mail_handler.setFormatter(formatter)
app.logger.addHandler(mail_handler)
app.logger.setLevel(log_level)
# 禁用Flask默认的日志处理器
logging.getLogger('werkzeug').setLevel(logging.WARNING)
# 在应用中设置日志
app = Flask(__name__)
setup_logging(app)
# 使用日志
@app.route('/some-endpoint')
def some_endpoint():
app.logger.debug('这是一个调试信息')
app.logger.info('用户访问了端点')
app.logger.warning('这是一个警告')
try:
# 一些可能失败的代码
result = do_something_risky()
app.logger.info(f'操作成功: {result}')
return '成功'
except Exception as e:
app.logger.error(f'操作失败: {e}', exc_info=True)
return '失败', 500
# 结构化日志(使用JSON格式)
import json
from datetime import datetime
class JsonFormatter(logging.Formatter):
"""JSON格式的日志格式化器"""
def format(self, record):
log_record = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'process_id': record.process,
'thread_id': record.thread
}
# 添加额外字段
if hasattr(record, 'request_id'):
log_record['request_id'] = record.request_id
if hasattr(record, 'user_id'):
log_record['user_id'] = record.user_id
if hasattr(record, 'path'):
log_record['path'] = record.path
# 异常信息
if record.exc_info:
log_record['exception'] = self.formatException(record.exc_info)
return json.dumps(log_record)
def create_structured_logger(name):
"""创建结构化日志记录器"""
logger = logging.getLogger(name)
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# 在请求上下文中添加日志信息
@app.before_request
def before_request():
"""为每个请求添加请求ID和用户信息"""
request.request_id = str(uuid.uuid4())
request.start_time = datetime.now()
# 为日志添加额外字段
import logging
logger = logging.getLogger('app')
# 创建日志适配器
extra = {
'request_id': request.request_id,
'path': request.path,
'method': request.method,
'ip': request.remote_addr,
'user_agent': request.user_agent.string[:200]
}
if hasattr(g, 'user'):
extra['user_id'] = g.user.id
# 使用适配器记录日志
adapter = logging.LoggerAdapter(logger, extra)
adapter.info(f"请求开始: {request.path}")
# 保存适配器到g对象
g.logger = adapter
@app.after_request
def after_request(response):
"""请求结束后记录日志"""
if hasattr(g, 'logger'):
duration = (datetime.now() - request.start_time).total_seconds()
g.logger.info(f"请求结束", extra={
'status_code': response.status_code,
'duration': duration,
'response_size': len(response.get_data())
})
return response
永远不要在生产环境中启用调试模式!调试模式会暴露敏感信息并允许执行任意代码。
# 调试模式配置
app = Flask(__name__)
# 开发环境配置
class DevelopmentConfig:
DEBUG = True
SECRET_KEY = 'dev-secret-key'
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
SQLALCHEMY_ECHO = True # 打印SQL语句
# Flask-DebugToolbar配置
DEBUG_TB_ENABLED = True
DEBUG_TB_INTERCEPT_REDIRECTS = False
# 生产环境配置
class ProductionConfig:
DEBUG = False
SECRET_KEY = os.environ.get('SECRET_KEY') # 从环境变量读取
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
# 生产环境特定的错误处理
PROPAGATE_EXCEPTIONS = False # 不向上传播异常
TRAP_HTTP_EXCEPTIONS = True # 捕获HTTP异常
# 根据环境选择配置
if os.environ.get('FLASK_ENV') == 'production':
app.config.from_object(ProductionConfig)
else:
app.config.from_object(DevelopmentConfig)
# 使用Flask-DebugToolbar(仅开发环境)
if app.debug:
from flask_debugtoolbar import DebugToolbarExtension
toolbar = DebugToolbarExtension(app)
# 自定义调试处理器
@app.route('/_debug/errors')
@login_required
@admin_required
def debug_errors():
"""仅管理员可访问的错误调试页面"""
if not app.debug:
abort(404)
# 获取最近的错误日志
errors = get_recent_errors()
return render_template('debug/errors.html', errors=errors)
@app.route('/_debug/raise-error')
def debug_raise_error():
"""用于测试的错误抛出端点"""
if not app.debug:
abort(404)
error_type = request.args.get('type', 'value')
if error_type == 'value':
raise ValueError("这是一个测试的ValueError")
elif error_type == 'division':
result = 1 / 0 # ZeroDivisionError
elif error_type == 'index':
arr = []
return arr[10] # IndexError
else:
raise Exception("这是一个测试的通用异常")
return "不会执行到这里"
# Sentry错误监控集成
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
def init_sentry(app):
"""初始化Sentry错误监控"""
if app.config.get('SENTRY_DSN'):
sentry_sdk.init(
dsn=app.config['SENTRY_DSN'],
integrations=[FlaskIntegration()],
# 环境设置
environment=app.config.get('ENVIRONMENT', 'production'),
release=app.config.get('VERSION', '1.0.0'),
# 采样率
traces_sample_rate=app.config.get('SENTRY_TRACES_SAMPLE_RATE', 0.1),
# 忽略某些错误
ignore_errors=[
KeyboardInterrupt,
NotImplementedError
],
# 用户信息
send_default_pii=app.config.get('SENTRY_SEND_DEFAULT_PII', False),
# 调试信息
debug=app.debug,
# 附加数据
before_send=before_send_event,
before_breadcrumb=before_breadcrumb
)
def before_send_event(event, hint):
"""发送事件前的处理"""
# 过滤敏感信息
if 'request' in event:
# 过滤密码字段
if 'data' in event['request']:
if isinstance(event['request']['data'], dict):
for key in ['password', 'token', 'secret']:
if key in event['request']['data']:
event['request']['data'][key] = '[FILTERED]'
return event
def before_breadcrumb(crumb, hint):
"""面包屑处理"""
return crumb
# 在应用中初始化
app = Flask(__name__)
app.config.from_pyfile('config.py')
init_sentry(app)
# 手动记录错误
@app.route('/api/some-endpoint')
def some_endpoint():
try:
# 业务逻辑
result = do_something()
return jsonify(result)
except Exception as e:
# 手动捕获并发送到Sentry
sentry_sdk.capture_exception(e)
# 附加额外信息
with sentry_sdk.configure_scope() as scope:
scope.set_tag("endpoint", "some_endpoint")
scope.set_user({"id": get_current_user_id()})
scope.set_extra("request_data", request.get_json())
return jsonify({'error': '处理失败'}), 500
# 测试Sentry
@app.route('/_debug/test-sentry')
def test_sentry():
"""测试Sentry集成"""
if not app.debug:
abort(404)
# 测试不同类型的错误
test_type = request.args.get('type', 'exception')
if test_type == 'exception':
raise Exception("Sentry测试: 通用异常")
elif test_type == 'capture':
sentry_sdk.capture_message("Sentry测试: 手动消息", level="info")
return "消息已发送到Sentry"
elif test_type == 'performance':
# 性能监控
with sentry_sdk.start_transaction(op="test", name="test_transaction") as transaction:
# 模拟一些操作
import time
time.sleep(0.1)
# 设置结果
transaction.set_status("ok")
return "性能跟踪已发送"
return "未知测试类型"
# error_handler.py - 企业级错误处理模块
import logging
import json
import traceback
from datetime import datetime
from functools import wraps
from typing import Dict, Any, Optional, Callable
from flask import Flask, request, jsonify, render_template, g
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
class ErrorHandler:
"""错误处理器类"""
def __init__(self, app: Optional[Flask] = None):
self.app = app
self.logger = logging.getLogger(__name__)
if app:
self.init_app(app)
def init_app(self, app: Flask):
"""初始化应用"""
self.app = app
# 设置错误处理器
self.setup_error_handlers()
# 设置上下文处理器
self.setup_context_processors()
# 初始化Sentry
self.init_sentry()
# 注册请求钩子
self.register_request_hooks()
def setup_error_handlers(self):
"""设置错误处理器"""
app = self.app
# HTTP错误处理器
@app.errorhandler(400)
def bad_request(error):
return self.error_response(
400, "请求参数错误", error.description
)
@app.errorhandler(401)
def unauthorized(error):
return self.error_response(
401, "需要身份验证", error.description
)
@app.errorhandler(403)
def forbidden(error):
return self.error_response(
403, "没有访问权限", error.description
)
@app.errorhandler(404)
def not_found(error):
return self.error_response(
404, "资源不存在", error.description
)
@app.errorhandler(405)
def method_not_allowed(error):
return self.error_response(
405, "请求方法不允许", error.description
)
@app.errorhandler(429)
def too_many_requests(error):
return self.error_response(
429, "请求过于频繁", error.description
)
@app.errorhandler(500)
def internal_server_error(error):
# 记录完整错误信息
self.log_exception(error)
return self.error_response(
500, "服务器内部错误"
)
# 通用异常处理器
@app.errorhandler(Exception)
def handle_exception(error):
# 记录异常
self.log_exception(error)
# 发送到Sentry
if self.sentry_enabled:
sentry_sdk.capture_exception(error)
# 返回错误响应
status_code = getattr(error, 'status_code', 500)
message = getattr(error, 'message', '服务器内部错误')
return self.error_response(status_code, message)
def setup_context_processors(self):
"""设置上下文处理器"""
@self.app.context_processor
def inject_error_context():
"""注入错误上下文"""
return {
'is_error_page': request.path.startswith('/error/'),
'error_codes': self.get_common_error_codes()
}
def init_sentry(self):
"""初始化Sentry"""
dsn = self.app.config.get('SENTRY_DSN')
if dsn and not self.app.debug:
sentry_sdk.init(
dsn=dsn,
integrations=[FlaskIntegration()],
environment=self.app.config.get('ENVIRONMENT', 'production'),
release=self.app.config.get('VERSION', '1.0.0'),
traces_sample_rate=0.1,
before_send=self.before_send_to_sentry
)
self.sentry_enabled = True
else:
self.sentry_enabled = False
def before_send_to_sentry(self, event, hint):
"""发送到Sentry前的处理"""
# 过滤敏感信息
self.filter_sensitive_data(event)
# 添加应用特定信息
event['tags']['app_name'] = self.app.name
event['tags']['environment'] = self.app.config.get('ENVIRONMENT')
return event
def filter_sensitive_data(self, event):
"""过滤敏感数据"""
sensitive_keys = [
'password', 'token', 'secret', 'key',
'authorization', 'cookie', 'credit_card',
'ssn', 'phone', 'email'
]
if 'request' in event:
request_data = event['request']
# 过滤headers
if 'headers' in request_data:
for key in list(request_data['headers'].keys()):
if any(s in key.lower() for s in sensitive_keys):
request_data['headers'][key] = '[FILTERED]'
# 过滤data
if 'data' in request_data:
if isinstance(request_data['data'], dict):
for key in list(request_data['data'].keys()):
if any(s in key.lower() for s in sensitive_keys):
request_data['data'][key] = '[FILTERED]'
def register_request_hooks(self):
"""注册请求钩子"""
app = self.app
@app.before_request
def before_request():
"""请求开始前"""
# 生成请求ID
request.request_id = datetime.now().strftime('%Y%m%d%H%M%S%f')
# 记录请求开始
self.logger.info(
"请求开始",
extra=self.get_request_context()
)
@app.after_request
def after_request(response):
"""请求结束后"""
# 记录请求结束
duration = (datetime.now() - g.start_time).total_seconds()
self.logger.info(
"请求结束",
extra={
**self.get_request_context(),
'status_code': response.status_code,
'duration': duration,
'response_size': len(response.get_data())
}
)
# 添加请求ID到响应头
response.headers['X-Request-ID'] = request.request_id
return response
@app.teardown_request
def teardown_request(exception):
"""请求结束时清理"""
if exception:
self.log_exception(exception)
def error_response(self, status_code: int, message: str,
details: Optional[str] = None) -> tuple:
"""生成错误响应"""
# 构建错误响应数据
error_data = {
'error': {
'code': status_code,
'message': message,
'request_id': getattr(request, 'request_id', None),
'timestamp': datetime.now().isoformat()
}
}
# 添加详细信息(仅调试模式)
if details and self.app.debug:
error_data['error']['details'] = details
# 根据请求类型返回不同的响应
if request.path.startswith('/api/') or request.is_json:
return jsonify(error_data), status_code
else:
# 返回HTML错误页面
return render_template(
f'errors/{status_code}.html',
error=error_data['error']
), status_code
def log_exception(self, exception: Exception):
"""记录异常"""
# 获取异常信息
exc_type = type(exception).__name__
exc_message = str(exception)
exc_traceback = traceback.format_exc()
# 构建日志数据
log_data = {
'exception_type': exc_type,
'exception_message': exc_message,
'exception_traceback': exc_traceback,
**self.get_request_context()
}
# 记录错误
self.logger.error(
f"未处理异常: {exc_type}: {exc_message}",
extra=log_data,
exc_info=True
)
def get_request_context(self) -> Dict[str, Any]:
"""获取请求上下文"""
context = {
'request_id': getattr(request, 'request_id', 'unknown'),
'path': request.path,
'method': request.method,
'remote_addr': request.remote_addr,
'user_agent': request.user_agent.string[:200] if request.user_agent else None
}
# 添加用户信息
if hasattr(g, 'user') and g.user:
context['user_id'] = g.user.id
context['user_role'] = g.user.role
return context
def get_common_error_codes(self) -> Dict[int, str]:
"""获取常见错误码"""
return {
400: "请求参数错误",
401: "需要身份验证",
403: "没有访问权限",
404: "资源不存在",
405: "请求方法不允许",
429: "请求过于频繁",
500: "服务器内部错误",
502: "网关错误",
503: "服务不可用",
504: "网关超时"
}
def api_error_handler(self, func: Callable):
"""API错误处理装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# 获取错误状态码
status_code = getattr(e, 'status_code', 500)
# 记录错误
self.log_exception(e)
# 返回错误响应
return self.error_response(
status_code,
str(e) or "服务器内部错误"
)
return wrapper
# 使用示例
app = Flask(__name__)
# 配置应用
app.config.update(
SECRET_KEY='your-secret-key',
ENVIRONMENT='production',
SENTRY_DSN='your-sentry-dsn-here' # 生产环境设置
)
# 初始化错误处理器
error_handler = ErrorHandler(app)
# 定义自定义异常
class BusinessException(Exception):
def __init__(self, message, status_code=400):
super().__init__(message)
self.status_code = status_code
self.message = message
# 使用错误处理器
@app.route('/api/test-error')
@error_handler.api_error_handler
def test_error():
error_type = request.args.get('type', 'business')
if error_type == 'business':
raise BusinessException("业务逻辑错误", 400)
elif error_type == 'validation':
raise ValueError("数据验证失败")
elif error_type == 'server':
# 模拟服务器错误
result = 1 / 0
else:
return jsonify({'success': True, 'data': '正常响应'})
return jsonify({'success': True})
# 错误页面路由
@app.route('/error/<int:code>')
def show_error_page(code):
"""显示错误页面"""
return render_template(f'errors/{code}.html'), code
@app.route('/error/logs')
@login_required
@admin_required
def error_logs():
"""错误日志查看(仅管理员)"""
logs = self.get_error_logs()
return render_template('admin/error_logs.html', logs=logs)
if __name__ == '__main__':
app.run(debug=True)
# 健康检查端点
@app.route('/health')
def health_check():
"""健康检查端点"""
checks = {
'database': check_database(),
'cache': check_cache(),
'external_service': check_external_service(),
'disk_space': check_disk_space()
}
# 检查所有组件
all_healthy = all(checks.values())
if all_healthy:
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'checks': checks
}), 200
else:
# 记录不健康的组件
unhealthy = [name for name, status in checks.items() if not status]
app.logger.error(f"健康检查失败: {unhealthy}")
return jsonify({
'status': 'unhealthy',
'timestamp': datetime.now().isoformat(),
'checks': checks,
'unhealthy_components': unhealthy
}), 503
@app.route('/health/liveness')
def liveness_probe():
"""存活探针"""
return jsonify({'status': 'alive'}), 200
@app.route('/health/readiness')
def readiness_probe():
"""就绪探针"""
# 检查应用是否准备好接收流量
if is_app_ready():
return jsonify({'status': 'ready'}), 200
else:
return jsonify({'status': 'not ready'}), 503
可能的原因:
# 正确的注册顺序
app = Flask(__name__)
# 先注册错误处理器
@app.errorhandler(404)
def handle_404(e):
return "404错误页面", 404
# 然后注册路由
@app.route('/')
def index():
return "首页"
# 错误的顺序 - 处理器不会被调用
@app.route('/wrong')
def wrong():
abort(404)
@app.errorhandler(404) # 这个处理器不会被执行
def handle_404_too_late(e):
return "太晚了", 404
生产环境调试策略:
示例请求ID追踪:
# 添加请求ID中间件
@app.before_request
def assign_request_id():
request.id = str(uuid.uuid4())
g.request_id = request.id
# 在日志中添加请求ID
import logging
logger = logging.getLogger('app')
logger = logging.LoggerAdapter(logger, {'request_id': request.id})
g.logger = logger
# 在错误响应中包含请求ID
@app.errorhandler(Exception)
def handle_exception(e):
logger = getattr(g, 'logger', logging.getLogger('app'))
logger.error(f"异常: {e}", exc_info=True)
return jsonify({
'error': '服务器错误',
'request_id': getattr(g, 'request_id', 'unknown'),
'support_reference': datetime.now().strftime('%Y%m%d-%H%M%S')
}), 500
数据库错误处理策略:
# 数据库错误处理
import psycopg2
from sqlalchemy.exc import SQLAlchemyError
def handle_database_error(func):
"""数据库错误处理装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except psycopg2.OperationalError as e:
# 数据库连接错误
app.logger.error(f"数据库连接错误: {e}")
# 尝试重连
if reconnect_database():
try:
return func(*args, **kwargs)
except Exception as retry_error:
app.logger.error(f"重连后再次失败: {retry_error}")
# 返回503服务不可用
abort(503, description="数据库暂时不可用")
except SQLAlchemyError as e:
# SQLAlchemy错误
app.logger.error(f"数据库操作错误: {e}")
# 区分不同类型的数据库错误
if "duplicate key" in str(e):
abort(409, description="资源已存在")
elif "foreign key" in str(e):
abort(400, description="关联数据不存在")
else:
abort(500, description="数据库操作失败")
return wrapper
# 使用装饰器
@app.route('/api/data')
@handle_database_error
def get_data():
# 数据库查询
data = db.session.query(MyModel).all()
return jsonify(data)
# 重连机制
def reconnect_database():
"""尝试重新连接数据库"""
max_retries = 3
for attempt in range(max_retries):
try:
db.session.rollback()
db.session.close()
db.engine.dispose()
# 创建新连接
db.session = db.create_scoped_session()
db.session.execute("SELECT 1") # 测试连接
app.logger.info(f"数据库重连成功 (尝试 {attempt + 1})")
return True
except Exception as e:
app.logger.warning(f"数据库重连失败 (尝试 {attempt + 1}): {e}")
time.sleep(1 * (attempt + 1)) # 指数退避
return False