FlaskRESTful API基础
RESTful API是一种基于HTTP协议的软件架构风格,用于构建可伸缩的Web服务。
1. RESTful API核心概念
- 客户端-服务器:关注点分离
- 无状态:每个请求包含所有必要信息
- 可缓存:响应必须标记为可缓存或不可缓存
- 统一接口:简化系统架构
- 分层系统:提高可扩展性
- 按需代码:可选约束
| HTTP方法 |
CRUD操作 |
幂等性 |
| GET |
读取 |
是 |
| POST |
创建 |
否 |
| PUT |
更新/替换 |
是 |
| PATCH |
部分更新 |
否 |
| DELETE |
删除 |
是 |
2. 基础RESTful API实现
2.1 简单的图书API
from flask import Flask, jsonify, request, abort, make_response
from flask.views import MethodView
from functools import wraps
app = Flask(__name__)
# 模拟数据存储
books = [
{
'id': 1,
'title': 'Python编程从入门到实践',
'author': 'Eric Matthes',
'isbn': '9787115428028',
'price': 89.00,
'stock': 10
},
{
'id': 2,
'title': 'Flask Web开发实战',
'author': '李辉',
'isbn': '9787115481771',
'price': 79.00,
'stock': 5
}
]
# 辅助函数
def validate_book_data(data, require_all=True):
"""验证图书数据"""
required_fields = ['title', 'author', 'isbn']
if require_all:
for field in required_fields:
if field not in data:
return False, f'缺少必填字段: {field}'
# 验证数据类型
if 'price' in data and (not isinstance(data['price'], (int, float)) or data['price'] < 0):
return False, '价格必须是正数'
if 'stock' in data and (not isinstance(data['stock'], int) or data['stock'] < 0):
return False, '库存必须是正整数'
return True, '验证通过'
def find_book(book_id):
"""查找图书"""
for book in books:
if book['id'] == book_id:
return book
return None
def generate_book_id():
"""生成图书ID"""
if not books:
return 1
return max(book['id'] for book in books) + 1
# 错误处理
@app.errorhandler(404)
def not_found(error):
"""404错误处理"""
return jsonify({'error': '资源不存在'}), 404
@app.errorhandler(400)
def bad_request(error):
"""400错误处理"""
return jsonify({'error': '请求数据无效'}), 400
@app.errorhandler(405)
def method_not_allowed(error):
"""405错误处理"""
return jsonify({'error': '方法不允许'}), 405
# 使用MethodView的RESTful API
class BookAPI(MethodView):
"""图书资源API"""
def get(self, book_id=None):
"""获取图书列表或单个图书"""
if book_id is None:
# 查询参数处理
author = request.args.get('author')
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
result = books
# 过滤
if author:
result = [book for book in result if book['author'] == author]
if min_price is not None:
result = [book for book in result if book['price'] >= min_price]
if max_price is not None:
result = [book for book in result if book['price'] <= max_price]
return jsonify({
'data': result,
'count': len(result),
'total': len(books)
})
else:
# 获取单个图书
book = find_book(book_id)
if not book:
abort(404)
return jsonify({'data': book})
def post(self):
"""创建新图书"""
if not request.json:
abort(400)
data = request.json
# 验证数据
is_valid, message = validate_book_data(data)
if not is_valid:
return jsonify({'error': message}), 400
# 检查ISBN是否已存在
if any(book['isbn'] == data['isbn'] for book in books):
return jsonify({'error': 'ISBN已存在'}), 409
# 创建新图书
new_book = {
'id': generate_book_id(),
'title': data['title'],
'author': data['author'],
'isbn': data['isbn'],
'price': data.get('price', 0.0),
'stock': data.get('stock', 0)
}
books.append(new_book)
# 返回创建的资源
response = jsonify({'data': new_book})
response.status_code = 201
response.headers['Location'] = f'/api/books/{new_book["id"]}'
return response
def put(self, book_id):
"""完全更新图书"""
if not request.json:
abort(400)
book = find_book(book_id)
if not book:
abort(404)
data = request.json
# 验证数据
is_valid, message = validate_book_data(data)
if not is_valid:
return jsonify({'error': message}), 400
# 更新图书
book.update({
'title': data['title'],
'author': data['author'],
'isbn': data['isbn'],
'price': data.get('price', 0.0),
'stock': data.get('stock', 0)
})
return jsonify({'data': book})
def patch(self, book_id):
"""部分更新图书"""
if not request.json:
abort(400)
book = find_book(book_id)
if not book:
abort(404)
data = request.json
# 验证数据
is_valid, message = validate_book_data(data, require_all=False)
if not is_valid:
return jsonify({'error': message}), 400
# 部分更新
for key, value in data.items():
if key in ['title', 'author', 'isbn', 'price', 'stock']:
book[key] = value
return jsonify({'data': book})
def delete(self, book_id):
"""删除图书"""
book = find_book(book_id)
if not book:
abort(404)
books.remove(book)
# 204 No Content
return '', 204
# 注册路由
book_view = BookAPI.as_view('book_api')
app.add_url_rule('/api/books', view_func=book_view, methods=['GET', 'POST'])
app.add_url_rule('/api/books/<int:book_id>', view_func=book_view,
methods=['GET', 'PUT', 'PATCH', 'DELETE'])
if __name__ == '__main__':
app.run(debug=True)
3. 使用Flask-RESTful扩展
# 安装扩展
# pip install flask-restful
from flask import Flask
from flask_restful import Api, Resource, reqparse, fields, marshal_with
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
api = Api(app)
# 请求参数解析器
book_parser = reqparse.RequestParser()
book_parser.add_argument('title', type=str, required=True, help='书名不能为空')
book_parser.add_argument('author', type=str, required=True, help='作者不能为空')
book_parser.add_argument('isbn', type=str, required=True, help='ISBN不能为空')
book_parser.add_argument('price', type=float, default=0.0)
book_parser.add_argument('stock', type=int, default=0)
# 响应字段格式化
book_fields = {
'id': fields.Integer,
'title': fields.String,
'author': fields.String,
'isbn': fields.String,
'price': fields.Float,
'stock': fields.Integer,
'links': fields.Url('book_resource', absolute=True)
}
book_list_fields = {
'books': fields.List(fields.Nested(book_fields)),
'count': fields.Integer,
'total': fields.Integer,
'next': fields.Url('book_list_resource', absolute=True),
'prev': fields.Url('book_list_resource', absolute=True)
}
# 自定义错误处理
class BookNotFound(HTTPException):
code = 404
description = '图书不存在'
class ValidationError(HTTPException):
code = 400
description = '数据验证失败'
# 资源类
class BookListResource(Resource):
"""图书列表资源"""
@marshal_with(book_list_fields)
def get(self):
"""获取图书列表"""
# 分页参数
parser = reqparse.RequestParser()
parser.add_argument('page', type=int, default=1)
parser.add_argument('per_page', type=int, default=10)
parser.add_argument('author', type=str)
args = parser.parse_args()
page = args['page']
per_page = args['per_page']
author = args['author']
# 过滤
result = books
if author:
result = [book for book in result if book['author'] == author]
# 分页
start = (page - 1) * per_page
end = start + per_page
paginated_books = result[start:end]
return {
'books': paginated_books,
'count': len(paginated_books),
'total': len(result),
'next': f'/api/v2/books?page={page + 1}&per_page={per_page}' if end < len(result) else None,
'prev': f'/api/v2/books?page={page - 1}&per_page={per_page}' if start > 0 else None
}
@marshal_with(book_fields)
def post(self):
"""创建新图书"""
args = book_parser.parse_args()
# 验证ISBN唯一性
if any(book['isbn'] == args['isbn'] for book in books):
raise ValidationError('ISBN已存在')
# 创建新图书
new_book = {
'id': generate_book_id(),
'title': args['title'],
'author': args['author'],
'isbn': args['isbn'],
'price': args['price'],
'stock': args['stock']
}
books.append(new_book)
return new_book, 201
class BookResource(Resource):
"""单个图书资源"""
@marshal_with(book_fields)
def get(self, book_id):
"""获取单个图书"""
book = find_book(book_id)
if not book:
raise BookNotFound('图书不存在')
return book
@marshal_with(book_fields)
def put(self, book_id):
"""更新图书"""
book = find_book(book_id)
if not book:
raise BookNotFound('图书不存在')
args = book_parser.parse_args()
# 检查ISBN唯一性(排除当前图书)
if any(b['isbn'] == args['isbn'] for b in books if b['id'] != book_id):
raise ValidationError('ISBN已存在')
# 更新图书
book.update({
'title': args['title'],
'author': args['author'],
'isbn': args['isbn'],
'price': args['price'],
'stock': args['stock']
})
return book
def delete(self, book_id):
"""删除图书"""
book = find_book(book_id)
if not book:
raise BookNotFound('图书不存在')
books.remove(book)
return '', 204
# 注册资源
api.add_resource(BookListResource, '/api/v2/books')
api.add_resource(BookResource, '/api/v2/books/<int:book_id>')
# 错误处理
api.handle_error = lambda e: {
'error': e.description if hasattr(e, 'description') else str(e),
'code': e.code if hasattr(e, 'code') else 500
}, e.code if hasattr(e, 'code') else 500
4. RESTful API设计最佳实践
4.1 URL设计规范
# 好的URL设计示例
"""
资源集合:
GET /api/books # 获取图书列表
POST /api/books # 创建新图书
单个资源:
GET /api/books/1 # 获取ID为1的图书
PUT /api/books/1 # 更新图书1
PATCH /api/books/1 # 部分更新图书1
DELETE /api/books/1 # 删除图书1
子资源:
GET /api/books/1/authors # 获取图书1的作者
GET /api/books/1/comments # 获取图书1的评论
POST /api/books/1/comments # 为图书1添加评论
过滤、排序、分页:
GET /api/books?author=John&page=2&per_page=20&sort=price
关系资源:
GET /api/users/1/books # 获取用户1的图书
"""
# 避免的URL设计
"""
❌ 不使用动词:
GET /api/getBooks
POST /api/createBook
PUT /api/updateBook
❌ 不使用大写:
GET /api/Books
❌ 不使用文件扩展名:
GET /api/books.json
GET /api/books.xml
"""
# 版本控制
"""
路径版本控制:
GET /api/v1/books
GET /api/v2/books
头部版本控制:
GET /api/books
Accept: application/vnd.myapi.v1+json
Accept: application/vnd.myapi.v2+json
查询参数版本控制(不推荐):
GET /api/books?version=1
"""
4.2 响应格式规范
// 成功的响应格式
{
"data": {
"id": 1,
"title": "Python编程",
"author": "Eric Matthes",
"price": 89.00
},
"meta": {
"request_id": "req_123456",
"timestamp": "2023-10-01T12:00:00Z"
},
"links": {
"self": "/api/books/1",
"related": "/api/books/1/authors"
}
}
// 列表响应
{
"data": [
{
"id": 1,
"title": "Python编程",
"author": "Eric Matthes"
},
{
"id": 2,
"title": "Flask实战",
"author": "李辉"
}
],
"pagination": {
"page": 1,
"per_page": 20,
"total": 100,
"total_pages": 5,
"next": "/api/books?page=2",
"prev": null
},
"meta": {
"count": 2
}
}
// 错误响应
{
"error": {
"code": "VALIDATION_ERROR",
"message": "数据验证失败",
"details": [
{
"field": "email",
"message": "邮箱格式不正确"
},
{
"field": "password",
"message": "密码长度至少8个字符"
}
]
},
"meta": {
"request_id": "req_123456",
"timestamp": "2023-10-01T12:00:00Z"
}
}
5. 认证与授权
from flask import g
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth, MultiAuth
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from functools import wraps
app.config['SECRET_KEY'] = 'your-secret-key-here'
basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth(scheme='Bearer')
multi_auth = MultiAuth(basic_auth, token_auth)
# 用户模型(简化)
class User:
def __init__(self, id, username, password_hash):
self.id = id
self.username = username
self.password_hash = password_hash
def verify_password(self, password):
return self.password_hash == password
def generate_auth_token(self, expires_in=3600):
"""生成认证令牌"""
s = Serializer(app.config['SECRET_KEY'], expires_in=expires_in)
return s.dumps({'id': self.id}).decode('utf-8')
@staticmethod
def verify_auth_token(token):
"""验证认证令牌"""
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
return User.query.get(data['id'])
# 模拟用户数据库
users = [
User(1, 'admin', 'admin123'),
User(2, 'user', 'user123')
]
def find_user_by_username(username):
return next((u for u in users if u.username == username), None)
def find_user_by_id(user_id):
return next((u for u in users if u.id == user_id), None)
# Basic认证
@basic_auth.verify_password
def verify_password(username, password):
"""验证用户名和密码"""
if username and password:
user = find_user_by_username(username)
if user and user.verify_password(password):
g.current_user = user
return user
return None
@basic_auth.error_handler
def basic_auth_error():
"""Basic认证错误处理"""
return jsonify({
'error': {
'code': 'UNAUTHORIZED',
'message': '认证失败'
}
}), 401
# Token认证
@token_auth.verify_token
def verify_token(token):
"""验证令牌"""
if token:
user = User.verify_auth_token(token)
if user:
g.current_user = user
return user
return None
@token_auth.error_handler
def token_auth_error():
"""Token认证错误处理"""
return jsonify({
'error': {
'code': 'UNAUTHORIZED',
'message': '令牌无效或已过期'
}
}), 401
# 权限装饰器
def role_required(role):
"""要求特定角色"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not hasattr(g, 'current_user') or not g.current_user:
return jsonify({'error': '需要认证'}), 401
# 检查用户角色(简化版)
if g.current_user.username != 'admin':
return jsonify({'error': '权限不足'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# 认证相关的API端点
@app.route('/api/auth/login', methods=['POST'])
def login():
"""登录并获取令牌"""
if not request.json or not 'username' in request.json or not 'password' in request.json:
return jsonify({'error': '需要用户名和密码'}), 400
username = request.json.get('username')
password = request.json.get('password')
user = find_user_by_username(username)
if not user or not user.verify_password(password):
return jsonify({'error': '用户名或密码错误'}), 401
token = user.generate_auth_token()
return jsonify({
'data': {
'token': token,
'expires_in': 3600,
'user': {
'id': user.id,
'username': user.username
}
}
})
@app.route('/api/auth/refresh', methods=['POST'])
@token_auth.login_required
def refresh_token():
"""刷新令牌"""
token = g.current_user.generate_auth_token()
return jsonify({
'data': {
'token': token,
'expires_in': 3600
}
})
@app.route('/api/auth/logout', methods=['POST'])
@token_auth.login_required
def logout():
"""登出(令牌失效)"""
# 在实际应用中,可能需要将令牌加入黑名单
return jsonify({'message': '登出成功'})
# 受保护的API端点
@app.route('/api/protected/books', methods=['GET'])
@multi_auth.login_required
def get_protected_books():
"""需要认证的图书列表"""
return jsonify({
'data': books,
'message': '认证成功'
})
@app.route('/api/admin/books', methods=['POST'])
@multi_auth.login_required
@role_required('admin')
def create_admin_book():
"""需要管理员权限的创建图书接口"""
# 管理员专属逻辑
return jsonify({'message': '管理员操作成功'})
6. 高级特性
6.1 分页和过滤
from flask_sqlalchemy import Pagination
class PaginatedAPI(Resource):
"""支持分页的API基类"""
def get_paginated_response(self, query, schema=None, **kwargs):
"""获取分页响应"""
parser = reqparse.RequestParser()
parser.add_argument('page', type=int, default=1, location='args')
parser.add_argument('per_page', type=int, default=20, location='args')
parser.add_argument('sort', type=str, location='args')
parser.add_argument('order', type=str, choices=['asc', 'desc'], default='asc', location='args')
args = parser.parse_args()
page = args['page']
per_page = args['per_page']
sort_field = args['sort']
order = args['order']
# 排序
if sort_field and hasattr(query, 'order_by'):
if order == 'desc':
query = query.order_by(getattr(query.column_descriptions[0]['type'], sort_field).desc())
else:
query = query.order_by(getattr(query.column_descriptions[0]['type'], sort_field))
# 分页
paginated = query.paginate(page=page, per_page=per_page, error_out=False)
# 序列化数据
if schema:
data = schema.dump(paginated.items, many=True)
else:
data = [item.to_dict() for item in paginated.items]
# 构建响应
response = {
'data': data,
'pagination': {
'page': paginated.page,
'per_page': paginated.per_page,
'total': paginated.total,
'pages': paginated.pages,
'has_next': paginated.has_next,
'has_prev': paginated.has_prev,
'next_num': paginated.next_num,
'prev_num': paginated.prev_num
},
'links': {
'self': request.url,
'next': self.build_url(page=paginated.next_num, per_page=per_page) if paginated.has_next else None,
'prev': self.build_url(page=paginated.prev_num, per_page=per_page) if paginated.has_prev else None,
'first': self.build_url(page=1, per_page=per_page),
'last': self.build_url(page=paginated.pages, per_page=per_page)
}
}
response.update(kwargs)
return response
def build_url(self, **kwargs):
"""构建分页URL"""
url = request.base_url
params = request.args.to_dict()
params.update(kwargs)
if params:
url += '?' + '&'.join(f'{k}={v}' for k, v in params.items() if v is not None)
return url
# 过滤器类
class BookFilter:
"""图书过滤器"""
def __init__(self, query):
self.query = query
def apply_filters(self, filters):
"""应用过滤器"""
parser = reqparse.RequestParser()
parser.add_argument('author', type=str, location='args')
parser.add_argument('min_price', type=float, location='args')
parser.add_argument('max_price', type=float, location='args')
parser.add_argument('in_stock', type=lambda x: x.lower() == 'true', location='args')
args = parser.parse_args()
# 作者过滤
if args.get('author'):
self.query = self.query.filter_by(author=args['author'])
# 价格范围过滤
if args.get('min_price') is not None:
self.query = self.query.filter(Book.price >= args['min_price'])
if args.get('max_price') is not None:
self.query = self.query.filter(Book.price <= args['max_price'])
# 库存过滤
if args.get('in_stock'):
self.query = self.query.filter(Book.stock > 0)
return self.query
6.2 缓存和限流
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_caching import Cache
import time
# 初始化扩展
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
# 限流装饰器
@app.route('/api/books')
@limiter.limit("10 per minute")
def get_books_limited():
"""限流的图书列表接口"""
return jsonify({'data': books})
# 缓存装饰器
def cache_response(timeout=300, key_prefix='view_'):
"""缓存响应装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 构建缓存键
cache_key = key_prefix + request.full_path
# 尝试从缓存获取
cached_response = cache.get(cache_key)
if cached_response is not None:
return cached_response
# 执行函数并缓存结果
response = f(*args, **kwargs)
cache.set(cache_key, response, timeout=timeout)
# 添加缓存头
response.headers['X-Cache'] = 'MISS'
response.headers['Cache-Control'] = f'public, max-age={timeout}'
return response
return decorated_function
return decorator
@app.route('/api/popular/books')
@cache_response(timeout=3600)
def get_popular_books():
"""获取热门图书(带缓存)"""
# 模拟耗时操作
time.sleep(2)
popular_books = sorted(books, key=lambda x: x.get('views', 0), reverse=True)[:10]
return jsonify({
'data': popular_books,
'cached_at': time.time()
})
# 条件缓存
@app.route('/api/books/<int:book_id>')
def get_book_cached(book_id):
"""获取图书(条件缓存)"""
cache_key = f'book_{book_id}'
# ETag缓存
if 'If-None-Match' in request.headers:
etag = request.headers['If-None-Match']
cached_etag = cache.get(f'{cache_key}_etag')
if cached_etag == etag:
return '', 304 # Not Modified
book = find_book(book_id)
if not book:
abort(404)
# 生成ETag
import hashlib
etag = hashlib.md5(str(book).encode()).hexdigest()
# 检查是否修改
if 'If-Modified-Since' in request.headers:
last_modified = cache.get(f'{cache_key}_modified')
if last_modified and last_modified <= request.headers['If-Modified-Since']:
return '', 304
response = jsonify({'data': book})
# 设置缓存头
response.headers['ETag'] = etag
response.headers['Last-Modified'] = time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime())
response.headers['Cache-Control'] = 'public, max-age=300'
# 缓存数据
cache.set(cache_key, response, timeout=300)
cache.set(f'{cache_key}_etag', etag, timeout=300)
cache.set(f'{cache_key}_modified', response.headers['Last-Modified'], timeout=300)
return response
7. API文档
from flask_swagger_ui import get_swaggerui_blueprint
from flask_restx import Api, Resource, fields, Namespace
# Swagger UI配置
SWAGGER_URL = '/api/docs'
API_URL = '/api/swagger.json'
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={
'app_name': "图书API",
'docExpansion': 'none',
'supportedSubmitMethods': ['get', 'post', 'put', 'delete'],
'validatorUrl': None
}
)
app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)
# 生成Swagger文档
@app.route('/api/swagger.json')
def swagger_json():
"""生成Swagger文档"""
swagger_doc = {
"openapi": "3.0.0",
"info": {
"title": "图书API",
"description": "图书管理RESTful API",
"version": "1.0.0",
"contact": {
"name": "API支持",
"email": "support@example.com"
}
},
"servers": [
{
"url": "http://localhost:5000/api",
"description": "开发服务器"
}
],
"paths": {
"/books": {
"get": {
"summary": "获取图书列表",
"description": "获取所有图书,支持过滤和分页",
"parameters": [
{
"name": "page",
"in": "query",
"description": "页码",
"required": False,
"schema": {"type": "integer", "default": 1}
},
{
"name": "per_page",
"in": "query",
"description": "每页数量",
"required": False,
"schema": {"type": "integer", "default": 20}
},
{
"name": "author",
"in": "query",
"description": "按作者过滤",
"required": False,
"schema": {"type": "string"}
}
],
"responses": {
"200": {
"description": "成功",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {"$ref": "#/components/schemas/Book"}
},
"pagination": {"$ref": "#/components/schemas/Pagination"}
}
}
}
}
}
}
},
"post": {
"summary": "创建新图书",
"description": "创建一本新图书",
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/BookInput"}
}
}
},
"responses": {
"201": {
"description": "创建成功",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Book"}
}
}
},
"400": {
"description": "请求数据无效",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Error"}
}
}
}
}
}
}
},
"components": {
"schemas": {
"Book": {
"type": "object",
"properties": {
"id": {"type": "integer", "example": 1},
"title": {"type": "string", "example": "Python编程"},
"author": {"type": "string", "example": "Eric Matthes"},
"isbn": {"type": "string", "example": "9787115428028"},
"price": {"type": "number", "format": "float", "example": 89.00},
"stock": {"type": "integer", "example": 10}
}
},
"BookInput": {
"type": "object",
"required": ["title", "author", "isbn"],
"properties": {
"title": {"type": "string", "example": "Python编程"},
"author": {"type": "string", "example": "Eric Matthes"},
"isbn": {"type": "string", "example": "9787115428028"},
"price": {"type": "number", "format": "float", "example": 89.00},
"stock": {"type": "integer", "example": 10}
}
},
"Pagination": {
"type": "object",
"properties": {
"page": {"type": "integer", "example": 1},
"per_page": {"type": "integer", "example": 20},
"total": {"type": "integer", "example": 100},
"pages": {"type": "integer", "example": 5}
}
},
"Error": {
"type": "object",
"properties": {
"error": {
"type": "object",
"properties": {
"code": {"type": "string", "example": "VALIDATION_ERROR"},
"message": {"type": "string", "example": "数据验证失败"},
"details": {
"type": "array",
"items": {
"type": "object",
"properties": {
"field": {"type": "string", "example": "email"},
"message": {"type": "string", "example": "邮箱格式不正确"}
}
}
}
}
}
}
}
},
"securitySchemes": {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
},
"security": [{"BearerAuth": []}]
}
return jsonify(swagger_doc)
8. 测试RESTful API
import unittest
import json
from app import create_app, db
from config import TestingConfig
class RESTfulAPITestCase(unittest.TestCase):
"""RESTful API测试用例"""
def setUp(self):
"""测试前准备"""
self.app = create_app(TestingConfig)
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
self.client = self.app.test_client()
self.auth_headers = None
# 创建测试数据
self.create_test_data()
def tearDown(self):
"""测试后清理"""
db.session.remove()
db.drop_all()
self.app_context.pop()
def create_test_data(self):
"""创建测试数据"""
from app.models import Book, User
# 创建测试图书
book1 = Book(title='测试图书1', author='作者1', isbn='1111111111')
book2 = Book(title='测试图书2', author='作者2', isbn='2222222222')
db.session.add_all([book1, book2])
db.session.commit()
# 创建测试用户
user = User(username='testuser', email='test@example.com')
user.password = 'test123'
db.session.add(user)
db.session.commit()
self.test_user = user
def authenticate(self, username='testuser', password='test123'):
"""认证并获取令牌"""
response = self.client.post('/api/auth/login',
data=json.dumps({
'username': username,
'password': password
}),
content_type='application/json')
data = json.loads(response.data)
token = data['data']['token']
self.auth_headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def test_get_books(self):
"""测试获取图书列表"""
response = self.client.get('/api/books')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('data', data)
self.assertIn('count', data)
def test_get_single_book(self):
"""测试获取单个图书"""
response = self.client.get('/api/books/1')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('data', data)
self.assertEqual(data['data']['id'], 1)
def test_create_book(self):
"""测试创建图书"""
self.authenticate()
new_book = {
'title': '新图书',
'author': '新作者',
'isbn': '3333333333',
'price': 99.00,
'stock': 5
}
response = self.client.post('/api/books',
data=json.dumps(new_book),
headers=self.auth_headers)
self.assertEqual(response.status_code, 201)
data = json.loads(response.data)
self.assertIn('data', data)
self.assertEqual(data['data']['title'], '新图书')
def test_update_book(self):
"""测试更新图书"""
self.authenticate()
updated_data = {
'title': '更新后的标题',
'author': '作者1',
'isbn': '1111111111',
'price': 79.00,
'stock': 8
}
response = self.client.put('/api/books/1',
data=json.dumps(updated_data),
headers=self.auth_headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['data']['title'], '更新后的标题')
self.assertEqual(data['data']['price'], 79.00)
def test_delete_book(self):
"""测试删除图书"""
self.authenticate()
response = self.client.delete('/api/books/1',
headers=self.auth_headers)
self.assertEqual(response.status_code, 204)
def test_authentication_required(self):
"""测试认证要求"""
# 未认证的请求
response = self.client.post('/api/books',
data=json.dumps({
'title': '测试图书',
'author': '测试作者',
'isbn': '4444444444'
}),
content_type='application/json')
self.assertEqual(response.status_code, 401)
def test_validation_error(self):
"""测试数据验证错误"""
self.authenticate()
invalid_book = {
'title': '', # 标题为空
'author': '作者',
'isbn': '5555555555'
}
response = self.client.post('/api/books',
data=json.dumps(invalid_book),
headers=self.auth_headers)
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertIn('error', data)
def test_pagination(self):
"""测试分页"""
response = self.client.get('/api/books?page=1&per_page=5')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('pagination', data)
self.assertEqual(data['pagination']['page'], 1)
def test_filtering(self):
"""测试过滤"""
response = self.client.get('/api/books?author=作者1')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
# 检查返回的数据都符合过滤条件
for book in data['data']:
self.assertEqual(book['author'], '作者1')
if __name__ == '__main__':
unittest.main()
9. 部署与监控
-
性能监控:使用Prometheus + Grafana监控API性能
-
日志记录:结构化日志记录所有API请求
-
异常报警:设置API异常报警机制
-
安全扫描:定期进行API安全扫描
-
负载均衡:使用Nginx或云负载均衡器
-
数据库优化:优化API数据库查询
-
版本管理:平滑的API版本升级策略
-
文档维护:保持API文档的及时更新