Flask用户认证
用户认证是Web应用的核心功能,包括注册、登录、退出、密码管理和权限控制等。
1. 安装必要扩展
# 安装用户认证相关扩展
pip install flask-login # 用户会话管理
pip install flask-wtf # 表单处理
pip install email-validator # 邮箱验证
# 密码安全相关
pip install bcrypt # 推荐:强大的密码哈希
# 或者使用Werkzeug内置的密码哈希
# from werkzeug.security import generate_password_hash, check_password_hash
2. 基础配置
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_bcrypt import Bcrypt
from config import Config
# 初始化扩展
db = SQLAlchemy()
login_manager = LoginManager()
bcrypt = Bcrypt()
def create_app(config_class=Config):
"""应用工厂函数"""
app = Flask(__name__)
app.config.from_object(config_class)
# 初始化扩展
db.init_app(app)
login_manager.init_app(app)
bcrypt.init_app(app)
# 配置Flask-Login
login_manager.login_view = 'auth.login' # 未登录时重定向的页面
login_manager.login_message = '请先登录后访问此页面'
login_manager.login_message_category = 'warning'
login_manager.session_protection = 'strong' # 会话保护级别
# 注册蓝图
from app.auth import auth_bp
app.register_blueprint(auth_bp)
return app
3. 用户模型设计
# app/models.py
from datetime import datetime
from flask_login import UserMixin
from . import db, bcrypt
class User(UserMixin, db.Model):
"""用户模型"""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(128), nullable=False)
# 个人信息
full_name = db.Column(db.String(100))
avatar = db.Column(db.String(200)) # 头像URL
bio = db.Column(db.Text) # 个人简介
location = db.Column(db.String(100)) # 所在地
website = db.Column(db.String(200)) # 个人网站
# 状态字段
is_active = db.Column(db.Boolean, default=True) # 是否激活
is_verified = db.Column(db.Boolean, default=False) # 邮箱是否验证
is_admin = db.Column(db.Boolean, default=False) # 是否管理员
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
last_login = db.Column(db.DateTime) # 最后登录时间
last_seen = db.Column(db.DateTime) # 最后活跃时间
# 密码重置相关
reset_token = db.Column(db.String(100))
reset_token_expires = db.Column(db.DateTime)
# 关系
posts = db.relationship('Post', backref='author', lazy='dynamic')
comments = db.relationship('Comment', backref='author', lazy='dynamic')
# 密码处理方法
@property
def password(self):
"""密码属性,防止直接读取"""
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
"""设置密码时自动加密"""
self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
def verify_password(self, password):
"""验证密码"""
return bcrypt.check_password_hash(self.password_hash, password)
# 用户状态方法
def activate(self):
"""激活用户"""
self.is_active = True
db.session.commit()
def deactivate(self):
"""禁用用户"""
self.is_active = False
db.session.commit()
def generate_reset_token(self, expires_in=3600):
"""生成密码重置令牌"""
import jwt
from datetime import datetime, timedelta
reset_token = jwt.encode(
{
'reset_password': self.id,
'exp': datetime.utcnow() + timedelta(seconds=expires_in)
},
current_app.config['SECRET_KEY'],
algorithm='HS256'
)
return reset_token
@staticmethod
def verify_reset_token(token):
"""验证重置令牌"""
import jwt
try:
data = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256']
)
user_id = data.get('reset_password')
except:
return None
return User.query.get(user_id)
# Flask-Login要求的方法
def get_id(self):
"""返回用户的唯一标识符"""
return str(self.id)
def is_authenticated(self):
"""用户是否已认证"""
return True
def is_anonymous(self):
"""是否为匿名用户"""
return False
def __repr__(self):
return f'<User {self.username}>'
# 用户加载函数
from app.models import User
@login_manager.user_loader
def load_user(user_id):
"""加载用户(Flask-Login要求)"""
if user_id is not None:
return User.query.get(int(user_id))
return None
4. 认证表单设计
# app/auth/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField, BooleanField, TextAreaField
from wtforms.validators import DataRequired, Length, Email, EqualTo, ValidationError
from app.models import User
class LoginForm(FlaskForm):
"""登录表单"""
username = StringField('用户名或邮箱', validators=[
DataRequired(message='请输入用户名或邮箱'),
Length(min=3, max=64, message='长度必须在3-64个字符之间')
])
password = PasswordField('密码', validators=[
DataRequired(message='请输入密码'),
Length(min=6, max=128, message='密码长度必须在6-128个字符之间')
])
remember = BooleanField('记住我')
submit = SubmitField('登录')
class RegistrationForm(FlaskForm):
"""注册表单"""
username = StringField('用户名', validators=[
DataRequired(message='用户名不能为空'),
Length(min=3, max=64, message='用户名长度必须在3-64个字符之间'),
# 自定义验证器:用户名格式
lambda form, field: ValidationError('用户名只能包含字母、数字和下划线')
if not all(c.isalnum() or c == '_' for c in field.data) else None
])
email = StringField('邮箱', validators=[
DataRequired(message='邮箱不能为空'),
Email(message='请输入有效的邮箱地址'),
Length(max=120, message='邮箱地址太长')
])
password = PasswordField('密码', validators=[
DataRequired(message='密码不能为空'),
Length(min=8, max=128, message='密码长度必须在8-128个字符之间'),
# 密码强度验证
lambda form, field: ValidationError('密码必须包含至少一个大写字母、一个小写字母和一个数字')
if not (any(c.isupper() for c in field.data) and
any(c.islower() for c in field.data) and
any(c.isdigit() for c in field.data)) else None
])
confirm_password = PasswordField('确认密码', validators=[
DataRequired(message='请确认密码'),
EqualTo('password', message='两次输入的密码不一致')
])
agree_terms = BooleanField('我同意服务条款', validators=[
DataRequired(message='必须同意服务条款')
])
submit = SubmitField('注册')
def validate_username(self, username):
"""验证用户名是否已存在"""
user = User.query.filter_by(username=username.data).first()
if user:
raise ValidationError('该用户名已被使用,请选择其他用户名')
def validate_email(self, email):
"""验证邮箱是否已存在"""
user = User.query.filter_by(email=email.data).first()
if user:
raise ValidationError('该邮箱已被注册,请使用其他邮箱')
class PasswordResetRequestForm(FlaskForm):
"""密码重置请求表单"""
email = StringField('邮箱', validators=[
DataRequired(message='请输入邮箱地址'),
Email(message='请输入有效的邮箱地址')
])
submit = SubmitField('发送重置链接')
def validate_email(self, email):
"""验证邮箱是否存在"""
user = User.query.filter_by(email=email.data).first()
if not user:
raise ValidationError('该邮箱未注册')
class PasswordResetForm(FlaskForm):
"""密码重置表单"""
password = PasswordField('新密码', validators=[
DataRequired(message='请输入新密码'),
Length(min=8, max=128, message='密码长度必须在8-128个字符之间')
])
confirm_password = PasswordField('确认新密码', validators=[
DataRequired(message='请确认新密码'),
EqualTo('password', message='两次输入的密码不一致')
])
submit = SubmitField('重置密码')
class ProfileForm(FlaskForm):
"""个人资料表单"""
full_name = StringField('真实姓名', validators=[
Length(max=100, message='姓名太长')
])
bio = TextAreaField('个人简介', validators=[
Length(max=500, message='简介不能超过500个字符')
])
location = StringField('所在地', validators=[
Length(max=100, message='所在地太长')
])
website = StringField('个人网站', validators=[
Length(max=200, message='网站地址太长')
])
submit = SubmitField('更新资料')
class ChangePasswordForm(FlaskForm):
"""修改密码表单"""
current_password = PasswordField('当前密码', validators=[
DataRequired(message='请输入当前密码')
])
new_password = PasswordField('新密码', validators=[
DataRequired(message='请输入新密码'),
Length(min=8, max=128, message='密码长度必须在8-128个字符之间')
])
confirm_password = PasswordField('确认新密码', validators=[
DataRequired(message='请确认新密码'),
EqualTo('new_password', message='两次输入的密码不一致')
])
submit = SubmitField('修改密码')
class ChangeEmailForm(FlaskForm):
"""修改邮箱表单"""
email = StringField('新邮箱地址', validators=[
DataRequired(message='请输入新邮箱地址'),
Email(message='请输入有效的邮箱地址'),
Length(max=120, message='邮箱地址太长')
])
password = PasswordField('当前密码', validators=[
DataRequired(message='请输入当前密码以确认')
])
submit = SubmitField('修改邮箱')
def validate_email(self, email):
"""验证新邮箱是否已被使用"""
user = User.query.filter_by(email=email.data).first()
if user:
raise ValidationError('该邮箱已被使用,请使用其他邮箱')
5. 认证路由实现
# app/auth/routes.py
from flask import render_template, redirect, url_for, flash, request, current_app
from flask_login import login_user, logout_user, current_user, login_required
from werkzeug.urls import url_parse
from . import auth_bp
from .forms import (
LoginForm, RegistrationForm, PasswordResetRequestForm,
PasswordResetForm, ProfileForm, ChangePasswordForm, ChangeEmailForm
)
from app.models import User
from app import db, bcrypt
import jwt
from datetime import datetime, timedelta
from app.email import send_email
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
"""用户登录"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = LoginForm()
if form.validate_on_submit():
# 查找用户(支持用户名或邮箱登录)
user = User.query.filter(
(User.username == form.username.data) |
(User.email == form.username.data)
).first()
if user is None or not user.verify_password(form.password.data):
flash('用户名或密码错误', 'danger')
return redirect(url_for('auth.login'))
if not user.is_active:
flash('您的账户已被禁用,请联系管理员', 'warning')
return redirect(url_for('auth.login'))
# 登录用户
login_user(user, remember=form.remember.data)
user.last_login = datetime.utcnow()
db.session.commit()
# 重定向到next参数指定的页面
next_page = request.args.get('next')
if not next_page or url_parse(next_page).netloc != '':
next_page = url_for('main.index')
flash('登录成功!', 'success')
return redirect(next_page)
return render_template('auth/login.html', form=form)
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
"""用户注册"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = RegistrationForm()
if form.validate_on_submit():
# 创建新用户
user = User(
username=form.username.data,
email=form.email.data,
password=form.password.data # 自动加密
)
db.session.add(user)
db.session.commit()
# 发送欢迎邮件
send_email(
subject='欢迎注册',
recipients=[user.email],
text_body=render_template('email/welcome.txt', user=user),
html_body=render_template('email/welcome.html', user=user)
)
flash('注册成功!请登录。', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)
@auth_bp.route('/logout')
@login_required
def logout():
"""用户退出"""
logout_user()
flash('您已成功退出登录', 'info')
return redirect(url_for('main.index'))
@auth_bp.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
"""个人资料页面"""
form = ProfileForm()
if form.validate_on_submit():
current_user.full_name = form.full_name.data
current_user.bio = form.bio.data
current_user.location = form.location.data
current_user.website = form.website.data
current_user.updated_at = datetime.utcnow()
db.session.commit()
flash('个人资料已更新', 'success')
return redirect(url_for('auth.profile'))
# 填充现有数据
elif request.method == 'GET':
form.full_name.data = current_user.full_name
form.bio.data = current_user.bio
form.location.data = current_user.location
form.website.data = current_user.website
return render_template('auth/profile.html', form=form)
@auth_bp.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
form = ChangePasswordForm()
if form.validate_on_submit():
# 验证当前密码
if not current_user.verify_password(form.current_password.data):
flash('当前密码错误', 'danger')
return redirect(url_for('auth.change_password'))
# 更新密码
current_user.password = form.new_password.data
db.session.commit()
# 发送密码修改通知邮件
send_email(
subject='密码修改通知',
recipients=[current_user.email],
text_body=render_template('email/password_changed.txt', user=current_user),
html_body=render_template('email/password_changed.html', user=current_user)
)
flash('密码已成功修改', 'success')
return redirect(url_for('auth.profile'))
return render_template('auth/change_password.html', form=form)
@auth_bp.route('/reset-password-request', methods=['GET', 'POST'])
def reset_password_request():
"""请求重置密码"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
# 生成重置令牌
token = user.generate_reset_token()
# 发送重置邮件
send_email(
subject='密码重置',
recipients=[user.email],
text_body=render_template(
'email/reset_password.txt',
user=user, token=token
),
html_body=render_template(
'email/reset_password.html',
user=user, token=token
)
)
# 无论用户是否存在,都显示相同的信息(防止邮箱枚举攻击)
flash('如果该邮箱已注册,重置链接已发送到您的邮箱', 'info')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password_request.html', form=form)
@auth_bp.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
"""重置密码"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
# 验证令牌
user = User.verify_reset_token(token)
if not user:
flash('重置链接无效或已过期', 'danger')
return redirect(url_for('auth.reset_password_request'))
form = PasswordResetForm()
if form.validate_on_submit():
# 更新密码
user.password = form.password.data
db.session.commit()
flash('密码已重置成功,请使用新密码登录', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password.html', form=form)
@auth_bp.route('/verify-email/<token>')
def verify_email(token):
"""验证邮箱"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
try:
data = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256']
)
user_id = data.get('verify_email')
user = User.query.get(user_id)
if user and not user.is_verified:
user.is_verified = True
db.session.commit()
flash('邮箱验证成功!', 'success')
else:
flash('验证链接无效或已过期', 'danger')
except:
flash('验证链接无效或已过期', 'danger')
return redirect(url_for('auth.login'))
@auth_bp.before_request
def update_last_seen():
"""在每次请求前更新最后活跃时间"""
if current_user.is_authenticated:
current_user.last_seen = datetime.utcnow()
db.session.commit()
6. 装饰器和权限控制
# app/decorators.py
from functools import wraps
from flask import abort, flash, redirect, url_for
from flask_login import current_user
def login_required(f):
"""登录装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('请先登录', 'warning')
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('请先登录', 'warning')
return redirect(url_for('auth.login'))
if not current_user.is_admin:
flash('您没有权限访问此页面', 'danger')
return redirect(url_for('main.index'))
return f(*args, **kwargs)
return decorated_function
def verified_required(f):
"""邮箱验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('请先登录', 'warning')
return redirect(url_for('auth.login'))
if not current_user.is_verified:
flash('请先验证您的邮箱地址', 'warning')
return redirect(url_for('auth.unverified'))
return f(*args, **kwargs)
return decorated_function
def active_required(f):
"""账户激活装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('请先登录', 'warning')
return redirect(url_for('auth.login'))
if not current_user.is_active:
flash('您的账户已被禁用', 'danger')
return redirect(url_for('auth.logout'))
return f(*args, **kwargs)
return decorated_function
# 在视图函数中使用装饰器
@auth_bp.route('/admin/dashboard')
@login_required
@admin_required
def admin_dashboard():
"""管理后台仪表板(需要管理员权限)"""
return render_template('admin/dashboard.html')
@auth_bp.route('/premium/content')
@login_required
@verified_required
def premium_content():
"""付费内容(需要验证邮箱)"""
return render_template('premium/content.html')
7. 模板文件
7.1 登录模板
<!-- templates/auth/login.html -->
@% extends "base.html" %>
@% block title %>登录@% endblock %>
@% block content %>
<div class="row justify-content-center">
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h4 class="mb-0">用户登录</h4>
</div>
<div class="card-body">
<form method="POST" action="{{ url_for('auth.login') }}">
{{ form.hidden_tag() }}
<div class="mb-3">
{{ form.username.label(class="form-label") }}
{{ form.username(class="form-control" + (" is-invalid" if form.username.errors else "")) }}
@% if form.username.errors %>
@% for error in form.username.errors %>
<div class="invalid-feedback">
{{ error }}
</div>
@% endfor %>
@% endif %>
</div>
<div class="mb-3">
{{ form.password.label(class="form-label") }}
{{ form.password(class="form-control" + (" is-invalid" if form.password.errors else "")) }}
@% if form.password.errors %>
@% for error in form.password.errors %>
<div class="invalid-feedback">
{{ error }}
</div>
@% endfor %>
@% endif %>
</div>
<div class="mb-3 form-check">
{{ form.remember(class="form-check-input") }}
{{ form.remember.label(class="form-check-label") }}
</div>
<div class="d-grid">
{{ form.submit(class="btn btn-primary") }}
</div>
</form>
<div class="mt-3 text-center">
<p>
<a href="{{ url_for('auth.reset_password_request') }}">忘记密码?</a>
</p>
<p>
还没有账号? <a href="{{ url_for('auth.register') }}">立即注册</a>
</p>
</div>
</div>
</div>
</div>
</div>
@% endblock %>
7.2 注册模板
<!-- templates/auth/register.html -->
@% extends "base.html" %>
@% block title %>注册@% endblock %>
@% block content %>
<div class="row justify-content-center">
<div class="col-md-8">
<div class="card">
<div class="card-header">
<h4 class="mb-0">用户注册</h4>
</div>
<div class="card-body">
<form method="POST" action="{{ url_for('auth.register') }}">
{{ form.hidden_tag() }}
<div class="row">
<div class="col-md-6 mb-3">
{{ form.username.label(class="form-label") }}
{{ form.username(class="form-control" + (" is-invalid" if form.username.errors else "")) }}
@% if form.username.errors %>
<div class="invalid-feedback">
{{ form.username.errors[0] }}
</div>
@% endif %>
<div class="form-text">用户名长度3-64个字符,只能包含字母、数字和下划线</div>
</div>
<div class="col-md-6 mb-3">
{{ form.email.label(class="form-label") }}
{{ form.email(class="form-control" + (" is-invalid" if form.email.errors else "")) }}
@% if form.email.errors %>
<div class="invalid-feedback">
{{ form.email.errors[0] }}
</div>
@% endif %>
</div>
</div>
<div class="row">
<div class="col-md-6 mb-3">
{{ form.password.label(class="form-label") }}
{{ form.password(class="form-control" + (" is-invalid" if form.password.errors else "")) }}
@% if form.password.errors %>
<div class="invalid-feedback">
{{ form.password.errors[0] }}
</div>
@% endif %>
<div class="form-text">密码至少8位,必须包含大写字母、小写字母和数字</div>
</div>
<div class="col-md-6 mb-3">
{{ form.confirm_password.label(class="form-label") }}
{{ form.confirm_password(class="form-control" + (" is-invalid" if form.confirm_password.errors else "")) }}
@% if form.confirm_password.errors %>
<div class="invalid-feedback">
{{ form.confirm_password.errors[0] }}
</div>
@% endif %>
</div>
</div>
<div class="mb-3 form-check">
{{ form.agree_terms(class="form-check-input" + (" is-invalid" if form.agree_terms.errors else "")) }}
{{ form.agree_terms.label(class="form-check-label") }}
@% if form.agree_terms.errors %>
<div class="invalid-feedback">
{{ form.agree_terms.errors[0] }}
</div>
@% endif %>
</div>
<div class="d-grid">
{{ form.submit(class="btn btn-success") }}
</div>
</form>
<div class="mt-3 text-center">
<p>已有账号? <a href="{{ url_for('auth.login') }}">立即登录</a></p>
</div>
</div>
</div>
</div>
</div>
@% endblock %>
8. 邮件服务
# app/email.py
from flask import render_template, current_app
from flask_mail import Mail, Message
from threading import Thread
mail = Mail()
def send_async_email(app, msg):
"""异步发送邮件"""
with app.app_context():
mail.send(msg)
def send_email(subject, recipients, text_body, html_body, sender=None):
"""发送邮件"""
app = current_app._get_current_object()
msg = Message(
subject=subject,
recipients=recipients,
sender=sender or current_app.config['MAIL_DEFAULT_SENDER'],
body=text_body,
html=html_body
)
# 异步发送邮件
Thread(target=send_async_email, args=(app, msg)).start()
# 配置邮件服务(config.py)
class Config:
# ... 其他配置
MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.gmail.com')
MAIL_PORT = int(os.environ.get('MAIL_PORT', 587))
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER', 'noreply@example.com')
ADMINS = ['admin@example.com']
9. 会话安全配置
# config.py
import os
from datetime import timedelta
class Config:
"""基础配置"""
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-this'
# 会话安全配置
SESSION_COOKIE_NAME = 'flask_session'
SESSION_COOKIE_HTTPONLY = True # 防止JavaScript访问
SESSION_COOKIE_SECURE = True # 生产环境设置为True,仅HTTPS
SESSION_COOKIE_SAMESITE = 'Lax' # 防止CSRF
# 记住我Cookie配置
REMEMBER_COOKIE_NAME = 'flask_remember'
REMEMBER_COOKIE_DURATION = timedelta(days=30)
REMEMBER_COOKIE_HTTPONLY = True
REMEMBER_COOKIE_SECURE = True
REMEMBER_COOKIE_SAMESITE = 'Lax'
# 防止会话劫持
PERMANENT_SESSION_LIFETIME = timedelta(days=1)
SESSION_REFRESH_EACH_REQUEST = True # 每次请求刷新会话
# 安全头配置
@staticmethod
def init_app(app):
"""初始化安全头"""
from flask_talisman import Talisman
# 设置安全头
csp = {
'default-src': "'self'",
'style-src': ["'self'", "'unsafe-inline'", "https://cdn.jsdelivr.net"],
'script-src': ["'self'", "https://cdn.jsdelivr.net"],
'img-src': ["'self'", "data:", "https:"],
'font-src': ["'self'", "https://cdn.jsdelivr.net"],
}
Talisman(
app,
content_security_policy=csp,
session_cookie_secure=True,
session_cookie_http_only=True,
force_https=False # 开发环境设为False
)
10. 测试用例
# tests/test_auth.py
import unittest
from flask import url_for
from app import create_app, db
from app.models import User
from config import TestingConfig
class AuthTestCase(unittest.TestCase):
def setUp(self):
"""测试前准备"""
self.app = create_app(TestingConfig)
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
self.client = self.app.test_client()
def tearDown(self):
"""测试后清理"""
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_user_registration(self):
"""测试用户注册"""
response = self.client.post(url_for('auth.register'), data={
'username': 'testuser',
'email': 'test@example.com',
'password': 'TestPass123',
'confirm_password': 'TestPass123',
'agree_terms': True
})
# 检查是否重定向到登录页面
self.assertEqual(response.status_code, 302)
self.assertIn(url_for('auth.login'), response.location)
# 检查用户是否创建成功
user = User.query.filter_by(username='testuser').first()
self.assertIsNotNone(user)
self.assertEqual(user.email, 'test@example.com')
def test_user_login(self):
"""测试用户登录"""
# 先创建用户
user = User(username='testuser', email='test@example.com', password='TestPass123')
db.session.add(user)
db.session.commit()
# 测试登录
response = self.client.post(url_for('auth.login'), data={
'username': 'testuser',
'password': 'TestPass123'
})
self.assertEqual(response.status_code, 302)
# 测试错误的密码
response = self.client.post(url_for('auth.login'), data={
'username': 'testuser',
'password': 'WrongPass'
})
self.assertIn(b'用户名或密码错误', response.data)
def test_protected_route(self):
"""测试受保护的路由"""
# 未登录时访问个人资料页
response = self.client.get(url_for('auth.profile'), follow_redirects=True)
self.assertIn(b'请先登录', response.data)
# 创建并登录用户
user = User(username='testuser', email='test@example.com', password='TestPass123')
db.session.add(user)
db.session.commit()
self.client.post(url_for('auth.login'), data={
'username': 'testuser',
'password': 'TestPass123'
})
# 登录后访问个人资料页
response = self.client.get(url_for('auth.profile'))
self.assertEqual(response.status_code, 200)
def test_password_reset(self):
"""测试密码重置"""
# 创建用户
user = User(username='testuser', email='test@example.com', password='OldPass123')
db.session.add(user)
db.session.commit()
# 请求重置密码
response = self.client.post(url_for('auth.reset_password_request'), data={
'email': 'test@example.com'
})
self.assertEqual(response.status_code, 302)
if __name__ == '__main__':
unittest.main()
11. 生产环境安全建议
-
使用HTTPS:所有认证相关的请求必须使用HTTPS
-
强密码策略:要求密码包含大小写字母、数字和特殊字符
-
会话安全:设置安全的会话Cookie属性(HttpOnly, Secure, SameSite)
-
防止暴力破解:实施登录尝试限制和验证码
-
密码过期策略:定期要求用户修改密码
-
登录通知:发送邮件通知用户新设备登录
-
防止账户枚举:在登录和注册时返回统一的错误信息
-
审计日志:记录所有登录尝试和敏感操作