Flask用户认证

用户认证是Web应用的核心功能,包括注册、登录、退出、密码管理和权限控制等。

1. 安装必要扩展

# 安装用户认证相关扩展
pip install flask-login  # 用户会话管理
pip install flask-wtf    # 表单处理
pip install email-validator  # 邮箱验证

# 密码安全相关
pip install bcrypt       # 推荐:强大的密码哈希
# 或者使用Werkzeug内置的密码哈希
# from werkzeug.security import generate_password_hash, check_password_hash

2. 基础配置

# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_bcrypt import Bcrypt
from config import Config

# 初始化扩展
db = SQLAlchemy()
login_manager = LoginManager()
bcrypt = Bcrypt()

def create_app(config_class=Config):
    """应用工厂函数"""
    app = Flask(__name__)
    app.config.from_object(config_class)

    # 初始化扩展
    db.init_app(app)
    login_manager.init_app(app)
    bcrypt.init_app(app)

    # 配置Flask-Login
    login_manager.login_view = 'auth.login'  # 未登录时重定向的页面
    login_manager.login_message = '请先登录后访问此页面'
    login_manager.login_message_category = 'warning'
    login_manager.session_protection = 'strong'  # 会话保护级别

    # 注册蓝图
    from app.auth import auth_bp
    app.register_blueprint(auth_bp)

    return app

3. 用户模型设计

# app/models.py
from datetime import datetime
from flask_login import UserMixin
from . import db, bcrypt

class User(UserMixin, db.Model):
    """用户模型"""
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(128), nullable=False)

    # 个人信息
    full_name = db.Column(db.String(100))
    avatar = db.Column(db.String(200))  # 头像URL
    bio = db.Column(db.Text)  # 个人简介
    location = db.Column(db.String(100))  # 所在地
    website = db.Column(db.String(200))  # 个人网站

    # 状态字段
    is_active = db.Column(db.Boolean, default=True)  # 是否激活
    is_verified = db.Column(db.Boolean, default=False)  # 邮箱是否验证
    is_admin = db.Column(db.Boolean, default=False)  # 是否管理员

    # 时间戳
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    last_login = db.Column(db.DateTime)  # 最后登录时间
    last_seen = db.Column(db.DateTime)  # 最后活跃时间

    # 密码重置相关
    reset_token = db.Column(db.String(100))
    reset_token_expires = db.Column(db.DateTime)

    # 关系
    posts = db.relationship('Post', backref='author', lazy='dynamic')
    comments = db.relationship('Comment', backref='author', lazy='dynamic')

    # 密码处理方法
    @property
    def password(self):
        """密码属性,防止直接读取"""
        raise AttributeError('password is not a readable attribute')

    @password.setter
    def password(self, password):
        """设置密码时自动加密"""
        self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')

    def verify_password(self, password):
        """验证密码"""
        return bcrypt.check_password_hash(self.password_hash, password)

    # 用户状态方法
    def activate(self):
        """激活用户"""
        self.is_active = True
        db.session.commit()

    def deactivate(self):
        """禁用用户"""
        self.is_active = False
        db.session.commit()

    def generate_reset_token(self, expires_in=3600):
        """生成密码重置令牌"""
        import jwt
        from datetime import datetime, timedelta

        reset_token = jwt.encode(
            {
                'reset_password': self.id,
                'exp': datetime.utcnow() + timedelta(seconds=expires_in)
            },
            current_app.config['SECRET_KEY'],
            algorithm='HS256'
        )
        return reset_token

    @staticmethod
    def verify_reset_token(token):
        """验证重置令牌"""
        import jwt
        try:
            data = jwt.decode(
                token,
                current_app.config['SECRET_KEY'],
                algorithms=['HS256']
            )
            user_id = data.get('reset_password')
        except:
            return None

        return User.query.get(user_id)

    # Flask-Login要求的方法
    def get_id(self):
        """返回用户的唯一标识符"""
        return str(self.id)

    def is_authenticated(self):
        """用户是否已认证"""
        return True

    def is_anonymous(self):
        """是否为匿名用户"""
        return False

    def __repr__(self):
        return f'<User {self.username}>'

# 用户加载函数
from app.models import User

@login_manager.user_loader
def load_user(user_id):
    """加载用户(Flask-Login要求)"""
    if user_id is not None:
        return User.query.get(int(user_id))
    return None

4. 认证表单设计

# app/auth/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField, BooleanField, TextAreaField
from wtforms.validators import DataRequired, Length, Email, EqualTo, ValidationError
from app.models import User

class LoginForm(FlaskForm):
    """登录表单"""
    username = StringField('用户名或邮箱', validators=[
        DataRequired(message='请输入用户名或邮箱'),
        Length(min=3, max=64, message='长度必须在3-64个字符之间')
    ])
    password = PasswordField('密码', validators=[
        DataRequired(message='请输入密码'),
        Length(min=6, max=128, message='密码长度必须在6-128个字符之间')
    ])
    remember = BooleanField('记住我')
    submit = SubmitField('登录')

class RegistrationForm(FlaskForm):
    """注册表单"""
    username = StringField('用户名', validators=[
        DataRequired(message='用户名不能为空'),
        Length(min=3, max=64, message='用户名长度必须在3-64个字符之间'),
        # 自定义验证器:用户名格式
        lambda form, field: ValidationError('用户名只能包含字母、数字和下划线')
        if not all(c.isalnum() or c == '_' for c in field.data) else None
    ])
    email = StringField('邮箱', validators=[
        DataRequired(message='邮箱不能为空'),
        Email(message='请输入有效的邮箱地址'),
        Length(max=120, message='邮箱地址太长')
    ])
    password = PasswordField('密码', validators=[
        DataRequired(message='密码不能为空'),
        Length(min=8, max=128, message='密码长度必须在8-128个字符之间'),
        # 密码强度验证
        lambda form, field: ValidationError('密码必须包含至少一个大写字母、一个小写字母和一个数字')
        if not (any(c.isupper() for c in field.data) and
                any(c.islower() for c in field.data) and
                any(c.isdigit() for c in field.data)) else None
    ])
    confirm_password = PasswordField('确认密码', validators=[
        DataRequired(message='请确认密码'),
        EqualTo('password', message='两次输入的密码不一致')
    ])
    agree_terms = BooleanField('我同意服务条款', validators=[
        DataRequired(message='必须同意服务条款')
    ])
    submit = SubmitField('注册')

    def validate_username(self, username):
        """验证用户名是否已存在"""
        user = User.query.filter_by(username=username.data).first()
        if user:
            raise ValidationError('该用户名已被使用,请选择其他用户名')

    def validate_email(self, email):
        """验证邮箱是否已存在"""
        user = User.query.filter_by(email=email.data).first()
        if user:
            raise ValidationError('该邮箱已被注册,请使用其他邮箱')

class PasswordResetRequestForm(FlaskForm):
    """密码重置请求表单"""
    email = StringField('邮箱', validators=[
        DataRequired(message='请输入邮箱地址'),
        Email(message='请输入有效的邮箱地址')
    ])
    submit = SubmitField('发送重置链接')

    def validate_email(self, email):
        """验证邮箱是否存在"""
        user = User.query.filter_by(email=email.data).first()
        if not user:
            raise ValidationError('该邮箱未注册')

class PasswordResetForm(FlaskForm):
    """密码重置表单"""
    password = PasswordField('新密码', validators=[
        DataRequired(message='请输入新密码'),
        Length(min=8, max=128, message='密码长度必须在8-128个字符之间')
    ])
    confirm_password = PasswordField('确认新密码', validators=[
        DataRequired(message='请确认新密码'),
        EqualTo('password', message='两次输入的密码不一致')
    ])
    submit = SubmitField('重置密码')

class ProfileForm(FlaskForm):
    """个人资料表单"""
    full_name = StringField('真实姓名', validators=[
        Length(max=100, message='姓名太长')
    ])
    bio = TextAreaField('个人简介', validators=[
        Length(max=500, message='简介不能超过500个字符')
    ])
    location = StringField('所在地', validators=[
        Length(max=100, message='所在地太长')
    ])
    website = StringField('个人网站', validators=[
        Length(max=200, message='网站地址太长')
    ])
    submit = SubmitField('更新资料')

class ChangePasswordForm(FlaskForm):
    """修改密码表单"""
    current_password = PasswordField('当前密码', validators=[
        DataRequired(message='请输入当前密码')
    ])
    new_password = PasswordField('新密码', validators=[
        DataRequired(message='请输入新密码'),
        Length(min=8, max=128, message='密码长度必须在8-128个字符之间')
    ])
    confirm_password = PasswordField('确认新密码', validators=[
        DataRequired(message='请确认新密码'),
        EqualTo('new_password', message='两次输入的密码不一致')
    ])
    submit = SubmitField('修改密码')

class ChangeEmailForm(FlaskForm):
    """修改邮箱表单"""
    email = StringField('新邮箱地址', validators=[
        DataRequired(message='请输入新邮箱地址'),
        Email(message='请输入有效的邮箱地址'),
        Length(max=120, message='邮箱地址太长')
    ])
    password = PasswordField('当前密码', validators=[
        DataRequired(message='请输入当前密码以确认')
    ])
    submit = SubmitField('修改邮箱')

    def validate_email(self, email):
        """验证新邮箱是否已被使用"""
        user = User.query.filter_by(email=email.data).first()
        if user:
            raise ValidationError('该邮箱已被使用,请使用其他邮箱')

5. 认证路由实现

# app/auth/routes.py
from flask import render_template, redirect, url_for, flash, request, current_app
from flask_login import login_user, logout_user, current_user, login_required
from werkzeug.urls import url_parse
from . import auth_bp
from .forms import (
    LoginForm, RegistrationForm, PasswordResetRequestForm,
    PasswordResetForm, ProfileForm, ChangePasswordForm, ChangeEmailForm
)
from app.models import User
from app import db, bcrypt
import jwt
from datetime import datetime, timedelta
from app.email import send_email

@auth_bp.route('/login', methods=['GET', 'POST'])
                            def login():
                            """用户登录"""
                            if current_user.is_authenticated:
                            return redirect(url_for('main.index'))

                            form = LoginForm()

                            if form.validate_on_submit():
                            # 查找用户(支持用户名或邮箱登录)
                            user = User.query.filter(
                            (User.username == form.username.data) |
                            (User.email == form.username.data)
                            ).first()

                            if user is None or not user.verify_password(form.password.data):
                            flash('用户名或密码错误', 'danger')
                            return redirect(url_for('auth.login'))

                            if not user.is_active:
                            flash('您的账户已被禁用,请联系管理员', 'warning')
                            return redirect(url_for('auth.login'))

                            # 登录用户
                            login_user(user, remember=form.remember.data)
                            user.last_login = datetime.utcnow()
                            db.session.commit()

                            # 重定向到next参数指定的页面
                            next_page = request.args.get('next')
                            if not next_page or url_parse(next_page).netloc != '':
                            next_page = url_for('main.index')

                            flash('登录成功!', 'success')
                            return redirect(next_page)

                            return render_template('auth/login.html', form=form)

                            @auth_bp.route('/register', methods=['GET', 'POST'])
                            def register():
                            """用户注册"""
                            if current_user.is_authenticated:
                            return redirect(url_for('main.index'))

                            form = RegistrationForm()

                            if form.validate_on_submit():
                            # 创建新用户
                            user = User(
                            username=form.username.data,
                            email=form.email.data,
                            password=form.password.data  # 自动加密
                            )

                            db.session.add(user)
                            db.session.commit()

                            # 发送欢迎邮件
                            send_email(
                            subject='欢迎注册',
                            recipients=[user.email],
                            text_body=render_template('email/welcome.txt', user=user),
                            html_body=render_template('email/welcome.html', user=user)
                            )

                            flash('注册成功!请登录。', 'success')
                            return redirect(url_for('auth.login'))

                            return render_template('auth/register.html', form=form)

                            @auth_bp.route('/logout')
                            @login_required
                            def logout():
                            """用户退出"""
                            logout_user()
                            flash('您已成功退出登录', 'info')
                            return redirect(url_for('main.index'))

                            @auth_bp.route('/profile', methods=['GET', 'POST'])
                            @login_required
                            def profile():
                            """个人资料页面"""
                            form = ProfileForm()

                            if form.validate_on_submit():
                            current_user.full_name = form.full_name.data
                            current_user.bio = form.bio.data
                            current_user.location = form.location.data
                            current_user.website = form.website.data
                            current_user.updated_at = datetime.utcnow()

                            db.session.commit()
                            flash('个人资料已更新', 'success')
                            return redirect(url_for('auth.profile'))

                            # 填充现有数据
                            elif request.method == 'GET':
                            form.full_name.data = current_user.full_name
                            form.bio.data = current_user.bio
                            form.location.data = current_user.location
                            form.website.data = current_user.website

                            return render_template('auth/profile.html', form=form)

                            @auth_bp.route('/change-password', methods=['GET', 'POST'])
                            @login_required
                            def change_password():
                            """修改密码"""
                            form = ChangePasswordForm()

                            if form.validate_on_submit():
                            # 验证当前密码
                            if not current_user.verify_password(form.current_password.data):
                            flash('当前密码错误', 'danger')
                            return redirect(url_for('auth.change_password'))

                            # 更新密码
                            current_user.password = form.new_password.data
                            db.session.commit()

                            # 发送密码修改通知邮件
                            send_email(
                            subject='密码修改通知',
                            recipients=[current_user.email],
                            text_body=render_template('email/password_changed.txt', user=current_user),
                            html_body=render_template('email/password_changed.html', user=current_user)
                            )

                            flash('密码已成功修改', 'success')
                            return redirect(url_for('auth.profile'))

                            return render_template('auth/change_password.html', form=form)

                            @auth_bp.route('/reset-password-request', methods=['GET', 'POST'])
                            def reset_password_request():
                            """请求重置密码"""
                            if current_user.is_authenticated:
                            return redirect(url_for('main.index'))

                            form = PasswordResetRequestForm()

                            if form.validate_on_submit():
                            user = User.query.filter_by(email=form.email.data).first()

                            if user:
                            # 生成重置令牌
                            token = user.generate_reset_token()

                            # 发送重置邮件
                            send_email(
                            subject='密码重置',
                            recipients=[user.email],
                            text_body=render_template(
                            'email/reset_password.txt',
                            user=user, token=token
                            ),
                            html_body=render_template(
                            'email/reset_password.html',
                            user=user, token=token
                            )
                            )

                            # 无论用户是否存在,都显示相同的信息(防止邮箱枚举攻击)
                            flash('如果该邮箱已注册,重置链接已发送到您的邮箱', 'info')
                            return redirect(url_for('auth.login'))

                            return render_template('auth/reset_password_request.html', form=form)

                            @auth_bp.route('/reset-password/<token>', methods=['GET', 'POST'])
                            def reset_password(token):
                            """重置密码"""
                            if current_user.is_authenticated:
                            return redirect(url_for('main.index'))

                            # 验证令牌
                            user = User.verify_reset_token(token)
                            if not user:
                            flash('重置链接无效或已过期', 'danger')
                            return redirect(url_for('auth.reset_password_request'))

                            form = PasswordResetForm()

                            if form.validate_on_submit():
                            # 更新密码
                            user.password = form.password.data
                            db.session.commit()

                            flash('密码已重置成功,请使用新密码登录', 'success')
                            return redirect(url_for('auth.login'))

                            return render_template('auth/reset_password.html', form=form)

                            @auth_bp.route('/verify-email/<token>')
                            def verify_email(token):
                            """验证邮箱"""
                            if current_user.is_authenticated:
                            return redirect(url_for('main.index'))

                            try:
                            data = jwt.decode(
                            token,
                            current_app.config['SECRET_KEY'],
                            algorithms=['HS256']
                            )
                            user_id = data.get('verify_email')
                            user = User.query.get(user_id)

                            if user and not user.is_verified:
                            user.is_verified = True
                            db.session.commit()
                            flash('邮箱验证成功!', 'success')
                            else:
                            flash('验证链接无效或已过期', 'danger')
                            except:
                            flash('验证链接无效或已过期', 'danger')

                            return redirect(url_for('auth.login'))

                            @auth_bp.before_request
                            def update_last_seen():
                            """在每次请求前更新最后活跃时间"""
                            if current_user.is_authenticated:
                            current_user.last_seen = datetime.utcnow()
                            db.session.commit()

6. 装饰器和权限控制

# app/decorators.py
from functools import wraps
from flask import abort, flash, redirect, url_for
from flask_login import current_user

def login_required(f):
    """登录装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            flash('请先登录', 'warning')
            return redirect(url_for('auth.login'))
        return f(*args, **kwargs)
    return decorated_function

def admin_required(f):
    """管理员权限装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            flash('请先登录', 'warning')
            return redirect(url_for('auth.login'))

        if not current_user.is_admin:
            flash('您没有权限访问此页面', 'danger')
            return redirect(url_for('main.index'))

        return f(*args, **kwargs)
    return decorated_function

def verified_required(f):
    """邮箱验证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            flash('请先登录', 'warning')
            return redirect(url_for('auth.login'))

        if not current_user.is_verified:
            flash('请先验证您的邮箱地址', 'warning')
            return redirect(url_for('auth.unverified'))

        return f(*args, **kwargs)
    return decorated_function

def active_required(f):
    """账户激活装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            flash('请先登录', 'warning')
            return redirect(url_for('auth.login'))

        if not current_user.is_active:
            flash('您的账户已被禁用', 'danger')
            return redirect(url_for('auth.logout'))

        return f(*args, **kwargs)
    return decorated_function

# 在视图函数中使用装饰器
@auth_bp.route('/admin/dashboard')
                            @login_required
                            @admin_required
                            def admin_dashboard():
                            """管理后台仪表板(需要管理员权限)"""
                            return render_template('admin/dashboard.html')

                            @auth_bp.route('/premium/content')
                            @login_required
                            @verified_required
                            def premium_content():
                            """付费内容(需要验证邮箱)"""
                            return render_template('premium/content.html')

7. 模板文件

7.1 登录模板

<!-- templates/auth/login.html -->
@% extends "base.html" %>

@% block title %>登录@% endblock %>

@% block content %>
<div class="row justify-content-center">
    <div class="col-md-6">
        <div class="card">
            <div class="card-header">
                <h4 class="mb-0">用户登录</h4>
            </div>
            <div class="card-body">
                <form method="POST" action="{{ url_for('auth.login') }}">
                    {{ form.hidden_tag() }}

                    <div class="mb-3">
                        {{ form.username.label(class="form-label") }}
                        {{ form.username(class="form-control" + (" is-invalid" if form.username.errors else "")) }}
                        @% if form.username.errors %>
                            @% for error in form.username.errors %>
                                <div class="invalid-feedback">
                                    {{ error }}
                                </div>
                            @% endfor %>
                        @% endif %>
                    </div>

                    <div class="mb-3">
                        {{ form.password.label(class="form-label") }}
                        {{ form.password(class="form-control" + (" is-invalid" if form.password.errors else "")) }}
                        @% if form.password.errors %>
                            @% for error in form.password.errors %>
                                <div class="invalid-feedback">
                                    {{ error }}
                                </div>
                            @% endfor %>
                        @% endif %>
                    </div>

                    <div class="mb-3 form-check">
                        {{ form.remember(class="form-check-input") }}
                        {{ form.remember.label(class="form-check-label") }}
                    </div>

                    <div class="d-grid">
                        {{ form.submit(class="btn btn-primary") }}
                    </div>
                </form>

                <div class="mt-3 text-center">
                    <p>
                        <a href="{{ url_for('auth.reset_password_request') }}">忘记密码?</a>
                    </p>
                    <p>
                        还没有账号? <a href="{{ url_for('auth.register') }}">立即注册</a>
                    </p>
                </div>
            </div>
        </div>
    </div>
</div>
@% endblock %>

7.2 注册模板

<!-- templates/auth/register.html -->
@% extends "base.html" %>

@% block title %>注册@% endblock %>

@% block content %>
<div class="row justify-content-center">
    <div class="col-md-8">
        <div class="card">
            <div class="card-header">
                <h4 class="mb-0">用户注册</h4>
            </div>
            <div class="card-body">
                <form method="POST" action="{{ url_for('auth.register') }}">
                    {{ form.hidden_tag() }}

                    <div class="row">
                        <div class="col-md-6 mb-3">
                            {{ form.username.label(class="form-label") }}
                            {{ form.username(class="form-control" + (" is-invalid" if form.username.errors else "")) }}
                            @% if form.username.errors %>
                                <div class="invalid-feedback">
                                    {{ form.username.errors[0] }}
                                </div>
                            @% endif %>
                            <div class="form-text">用户名长度3-64个字符,只能包含字母、数字和下划线</div>
                        </div>

                        <div class="col-md-6 mb-3">
                            {{ form.email.label(class="form-label") }}
                            {{ form.email(class="form-control" + (" is-invalid" if form.email.errors else "")) }}
                            @% if form.email.errors %>
                                <div class="invalid-feedback">
                                    {{ form.email.errors[0] }}
                                </div>
                            @% endif %>
                        </div>
                    </div>

                    <div class="row">
                        <div class="col-md-6 mb-3">
                            {{ form.password.label(class="form-label") }}
                            {{ form.password(class="form-control" + (" is-invalid" if form.password.errors else "")) }}
                            @% if form.password.errors %>
                                <div class="invalid-feedback">
                                    {{ form.password.errors[0] }}
                                </div>
                            @% endif %>
                            <div class="form-text">密码至少8位,必须包含大写字母、小写字母和数字</div>
                        </div>

                        <div class="col-md-6 mb-3">
                            {{ form.confirm_password.label(class="form-label") }}
                            {{ form.confirm_password(class="form-control" + (" is-invalid" if form.confirm_password.errors else "")) }}
                            @% if form.confirm_password.errors %>
                                <div class="invalid-feedback">
                                    {{ form.confirm_password.errors[0] }}
                                </div>
                            @% endif %>
                        </div>
                    </div>

                    <div class="mb-3 form-check">
                        {{ form.agree_terms(class="form-check-input" + (" is-invalid" if form.agree_terms.errors else "")) }}
                        {{ form.agree_terms.label(class="form-check-label") }}
                        @% if form.agree_terms.errors %>
                            <div class="invalid-feedback">
                                {{ form.agree_terms.errors[0] }}
                            </div>
                        @% endif %>
                    </div>

                    <div class="d-grid">
                        {{ form.submit(class="btn btn-success") }}
                    </div>
                </form>

                <div class="mt-3 text-center">
                    <p>已有账号? <a href="{{ url_for('auth.login') }}">立即登录</a></p>
                </div>
            </div>
        </div>
    </div>
</div>
@% endblock %>

8. 邮件服务

# app/email.py
from flask import render_template, current_app
from flask_mail import Mail, Message
from threading import Thread

mail = Mail()

def send_async_email(app, msg):
    """异步发送邮件"""
    with app.app_context():
        mail.send(msg)

def send_email(subject, recipients, text_body, html_body, sender=None):
    """发送邮件"""
    app = current_app._get_current_object()

    msg = Message(
        subject=subject,
        recipients=recipients,
        sender=sender or current_app.config['MAIL_DEFAULT_SENDER'],
        body=text_body,
        html=html_body
    )

    # 异步发送邮件
    Thread(target=send_async_email, args=(app, msg)).start()

# 配置邮件服务(config.py)
class Config:
    # ... 其他配置
    MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.gmail.com')
    MAIL_PORT = int(os.environ.get('MAIL_PORT', 587))
    MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
    MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
    MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
    MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER', 'noreply@example.com')
    ADMINS = ['admin@example.com']

9. 会话安全配置

# config.py
import os
from datetime import timedelta

class Config:
    """基础配置"""
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-this'

    # 会话安全配置
    SESSION_COOKIE_NAME = 'flask_session'
    SESSION_COOKIE_HTTPONLY = True  # 防止JavaScript访问
    SESSION_COOKIE_SECURE = True  # 生产环境设置为True,仅HTTPS
    SESSION_COOKIE_SAMESITE = 'Lax'  # 防止CSRF

    # 记住我Cookie配置
    REMEMBER_COOKIE_NAME = 'flask_remember'
    REMEMBER_COOKIE_DURATION = timedelta(days=30)
    REMEMBER_COOKIE_HTTPONLY = True
    REMEMBER_COOKIE_SECURE = True
    REMEMBER_COOKIE_SAMESITE = 'Lax'

    # 防止会话劫持
    PERMANENT_SESSION_LIFETIME = timedelta(days=1)
    SESSION_REFRESH_EACH_REQUEST = True  # 每次请求刷新会话

    # 安全头配置
    @staticmethod
    def init_app(app):
        """初始化安全头"""
        from flask_talisman import Talisman

        # 设置安全头
        csp = {
            'default-src': "'self'",
            'style-src': ["'self'", "'unsafe-inline'", "https://cdn.jsdelivr.net"],
            'script-src': ["'self'", "https://cdn.jsdelivr.net"],
            'img-src': ["'self'", "data:", "https:"],
            'font-src': ["'self'", "https://cdn.jsdelivr.net"],
        }

        Talisman(
            app,
            content_security_policy=csp,
            session_cookie_secure=True,
            session_cookie_http_only=True,
            force_https=False  # 开发环境设为False
        )

10. 测试用例

# tests/test_auth.py
import unittest
from flask import url_for
from app import create_app, db
from app.models import User
from config import TestingConfig

class AuthTestCase(unittest.TestCase):
    def setUp(self):
        """测试前准备"""
        self.app = create_app(TestingConfig)
        self.app_context = self.app.app_context()
        self.app_context.push()
        db.create_all()
        self.client = self.app.test_client()

    def tearDown(self):
        """测试后清理"""
        db.session.remove()
        db.drop_all()
        self.app_context.pop()

    def test_user_registration(self):
        """测试用户注册"""
        response = self.client.post(url_for('auth.register'), data={
            'username': 'testuser',
            'email': 'test@example.com',
            'password': 'TestPass123',
            'confirm_password': 'TestPass123',
            'agree_terms': True
        })

        # 检查是否重定向到登录页面
        self.assertEqual(response.status_code, 302)
        self.assertIn(url_for('auth.login'), response.location)

        # 检查用户是否创建成功
        user = User.query.filter_by(username='testuser').first()
        self.assertIsNotNone(user)
        self.assertEqual(user.email, 'test@example.com')

    def test_user_login(self):
        """测试用户登录"""
        # 先创建用户
        user = User(username='testuser', email='test@example.com', password='TestPass123')
        db.session.add(user)
        db.session.commit()

        # 测试登录
        response = self.client.post(url_for('auth.login'), data={
            'username': 'testuser',
            'password': 'TestPass123'
        })

        self.assertEqual(response.status_code, 302)

        # 测试错误的密码
        response = self.client.post(url_for('auth.login'), data={
            'username': 'testuser',
            'password': 'WrongPass'
        })
        self.assertIn(b'用户名或密码错误', response.data)

    def test_protected_route(self):
        """测试受保护的路由"""
        # 未登录时访问个人资料页
        response = self.client.get(url_for('auth.profile'), follow_redirects=True)
        self.assertIn(b'请先登录', response.data)

        # 创建并登录用户
        user = User(username='testuser', email='test@example.com', password='TestPass123')
        db.session.add(user)
        db.session.commit()

        self.client.post(url_for('auth.login'), data={
            'username': 'testuser',
            'password': 'TestPass123'
        })

        # 登录后访问个人资料页
        response = self.client.get(url_for('auth.profile'))
        self.assertEqual(response.status_code, 200)

    def test_password_reset(self):
        """测试密码重置"""
        # 创建用户
        user = User(username='testuser', email='test@example.com', password='OldPass123')
        db.session.add(user)
        db.session.commit()

        # 请求重置密码
        response = self.client.post(url_for('auth.reset_password_request'), data={
            'email': 'test@example.com'
        })
        self.assertEqual(response.status_code, 302)

if __name__ == '__main__':
    unittest.main()

11. 生产环境安全建议

  • 使用HTTPS:所有认证相关的请求必须使用HTTPS
  • 强密码策略:要求密码包含大小写字母、数字和特殊字符
  • 会话安全:设置安全的会话Cookie属性(HttpOnly, Secure, SameSite)
  • 防止暴力破解:实施登录尝试限制和验证码
  • 密码过期策略:定期要求用户修改密码
  • 登录通知:发送邮件通知用户新设备登录
  • 防止账户枚举:在登录和注册时返回统一的错误信息
  • 审计日志:记录所有登录尝试和敏感操作