Flask用户会话管理

本教程将详细介绍Flask中用户会话管理的各个方面,包括Session基础、用户认证系统、Flask-Login扩展和安全最佳实践。

1. Session基础概念

什么是Session

Session(会话)是Web应用中跟踪用户状态的一种机制,允许服务器在多个请求之间存储和检索用户数据。

  • 无状态协议:HTTP是无状态的,Session提供状态
  • 服务器端存储:数据存储在服务器端
  • 客户端标识:通过Session ID识别用户
  • 临时存储:会话数据是临时的
  • 安全存储:敏感数据不应存储在客户端
Session vs Cookie
Session Cookie
存储位置 服务器端 客户端
安全性 较高 较低
存储容量 较大 较小(4KB)
数据类型 任意Python对象 字符串
过期控制 灵活 固定时间
用途 用户状态、购物车等 用户偏好、跟踪等

2. Flask Session配置

安全提醒:Flask的Session使用客户端签名的Cookie存储,必须设置SECRET_KEY来保证安全。

2.1 基本配置

基础配置
from flask import Flask, session

app = Flask(__name__)

# 必须设置SECRET_KEY,用于签名Session Cookie
app.config['SECRET_KEY'] = 'your-secret-key-here-change-this'

# Session配置选项
app.config['SESSION_COOKIE_NAME'] = 'my_session'  # Session Cookie名称
app.config['SESSION_COOKIE_DOMAIN'] = None        # Cookie域名
app.config['SESSION_COOKIE_PATH'] = None          # Cookie路径
app.config['SESSION_COOKIE_HTTPONLY'] = True      # 防止JavaScript访问
app.config['SESSION_COOKIE_SECURE'] = False       # 是否仅HTTPS传输
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'     # 防止CSRF

# Session过期时间配置
app.config['PERMANENT_SESSION_LIFETIME'] = 3600   # 会话有效期(秒)
app.config['SESSION_REFRESH_EACH_REQUEST'] = True # 每次请求刷新过期时间

# 调试配置
@app.route('/debug-session')
def debug_session():
    """查看Session配置"""
    return {
        'cookie_name': app.config.get('SESSION_COOKIE_NAME'),
        'cookie_domain': app.config.get('SESSION_COOKIE_DOMAIN'),
        'cookie_secure': app.config.get('SESSION_COOKIE_SECURE'),
        'cookie_httponly': app.config.get('SESSION_COOKIE_HTTPONLY'),
        'cookie_samesite': app.config.get('SESSION_COOKIE_SAMESITE'),
        'permanent_lifetime': app.config.get('PERMANENT_SESSION_LIFETIME')
    }
生产环境配置
import os
from datetime import timedelta

class Config:
    """基础配置类"""
    # 从环境变量获取密钥,避免硬编码
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key'

    # Session安全配置
    SESSION_COOKIE_NAME = 'app_session'
    SESSION_COOKIE_HTTPONLY = True
    SESSION_COOKIE_SAMESITE = 'Lax'

    # Session过期时间(7天)
    PERMANENT_SESSION_LIFETIME = timedelta(days=7)

    # 每次请求刷新Session
    SESSION_REFRESH_EACH_REQUEST = True

class DevelopmentConfig(Config):
    """开发环境配置"""
    DEBUG = True
    SESSION_COOKIE_SECURE = False  # 开发环境可以不是HTTPS

class ProductionConfig(Config):
    """生产环境配置"""
    DEBUG = False
    SESSION_COOKIE_SECURE = True   # 生产环境必须使用HTTPS

    # 更长的Session过期时间
    PERMANENT_SESSION_LIFETIME = timedelta(days=30)

    # 额外的安全头
    @staticmethod
    def init_app(app):
        """初始化应用"""
        # 设置安全相关的HTTP头
        from flask import Flask
        from flask_talisman import Talisman

        Talisman(app,
                content_security_policy=None,
                session_cookie_secure=True,
                session_cookie_httponly=True,
                session_cookie_samesite='Lax')

# 应用工厂模式
def create_app(config_class=DevelopmentConfig):
    app = Flask(__name__)
    app.config.from_object(config_class)

    if isinstance(app.config['PERMANENT_SESSION_LIFETIME'], timedelta):
        app.permanent_session_lifetime = app.config['PERMANENT_SESSION_LIFETIME']

    return app

# 创建应用
app = create_app(ProductionConfig)

3. 使用Session对象

3.1 基本操作

Session操作示例
from flask import Flask, session, request, redirect, url_for

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret-key'

@app.route('/')
def index():
    """首页,显示Session信息"""
    # 获取Session值
    username = session.get('username', '游客')
    visit_count = session.get('visit_count', 0)

    # 更新访问次数
    session['visit_count'] = visit_count + 1

    return f"""
    <h1>欢迎,{username}</h1>
    <p>您已经访问了 {session['visit_count']} 次</p>
    <a href="/login">登录</a>
    <a href="/logout">退出</a>
    """

@app.route('/login', methods=['GET', 'POST'])
def login():
    """登录页面"""
    if request.method == 'POST':
        username = request.form.get('username')

        # 设置Session值
        session['username'] = username
        session['logged_in'] = True
        session['login_time'] = datetime.now().isoformat()

        # 设置永久Session
        session.permanent = True

        # 存储用户信息
        session['user_info'] = {
            'username': username,
            'ip': request.remote_addr,
            'user_agent': request.user_agent.string
        }

        # 重定向到首页
        return redirect(url_for('index'))

    return '''
    <form method="POST">
        <input type="text" name="username" placeholder="用户名">
        <button type="submit">登录</button>
    </form>
    '''

@app.route('/logout')
def logout():
    """退出登录"""
    # 清除特定Session值
    session.pop('username', None)
    session.pop('logged_in', None)

    # 或者清除所有Session
    # session.clear()

    return redirect(url_for('index'))

@app.route('/session-info')
def session_info():
    """查看Session信息"""
    if 'logged_in' not in session:
        return "未登录"

    return {
        'username': session.get('username'),
        'logged_in': session.get('logged_in', False),
        'visit_count': session.get('visit_count', 0),
        'login_time': session.get('login_time'),
        'session_id': session.sid,  # Session ID
        'session_permanent': session.permanent,
        'all_session_data': dict(session)
    }

@app.route('/update-profile', methods=['POST'])
def update_profile():
    """更新用户资料"""
    if 'logged_in' not in session:
        return redirect(url_for('login'))

    # 更新Session中的用户信息
    theme = request.form.get('theme', 'light')
    language = request.form.get('language', 'zh-CN')

    session['theme'] = theme
    session['language'] = language

    # 修改已存在的字典
    if 'user_info' in session:
        session['user_info'].update({
            'theme': theme,
            'language': language,
            'updated_at': datetime.now().isoformat()
        })
        # Session中的字典需要重新赋值来触发修改
        session.modified = True

    return "资料已更新"
高级Session操作
from flask import Flask, session, g
from datetime import datetime, timedelta

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret-key'

@app.before_request
def before_request():
    """在每个请求前执行"""
    # 检查Session是否有效
    if 'user_id' in session:
        # 检查Session是否过期
        last_activity = session.get('last_activity')
        if last_activity:
            last_time = datetime.fromisoformat(last_activity)
            if datetime.now() - last_time > timedelta(hours=1):
                # Session过期,清除
                session.clear()
                g.user = None
            else:
                # 更新最后活动时间
                session['last_activity'] = datetime.now().isoformat()

        # 将用户信息存储在g对象中
        g.user = {
            'id': session.get('user_id'),
            'username': session.get('username'),
            'is_admin': session.get('is_admin', False)
        }
    else:
        g.user = None

@app.route('/cart/add/<int:product_id>')
def add_to_cart(product_id):
    """添加到购物车"""
    # 初始化购物车
    if 'cart' not in session:
        session['cart'] = {}

    # 更新购物车
    cart = session['cart']
    cart[product_id] = cart.get(product_id, 0) + 1

    # 重新赋值触发修改
    session['cart'] = cart

    return f"已添加到购物车,当前数量: {cart[product_id]}"

@app.route('/cart')
def view_cart():
    """查看购物车"""
    cart = session.get('cart', {})

    # 计算总价
    total = 0
    items = []
    for product_id, quantity in cart.items():
        price = get_product_price(product_id)  # 假设有获取价格的函数
        items.append({
            'id': product_id,
            'quantity': quantity,
            'price': price,
            'subtotal': price * quantity
        })
        total += price * quantity

    return {
        'items': items,
        'total': total,
        'item_count': len(cart)
    }

@app.route('/session-stats')
def session_stats():
    """Session统计信息"""
    # 计算Session大小(近似值)
    import sys
    session_size = sum(
        sys.getsizeof(k) + sys.getsizeof(v)
        for k, v in session.items()
    )

    return {
        'session_keys': list(session.keys()),
        'session_size_bytes': session_size,
        'session_modified': session.modified,
        'new_session': session.new
    }

@app.route('/flash-message')
def flash_message():
    """使用Session实现Flash消息"""
    # 设置Flash消息
    session.setdefault('_flashes', []).append(('success', '操作成功!'))

    # 在下一个请求中显示Flash消息
    return '''
    <script>
        // 模拟下一个请求
        setTimeout(() => {
            window.location.href = '/show-flashes';
        }, 1000);
    </script>
    '''

@app.route('/show-flashes')
def show_flashes():
    """显示Flash消息"""
    flashes = session.pop('_flashes', []) if 'show_flashes' in request.args else []

    html = '<h1>Flash消息</h1>'
    for category, message in flashes:
        html += f'<div class="alert alert-{category}">{message}</div>'

    return html

4. Flask-Login扩展

4.1 安装和配置

Flask-Login是一个用户会话管理扩展,提供了用户认证、登录状态管理、"记住我"功能等。
安装和初始化
# 安装Flask-Login
pip install flask-login

# 或者使用requirements.txt
# flask-login==0.6.2
# 初始化Flask-Login
from flask import Flask
from flask_login import LoginManager

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# 创建LoginManager实例
login_manager = LoginManager()

# 配置LoginManager
login_manager.init_app(app)

# 基本配置
login_manager.login_view = 'login'  # 登录页面的视图函数名
login_manager.login_message = '请先登录'  # 登录提示消息
login_manager.login_message_category = 'info'  # 消息类别
login_manager.refresh_view = 'refresh'  # 刷新视图(如果需要)
login_manager.needs_refresh_message = '请重新登录以确认身份'
login_manager.needs_refresh_message_category = 'info'

# 会话保护配置
login_manager.session_protection = 'strong'  # 可选值: None, 'basic', 'strong'

# Cookie配置
login_manager.remember_cookie_name = 'remember_token'
login_manager.remember_cookie_duration = timedelta(days=7)
login_manager.remember_cookie_secure = True  # 仅HTTPS
login_manager.remember_cookie_httponly = True  # 防止JS访问

# 用户加载器(必须实现)
@login_manager.user_loader
def load_user(user_id):
    """根据用户ID加载用户对象"""
    from .models import User
    return User.query.get(int(user_id))

# 请求加载器(可选)
@login_manager.request_loader
def load_user_from_request(request):
    """从请求中加载用户(用于API认证)"""
    # 从请求头获取API密钥
    api_key = request.headers.get('X-API-Key')
    if api_key:
        user = User.query.filter_by(api_key=api_key).first()
        if user:
            return user

    # 从请求参数获取token
    token = request.args.get('token')
    if token:
        user = User.verify_auth_token(token)
        if user:
            return user

    return None
用户模型要求
# models.py
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from . import db, login_manager

class User(UserMixin, db.Model):
    """用户模型"""
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, index=True, nullable=False)
    email = db.Column(db.String(120), unique=True, index=True, nullable=False)
    password_hash = db.Column(db.String(128), nullable=False)
    is_active = db.Column(db.Boolean, default=True, nullable=False)
    is_admin = db.Column(db.Boolean, default=False, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    last_login = db.Column(db.DateTime)

    # 关系
    posts = db.relationship('Post', backref='author', lazy='dynamic')

    def __init__(self, username, email, password):
        self.username = username
        self.email = email
        self.password_hash = generate_password_hash(password)

    @property
    def password(self):
        """密码不可读"""
        raise AttributeError('password is not a readable attribute')

    @password.setter
    def password(self, password):
        """设置密码"""
        self.password_hash = generate_password_hash(password)

    def verify_password(self, password):
        """验证密码"""
        return check_password_hash(self.password_hash, password)

    def get_id(self):
        """获取用户ID(Flask-Login要求)"""
        return str(self.id)

    @property
    def is_authenticated(self):
        """用户是否已认证(Flask-Login要求)"""
        return True

    @property
    def is_anonymous(self):
        """是否是匿名用户(Flask-Link要求)"""
        return False

    def generate_auth_token(self, expiration=3600):
        """生成认证令牌"""
        s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
        return s.dumps({'id': self.id}).decode('utf-8')

    @staticmethod
    def verify_auth_token(token):
        """验证认证令牌"""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return None
        return User.query.get(data['id'])

    def __repr__(self):
        return f'<User {self.username}>'

# 用户加载器
@login_manager.user_loader
def load_user(user_id):
    """根据用户ID加载用户"""
    return User.query.get(int(user_id))

# 匿名用户模型
class AnonymousUser(AnonymousUserMixin):
    """匿名用户模型"""

    def __init__(self):
        self.username = '游客'
        self.is_admin = False

    def can(self, permissions):
        return False

    def is_administrator(self):
        return False

# 设置匿名用户模型
login_manager.anonymous_user = AnonymousUser

5. 用户认证系统

5.1 登录和登出

登录功能实现
# auth.py
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.urls import url_parse

auth_bp = Blueprint('auth', __name__, url_prefix='/auth')

@auth_bp.route('/login', methods=['GET', 'POST'])
                                            def login():
                                            """登录视图"""
                                            # 如果用户已登录,重定向到首页
                                            if current_user.is_authenticated:
                                            return redirect(url_for('main.index'))

                                            if request.method == 'POST':
                                            username = request.form.get('username', '').strip()
                                            password = request.form.get('password', '')
                                            remember = request.form.get('remember', False) == 'on'

                                            # 验证输入
                                            if not username or not password:
                                            flash('请输入用户名和密码', 'error')
                                            return render_template('auth/login.html')

                                            # 查找用户
                                            user = User.query.filter_by(username=username).first()

                                            # 验证用户
                                            if user is None or not user.verify_password(password):
                                            flash('用户名或密码错误', 'error')
                                            # 记录登录失败尝试
                                            session['login_attempts'] = session.get('login_attempts', 0) + 1
                                            return render_template('auth/login.html')

                                            # 检查用户是否被禁用
                                            if not user.is_active:
                                            flash('账户已被禁用,请联系管理员', 'error')
                                            return render_template('auth/login.html')

                                            # 登录用户
                                            login_user(user, remember=remember)

                                            # 更新最后登录时间
                                            user.last_login = datetime.utcnow()
                                            db.session.commit()

                                            # 重置登录尝试次数
                                            session.pop('login_attempts', None)

                                            # 记录登录信息
                                            session['login_ip'] = request.remote_addr
                                            session['login_time'] = datetime.now().isoformat()

                                            # 重定向到原始页面或首页
                                            next_page = request.args.get('next')
                                            if not next_page or url_parse(next_page).netloc != '':
                                            next_page = url_for('main.index')

                                            flash('登录成功!', 'success')
                                            return redirect(next_page)

                                            return render_template('auth/login.html')

                                            @auth_bp.route('/logout')
                                            @login_required
                                            def logout():
                                            """退出登录"""
                                            # 记录退出信息
                                            user_id = current_user.id
                                            username = current_user.username

                                            logout_user()

                                            # 清除Session
                                            session.clear()

                                            flash('您已成功退出登录', 'info')
                                            return redirect(url_for('auth.login'))

                                            @auth_bp.route('/register', methods=['GET', 'POST'])
                                            def register():
                                            """用户注册"""
                                            if current_user.is_authenticated:
                                            return redirect(url_for('main.index'))

                                            if request.method == 'POST':
                                            username = request.form.get('username', '').strip()
                                            email = request.form.get('email', '').strip()
                                            password = request.form.get('password', '')
                                            password2 = request.form.get('password2', '')

                                            # 验证表单
                                            errors = []

                                            # 验证用户名
                                            if not username or len(username) < 3:
                                            errors.append('用户名至少3个字符')
                                            elif User.query.filter_by(username=username).first():
                                            errors.append('用户名已存在')

                                            # 验证邮箱
                                            if not email or '@' not in email:
                                            errors.append('请输入有效的邮箱地址')
                                            elif User.query.filter_by(email=email).first():
                                            errors.append('邮箱已注册')

                                            # 验证密码
                                            if len(password) < 6:
                                            errors.append('密码至少6个字符')
                                            elif password != password2:
                                            errors.append('两次输入的密码不一致')

                                            if errors:
                                            for error in errors:
                                            flash(error, 'error')
                                            return render_template('auth/register.html')

                                            # 创建用户
                                            user = User(username=username, email=email, password=password)

                                            try:
                                            db.session.add(user)
                                            db.session.commit()

                                            # 自动登录新用户
                                            login_user(user)
                                            flash('注册成功!', 'success')

                                            # 发送欢迎邮件
                                            send_welcome_email(user)

                                            return redirect(url_for('main.index'))
                                            except Exception as e:
                                            db.session.rollback()
                                            flash(f'注册失败: {str(e)}', 'error')

                                            return render_template('auth/register.html')
登录限制和装饰器
# decorators.py
from functools import wraps
from flask import flash, redirect, url_for, request
from flask_login import current_user
from werkzeug.urls import url_parse

def login_required(f):
    """登录要求装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            # 存储原始URL
            session['next'] = request.url
            flash('请先登录', 'warning')
            return redirect(url_for('auth.login', next=request.url))
        return f(*args, **kwargs)
    return decorated_function

def admin_required(f):
    """管理员要求装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            flash('请先登录', 'warning')
            return redirect(url_for('auth.login', next=request.url))
        if not current_user.is_admin:
            flash('需要管理员权限', 'error')
            return redirect(url_for('main.index'))
        return f(*args, **kwargs)
    return decorated_function

def logout_required(f):
    """要求用户未登录的装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if current_user.is_authenticated:
            flash('您已登录', 'info')
            return redirect(url_for('main.index'))
        return f(*args, **kwargs)
    return decorated_function

# 使用装饰器
@main_bp.route('/profile')
@login_required
def profile():
    """个人资料页面"""
    return render_template('user/profile.html')

@main_bp.route('/admin/dashboard')
@admin_required
def admin_dashboard():
    """管理员面板"""
    return render_template('admin/dashboard.html')

# 登录限制中间件
@app.before_request
def check_login_attempts():
    """检查登录尝试次数"""
    if request.endpoint == 'auth.login':
        attempts = session.get('login_attempts', 0)
        if attempts >= 5:
            # 限制登录频率
            flash('登录尝试次数过多,请稍后再试', 'error')
            return redirect(url_for('main.index'))

# 基于角色的访问控制
def permission_required(permission):
    """权限要求装饰器"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not current_user.can(permission):
                flash('权限不足', 'error')
                return redirect(url_for('main.index'))
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# 登录后重定向处理
def redirect_back(default='main.index', **kwargs):
    """重定向回原始页面"""
    for target in request.args.get('next'), request.referrer:
        if not target:
            continue
        if url_parse(target).netloc != '':
            continue
        return redirect(target)
    return redirect(url_for(default, **kwargs))

# 在登录视图中使用
@auth_bp.route('/login', methods=['POST'])
                                            def login_post():
                                            # ... 登录验证逻辑
                                            if login_success:
                                            return redirect_back()  # 重定向回原始页面

6. "记住我"功能

6.1 实现记住我功能

Flask-Login记住我
# 配置记住我功能
login_manager = LoginManager()

# 记住我Cookie配置
login_manager.remember_cookie_name = 'remember_token'
login_manager.remember_cookie_duration = timedelta(days=30)  # 记住30天
login_manager.remember_cookie_secure = True  # 仅HTTPS
login_manager.remember_cookie_httponly = True  # 防止JavaScript访问
login_manager.remember_cookie_samesite = 'Lax'  # CSRF防护

# 在登录视图中使用记住我功能
@auth_bp.route('/login', methods=['POST'])
                                            def login():
                                            if request.method == 'POST':
                                            # ... 验证用户名密码

                                            if user and user.verify_password(password):
                                            # 获取记住我选项
                                            remember = request.form.get('remember', False) == 'on'

                                            # 登录用户,启用记住我功能
                                            login_user(user, remember=remember)

                                            flash('登录成功!', 'success')
                                            return redirect(url_for('main.index'))

                                            # 用户模型中的记住我令牌
                                            from itsdangerous import URLSafeTimedSerializer
                                            from flask import current_app

                                            class User(UserMixin, db.Model):
                                            # ... 其他字段

                                            remember_token = db.Column(db.String(128))

                                            def get_remember_token(self):
                                            """获取记住我令牌"""
                                            s = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
                                            return s.dumps({'user_id': self.id})

                                            @staticmethod
                                            def verify_remember_token(token, max_age=2592000):  # 30天
                                            """验证记住我令牌"""
                                            s = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
                                            try:
                                            data = s.loads(token, max_age=max_age)
                                            user_id = data.get('user_id')
                                            if user_id:
                                            return User.query.get(user_id)
                                            except:
                                            return None

                                            def update_remember_token(self):
                                            """更新记住我令牌"""
                                            self.remember_token = self.get_remember_token()
                                            db.session.commit()

                                            # 自定义记住我功能
                                            from flask_login import AnonymousUserMixin

                                            class CustomLoginManager(LoginManager):
                                            """自定义登录管理器"""

                                            def _load_user_from_remember_cookie(self, cookie):
                                            """从记住我Cookie加载用户"""
                                            if not cookie:
                                            return

                                            # 解码Cookie
                                            data = self._remember_cookie_decode(cookie)

                                            if data:
                                            user_id = data[0]
                                            token = data[1] if len(data) > 1 else None

                                            # 加载用户
                                            user = self._user_callback(user_id)

                                            if user and user.verify_remember_token(token):
                                            return user

                                            return None
记住我安全性
# 安全的记住我实现
import secrets
from datetime import datetime, timedelta
from werkzeug.security import generate_password_hash, check_password_hash

class User(UserMixin, db.Model):
    # ... 其他字段

    # 记住我相关字段
    remember_token_hash = db.Column(db.String(128))
    remember_token_created_at = db.Column(db.DateTime)

    def generate_remember_token(self):
        """生成记住我令牌"""
        # 生成随机令牌
        token = secrets.token_urlsafe(32)

        # 存储令牌哈希
        self.remember_token_hash = generate_password_hash(token)
        self.remember_token_created_at = datetime.utcnow()
        db.session.commit()

        return token

    def verify_remember_token(self, token):
        """验证记住我令牌"""
        if not self.remember_token_hash or not self.remember_token_created_at:
            return False

        # 检查令牌是否过期(30天)
        token_age = datetime.utcnow() - self.remember_token_created_at
        if token_age > timedelta(days=30):
            return False

        # 验证令牌
        return check_password_hash(self.remember_token_hash, token)

    def invalidate_remember_token(self):
        """使记住我令牌失效"""
        self.remember_token_hash = None
        self.remember_token_created_at = None
        db.session.commit()

# 记住我中间件
@app.before_request
def check_remember_me():
    """检查记住我Cookie"""
    if not current_user.is_authenticated:
        cookie = request.cookies.get(app.config['REMEMBER_COOKIE_NAME'])
        if cookie:
            user = User.verify_remember_cookie(cookie)
            if user:
                login_user(user)
                flash('自动登录成功', 'info')

# 更新记住我令牌周期
def refresh_remember_token(user):
    """刷新记住我令牌"""
    if user.remember_token_created_at:
        token_age = datetime.utcnow() - user.remember_token_created_at
        if token_age > timedelta(days=15):  # 超过15天刷新
            user.generate_remember_token()

# 在敏感操作前重新认证
def confirm_login_required(f):
    """要求重新确认登录的装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            return login_manager.unauthorized()

        # 检查是否需要重新认证
        last_confirmed = session.get('login_confirmed_at')
        if last_confirmed:
            last_time = datetime.fromisoformat(last_confirmed)
            if datetime.now() - last_time > timedelta(minutes=30):
                # 需要重新认证
                return redirect(url_for('auth.confirm_login'))

        return f(*args, **kwargs)
    return decorated_function

@auth_bp.route('/confirm-login', methods=['GET', 'POST'])
                                            @login_required
                                            def confirm_login():
                                            """重新确认登录"""
                                            if request.method == 'POST':
                                            password = request.form.get('password')
                                            if current_user.verify_password(password):
                                            # 更新确认时间
                                            session['login_confirmed_at'] = datetime.now().isoformat()
                                            flash('身份验证成功', 'success')
                                            return redirect_back()
                                            else:
                                            flash('密码错误', 'error')

                                            return render_template('auth/confirm_login.html')

7. 会话安全

安全警告:会话安全是Web应用安全的核心,必须采取适当的安全措施防止会话劫持、固定攻击等。

7.1 会话安全配置

安全配置示例
# 安全配置类
class SecurityConfig:
    """安全配置"""

    # Session安全配置
    SESSION_COOKIE_NAME = '__Secure-session' if not DEBUG else 'session'
    SESSION_COOKIE_HTTPONLY = True  # 防止XSS
    SESSION_COOKIE_SECURE = not DEBUG  # 生产环境使用HTTPS
    SESSION_COOKIE_SAMESITE = 'Lax'  # 防止CSRF

    # 记住我Cookie安全配置
    REMEMBER_COOKIE_NAME = '__Secure-remember' if not DEBUG else 'remember'
    REMEMBER_COOKIE_HTTPONLY = True
    REMEMBER_COOKIE_SECURE = not DEBUG
    REMEMBER_COOKIE_SAMESITE = 'Lax'
    REMEMBER_COOKIE_DURATION = timedelta(days=30)

    # 防止会话固定攻击
    SESSION_COOKIE_REFRESH_ON_REQUEST = True  # 每次请求刷新Session ID

    # Session保护级别
    SESSION_PROTECTION = 'strong'  # Flask-Login会话保护

    # 防止暴力破解
    LOGIN_ATTEMPT_LIMIT = 5
    LOGIN_ATTEMPT_TIMEOUT = 300  # 5分钟

    # 安全头
    SECURITY_HEADERS = {
        'X-Frame-Options': 'DENY',
        'X-Content-Type-Options': 'nosniff',
        'X-XSS-Protection': '1; mode=block',
        'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
        'Content-Security-Policy': "default-src 'self'",
        'Referrer-Policy': 'strict-origin-when-cross-origin'
    }

# 安全中间件
@app.after_request
def set_security_headers(response):
    """设置安全HTTP头"""
    for header, value in app.config.get('SECURITY_HEADERS', {}).items():
        response.headers[header] = value
    return response

@app.before_request
def check_session_security():
    """检查会话安全"""
    if current_user.is_authenticated:
        # 检查用户代理是否变化
        current_ua = request.user_agent.string
        stored_ua = session.get('user_agent')

        if stored_ua and stored_ua != current_ua:
            # 用户代理变化,强制重新登录
            logout_user()
            session.clear()
            flash('安全检测:用户代理变化,请重新登录', 'warning')
            return redirect(url_for('auth.login'))

        # 检查IP地址是否变化
        current_ip = request.remote_addr
        stored_ip = session.get('login_ip')

        if stored_ip and stored_ip != current_ip:
            # IP变化,记录日志但允许(因为用户可能使用移动网络)
            app.logger.warning(f'用户 {current_user.id} IP地址变化: {stored_ip} -> {current_ip}')
            session['login_ip'] = current_ip

        # 存储当前用户代理(如果是首次)
        if not stored_ua:
            session['user_agent'] = current_ua
防止会话攻击
# 防止会话固定攻击
@app.before_request
def prevent_session_fixation():
    """防止会话固定攻击"""
    if 'user_id' in session:
        # 如果用户已登录,生成新的Session ID
        if session.get('_new_session_id'):
            # 已经有新Session ID,不需要再生成
            pass
        else:
            # 生成新的Session ID
            old_session_id = session.sid
            session['_new_session_id'] = True
            session.modified = True

# 会话超时处理
@app.before_request
def handle_session_timeout():
    """处理会话超时"""
    if 'user_id' in session:
        last_activity = session.get('last_activity')
        if last_activity:
            last_time = datetime.fromisoformat(last_activity)
            timeout = app.config.get('SESSION_TIMEOUT', 3600)  # 默认1小时

            if datetime.now() - last_time > timedelta(seconds=timeout):
                # 会话超时,清除Session
                user_id = session.get('user_id')
                session.clear()

                # 记录日志
                app.logger.info(f'会话超时: 用户 {user_id}')

                # 如果用户正在浏览页面,重定向到登录页
                if request.endpoint not in ['static', 'auth.login']:
                    flash('会话已过期,请重新登录', 'warning')
                    return redirect(url_for('auth.login'))

        # 更新最后活动时间
        session['last_activity'] = datetime.now().isoformat()

# 防止并发登录
@app.before_request
def prevent_concurrent_login():
    """防止同一用户多处登录"""
    if current_user.is_authenticated:
        user_id = current_user.id
        current_token = session.get('login_token')

        # 获取数据库中存储的最新登录令牌
        latest_token = get_latest_login_token(user_id)

        if latest_token and current_token != latest_token:
            # 令牌不匹配,说明在其他地方登录了
            logout_user()
            session.clear()
            flash('您的账户在其他地方登录,您已被登出', 'warning')
            return redirect(url_for('auth.login'))

# 登录成功时生成新的登录令牌
def generate_login_token(user_id):
    """生成登录令牌"""
    token = secrets.token_urlsafe(32)
    store_login_token(user_id, token)  # 存储到数据库或Redis
    session['login_token'] = token
    return token

# 安全注销
def secure_logout():
    """安全的退出登录"""
    # 清除记住我令牌
    if current_user.is_authenticated:
        current_user.invalidate_remember_token()

    # 清除登录令牌
    session.pop('login_token', None)

    # 清除所有Session数据
    session.clear()

    # Flask-Login登出
    logout_user()

    # 设置安全相关的响应头
    response = redirect(url_for('auth.login'))
    response.delete_cookie(app.config['SESSION_COOKIE_NAME'])
    response.delete_cookie(app.config['REMEMBER_COOKIE_NAME'])

    return response

# 审计日志
@app.after_request
def log_session_activity(response):
    """记录会话活动日志"""
    if 'user_id' in session:
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'user_id': session.get('user_id'),
            'endpoint': request.endpoint,
            'method': request.method,
            'ip': request.remote_addr,
            'user_agent': request.user_agent.string,
            'status_code': response.status_code
        }
        # 记录到文件或数据库
        app.logger.info(f'SESSION_ACTIVITY: {json.dumps(log_entry)}')

    return response

8. 高级技巧

8.1 分布式Session存储

Redis Session存储
# 使用Redis存储Session
import redis
from flask.sessions import SessionInterface, SessionMixin
from werkzeug.datastructures import CallbackDict
import pickle
import json
from datetime import timedelta

class RedisSession(CallbackDict, SessionMixin):
    """Redis Session类"""

    def __init__(self, initial=None, sid=None, new=False):
        def on_update(self):
            self.modified = True

        CallbackDict.__init__(self, initial, on_update)
        self.sid = sid
        self.new = new
        self.modified = False

class RedisSessionInterface(SessionInterface):
    """Redis Session接口"""

    serializer = pickle
    session_class = RedisSession

    def __init__(self, redis_connection, prefix='session:'):
        self.redis = redis_connection
        self.prefix = prefix

    def generate_sid(self):
        """生成Session ID"""
        return secrets.token_urlsafe(32)

    def get_redis_expiration_time(self, app, session):
        """获取Redis过期时间"""
        if session.permanent:
            return app.permanent_session_lifetime
        return timedelta(days=1)

    def open_session(self, app, request):
        """打开Session"""
        sid = request.cookies.get(app.config['SESSION_COOKIE_NAME'])
        if not sid:
            sid = self.generate_sid()
            return self.session_class(sid=sid, new=True)

        # 从Redis加载Session数据
        key = self.prefix + sid
        val = self.redis.get(key)

        if val is not None:
            try:
                data = self.serializer.loads(val)
                return self.session_class(data, sid=sid)
            except:
                # 数据损坏,创建新Session
                pass

        return self.session_class(sid=sid, new=True)

    def save_session(self, app, session, response):
        """保存Session到Redis"""
        domain = self.get_cookie_domain(app)
        path = self.get_cookie_path(app)

        # 如果Session为空,从Redis删除
        if not session:
            if session.modified:
                self.redis.delete(self.prefix + session.sid)
                response.delete_cookie(app.config['SESSION_COOKIE_NAME'],
                                      domain=domain, path=path)
            return

        # 计算过期时间
        expires = self.get_expiration_time(app, session)
        redis_expires = self.get_redis_expiration_time(app, session)

        # 保存到Redis
        key = self.prefix + session.sid
        val = self.serializer.dumps(dict(session))
        self.redis.setex(key, int(redis_expires.total_seconds()), val)

        # 设置Cookie
        response.set_cookie(
            app.config['SESSION_COOKIE_NAME'],
            session.sid,
            expires=expires,
            httponly=app.config['SESSION_COOKIE_HTTPONLY'],
            domain=domain,
            path=path,
            secure=app.config['SESSION_COOKIE_SECURE'],
            samesite=app.config['SESSION_COOKIE_SAMESITE']
        )

# 初始化Redis Session
def create_app():
    app = Flask(__name__)

    # Redis配置
    redis_client = redis.Redis(
        host=app.config['REDIS_HOST'],
        port=app.config['REDIS_PORT'],
        password=app.config['REDIS_PASSWORD'],
        decode_responses=False
    )

    # 设置Session接口
    app.session_interface = RedisSessionInterface(
        redis_client,
        prefix='flask_session:'
    )

    return app

# 使用JSON序列化的版本
class JSONRedisSessionInterface(RedisSessionInterface):
    """使用JSON序列化的Redis Session接口"""

    serializer = json

    def open_session(self, app, request):
        """打开Session(JSON版本)"""
        sid = request.cookies.get(app.config['SESSION_COOKIE_NAME'])
        if not sid:
            sid = self.generate_sid()
            return self.session_class(sid=sid, new=True)

        key = self.prefix + sid
        val = self.redis.get(key)

        if val is not None:
            try:
                data = json.loads(val.decode('utf-8'))
                return self.session_class(data, sid=sid)
            except:
                pass

        return self.session_class(sid=sid, new=True)

    def save_session(self, app, session, response):
        """保存Session(JSON版本)"""
        domain = self.get_cookie_domain(app)
        path = self.get_cookie_path(app)

        if not session:
            if session.modified:
                self.redis.delete(self.prefix + session.sid)
                response.delete_cookie(app.config['SESSION_COOKIE_NAME'],
                                      domain=domain, path=path)
            return

        expires = self.get_expiration_time(app, session)
        redis_expires = self.get_redis_expiration_time(app, session)

        key = self.prefix + session.sid
        val = json.dumps(dict(session), ensure_ascii=False)
        self.redis.setex(key, int(redis_expires.total_seconds()), val)

        response.set_cookie(
            app.config['SESSION_COOKIE_NAME'],
            session.sid,
            expires=expires,
            httponly=app.config['SESSION_COOKIE_HTTPONLY'],
            domain=domain,
            path=path,
            secure=app.config['SESSION_COOKIE_SECURE'],
            samesite=app.config['SESSION_COOKIE_SAMESITE']
        )
Session共享和清理
# Session清理任务
import schedule
import threading
from datetime import datetime, timedelta

def cleanup_expired_sessions():
    """清理过期的Session"""
    now = datetime.utcnow()

    # 清理数据库中的过期Session(如果有)
    expired_sessions = Session.query.filter(
        Session.expires_at < now
    ).all()

    for session in expired_sessions:
        db.session.delete(session)

    db.session.commit()

    # 清理Redis中的过期Session
    if hasattr(app, 'session_interface') and \
       hasattr(app.session_interface, 'redis'):

        redis_client = app.session_interface.redis
        prefix = app.session_interface.prefix

        # 使用SCAN迭代所有Session键
        cursor = '0'
        while cursor != 0:
            cursor, keys = redis_client.scan(
                cursor=cursor,
                match=f'{prefix}*',
                count=100
            )

            for key in keys:
                # 检查TTL
                ttl = redis_client.ttl(key)
                if ttl < 0:  # 没有设置过期时间或已过期
                    redis_client.delete(key)

# 定时任务线程
def run_scheduler():
    """运行定时任务"""
    # 每小时清理一次过期Session
    schedule.every().hour.do(cleanup_expired_sessions)

    while True:
        schedule.run_pending()
        time.sleep(60)

# 启动清理线程
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()

# Session数据分析
def analyze_session_data():
    """分析Session数据"""
    stats = {
        'total_sessions': 0,
        'active_sessions': 0,
        'average_session_duration': 0,
        'sessions_by_hour': {}
    }

    # 统计活跃Session(过去30分钟内有活动的)
    active_cutoff = datetime.utcnow() - timedelta(minutes=30)

    sessions = Session.query.all()
    stats['total_sessions'] = len(sessions)

    active_sessions = 0
    total_duration = 0

    for session in sessions:
        # 检查是否活跃
        if session.last_activity and session.last_activity > active_cutoff:
            active_sessions += 1

        # 计算会话时长
        if session.created_at and session.last_activity:
            duration = (session.last_activity - session.created_at).total_seconds()
            total_duration += duration

    stats['active_sessions'] = active_sessions

    if sessions:
        stats['average_session_duration'] = total_duration / len(sessions)

    # 按小时统计
    for i in range(24):
        hour_start = datetime.utcnow().replace(hour=i, minute=0, second=0, microsecond=0)
        hour_end = hour_start + timedelta(hours=1)

        hour_sessions = Session.query.filter(
            Session.created_at >= hour_start,
            Session.created_at < hour_end
        ).count()

        stats['sessions_by_hour'][i] = hour_sessions

    return stats

# Session监控端点
@app.route('/admin/session-stats')
@admin_required
def session_statistics():
    """Session统计信息"""
    stats = analyze_session_data()

    # 实时Session查看(仅限开发环境)
    if app.debug:
        current_sessions = []
        for session in Session.query.order_by(Session.last_activity.desc()).limit(50):
            current_sessions.append({
                'id': session.id,
                'user_id': session.user_id,
                'ip': session.ip_address,
                'created': session.created_at.isoformat(),
                'last_activity': session.last_activity.isoformat() if session.last_activity else None,
                'user_agent': session.user_agent[:100] if session.user_agent else None
            })

        stats['current_sessions'] = current_sessions

    return jsonify(stats)

# 强制登出用户
@app.route('/admin/force-logout/<int:user_id>', methods=['POST'])
@admin_required
def force_logout_user(user_id):
    """强制登出指定用户"""
    # 使该用户的所有Session失效
    sessions = Session.query.filter_by(user_id=user_id).all()

    for session in sessions:
        db.session.delete(session)

    # 清除Redis中的Session(如果使用Redis)
    if hasattr(app, 'session_interface') and \
       hasattr(app.session_interface, 'redis'):

        redis_client = app.session_interface.redis
        prefix = app.session_interface.prefix

        # 查找该用户的所有Session键
        pattern = f'{prefix}*'
        cursor = '0'
        keys_to_delete = []

        while cursor != 0:
            cursor, keys = redis_client.scan(cursor=cursor, match=pattern, count=100)
            for key in keys:
                # 获取Session数据
                val = redis_client.get(key)
                if val:
                    try:
                        data = json.loads(val.decode('utf-8'))
                        if data.get('user_id') == user_id:
                            keys_to_delete.append(key)
                    except:
                        pass

        # 删除找到的键
        if keys_to_delete:
            redis_client.delete(*keys_to_delete)

    db.session.commit()

    flash(f'用户 {user_id} 已被强制登出', 'success')
    return redirect(url_for('admin.session_management'))

9. 最佳实践

推荐做法
  • 使用Flask-Login:提供标准化的用户认证
  • 设置强密钥:SECRET_KEY要足够复杂
  • 启用HTTPS:生产环境必须使用HTTPS
  • 设置合理过期时间:平衡安全性和用户体验
  • 使用安全Cookie属性:HttpOnly、Secure、SameSite
  • 限制Session大小:避免存储过大对象
  • 定期清理Session:防止Session数据堆积
  • 记录登录活动:用于安全审计
避免做法
  • 存储敏感数据:密码、密钥等不应存储在Session
  • 使用弱SECRET_KEY:避免使用简单或默认密钥
  • 忽略Cookie安全属性:必须设置HttpOnly和Secure
  • 过长过期时间:增加会话劫持风险
  • 不验证用户输入:所有输入都应验证
  • 信任客户端数据:客户端数据可能被篡改
  • 暴露Session信息:不要在错误信息中暴露Session详情
  • 忽略并发控制:防止同一用户多处登录

10. 完整示例:用户管理系统

完整的用户会话管理系统
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_migrate import Migrate
from config import Config

db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()

def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(config_class)

    # 初始化扩展
    db.init_app(app)
    migrate.init_app(app, db)
    login_manager.init_app(app)

    # 配置LoginManager
    login_manager.login_view = 'auth.login'
    login_manager.login_message = '请先登录'
    login_manager.login_message_category = 'info'
    login_manager.session_protection = 'strong'

    # 注册蓝图
    from app.auth import bp as auth_bp
    app.register_blueprint(auth_bp)

    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    from app.admin import bp as admin_bp
    app.register_blueprint(admin_bp, url_prefix='/admin')

    # 注册上下文处理器
    @app.context_processor
    def inject_user():
        """注入用户信息到模板"""
        from flask_login import current_user
        return dict(current_user=current_user)

    # 注册错误处理器
    from app.errors import bp as errors_bp
    app.register_blueprint(errors_bp)

    return app

# app/models.py
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from app import db, login_manager

class User(UserMixin, db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, index=True)
    email = db.Column(db.String(120), unique=True, index=True)
    password_hash = db.Column(db.String(128))
    is_active = db.Column(db.Boolean, default=True)
    is_admin = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    last_login = db.Column(db.DateTime)

    # 关系
    sessions = db.relationship('UserSession', backref='user', lazy='dynamic')

    def __repr__(self):
        return f'<User {self.username}>'

    def set_password(self, password):
        self.password_hash = generate_password_hash(password)

    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

class UserSession(db.Model):
    """用户会话记录"""
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    session_id = db.Column(db.String(128), unique=True, index=True)
    ip_address = db.Column(db.String(45))
    user_agent = db.Column(db.Text)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    last_activity = db.Column(db.DateTime, default=datetime.utcnow)
    expires_at = db.Column(db.DateTime)

    def __repr__(self):
        return f'<UserSession {self.session_id}>'

@login_manager.user_loader
def load_user(id):
    return User.query.get(int(id))
# app/auth/__init__.py
from flask import Blueprint

bp = Blueprint('auth', __name__)

from app.auth import routes

# app/auth/routes.py
from flask import render_template, redirect, url_for, flash, request, session
from flask_login import login_user, logout_user, current_user, login_required
from app.auth import bp
from app.models import User, UserSession
from app import db
import secrets
from datetime import datetime, timedelta

@bp.route('/login', methods=['GET', 'POST'])
def login():
    if current_user.is_authenticated:
        return redirect(url_for('main.index'))

    if request.method == 'POST':
        username = request.form.get('username', '').strip()
        password = request.form.get('password', '')
        remember = request.form.get('remember', False) == 'on'

        # 检查登录尝试次数
        login_attempts = session.get('login_attempts', 0)
        if login_attempts >= 5:
            flash('登录尝试次数过多,请稍后再试', 'error')
            return render_template('auth/login.html')

        user = User.query.filter_by(username=username).first()

        if user is None or not user.check_password(password):
            session['login_attempts'] = login_attempts + 1
            flash('用户名或密码错误', 'error')
            return render_template('auth/login.html')

        if not user.is_active:
            flash('账户已被禁用', 'error')
            return render_template('auth/login.html')

        # 登录成功
        login_user(user, remember=remember)

        # 更新最后登录时间
        user.last_login = datetime.utcnow()

        # 创建会话记录
        session_record = UserSession(
            user_id=user.id,
            session_id=secrets.token_urlsafe(32),
            ip_address=request.remote_addr,
            user_agent=request.user_agent.string,
            expires_at=datetime.utcnow() + timedelta(days=30 if remember else 1)
        )
        db.session.add(session_record)
        db.session.commit()

        # 存储会话ID到Session
        session['current_session_id'] = session_record.session_id

        # 清除登录尝试次数
        session.pop('login_attempts', None)

        flash('登录成功!', 'success')

        next_page = request.args.get('next')
        if not next_page or url_parse(next_page).netloc != '':
            next_page = url_for('main.index')

        return redirect(next_page)

    return render_template('auth/login.html')

@bp.route('/logout')
@login_required
def logout():
    # 删除当前会话记录
    session_id = session.get('current_session_id')
    if session_id:
        UserSession.query.filter_by(session_id=session_id).delete()
        db.session.commit()

    logout_user()
    session.clear()

    flash('您已成功退出登录', 'info')
    return redirect(url_for('auth.login'))

@bp.route('/register', methods=['GET', 'POST'])
def register():
    if current_user.is_authenticated:
        return redirect(url_for('main.index'))

    if request.method == 'POST':
        # 注册逻辑...
        pass

    return render_template('auth/register.html')

# app/main/routes.py
from flask import render_template, session, g
from flask_login import login_required, current_user
from app.main import bp
from app.models import UserSession
from datetime import datetime

@bp.before_app_request
def before_request():
    """在每个请求前执行"""
    if current_user.is_authenticated:
        # 更新会话最后活动时间
        session_id = session.get('current_session_id')
        if session_id:
            session_record = UserSession.query.filter_by(
                session_id=session_id,
                user_id=current_user.id
            ).first()

            if session_record:
                session_record.last_activity = datetime.utcnow()
                db.session.commit()

        # 检查会话是否过期
        g.session_expired = False
        if session_id:
            session_record = UserSession.query.filter_by(session_id=session_id).first()
            if session_record and session_record.expires_at < datetime.utcnow():
                g.session_expired = True
                logout_user()
                session.clear()
                flash('会话已过期,请重新登录', 'warning')

@bp.route('/')
def index():
    return render_template('main/index.html')

@bp.route('/profile')
@login_required
def profile():
    return render_template('main/profile.html')

@bp.route('/dashboard')
@login_required
def dashboard():
    # 获取用户的活动会话
    active_sessions = UserSession.query.filter_by(
        user_id=current_user.id
    ).filter(
        UserSession.expires_at > datetime.utcnow()
    ).order_by(UserSession.last_activity.desc()).all()

    return render_template('main/dashboard.html',
                          active_sessions=active_sessions)
{# templates/auth/login.html #}
{% extends "base.html" %}

{% block content %}
<div class="row justify-content-center">
    <div class="col-md-6">
        <div class="card">
            <div class="card-header">
                <h3 class="mb-0">用户登录</h3>
            </div>
            <div class="card-body">
                <form method="POST" action="">
                    {{ form.hidden_tag() if form else '' }}

                    <div class="mb-3">
                        <label for="username" class="form-label">用户名</label>
                        <input type="text" class="form-control" id="username"
                               name="username" required
                               value="{{ request.form.username if request.form else '' }}">
                    </div>

                    <div class="mb-3">
                        <label for="password" class="form-label">密码</label>
                        <input type="password" class="form-control" id="password"
                               name="password" required>
                    </div>

                    <div class="mb-3 form-check">
                        <input type="checkbox" class="form-check-input"
                               id="remember" name="remember">
                        <label class="form-check-label" for="remember">
                            记住我
                        </label>
                    </div>

                    <div class="d-grid gap-2">
                        <button type="submit" class="btn btn-primary">登录</button>
                    </div>
                </form>

                <div class="mt-3 text-center">
                    <p class="mb-2">还没有账号?</p>
                    <a href="{{ url_for('auth.register') }}" class="btn btn-outline-secondary">
                        立即注册
                    </a>
                </div>

                <div class="mt-4">
                    <h6>安全提示:</h6>
                    <ul class="small text-muted">
                        <li>请勿在公共电脑上使用"记住我"功能</li>
                        <li>定期修改密码以保证账户安全</li>
                        <li>如发现异常登录,请及时修改密码</li>
                    </ul>
                </div>
            </div>
        </div>
    </div>
</div>
{% endblock %}
{# templates/main/dashboard.html #}
{% extends "base.html" %}

{% block content %}
<div class="container">
    <div class="row">
        <div class="col-md-3">
            <div class="card">
                <div class="card-body text-center">
                    <h5 class="card-title">{{ current_user.username }}</h5>
                    <p class="card-text">
                        <small class="text-muted">
                            {{ current_user.email }}
                        </small>
                    </p>
                    <p class="card-text">
                        <span class="badge bg-{{ 'success' if current_user.is_active else 'danger' }}">
                            {{ '活跃' if current_user.is_active else '禁用' }}
                        </span>
                        {% if current_user.is_admin %}
                            <span class="badge bg-warning">管理员</span>
                        {% endif %}
                    </p>
                </div>
            </div>

            <div class="card mt-3">
                <div class="card-body">
                    <h6>快捷操作</h6>
                    <div class="d-grid gap-2">
                        <a href="{{ url_for('main.profile') }}" class="btn btn-outline-primary">
                            编辑资料
                        </a>
                        <a href="{{ url_for('auth.logout') }}" class="btn btn-outline-danger">
                            退出登录
                        </a>
                    </div>
                </div>
            </div>
        </div>

        <div class="col-md-9">
            <div class="card">
                <div class="card-header">
                    <h5 class="mb-0">当前登录设备</h5>
                </div>
                <div class="card-body">
                    {% if active_sessions %}
                        <div class="table-responsive">
                            <table class="table table-hover">
                                <thead>
                                    <tr>
                                        <th>设备信息</th>
                                        <th>IP地址</th>
                                        <th>最后活动</th>
                                        <th>状态</th>
                                        <th>操作</th>
                                    </tr>
                                </thead>
                                <tbody>
                                    {% for session in active_sessions %}
                                        <tr class="{{ 'table-primary' if session.session_id == current_session_id else '' }}">
                                            <td>
                                                <small>{{ session.user_agent|truncate(50) }}</small>
                                            </td>
                                            <td>{{ session.ip_address }}</td>
                                            <td>
                                                <small>{{ session.last_activity|datetimeformat }}</small>
                                            </td>
                                            <td>
                                                <span class="badge bg-{{ 'success' if session.expires_at > now else 'warning' }}">
                                                    {{ '有效' if session.expires_at > now else '即将过期' }}
                                                </span>
                                            </td>
                                            <td>
                                                {% if session.session_id != current_session_id %}
                                                    <button class="btn btn-sm btn-outline-danger"
                                                            onclick="logoutSession('{{ session.session_id }}')">
                                                        强制退出
                                                    </button>
                                                {% else %}
                                                    <span class="text-muted">当前设备</span>
                                                {% endif %}
                                            </td>
                                        </tr>
                                    {% endfor %}
                                </tbody>
                            </table>
                        </div>
                    {% else %}
                        <p class="text-muted">没有活跃的登录会话</p>
                    {% endif %}
                </div>
            </div>
        </div>
    </div>
</div>

<script>
function logoutSession(sessionId) {
    if (confirm('确定要强制退出此设备吗?')) {
        fetch('/auth/force-logout-session', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'X-CSRFToken': getCsrfToken()
            },
            body: JSON.stringify({ session_id: sessionId })
        })
        .then(response => response.json())
        .then(data => {
            if (data.success) {
                alert('操作成功');
                location.reload();
            } else {
                alert('操作失败: ' + data.message);
            }
        });
    }
}
</script>
{% endblock %}
系统特点
  • 完整的用户生命周期:注册→登录→会话管理→登出
  • 会话跟踪:记录用户登录设备和活动
  • 安全控制:登录尝试限制、会话过期、强制登出
  • 多设备管理:查看和管理所有登录设备
  • 生产就绪:错误处理、日志记录、数据库迁移
学习总结

通过本教程,你已经掌握了Flask中用户会话管理的各个方面:

  • Session基础:理解Session的概念和用途
  • Flask Session配置:安全配置和最佳实践
  • Session操作:存储、获取、修改和删除Session数据
  • Flask-Login扩展:用户认证和会话管理
  • 用户认证系统:登录、注册、权限控制
  • 记住我功能:长期会话和自动登录
  • 会话安全:防止会话劫持、固定攻击等
  • 高级技巧:分布式Session、Redis存储、会话清理
  • 最佳实践:安全配置、性能优化、错误处理
  • 完整系统:用户管理系统的完整实现
  • 模板集成:在HTML中显示用户状态和会话信息
  • 生产部署:安全配置和性能考虑
下一步学习建议
  • 深入学习Flask数据库集成
  • 掌握Flask REST API开发
  • 学习Flask邮件发送功能
  • 了解Flask缓存和性能优化
  • 实践Flask项目部署与运维