Session(会话)是Web应用中跟踪用户状态的一种机制,允许服务器在多个请求之间存储和检索用户数据。
| Session | Cookie | |
|---|---|---|
| 存储位置 | 服务器端 | 客户端 |
| 安全性 | 较高 | 较低 |
| 存储容量 | 较大 | 较小(4KB) |
| 数据类型 | 任意Python对象 | 字符串 |
| 过期控制 | 灵活 | 固定时间 |
| 用途 | 用户状态、购物车等 | 用户偏好、跟踪等 |
SECRET_KEY来保证安全。
from flask import Flask, session
app = Flask(__name__)
# 必须设置SECRET_KEY,用于签名Session Cookie
app.config['SECRET_KEY'] = 'your-secret-key-here-change-this'
# Session配置选项
app.config['SESSION_COOKIE_NAME'] = 'my_session' # Session Cookie名称
app.config['SESSION_COOKIE_DOMAIN'] = None # Cookie域名
app.config['SESSION_COOKIE_PATH'] = None # Cookie路径
app.config['SESSION_COOKIE_HTTPONLY'] = True # 防止JavaScript访问
app.config['SESSION_COOKIE_SECURE'] = False # 是否仅HTTPS传输
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # 防止CSRF
# Session过期时间配置
app.config['PERMANENT_SESSION_LIFETIME'] = 3600 # 会话有效期(秒)
app.config['SESSION_REFRESH_EACH_REQUEST'] = True # 每次请求刷新过期时间
# 调试配置
@app.route('/debug-session')
def debug_session():
"""查看Session配置"""
return {
'cookie_name': app.config.get('SESSION_COOKIE_NAME'),
'cookie_domain': app.config.get('SESSION_COOKIE_DOMAIN'),
'cookie_secure': app.config.get('SESSION_COOKIE_SECURE'),
'cookie_httponly': app.config.get('SESSION_COOKIE_HTTPONLY'),
'cookie_samesite': app.config.get('SESSION_COOKIE_SAMESITE'),
'permanent_lifetime': app.config.get('PERMANENT_SESSION_LIFETIME')
}
import os
from datetime import timedelta
class Config:
"""基础配置类"""
# 从环境变量获取密钥,避免硬编码
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key'
# Session安全配置
SESSION_COOKIE_NAME = 'app_session'
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# Session过期时间(7天)
PERMANENT_SESSION_LIFETIME = timedelta(days=7)
# 每次请求刷新Session
SESSION_REFRESH_EACH_REQUEST = True
class DevelopmentConfig(Config):
"""开发环境配置"""
DEBUG = True
SESSION_COOKIE_SECURE = False # 开发环境可以不是HTTPS
class ProductionConfig(Config):
"""生产环境配置"""
DEBUG = False
SESSION_COOKIE_SECURE = True # 生产环境必须使用HTTPS
# 更长的Session过期时间
PERMANENT_SESSION_LIFETIME = timedelta(days=30)
# 额外的安全头
@staticmethod
def init_app(app):
"""初始化应用"""
# 设置安全相关的HTTP头
from flask import Flask
from flask_talisman import Talisman
Talisman(app,
content_security_policy=None,
session_cookie_secure=True,
session_cookie_httponly=True,
session_cookie_samesite='Lax')
# 应用工厂模式
def create_app(config_class=DevelopmentConfig):
app = Flask(__name__)
app.config.from_object(config_class)
if isinstance(app.config['PERMANENT_SESSION_LIFETIME'], timedelta):
app.permanent_session_lifetime = app.config['PERMANENT_SESSION_LIFETIME']
return app
# 创建应用
app = create_app(ProductionConfig)
from flask import Flask, session, request, redirect, url_for
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret-key'
@app.route('/')
def index():
"""首页,显示Session信息"""
# 获取Session值
username = session.get('username', '游客')
visit_count = session.get('visit_count', 0)
# 更新访问次数
session['visit_count'] = visit_count + 1
return f"""
<h1>欢迎,{username}</h1>
<p>您已经访问了 {session['visit_count']} 次</p>
<a href="/login">登录</a>
<a href="/logout">退出</a>
"""
@app.route('/login', methods=['GET', 'POST'])
def login():
"""登录页面"""
if request.method == 'POST':
username = request.form.get('username')
# 设置Session值
session['username'] = username
session['logged_in'] = True
session['login_time'] = datetime.now().isoformat()
# 设置永久Session
session.permanent = True
# 存储用户信息
session['user_info'] = {
'username': username,
'ip': request.remote_addr,
'user_agent': request.user_agent.string
}
# 重定向到首页
return redirect(url_for('index'))
return '''
<form method="POST">
<input type="text" name="username" placeholder="用户名">
<button type="submit">登录</button>
</form>
'''
@app.route('/logout')
def logout():
"""退出登录"""
# 清除特定Session值
session.pop('username', None)
session.pop('logged_in', None)
# 或者清除所有Session
# session.clear()
return redirect(url_for('index'))
@app.route('/session-info')
def session_info():
"""查看Session信息"""
if 'logged_in' not in session:
return "未登录"
return {
'username': session.get('username'),
'logged_in': session.get('logged_in', False),
'visit_count': session.get('visit_count', 0),
'login_time': session.get('login_time'),
'session_id': session.sid, # Session ID
'session_permanent': session.permanent,
'all_session_data': dict(session)
}
@app.route('/update-profile', methods=['POST'])
def update_profile():
"""更新用户资料"""
if 'logged_in' not in session:
return redirect(url_for('login'))
# 更新Session中的用户信息
theme = request.form.get('theme', 'light')
language = request.form.get('language', 'zh-CN')
session['theme'] = theme
session['language'] = language
# 修改已存在的字典
if 'user_info' in session:
session['user_info'].update({
'theme': theme,
'language': language,
'updated_at': datetime.now().isoformat()
})
# Session中的字典需要重新赋值来触发修改
session.modified = True
return "资料已更新"
from flask import Flask, session, g
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret-key'
@app.before_request
def before_request():
"""在每个请求前执行"""
# 检查Session是否有效
if 'user_id' in session:
# 检查Session是否过期
last_activity = session.get('last_activity')
if last_activity:
last_time = datetime.fromisoformat(last_activity)
if datetime.now() - last_time > timedelta(hours=1):
# Session过期,清除
session.clear()
g.user = None
else:
# 更新最后活动时间
session['last_activity'] = datetime.now().isoformat()
# 将用户信息存储在g对象中
g.user = {
'id': session.get('user_id'),
'username': session.get('username'),
'is_admin': session.get('is_admin', False)
}
else:
g.user = None
@app.route('/cart/add/<int:product_id>')
def add_to_cart(product_id):
"""添加到购物车"""
# 初始化购物车
if 'cart' not in session:
session['cart'] = {}
# 更新购物车
cart = session['cart']
cart[product_id] = cart.get(product_id, 0) + 1
# 重新赋值触发修改
session['cart'] = cart
return f"已添加到购物车,当前数量: {cart[product_id]}"
@app.route('/cart')
def view_cart():
"""查看购物车"""
cart = session.get('cart', {})
# 计算总价
total = 0
items = []
for product_id, quantity in cart.items():
price = get_product_price(product_id) # 假设有获取价格的函数
items.append({
'id': product_id,
'quantity': quantity,
'price': price,
'subtotal': price * quantity
})
total += price * quantity
return {
'items': items,
'total': total,
'item_count': len(cart)
}
@app.route('/session-stats')
def session_stats():
"""Session统计信息"""
# 计算Session大小(近似值)
import sys
session_size = sum(
sys.getsizeof(k) + sys.getsizeof(v)
for k, v in session.items()
)
return {
'session_keys': list(session.keys()),
'session_size_bytes': session_size,
'session_modified': session.modified,
'new_session': session.new
}
@app.route('/flash-message')
def flash_message():
"""使用Session实现Flash消息"""
# 设置Flash消息
session.setdefault('_flashes', []).append(('success', '操作成功!'))
# 在下一个请求中显示Flash消息
return '''
<script>
// 模拟下一个请求
setTimeout(() => {
window.location.href = '/show-flashes';
}, 1000);
</script>
'''
@app.route('/show-flashes')
def show_flashes():
"""显示Flash消息"""
flashes = session.pop('_flashes', []) if 'show_flashes' in request.args else []
html = '<h1>Flash消息</h1>'
for category, message in flashes:
html += f'<div class="alert alert-{category}">{message}</div>'
return html
# 安装Flask-Login
pip install flask-login
# 或者使用requirements.txt
# flask-login==0.6.2
# 初始化Flask-Login
from flask import Flask
from flask_login import LoginManager
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# 创建LoginManager实例
login_manager = LoginManager()
# 配置LoginManager
login_manager.init_app(app)
# 基本配置
login_manager.login_view = 'login' # 登录页面的视图函数名
login_manager.login_message = '请先登录' # 登录提示消息
login_manager.login_message_category = 'info' # 消息类别
login_manager.refresh_view = 'refresh' # 刷新视图(如果需要)
login_manager.needs_refresh_message = '请重新登录以确认身份'
login_manager.needs_refresh_message_category = 'info'
# 会话保护配置
login_manager.session_protection = 'strong' # 可选值: None, 'basic', 'strong'
# Cookie配置
login_manager.remember_cookie_name = 'remember_token'
login_manager.remember_cookie_duration = timedelta(days=7)
login_manager.remember_cookie_secure = True # 仅HTTPS
login_manager.remember_cookie_httponly = True # 防止JS访问
# 用户加载器(必须实现)
@login_manager.user_loader
def load_user(user_id):
"""根据用户ID加载用户对象"""
from .models import User
return User.query.get(int(user_id))
# 请求加载器(可选)
@login_manager.request_loader
def load_user_from_request(request):
"""从请求中加载用户(用于API认证)"""
# 从请求头获取API密钥
api_key = request.headers.get('X-API-Key')
if api_key:
user = User.query.filter_by(api_key=api_key).first()
if user:
return user
# 从请求参数获取token
token = request.args.get('token')
if token:
user = User.verify_auth_token(token)
if user:
return user
return None
# models.py
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from . import db, login_manager
class User(UserMixin, db.Model):
"""用户模型"""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True, nullable=False)
email = db.Column(db.String(120), unique=True, index=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
is_admin = db.Column(db.Boolean, default=False, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime)
# 关系
posts = db.relationship('Post', backref='author', lazy='dynamic')
def __init__(self, username, email, password):
self.username = username
self.email = email
self.password_hash = generate_password_hash(password)
@property
def password(self):
"""密码不可读"""
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
"""设置密码"""
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
"""验证密码"""
return check_password_hash(self.password_hash, password)
def get_id(self):
"""获取用户ID(Flask-Login要求)"""
return str(self.id)
@property
def is_authenticated(self):
"""用户是否已认证(Flask-Login要求)"""
return True
@property
def is_anonymous(self):
"""是否是匿名用户(Flask-Link要求)"""
return False
def generate_auth_token(self, expiration=3600):
"""生成认证令牌"""
s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'id': self.id}).decode('utf-8')
@staticmethod
def verify_auth_token(token):
"""验证认证令牌"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
return User.query.get(data['id'])
def __repr__(self):
return f'<User {self.username}>'
# 用户加载器
@login_manager.user_loader
def load_user(user_id):
"""根据用户ID加载用户"""
return User.query.get(int(user_id))
# 匿名用户模型
class AnonymousUser(AnonymousUserMixin):
"""匿名用户模型"""
def __init__(self):
self.username = '游客'
self.is_admin = False
def can(self, permissions):
return False
def is_administrator(self):
return False
# 设置匿名用户模型
login_manager.anonymous_user = AnonymousUser
# auth.py
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.urls import url_parse
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
"""登录视图"""
# 如果用户已登录,重定向到首页
if current_user.is_authenticated:
return redirect(url_for('main.index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
remember = request.form.get('remember', False) == 'on'
# 验证输入
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('auth/login.html')
# 查找用户
user = User.query.filter_by(username=username).first()
# 验证用户
if user is None or not user.verify_password(password):
flash('用户名或密码错误', 'error')
# 记录登录失败尝试
session['login_attempts'] = session.get('login_attempts', 0) + 1
return render_template('auth/login.html')
# 检查用户是否被禁用
if not user.is_active:
flash('账户已被禁用,请联系管理员', 'error')
return render_template('auth/login.html')
# 登录用户
login_user(user, remember=remember)
# 更新最后登录时间
user.last_login = datetime.utcnow()
db.session.commit()
# 重置登录尝试次数
session.pop('login_attempts', None)
# 记录登录信息
session['login_ip'] = request.remote_addr
session['login_time'] = datetime.now().isoformat()
# 重定向到原始页面或首页
next_page = request.args.get('next')
if not next_page or url_parse(next_page).netloc != '':
next_page = url_for('main.index')
flash('登录成功!', 'success')
return redirect(next_page)
return render_template('auth/login.html')
@auth_bp.route('/logout')
@login_required
def logout():
"""退出登录"""
# 记录退出信息
user_id = current_user.id
username = current_user.username
logout_user()
# 清除Session
session.clear()
flash('您已成功退出登录', 'info')
return redirect(url_for('auth.login'))
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
"""用户注册"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip()
password = request.form.get('password', '')
password2 = request.form.get('password2', '')
# 验证表单
errors = []
# 验证用户名
if not username or len(username) < 3:
errors.append('用户名至少3个字符')
elif User.query.filter_by(username=username).first():
errors.append('用户名已存在')
# 验证邮箱
if not email or '@' not in email:
errors.append('请输入有效的邮箱地址')
elif User.query.filter_by(email=email).first():
errors.append('邮箱已注册')
# 验证密码
if len(password) < 6:
errors.append('密码至少6个字符')
elif password != password2:
errors.append('两次输入的密码不一致')
if errors:
for error in errors:
flash(error, 'error')
return render_template('auth/register.html')
# 创建用户
user = User(username=username, email=email, password=password)
try:
db.session.add(user)
db.session.commit()
# 自动登录新用户
login_user(user)
flash('注册成功!', 'success')
# 发送欢迎邮件
send_welcome_email(user)
return redirect(url_for('main.index'))
except Exception as e:
db.session.rollback()
flash(f'注册失败: {str(e)}', 'error')
return render_template('auth/register.html')
# decorators.py
from functools import wraps
from flask import flash, redirect, url_for, request
from flask_login import current_user
from werkzeug.urls import url_parse
def login_required(f):
"""登录要求装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
# 存储原始URL
session['next'] = request.url
flash('请先登录', 'warning')
return redirect(url_for('auth.login', next=request.url))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""管理员要求装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('请先登录', 'warning')
return redirect(url_for('auth.login', next=request.url))
if not current_user.is_admin:
flash('需要管理员权限', 'error')
return redirect(url_for('main.index'))
return f(*args, **kwargs)
return decorated_function
def logout_required(f):
"""要求用户未登录的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if current_user.is_authenticated:
flash('您已登录', 'info')
return redirect(url_for('main.index'))
return f(*args, **kwargs)
return decorated_function
# 使用装饰器
@main_bp.route('/profile')
@login_required
def profile():
"""个人资料页面"""
return render_template('user/profile.html')
@main_bp.route('/admin/dashboard')
@admin_required
def admin_dashboard():
"""管理员面板"""
return render_template('admin/dashboard.html')
# 登录限制中间件
@app.before_request
def check_login_attempts():
"""检查登录尝试次数"""
if request.endpoint == 'auth.login':
attempts = session.get('login_attempts', 0)
if attempts >= 5:
# 限制登录频率
flash('登录尝试次数过多,请稍后再试', 'error')
return redirect(url_for('main.index'))
# 基于角色的访问控制
def permission_required(permission):
"""权限要求装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.can(permission):
flash('权限不足', 'error')
return redirect(url_for('main.index'))
return f(*args, **kwargs)
return decorated_function
return decorator
# 登录后重定向处理
def redirect_back(default='main.index', **kwargs):
"""重定向回原始页面"""
for target in request.args.get('next'), request.referrer:
if not target:
continue
if url_parse(target).netloc != '':
continue
return redirect(target)
return redirect(url_for(default, **kwargs))
# 在登录视图中使用
@auth_bp.route('/login', methods=['POST'])
def login_post():
# ... 登录验证逻辑
if login_success:
return redirect_back() # 重定向回原始页面
# 配置记住我功能
login_manager = LoginManager()
# 记住我Cookie配置
login_manager.remember_cookie_name = 'remember_token'
login_manager.remember_cookie_duration = timedelta(days=30) # 记住30天
login_manager.remember_cookie_secure = True # 仅HTTPS
login_manager.remember_cookie_httponly = True # 防止JavaScript访问
login_manager.remember_cookie_samesite = 'Lax' # CSRF防护
# 在登录视图中使用记住我功能
@auth_bp.route('/login', methods=['POST'])
def login():
if request.method == 'POST':
# ... 验证用户名密码
if user and user.verify_password(password):
# 获取记住我选项
remember = request.form.get('remember', False) == 'on'
# 登录用户,启用记住我功能
login_user(user, remember=remember)
flash('登录成功!', 'success')
return redirect(url_for('main.index'))
# 用户模型中的记住我令牌
from itsdangerous import URLSafeTimedSerializer
from flask import current_app
class User(UserMixin, db.Model):
# ... 其他字段
remember_token = db.Column(db.String(128))
def get_remember_token(self):
"""获取记住我令牌"""
s = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
return s.dumps({'user_id': self.id})
@staticmethod
def verify_remember_token(token, max_age=2592000): # 30天
"""验证记住我令牌"""
s = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token, max_age=max_age)
user_id = data.get('user_id')
if user_id:
return User.query.get(user_id)
except:
return None
def update_remember_token(self):
"""更新记住我令牌"""
self.remember_token = self.get_remember_token()
db.session.commit()
# 自定义记住我功能
from flask_login import AnonymousUserMixin
class CustomLoginManager(LoginManager):
"""自定义登录管理器"""
def _load_user_from_remember_cookie(self, cookie):
"""从记住我Cookie加载用户"""
if not cookie:
return
# 解码Cookie
data = self._remember_cookie_decode(cookie)
if data:
user_id = data[0]
token = data[1] if len(data) > 1 else None
# 加载用户
user = self._user_callback(user_id)
if user and user.verify_remember_token(token):
return user
return None
# 安全的记住我实现
import secrets
from datetime import datetime, timedelta
from werkzeug.security import generate_password_hash, check_password_hash
class User(UserMixin, db.Model):
# ... 其他字段
# 记住我相关字段
remember_token_hash = db.Column(db.String(128))
remember_token_created_at = db.Column(db.DateTime)
def generate_remember_token(self):
"""生成记住我令牌"""
# 生成随机令牌
token = secrets.token_urlsafe(32)
# 存储令牌哈希
self.remember_token_hash = generate_password_hash(token)
self.remember_token_created_at = datetime.utcnow()
db.session.commit()
return token
def verify_remember_token(self, token):
"""验证记住我令牌"""
if not self.remember_token_hash or not self.remember_token_created_at:
return False
# 检查令牌是否过期(30天)
token_age = datetime.utcnow() - self.remember_token_created_at
if token_age > timedelta(days=30):
return False
# 验证令牌
return check_password_hash(self.remember_token_hash, token)
def invalidate_remember_token(self):
"""使记住我令牌失效"""
self.remember_token_hash = None
self.remember_token_created_at = None
db.session.commit()
# 记住我中间件
@app.before_request
def check_remember_me():
"""检查记住我Cookie"""
if not current_user.is_authenticated:
cookie = request.cookies.get(app.config['REMEMBER_COOKIE_NAME'])
if cookie:
user = User.verify_remember_cookie(cookie)
if user:
login_user(user)
flash('自动登录成功', 'info')
# 更新记住我令牌周期
def refresh_remember_token(user):
"""刷新记住我令牌"""
if user.remember_token_created_at:
token_age = datetime.utcnow() - user.remember_token_created_at
if token_age > timedelta(days=15): # 超过15天刷新
user.generate_remember_token()
# 在敏感操作前重新认证
def confirm_login_required(f):
"""要求重新确认登录的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
return login_manager.unauthorized()
# 检查是否需要重新认证
last_confirmed = session.get('login_confirmed_at')
if last_confirmed:
last_time = datetime.fromisoformat(last_confirmed)
if datetime.now() - last_time > timedelta(minutes=30):
# 需要重新认证
return redirect(url_for('auth.confirm_login'))
return f(*args, **kwargs)
return decorated_function
@auth_bp.route('/confirm-login', methods=['GET', 'POST'])
@login_required
def confirm_login():
"""重新确认登录"""
if request.method == 'POST':
password = request.form.get('password')
if current_user.verify_password(password):
# 更新确认时间
session['login_confirmed_at'] = datetime.now().isoformat()
flash('身份验证成功', 'success')
return redirect_back()
else:
flash('密码错误', 'error')
return render_template('auth/confirm_login.html')
# 安全配置类
class SecurityConfig:
"""安全配置"""
# Session安全配置
SESSION_COOKIE_NAME = '__Secure-session' if not DEBUG else 'session'
SESSION_COOKIE_HTTPONLY = True # 防止XSS
SESSION_COOKIE_SECURE = not DEBUG # 生产环境使用HTTPS
SESSION_COOKIE_SAMESITE = 'Lax' # 防止CSRF
# 记住我Cookie安全配置
REMEMBER_COOKIE_NAME = '__Secure-remember' if not DEBUG else 'remember'
REMEMBER_COOKIE_HTTPONLY = True
REMEMBER_COOKIE_SECURE = not DEBUG
REMEMBER_COOKIE_SAMESITE = 'Lax'
REMEMBER_COOKIE_DURATION = timedelta(days=30)
# 防止会话固定攻击
SESSION_COOKIE_REFRESH_ON_REQUEST = True # 每次请求刷新Session ID
# Session保护级别
SESSION_PROTECTION = 'strong' # Flask-Login会话保护
# 防止暴力破解
LOGIN_ATTEMPT_LIMIT = 5
LOGIN_ATTEMPT_TIMEOUT = 300 # 5分钟
# 安全头
SECURITY_HEADERS = {
'X-Frame-Options': 'DENY',
'X-Content-Type-Options': 'nosniff',
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'Content-Security-Policy': "default-src 'self'",
'Referrer-Policy': 'strict-origin-when-cross-origin'
}
# 安全中间件
@app.after_request
def set_security_headers(response):
"""设置安全HTTP头"""
for header, value in app.config.get('SECURITY_HEADERS', {}).items():
response.headers[header] = value
return response
@app.before_request
def check_session_security():
"""检查会话安全"""
if current_user.is_authenticated:
# 检查用户代理是否变化
current_ua = request.user_agent.string
stored_ua = session.get('user_agent')
if stored_ua and stored_ua != current_ua:
# 用户代理变化,强制重新登录
logout_user()
session.clear()
flash('安全检测:用户代理变化,请重新登录', 'warning')
return redirect(url_for('auth.login'))
# 检查IP地址是否变化
current_ip = request.remote_addr
stored_ip = session.get('login_ip')
if stored_ip and stored_ip != current_ip:
# IP变化,记录日志但允许(因为用户可能使用移动网络)
app.logger.warning(f'用户 {current_user.id} IP地址变化: {stored_ip} -> {current_ip}')
session['login_ip'] = current_ip
# 存储当前用户代理(如果是首次)
if not stored_ua:
session['user_agent'] = current_ua
# 防止会话固定攻击
@app.before_request
def prevent_session_fixation():
"""防止会话固定攻击"""
if 'user_id' in session:
# 如果用户已登录,生成新的Session ID
if session.get('_new_session_id'):
# 已经有新Session ID,不需要再生成
pass
else:
# 生成新的Session ID
old_session_id = session.sid
session['_new_session_id'] = True
session.modified = True
# 会话超时处理
@app.before_request
def handle_session_timeout():
"""处理会话超时"""
if 'user_id' in session:
last_activity = session.get('last_activity')
if last_activity:
last_time = datetime.fromisoformat(last_activity)
timeout = app.config.get('SESSION_TIMEOUT', 3600) # 默认1小时
if datetime.now() - last_time > timedelta(seconds=timeout):
# 会话超时,清除Session
user_id = session.get('user_id')
session.clear()
# 记录日志
app.logger.info(f'会话超时: 用户 {user_id}')
# 如果用户正在浏览页面,重定向到登录页
if request.endpoint not in ['static', 'auth.login']:
flash('会话已过期,请重新登录', 'warning')
return redirect(url_for('auth.login'))
# 更新最后活动时间
session['last_activity'] = datetime.now().isoformat()
# 防止并发登录
@app.before_request
def prevent_concurrent_login():
"""防止同一用户多处登录"""
if current_user.is_authenticated:
user_id = current_user.id
current_token = session.get('login_token')
# 获取数据库中存储的最新登录令牌
latest_token = get_latest_login_token(user_id)
if latest_token and current_token != latest_token:
# 令牌不匹配,说明在其他地方登录了
logout_user()
session.clear()
flash('您的账户在其他地方登录,您已被登出', 'warning')
return redirect(url_for('auth.login'))
# 登录成功时生成新的登录令牌
def generate_login_token(user_id):
"""生成登录令牌"""
token = secrets.token_urlsafe(32)
store_login_token(user_id, token) # 存储到数据库或Redis
session['login_token'] = token
return token
# 安全注销
def secure_logout():
"""安全的退出登录"""
# 清除记住我令牌
if current_user.is_authenticated:
current_user.invalidate_remember_token()
# 清除登录令牌
session.pop('login_token', None)
# 清除所有Session数据
session.clear()
# Flask-Login登出
logout_user()
# 设置安全相关的响应头
response = redirect(url_for('auth.login'))
response.delete_cookie(app.config['SESSION_COOKIE_NAME'])
response.delete_cookie(app.config['REMEMBER_COOKIE_NAME'])
return response
# 审计日志
@app.after_request
def log_session_activity(response):
"""记录会话活动日志"""
if 'user_id' in session:
log_entry = {
'timestamp': datetime.now().isoformat(),
'user_id': session.get('user_id'),
'endpoint': request.endpoint,
'method': request.method,
'ip': request.remote_addr,
'user_agent': request.user_agent.string,
'status_code': response.status_code
}
# 记录到文件或数据库
app.logger.info(f'SESSION_ACTIVITY: {json.dumps(log_entry)}')
return response
# 使用Redis存储Session
import redis
from flask.sessions import SessionInterface, SessionMixin
from werkzeug.datastructures import CallbackDict
import pickle
import json
from datetime import timedelta
class RedisSession(CallbackDict, SessionMixin):
"""Redis Session类"""
def __init__(self, initial=None, sid=None, new=False):
def on_update(self):
self.modified = True
CallbackDict.__init__(self, initial, on_update)
self.sid = sid
self.new = new
self.modified = False
class RedisSessionInterface(SessionInterface):
"""Redis Session接口"""
serializer = pickle
session_class = RedisSession
def __init__(self, redis_connection, prefix='session:'):
self.redis = redis_connection
self.prefix = prefix
def generate_sid(self):
"""生成Session ID"""
return secrets.token_urlsafe(32)
def get_redis_expiration_time(self, app, session):
"""获取Redis过期时间"""
if session.permanent:
return app.permanent_session_lifetime
return timedelta(days=1)
def open_session(self, app, request):
"""打开Session"""
sid = request.cookies.get(app.config['SESSION_COOKIE_NAME'])
if not sid:
sid = self.generate_sid()
return self.session_class(sid=sid, new=True)
# 从Redis加载Session数据
key = self.prefix + sid
val = self.redis.get(key)
if val is not None:
try:
data = self.serializer.loads(val)
return self.session_class(data, sid=sid)
except:
# 数据损坏,创建新Session
pass
return self.session_class(sid=sid, new=True)
def save_session(self, app, session, response):
"""保存Session到Redis"""
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
# 如果Session为空,从Redis删除
if not session:
if session.modified:
self.redis.delete(self.prefix + session.sid)
response.delete_cookie(app.config['SESSION_COOKIE_NAME'],
domain=domain, path=path)
return
# 计算过期时间
expires = self.get_expiration_time(app, session)
redis_expires = self.get_redis_expiration_time(app, session)
# 保存到Redis
key = self.prefix + session.sid
val = self.serializer.dumps(dict(session))
self.redis.setex(key, int(redis_expires.total_seconds()), val)
# 设置Cookie
response.set_cookie(
app.config['SESSION_COOKIE_NAME'],
session.sid,
expires=expires,
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
domain=domain,
path=path,
secure=app.config['SESSION_COOKIE_SECURE'],
samesite=app.config['SESSION_COOKIE_SAMESITE']
)
# 初始化Redis Session
def create_app():
app = Flask(__name__)
# Redis配置
redis_client = redis.Redis(
host=app.config['REDIS_HOST'],
port=app.config['REDIS_PORT'],
password=app.config['REDIS_PASSWORD'],
decode_responses=False
)
# 设置Session接口
app.session_interface = RedisSessionInterface(
redis_client,
prefix='flask_session:'
)
return app
# 使用JSON序列化的版本
class JSONRedisSessionInterface(RedisSessionInterface):
"""使用JSON序列化的Redis Session接口"""
serializer = json
def open_session(self, app, request):
"""打开Session(JSON版本)"""
sid = request.cookies.get(app.config['SESSION_COOKIE_NAME'])
if not sid:
sid = self.generate_sid()
return self.session_class(sid=sid, new=True)
key = self.prefix + sid
val = self.redis.get(key)
if val is not None:
try:
data = json.loads(val.decode('utf-8'))
return self.session_class(data, sid=sid)
except:
pass
return self.session_class(sid=sid, new=True)
def save_session(self, app, session, response):
"""保存Session(JSON版本)"""
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if not session:
if session.modified:
self.redis.delete(self.prefix + session.sid)
response.delete_cookie(app.config['SESSION_COOKIE_NAME'],
domain=domain, path=path)
return
expires = self.get_expiration_time(app, session)
redis_expires = self.get_redis_expiration_time(app, session)
key = self.prefix + session.sid
val = json.dumps(dict(session), ensure_ascii=False)
self.redis.setex(key, int(redis_expires.total_seconds()), val)
response.set_cookie(
app.config['SESSION_COOKIE_NAME'],
session.sid,
expires=expires,
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
domain=domain,
path=path,
secure=app.config['SESSION_COOKIE_SECURE'],
samesite=app.config['SESSION_COOKIE_SAMESITE']
)
# Session清理任务
import schedule
import threading
from datetime import datetime, timedelta
def cleanup_expired_sessions():
"""清理过期的Session"""
now = datetime.utcnow()
# 清理数据库中的过期Session(如果有)
expired_sessions = Session.query.filter(
Session.expires_at < now
).all()
for session in expired_sessions:
db.session.delete(session)
db.session.commit()
# 清理Redis中的过期Session
if hasattr(app, 'session_interface') and \
hasattr(app.session_interface, 'redis'):
redis_client = app.session_interface.redis
prefix = app.session_interface.prefix
# 使用SCAN迭代所有Session键
cursor = '0'
while cursor != 0:
cursor, keys = redis_client.scan(
cursor=cursor,
match=f'{prefix}*',
count=100
)
for key in keys:
# 检查TTL
ttl = redis_client.ttl(key)
if ttl < 0: # 没有设置过期时间或已过期
redis_client.delete(key)
# 定时任务线程
def run_scheduler():
"""运行定时任务"""
# 每小时清理一次过期Session
schedule.every().hour.do(cleanup_expired_sessions)
while True:
schedule.run_pending()
time.sleep(60)
# 启动清理线程
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
# Session数据分析
def analyze_session_data():
"""分析Session数据"""
stats = {
'total_sessions': 0,
'active_sessions': 0,
'average_session_duration': 0,
'sessions_by_hour': {}
}
# 统计活跃Session(过去30分钟内有活动的)
active_cutoff = datetime.utcnow() - timedelta(minutes=30)
sessions = Session.query.all()
stats['total_sessions'] = len(sessions)
active_sessions = 0
total_duration = 0
for session in sessions:
# 检查是否活跃
if session.last_activity and session.last_activity > active_cutoff:
active_sessions += 1
# 计算会话时长
if session.created_at and session.last_activity:
duration = (session.last_activity - session.created_at).total_seconds()
total_duration += duration
stats['active_sessions'] = active_sessions
if sessions:
stats['average_session_duration'] = total_duration / len(sessions)
# 按小时统计
for i in range(24):
hour_start = datetime.utcnow().replace(hour=i, minute=0, second=0, microsecond=0)
hour_end = hour_start + timedelta(hours=1)
hour_sessions = Session.query.filter(
Session.created_at >= hour_start,
Session.created_at < hour_end
).count()
stats['sessions_by_hour'][i] = hour_sessions
return stats
# Session监控端点
@app.route('/admin/session-stats')
@admin_required
def session_statistics():
"""Session统计信息"""
stats = analyze_session_data()
# 实时Session查看(仅限开发环境)
if app.debug:
current_sessions = []
for session in Session.query.order_by(Session.last_activity.desc()).limit(50):
current_sessions.append({
'id': session.id,
'user_id': session.user_id,
'ip': session.ip_address,
'created': session.created_at.isoformat(),
'last_activity': session.last_activity.isoformat() if session.last_activity else None,
'user_agent': session.user_agent[:100] if session.user_agent else None
})
stats['current_sessions'] = current_sessions
return jsonify(stats)
# 强制登出用户
@app.route('/admin/force-logout/<int:user_id>', methods=['POST'])
@admin_required
def force_logout_user(user_id):
"""强制登出指定用户"""
# 使该用户的所有Session失效
sessions = Session.query.filter_by(user_id=user_id).all()
for session in sessions:
db.session.delete(session)
# 清除Redis中的Session(如果使用Redis)
if hasattr(app, 'session_interface') and \
hasattr(app.session_interface, 'redis'):
redis_client = app.session_interface.redis
prefix = app.session_interface.prefix
# 查找该用户的所有Session键
pattern = f'{prefix}*'
cursor = '0'
keys_to_delete = []
while cursor != 0:
cursor, keys = redis_client.scan(cursor=cursor, match=pattern, count=100)
for key in keys:
# 获取Session数据
val = redis_client.get(key)
if val:
try:
data = json.loads(val.decode('utf-8'))
if data.get('user_id') == user_id:
keys_to_delete.append(key)
except:
pass
# 删除找到的键
if keys_to_delete:
redis_client.delete(*keys_to_delete)
db.session.commit()
flash(f'用户 {user_id} 已被强制登出', 'success')
return redirect(url_for('admin.session_management'))
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_migrate import Migrate
from config import Config
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
# 初始化扩展
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
# 配置LoginManager
login_manager.login_view = 'auth.login'
login_manager.login_message = '请先登录'
login_manager.login_message_category = 'info'
login_manager.session_protection = 'strong'
# 注册蓝图
from app.auth import bp as auth_bp
app.register_blueprint(auth_bp)
from app.main import bp as main_bp
app.register_blueprint(main_bp)
from app.admin import bp as admin_bp
app.register_blueprint(admin_bp, url_prefix='/admin')
# 注册上下文处理器
@app.context_processor
def inject_user():
"""注入用户信息到模板"""
from flask_login import current_user
return dict(current_user=current_user)
# 注册错误处理器
from app.errors import bp as errors_bp
app.register_blueprint(errors_bp)
return app
# app/models.py
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from app import db, login_manager
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
email = db.Column(db.String(120), unique=True, index=True)
password_hash = db.Column(db.String(128))
is_active = db.Column(db.Boolean, default=True)
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime)
# 关系
sessions = db.relationship('UserSession', backref='user', lazy='dynamic')
def __repr__(self):
return f'<User {self.username}>'
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class UserSession(db.Model):
"""用户会话记录"""
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
session_id = db.Column(db.String(128), unique=True, index=True)
ip_address = db.Column(db.String(45))
user_agent = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_activity = db.Column(db.DateTime, default=datetime.utcnow)
expires_at = db.Column(db.DateTime)
def __repr__(self):
return f'<UserSession {self.session_id}>'
@login_manager.user_loader
def load_user(id):
return User.query.get(int(id))
# app/auth/__init__.py
from flask import Blueprint
bp = Blueprint('auth', __name__)
from app.auth import routes
# app/auth/routes.py
from flask import render_template, redirect, url_for, flash, request, session
from flask_login import login_user, logout_user, current_user, login_required
from app.auth import bp
from app.models import User, UserSession
from app import db
import secrets
from datetime import datetime, timedelta
@bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('main.index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
remember = request.form.get('remember', False) == 'on'
# 检查登录尝试次数
login_attempts = session.get('login_attempts', 0)
if login_attempts >= 5:
flash('登录尝试次数过多,请稍后再试', 'error')
return render_template('auth/login.html')
user = User.query.filter_by(username=username).first()
if user is None or not user.check_password(password):
session['login_attempts'] = login_attempts + 1
flash('用户名或密码错误', 'error')
return render_template('auth/login.html')
if not user.is_active:
flash('账户已被禁用', 'error')
return render_template('auth/login.html')
# 登录成功
login_user(user, remember=remember)
# 更新最后登录时间
user.last_login = datetime.utcnow()
# 创建会话记录
session_record = UserSession(
user_id=user.id,
session_id=secrets.token_urlsafe(32),
ip_address=request.remote_addr,
user_agent=request.user_agent.string,
expires_at=datetime.utcnow() + timedelta(days=30 if remember else 1)
)
db.session.add(session_record)
db.session.commit()
# 存储会话ID到Session
session['current_session_id'] = session_record.session_id
# 清除登录尝试次数
session.pop('login_attempts', None)
flash('登录成功!', 'success')
next_page = request.args.get('next')
if not next_page or url_parse(next_page).netloc != '':
next_page = url_for('main.index')
return redirect(next_page)
return render_template('auth/login.html')
@bp.route('/logout')
@login_required
def logout():
# 删除当前会话记录
session_id = session.get('current_session_id')
if session_id:
UserSession.query.filter_by(session_id=session_id).delete()
db.session.commit()
logout_user()
session.clear()
flash('您已成功退出登录', 'info')
return redirect(url_for('auth.login'))
@bp.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('main.index'))
if request.method == 'POST':
# 注册逻辑...
pass
return render_template('auth/register.html')
# app/main/routes.py
from flask import render_template, session, g
from flask_login import login_required, current_user
from app.main import bp
from app.models import UserSession
from datetime import datetime
@bp.before_app_request
def before_request():
"""在每个请求前执行"""
if current_user.is_authenticated:
# 更新会话最后活动时间
session_id = session.get('current_session_id')
if session_id:
session_record = UserSession.query.filter_by(
session_id=session_id,
user_id=current_user.id
).first()
if session_record:
session_record.last_activity = datetime.utcnow()
db.session.commit()
# 检查会话是否过期
g.session_expired = False
if session_id:
session_record = UserSession.query.filter_by(session_id=session_id).first()
if session_record and session_record.expires_at < datetime.utcnow():
g.session_expired = True
logout_user()
session.clear()
flash('会话已过期,请重新登录', 'warning')
@bp.route('/')
def index():
return render_template('main/index.html')
@bp.route('/profile')
@login_required
def profile():
return render_template('main/profile.html')
@bp.route('/dashboard')
@login_required
def dashboard():
# 获取用户的活动会话
active_sessions = UserSession.query.filter_by(
user_id=current_user.id
).filter(
UserSession.expires_at > datetime.utcnow()
).order_by(UserSession.last_activity.desc()).all()
return render_template('main/dashboard.html',
active_sessions=active_sessions)
{# templates/auth/login.html #}
{% extends "base.html" %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h3 class="mb-0">用户登录</h3>
</div>
<div class="card-body">
<form method="POST" action="">
{{ form.hidden_tag() if form else '' }}
<div class="mb-3">
<label for="username" class="form-label">用户名</label>
<input type="text" class="form-control" id="username"
name="username" required
value="{{ request.form.username if request.form else '' }}">
</div>
<div class="mb-3">
<label for="password" class="form-label">密码</label>
<input type="password" class="form-control" id="password"
name="password" required>
</div>
<div class="mb-3 form-check">
<input type="checkbox" class="form-check-input"
id="remember" name="remember">
<label class="form-check-label" for="remember">
记住我
</label>
</div>
<div class="d-grid gap-2">
<button type="submit" class="btn btn-primary">登录</button>
</div>
</form>
<div class="mt-3 text-center">
<p class="mb-2">还没有账号?</p>
<a href="{{ url_for('auth.register') }}" class="btn btn-outline-secondary">
立即注册
</a>
</div>
<div class="mt-4">
<h6>安全提示:</h6>
<ul class="small text-muted">
<li>请勿在公共电脑上使用"记住我"功能</li>
<li>定期修改密码以保证账户安全</li>
<li>如发现异常登录,请及时修改密码</li>
</ul>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{# templates/main/dashboard.html #}
{% extends "base.html" %}
{% block content %}
<div class="container">
<div class="row">
<div class="col-md-3">
<div class="card">
<div class="card-body text-center">
<h5 class="card-title">{{ current_user.username }}</h5>
<p class="card-text">
<small class="text-muted">
{{ current_user.email }}
</small>
</p>
<p class="card-text">
<span class="badge bg-{{ 'success' if current_user.is_active else 'danger' }}">
{{ '活跃' if current_user.is_active else '禁用' }}
</span>
{% if current_user.is_admin %}
<span class="badge bg-warning">管理员</span>
{% endif %}
</p>
</div>
</div>
<div class="card mt-3">
<div class="card-body">
<h6>快捷操作</h6>
<div class="d-grid gap-2">
<a href="{{ url_for('main.profile') }}" class="btn btn-outline-primary">
编辑资料
</a>
<a href="{{ url_for('auth.logout') }}" class="btn btn-outline-danger">
退出登录
</a>
</div>
</div>
</div>
</div>
<div class="col-md-9">
<div class="card">
<div class="card-header">
<h5 class="mb-0">当前登录设备</h5>
</div>
<div class="card-body">
{% if active_sessions %}
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>设备信息</th>
<th>IP地址</th>
<th>最后活动</th>
<th>状态</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{% for session in active_sessions %}
<tr class="{{ 'table-primary' if session.session_id == current_session_id else '' }}">
<td>
<small>{{ session.user_agent|truncate(50) }}</small>
</td>
<td>{{ session.ip_address }}</td>
<td>
<small>{{ session.last_activity|datetimeformat }}</small>
</td>
<td>
<span class="badge bg-{{ 'success' if session.expires_at > now else 'warning' }}">
{{ '有效' if session.expires_at > now else '即将过期' }}
</span>
</td>
<td>
{% if session.session_id != current_session_id %}
<button class="btn btn-sm btn-outline-danger"
onclick="logoutSession('{{ session.session_id }}')">
强制退出
</button>
{% else %}
<span class="text-muted">当前设备</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<p class="text-muted">没有活跃的登录会话</p>
{% endif %}
</div>
</div>
</div>
</div>
</div>
<script>
function logoutSession(sessionId) {
if (confirm('确定要强制退出此设备吗?')) {
fetch('/auth/force-logout-session', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': getCsrfToken()
},
body: JSON.stringify({ session_id: sessionId })
})
.then(response => response.json())
.then(data => {
if (data.success) {
alert('操作成功');
location.reload();
} else {
alert('操作失败: ' + data.message);
}
});
}
}
</script>
{% endblock %}
通过本教程,你已经掌握了Flask中用户会话管理的各个方面: