客户端发送给服务器的消息
服务器返回给客户端的消息
获取资源
安全 幂等创建资源
不安全 非幂等替换资源
不安全 幂等删除资源
不安全 幂等部分更新
不安全 幂等获取头部
安全 幂等Flask使用request对象封装HTTP请求的所有信息。需要先从flask模块导入:
from flask import Flask, request
| 属性 | 描述 | 示例 |
|---|---|---|
request.method |
HTTP请求方法 | 'GET', 'POST', 'PUT' |
request.args |
GET请求的查询参数 | ?name=value |
request.form |
POST请求的表单数据 | 表单提交的数据 |
request.json |
JSON格式的请求体 | API请求的数据 |
request.files |
上传的文件 | 文件上传数据 |
request.headers |
HTTP请求头 | User-Agent, Content-Type |
request.cookies |
Cookie数据 | 会话信息 |
request.url |
完整的请求URL | http://example.com/path |
request.path |
请求的路径部分 | /path |
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/process', methods=['GET', 'POST', 'PUT'])
def process_request():
"""处理不同类型的请求"""
# 获取请求方法
method = request.method
# 根据方法处理不同数据
if method == 'GET':
# 获取查询参数
name = request.args.get('name', 'Guest')
age = request.args.get('age', '18')
return f'GET请求: 姓名={name}, 年龄={age}'
elif method == 'POST':
# 检查Content-Type
content_type = request.headers.get('Content-Type', '')
if 'application/json' in content_type:
# 处理JSON数据
data = request.get_json()
return jsonify({
'method': 'POST',
'data': data,
'message': 'JSON数据已接收'
})
elif 'multipart/form-data' in content_type:
# 处理表单数据和文件上传
username = request.form.get('username')
file = request.files.get('avatar')
if file:
filename = file.filename
file.save(f'uploads/{filename}')
return f'POST请求: 用户名={username}, 文件已保存'
else:
# 普通表单数据
username = request.form.get('username', '匿名用户')
email = request.form.get('email', '')
return f'POST请求: 用户名={username}, 邮箱={email}'
elif method == 'PUT':
# 获取原始数据
raw_data = request.get_data(as_text=True)
return f'PUT请求原始数据: {raw_data}'
else:
return f'不支持的请求方法: {method}', 405
if __name__ == '__main__':
app.run(debug=True)
URL示例: /search?q=flask&category=web&page=1
from flask import request, render_template
@app.route('/search')
def search():
"""搜索功能 - GET请求示例"""
# 获取查询参数
query = request.args.get('q', '')
category = request.args.get('category', 'all')
page = request.args.get('page', '1')
per_page = request.args.get('per_page', '10')
try:
page = int(page)
per_page = int(per_page)
except ValueError:
page = 1
per_page = 10
# 模拟搜索逻辑
results = []
if query:
# 这里应该是实际的搜索逻辑
results = [
{'id': 1, 'title': f'{query}教程', 'url': '/tutorial/1'},
{'id': 2, 'title': f'{query}入门指南', 'url': '/guide/1'},
]
# 构建下一页的URL
next_page_url = f'/search?q={query}&category={category}&page={page+1}'
return render_template('search.html',
query=query,
category=category,
page=page,
results=results,
next_page_url=next_page_url)
@app.route('/api/users')
def get_users():
"""API示例 - 安全地获取查询参数"""
# 使用get()方法并提供默认值
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
sort_by = request.args.get('sort_by', 'id')
order = request.args.get('order', 'asc')
# 验证参数
if page < 1:
page = 1
if limit < 1 or limit > 100:
limit = 10
if order not in ['asc', 'desc']:
order = 'asc'
# 模拟数据库查询
users = [
{'id': i, 'name': f'User{i}', 'email': f'user{i}@example.com'}
for i in range(1, limit + 1)
]
# 分页逻辑
start_idx = (page - 1) * limit
end_idx = page * limit
paginated_users = users[start_idx:end_idx]
return jsonify({
'page': page,
'limit': limit,
'total': len(users),
'users': paginated_users,
'has_next': end_idx < len(users)
})
1. 用户填写表单
2. 浏览器发送POST请求
3. 服务器处理并响应
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>用户注册</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<div class="container mt-5">
<div class="row justify-content-center">
<div class="col-md-6">
<h2 class="mb-4 text-center">用户注册</h2>
<!-- 注意:表单的method必须是POST -->
<form method="POST" action="{{ url_for('register') }}">
<div class="mb-3">
<label for="username" class="form-label">用户名</label>
<input type="text" class="form-control" id="username" name="username" required>
</div>
<div class="mb-3">
<label for="email" class="form-label">邮箱地址</label>
<input type="email" class="form-control" id="email" name="email" required>
</div>
<div class="mb-3">
<label for="password" class="form-label">密码</label>
<input type="password" class="form-control" id="password" name="password" required>
</div>
<div class="mb-3">
<label for="confirm_password" class="form-label">确认密码</label>
<input type="password" class="form-control" id="confirm_password" name="confirm_password" required>
</div>
<div class="mb-3 form-check">
<input type="checkbox" class="form-check-input" id="agree" name="agree" required>
<label class="form-check-label" for="agree">同意用户协议</label>
</div>
<button type="submit" class="btn btn-primary w-100">注册</button>
</form>
<!-- 显示错误信息 -->
{% if error %}
<div class="alert alert-danger mt-3">
{{ error }}
</div>
{% endif %}
<!-- 显示成功信息 -->
{% if success %}
<div class="alert alert-success mt-3">
{{ success }}
</div>
{% endif %}
</div>
</div>
</div>
</body>
</html>
from flask import render_template, redirect, url_for, flash
@app.route('/register', methods=['GET', 'POST'])
def register():
"""用户注册 - 处理GET和POST请求"""
if request.method == 'POST':
# 获取表单数据
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip()
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
agree = request.form.get('agree') == 'on'
# 验证数据
errors = []
if not username:
errors.append('用户名不能为空')
elif len(username) < 3:
errors.append('用户名至少3个字符')
if not email:
errors.append('邮箱不能为空')
elif '@' not in email:
errors.append('邮箱格式不正确')
if not password:
errors.append('密码不能为空')
elif len(password) < 6:
errors.append('密码至少6个字符')
elif password != confirm_password:
errors.append('两次输入的密码不一致')
if not agree:
errors.append('必须同意用户协议')
# 处理验证结果
if errors:
return render_template('register.html', error='
'.join(errors))
else:
# 这里应该保存到数据库
# save_user_to_db(username, email, password)
# 使用flash消息
flash('注册成功!请登录。', 'success')
# 重定向到登录页面
return redirect(url_for('login'))
# GET请求 - 显示注册表单
return render_template('register.html')
# 需要在模板中显示flash消息
# 在base.html中添加:
# {% with messages = get_flashed_messages(with_categories=true) %}
# {% if messages %}
# {% for category, message in messages %}
# {{ message }}
# {% endfor %}
# {% endif %}
# {% endwith %}
@app.route('/api/posts', methods=['POST'])
def create_post():
"""创建文章 - JSON API示例"""
# 检查请求头
if not request.is_json:
return jsonify({'error': 'Content-Type必须是application/json'}), 400
# 获取JSON数据
data = request.get_json()
if not data:
return jsonify({'error': '请求体不能为空'}), 400
# 验证必需字段
required_fields = ['title', 'content', 'author_id']
for field in required_fields:
if field not in data:
return jsonify({'error': f'缺少必需字段: {field}'}), 400
# 验证数据
title = data.get('title', '').strip()
content = data.get('content', '').strip()
author_id = data.get('author_id')
if not title:
return jsonify({'error': '标题不能为空'}), 400
if not content:
return jsonify({'error': '内容不能为空'}), 400
try:
author_id = int(author_id)
except (ValueError, TypeError):
return jsonify({'error': '作者ID必须是整数'}), 400
# 模拟保存到数据库
new_post = {
'id': 1001, # 假设这是数据库生成的新ID
'title': title,
'content': content,
'author_id': author_id,
'created_at': '2023-01-01T00:00:00Z',
'updated_at': '2023-01-01T00:00:00Z'
}
# 返回创建成功的响应
return jsonify({
'message': '文章创建成功',
'post': new_post,
'links': {
'self': url_for('get_post', post_id=new_post['id'], _external=True),
'author': url_for('get_user', user_id=author_id, _external=True)
}
}), 201 # 201 Created状态码
完整替换资源
部分更新资源
from flask import request, jsonify
# 模拟的数据库
users_db = {
1: {'id': 1, 'name': '张三', 'email': 'zhangsan@example.com', 'age': 25},
2: {'id': 2, 'name': '李四', 'email': 'lisi@example.com', 'age': 30}
}
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user_put(user_id):
"""使用PUT方法更新用户(完整替换)"""
if user_id not in users_db:
return jsonify({'error': '用户不存在'}), 404
# PUT需要完整的资源数据
data = request.get_json()
required_fields = ['name', 'email', 'age']
for field in required_fields:
if field not in data:
return jsonify({'error': f'PUT请求需要完整数据,缺少字段: {field}'}), 400
# 验证数据
name = data.get('name', '').strip()
email = data.get('email', '').strip()
age = data.get('age')
if not name:
return jsonify({'error': '姓名不能为空'}), 400
if not email or '@' not in email:
return jsonify({'error': '邮箱格式不正确'}), 400
try:
age = int(age)
if age < 0 or age > 150:
return jsonify({'error': '年龄无效'}), 400
except (ValueError, TypeError):
return jsonify({'error': '年龄必须是整数'}), 400
# 完整替换用户数据
users_db[user_id] = {
'id': user_id,
'name': name,
'email': email,
'age': age,
'updated_at': '2023-01-01T00:00:00Z'
}
return jsonify({
'message': '用户已更新',
'user': users_db[user_id]
})
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
def update_user_patch(user_id):
"""使用PATCH方法更新用户(部分更新)"""
if user_id not in users_db:
return jsonify({'error': '用户不存在'}), 404
data = request.get_json()
if not data:
return jsonify({'error': '请求体不能为空'}), 400
current_user = users_db[user_id]
updated_fields = []
# 只更新提供的字段
if 'name' in data:
name = data['name'].strip()
if name:
current_user['name'] = name
updated_fields.append('name')
if 'email' in data:
email = data['email'].strip()
if email and '@' in email:
current_user['email'] = email
updated_fields.append('email')
if 'age' in data:
try:
age = int(data['age'])
if 0 <= age <= 150:
current_user['age'] = age
updated_fields.append('age')
except (ValueError, TypeError):
pass # 忽略无效的年龄值
current_user['updated_at'] = '2023-01-01T00:00:00Z'
return jsonify({
'message': f'用户已部分更新(更新的字段: {", ".join(updated_fields)})',
'user': current_user
})
# 对比两种方法的使用:
# PUT请求需要发送完整数据:
# {
# "name": "张三",
# "email": "zhangsan@new.com",
# "age": 26
# }
# PATCH请求只需要发送要修改的字段:
# {
# "age": 26
# }
# 或
# {
# "email": "zhangsan@new.com"
# }
DELETE操作是不可逆的。在实际应用中,应该:
from flask import jsonify, abort
# 模拟带状态的数据库
posts_db = {
1: {'id': 1, 'title': '文章1', 'content': '内容1', 'deleted': False},
2: {'id': 2, 'title': '文章2', 'content': '内容2', 'deleted': False},
3: {'id': 3, 'title': '文章3', 'content': '内容3', 'deleted': False}
}
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
def delete_post(post_id):
"""删除文章 - 使用软删除"""
if post_id not in posts_db:
return jsonify({'error': '文章不存在'}), 404
post = posts_db[post_id]
if post['deleted']:
return jsonify({'error': '文章已被删除'}), 410 # 410 Gone
# 执行软删除(标记为已删除)
posts_db[post_id]['deleted'] = True
posts_db[post_id]['deleted_at'] = '2023-01-01T00:00:00Z'
return jsonify({
'message': f'文章 {post_id} 已删除',
'deleted_at': posts_db[post_id]['deleted_at']
}), 200
@app.route('/api/posts/<int:post_id>/force', methods=['DELETE'])
def force_delete_post(post_id):
"""强制删除文章 - 物理删除"""
# 在实际应用中,这里应该有管理员权限检查
# if not current_user.is_admin:
# abort(403, description="需要管理员权限")
if post_id not in posts_db:
return jsonify({'error': '文章不存在'}), 404
# 物理删除(从字典中移除)
deleted_post = posts_db.pop(post_id)
# 记录删除日志
log_deletion(post_id, deleted_post['title'])
return jsonify({
'message': f'文章 {post_id} 已永久删除',
'deleted_post': deleted_post
}), 200
def log_deletion(post_id, title):
"""记录删除日志(示例函数)"""
print(f'[DELETE LOG] 文章ID: {post_id}, 标题: {title}, 时间: 2023-01-01T00:00:00Z')
# 恢复已删除的文章
@app.route('/api/posts/<int:post_id>/restore', methods=['POST'])
def restore_post(post_id):
"""恢复已删除的文章"""
if post_id not in posts_db:
return jsonify({'error': '文章不存在'}), 404
post = posts_db[post_id]
if not post['deleted']:
return jsonify({'error': '文章未被删除'}), 400
# 恢复文章
posts_db[post_id]['deleted'] = False
posts_db[post_id].pop('deleted_at', None)
posts_db[post_id]['restored_at'] = '2023-01-01T00:00:00Z'
return jsonify({
'message': f'文章 {post_id} 已恢复',
'restored_at': posts_db[post_id]['restored_at']
}), 200
HEAD方法与GET类似,但服务器只返回响应头,不返回响应体。用于获取资源的元信息。
@app.route('/api/info', methods=['GET', 'HEAD'])
def get_info():
"""同时支持GET和HEAD方法"""
data = {
'version': '1.0.0',
'author': 'Flask Team',
'timestamp': '2023-01-01T00:00:00Z'
}
if request.method == 'HEAD':
# HEAD请求,不返回响应体
response = jsonify({})
response.headers['X-API-Version'] = '1.0.0'
response.headers['Content-Type'] = 'application/json'
response.headers['Content-Length'] = str(len(str(data)))
return response
# GET请求,返回完整数据
return jsonify(data)
OPTIONS方法用于获取资源支持的HTTP方法。
@app.route('/api/users', methods=['GET', 'POST', 'OPTIONS'])
@app.route('/api/users/<int:user_id>', methods=['GET', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'])
def handle_users(user_id=None):
"""处理用户资源,支持OPTIONS方法"""
if request.method == 'OPTIONS':
# 返回支持的HTTP方法
response = jsonify({})
if user_id:
# 单个用户资源支持的方法
allowed_methods = ['GET', 'PUT', 'PATCH', 'DELETE', 'OPTIONS']
else:
# 用户集合支持的方法
allowed_methods = ['GET', 'POST', 'OPTIONS']
response.headers['Allow'] = ', '.join(allowed_methods)
response.headers['Access-Control-Allow-Methods'] = ', '.join(allowed_methods)
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return response
# 其他方法的处理逻辑...
# [GET, POST, PUT, PATCH, DELETE的实现代码]
return jsonify({'error': '方法未实现'}), 501
| HTTP方法 | URL模式 | 操作 | 描述 | 成功状态码 |
|---|---|---|---|---|
| GET | /api/users |
获取用户列表 | 检索资源集合 | 200 OK |
| GET | /api/users/{id} |
获取单个用户 | 检索单个资源 | 200 OK |
| POST | /api/users |
创建用户 | 创建新资源 | 201 Created |
| PUT | /api/users/{id} |
更新用户 | 完全替换资源 | 200 OK 或 204 No Content |
| PATCH | /api/users/{id} |
部分更新用户 | 部分修改资源 | 200 OK 或 204 No Content |
| DELETE | /api/users/{id} |
删除用户 | 删除资源 | 200 OK 或 204 No Content |
| OPTIONS | /api/users |
获取可用方法 | 查询资源支持的方法 | 200 OK |
from flask import Flask, request, jsonify, url_for, abort
from functools import wraps
app = Flask(__name__)
# 模拟数据库
books_db = {}
book_id_counter = 1
def require_json_content_type(f):
"""装饰器:要求请求的Content-Type为application/json"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.is_json:
return jsonify({'error': 'Content-Type必须是application/json'}), 400
return f(*args, **kwargs)
return decorated_function
# 获取所有书籍
@app.route('/api/books', methods=['GET'])
def get_books():
"""获取书籍列表"""
books = list(books_db.values())
# 分页参数
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 简单的分页逻辑
start = (page - 1) * per_page
end = start + per_page
paginated_books = books[start:end]
response = {
'books': paginated_books,
'pagination': {
'page': page,
'per_page': per_page,
'total': len(books),
'pages': (len(books) + per_page - 1) // per_page
}
}
return jsonify(response)
# 获取单个书籍
@app.route('/api/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
"""获取单个书籍"""
if book_id not in books_db:
abort(404, description="书籍不存在")
return jsonify(books_db[book_id])
# 创建新书籍
@app.route('/api/books', methods=['POST'])
@require_json_content_type
def create_book():
"""创建新书籍"""
global book_id_counter
data = request.get_json()
# 验证必需字段
required_fields = ['title', 'author', 'year']
for field in required_fields:
if field not in data:
return jsonify({'error': f'缺少必需字段: {field}'}), 400
# 创建新书籍
new_book = {
'id': book_id_counter,
'title': data['title'],
'author': data['author'],
'year': data['year'],
'created_at': '2023-01-01T00:00:00Z',
'updated_at': '2023-01-01T00:00:00Z'
}
books_db[book_id_counter] = new_book
book_id_counter += 1
# 响应头包含新资源的URL
response = jsonify(new_book)
response.status_code = 201
response.headers['Location'] = url_for('get_book', book_id=new_book['id'], _external=True)
return response
# 更新书籍(PUT - 完整替换)
@app.route('/api/books/<int:book_id>', methods=['PUT'])
@require_json_content_type
def update_book_put(book_id):
"""更新书籍(完整替换)"""
if book_id not in books_db:
abort(404, description="书籍不存在")
data = request.get_json()
# PUT需要完整数据
required_fields = ['title', 'author', 'year']
for field in required_fields:
if field not in data:
return jsonify({'error': f'PUT请求需要完整数据,缺少字段: {field}'}), 400
# 更新书籍
books_db[book_id].update({
'title': data['title'],
'author': data['author'],
'year': data['year'],
'updated_at': '2023-01-01T00:00:00Z'
})
return jsonify(books_db[book_id])
# 部分更新书籍(PATCH)
@app.route('/api/books/<int:book_id>', methods=['PATCH'])
@require_json_content_type
def update_book_patch(book_id):
"""部分更新书籍"""
if book_id not in books_db:
abort(404, description="书籍不存在")
data = request.get_json()
if not data:
return jsonify({'error': '请求体不能为空'}), 400
# 只更新提供的字段
book = books_db[book_id]
updated_fields = []
updatable_fields = ['title', 'author', 'year']
for field in updatable_fields:
if field in data:
book[field] = data[field]
updated_fields.append(field)
book['updated_at'] = '2023-01-01T00:00:00Z'
return jsonify({
'book': book,
'updated_fields': updated_fields
})
# 删除书籍
@app.route('/api/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
"""删除书籍"""
if book_id not in books_db:
abort(404, description="书籍不存在")
deleted_book = books_db.pop(book_id)
return jsonify({
'message': '书籍已删除',
'deleted_book': deleted_book
}), 200
# 错误处理
@app.errorhandler(404)
def not_found_error(error):
return jsonify({'error': '资源不存在', 'message': str(error.description)}), 404
@app.errorhandler(400)
def bad_request_error(error):
return jsonify({'error': '请求错误', 'message': str(error.description)}), 400
@app.errorhandler(405)
def method_not_allowed_error(error):
allowed_methods = ', '.join(error.valid_methods)
return jsonify({
'error': '方法不允许',
'message': f'该资源支持的HTTP方法: {allowed_methods}'
}), 405
CSRF(Cross-Site Request Forgery,跨站请求伪造)是一种攻击方式, 攻击者诱导用户在已登录的Web应用中执行非本意的操作。
# 安装Flask-WTF扩展
# pip install flask-wtf
from flask import Flask, render_template, request, jsonify
from flask_wtf.csrf import CSRFProtect
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here' # 必须设置密钥
# 启用CSRF保护
csrf = CSRFProtect(app)
# 对于API,可能需要禁用某些端点的CSRF保护
# csrf.exempt(api_blueprint)
@app.route('/protected-form', methods=['GET', 'POST'])
def protected_form():
"""受CSRF保护的表单"""
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired
class MyForm(FlaskForm):
name = StringField('姓名', validators=[DataRequired()])
submit = SubmitField('提交')
form = MyForm()
if form.validate_on_submit():
return f'表单提交成功: {form.name.data}'
return render_template('form.html', form=form)
# 在模板中,需要添加CSRF令牌
# <form method="post">
# {{ form.csrf_token }}
# {{ form.name.label }} {{ form.name() }}
# {{ form.submit() }}
# </form>
# 对于AJAX请求,需要发送CSRF令牌
@app.route('/api/protected-endpoint', methods=['POST'])
def protected_endpoint():
"""受CSRF保护的API端点"""
# Flask-WTF会自动验证CSRF令牌
data = request.get_json()
return jsonify({'message': '请求成功', 'data': data})
# 获取CSRF令牌(用于AJAX请求)
@app.route('/api/csrf-token', methods=['GET'])
def get_csrf_token():
"""获取CSRF令牌(用于AJAX请求)"""
from flask_wtf.csrf import generate_csrf
token = generate_csrf()
return jsonify({'csrf_token': token})
你已经掌握了Flask中HTTP方法的核心知识: