文件上传是Web应用中常见的功能,Flask通过request.files对象和Werkzeug库提供了强大的文件上传支持。
首先创建一个HTML表单,必须设置enctype="multipart/form-data":
<!-- templates/upload.html -->
{% raw %}{% extends "base.html" %}{% endraw %}
{% raw %}{% block content %}{% endraw %}
<div class="row justify-content-center">
<div class="col-md-8">
<div class="card">
<div class="card-header">
<h3 class="text-center">文件上传</h3>
</div>
<div class="card-body">
<form method="POST" action="{{ url_for('upload_file') }}" enctype="multipart/form-data">
<div class="mb-3">
<label for="file" class="form-label">选择文件</label>
<input type="file" class="form-control" id="file" name="file" required>
<div class="form-text">支持的文件类型: JPG, PNG, PDF, DOC (最大10MB)</div>
</div>
<div class="mb-3">
<label for="description" class="form-label">文件描述 (可选)</label>
<textarea class="form-control" id="description" name="description" rows="3"></textarea>
</div>
<div class="d-grid">
<button type="submit" class="btn btn-primary">上传文件</button>
</div>
</form>
</div>
</div>
</div>
</div>
{% raw %}{% endblock %}{% endraw %}
# app.py - 基础文件上传
import os
from flask import Flask, render_template, request, flash, redirect, url_for
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
# 配置上传参数
UPLOAD_FOLDER = 'uploads' # 上传目录
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10MB限制
def allowed_file(filename):
"""检查文件扩展名是否允许"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
# 检查是否有文件部分
if 'file' not in request.files:
flash('没有选择文件', 'error')
return redirect(request.url)
file = request.files['file']
description = request.form.get('description', '')
# 如果用户没有选择文件,浏览器会提交一个空文件
if file.filename == '':
flash('没有选择文件', 'error')
return redirect(request.url)
# 检查文件类型
if file and allowed_file(file.filename):
# 安全化文件名
filename = secure_filename(file.filename)
# 确保上传目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# 保存文件
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
# 保存文件信息(这里可以保存到数据库)
file_info = {
'filename': filename,
'original_name': file.filename,
'file_size': os.path.getsize(file_path),
'description': description,
'upload_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
flash(f'文件 "{filename}" 上传成功!', 'success')
return redirect(url_for('upload_file'))
else:
flash('不支持的文件类型', 'error')
return redirect(request.url)
return render_template('upload.html')
if __name__ == '__main__':
app.run(debug=True)
secure_filename()防止路径遍历攻击MAX_CONTENT_LENGTH限制文件大小# 增强版文件验证
import imghdr # 用于检查图片类型
import magic # python-magic库,需要安装
from werkzeug.utils import secure_filename
import os
class FileValidator:
"""文件验证器类"""
def __init__(self, allowed_extensions=None, max_size=10*1024*1024):
self.allowed_extensions = allowed_extensions or {'jpg', 'jpeg', 'png', 'gif', 'pdf'}
self.max_size = max_size
def validate_file(self, file_stream, filename):
"""综合验证文件"""
errors = []
# 1. 检查文件名
if not self._is_safe_filename(filename):
errors.append("文件名不安全")
# 2. 检查文件大小
if not self._validate_size(file_stream):
errors.append(f"文件大小超过限制 ({self.max_size/1024/1024}MB)")
# 3. 检查扩展名
if not self._validate_extension(filename):
errors.append(f"不支持的文件扩展名,支持: {', '.join(self.allowed_extensions)}")
# 4. 检查文件内容类型(更安全)
if not self._validate_content_type(file_stream, filename):
errors.append("文件内容类型与扩展名不匹配")
return errors
def _is_safe_filename(self, filename):
"""检查文件名是否安全"""
safe_name = secure_filename(filename)
return safe_name == filename and '..' not in filename
def _validate_size(self, file_stream):
"""验证文件大小"""
# 保存当前位置
current_pos = file_stream.tell()
# 移动到文件末尾
file_stream.seek(0, 2)
size = file_stream.tell()
# 回到原始位置
file_stream.seek(current_pos)
return size <= self.max_size
def _validate_extension(self, filename):
"""验证文件扩展名"""
if '.' not in filename:
return False
ext = filename.rsplit('.', 1)[1].lower()
return ext in self.allowed_extensions
def _validate_content_type(self, file_stream, filename):
"""验证文件实际内容类型"""
# 对于图片,使用imghdr检查
if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
current_pos = file_stream.tell()
file_type = imghdr.what(file_stream)
file_stream.seek(current_pos)
if not file_type:
return False
# 使用python-magic进行更严格的验证
try:
import magic
current_pos = file_stream.tell()
mime = magic.from_buffer(file_stream.read(2048), mime=True)
file_stream.seek(current_pos)
# 检查MIME类型是否与扩展名匹配
expected_mimes = {
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'pdf': 'application/pdf',
'txt': 'text/plain',
}
ext = filename.rsplit('.', 1)[1].lower()
if ext in expected_mimes and mime != expected_mimes.get(ext):
return False
except ImportError:
# 如果没有安装python-magic,跳过这部分检查
pass
return True
# 在Flask应用中使用
@app.route('/upload-secure', methods=['POST'])
def upload_file_secure():
if 'file' not in request.files:
return jsonify({'error': '没有文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
# 创建验证器
validator = FileValidator(
allowed_extensions={'jpg', 'jpeg', 'png', 'gif', 'pdf', 'txt'},
max_size=5*1024*1024 # 5MB
)
# 验证文件
errors = validator.validate_file(file.stream, file.filename)
if errors:
return jsonify({'error': '; '.join(errors)}), 400
# 保存文件
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return jsonify({'message': '文件上传成功', 'filename': filename}), 200
<!-- templates/multi_upload.html -->
<form method="POST" action="{{ url_for('upload_multiple_files') }}" enctype="multipart/form-data">
<div class="mb-3">
<label for="files" class="form-label">选择多个文件</label>
<input type="file" class="form-control" id="files" name="files" multiple required>
<div class="form-text">按住Ctrl键可以选择多个文件</div>
</div>
<div class="mb-3">
<label for="category" class="form-label">分类</label>
<select class="form-control" id="category" name="category">
<option value="images">图片</option>
<option value="documents">文档</option>
<option value="other">其他</option>
</select>
</div>
<div class="d-grid">
<button type="submit" class="btn btn-primary">上传多个文件</button>
</div>
</form>
<!-- 显示已选择文件的预览 -->
<div id="file-preview" class="mt-3"></div>
<script>
document.getElementById('files').addEventListener('change', function(e) {
const preview = document.getElementById('file-preview');
preview.innerHTML = '';
const files = e.target.files;
if (files.length === 0) return;
const list = document.createElement('div');
list.className = 'list-group';
for (let i = 0; i < files.length; i++) {
const file = files[i];
const item = document.createElement('div');
item.className = 'list-group-item d-flex justify-content-between align-items-center';
item.innerHTML = `
<div>
<i class="fas fa-file me-2"></i>
${file.name}
<small class="text-muted ms-2">(${(file.size / 1024).toFixed(2)} KB)</small>
</div>
<span class="badge bg-secondary">${file.type || '未知类型'}</span>
`;
list.appendChild(item);
}
const header = document.createElement('h6');
header.className = 'mt-3';
header.textContent = `已选择 ${files.length} 个文件`;
preview.appendChild(header);
preview.appendChild(list);
});
</script>
# 处理多个文件上传
import os
import uuid
from datetime import datetime
from werkzeug.utils import secure_filename
@app.route('/upload-multiple', methods=['GET', 'POST'])
def upload_multiple_files():
if request.method == 'POST':
# 检查是否有文件
if 'files' not in request.files:
flash('没有选择文件', 'error')
return redirect(request.url)
files = request.files.getlist('files')
category = request.form.get('category', 'other')
if len(files) == 0 or files[0].filename == '':
flash('没有选择文件', 'error')
return redirect(request.url)
# 限制最多10个文件
if len(files) > 10:
flash('一次最多上传10个文件', 'error')
return redirect(request.url)
uploaded_files = []
failed_files = []
for file in files:
# 验证每个文件
if file and allowed_file(file.filename):
try:
# 生成唯一文件名
original_name = secure_filename(file.filename)
ext = original_name.rsplit('.', 1)[1].lower() if '.' in original_name else ''
# 使用UUID防止文件名冲突
unique_filename = f"{uuid.uuid4().hex}{f'.{ext}' if ext else ''}"
# 按日期组织目录
date_folder = datetime.now().strftime('%Y/%m/%d')
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, date_folder)
os.makedirs(upload_dir, exist_ok=True)
# 保存文件
file_path = os.path.join(upload_dir, unique_filename)
file.save(file_path)
# 记录文件信息
uploaded_files.append({
'original_name': original_name,
'saved_name': unique_filename,
'size': os.path.getsize(file_path),
'path': file_path,
'category': category
})
except Exception as e:
failed_files.append({
'name': file.filename,
'error': str(e)
})
else:
failed_files.append({
'name': file.filename,
'error': '不支持的文件类型'
})
# 显示结果
if uploaded_files:
flash(f"成功上传 {len(uploaded_files)} 个文件", 'success')
if failed_files:
flash(f"{len(failed_files)} 个文件上传失败", 'warning')
# 这里可以将文件信息保存到数据库
return redirect(url_for('upload_multiple_files'))
return render_template('multi_upload.html')
# 图片处理(需要安装Pillow库)
from PIL import Image
import io
def process_image(file_stream, filename):
"""处理上传的图片,生成缩略图"""
# 打开图片
img = Image.open(file_stream)
# 获取图片信息
img_info = {
'format': img.format,
'size': img.size,
'mode': img.mode,
'filename': filename
}
# 生成缩略图(固定尺寸)
thumb_size = (200, 200)
img.thumbnail(thumb_size, Image.Resampling.LANCZOS)
# 保存缩略图
thumb_filename = f"thumb_{secure_filename(filename)}"
thumb_path = os.path.join(app.config['UPLOAD_FOLDER'], 'thumbnails', thumb_filename)
os.makedirs(os.path.dirname(thumb_path), exist_ok=True)
# 转换为RGB模式(如果是RGBA)
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else img)
img = background
img.save(thumb_path, 'JPEG', quality=85)
return img_info, thumb_path
@app.route('/upload-image', methods=['POST'])
def upload_image():
if 'image' not in request.files:
return jsonify({'error': '没有图片文件'}), 400
file = request.files['image']
# 验证必须是图片
try:
img = Image.open(file.stream)
img.verify() # 验证图片完整性
file.stream.seek(0) # 重置流位置
except:
return jsonify({'error': '无效的图片文件'}), 400
# 处理图片
img_info, thumb_path = process_image(file.stream, file.filename)
# 保存原图
original_path = os.path.join(app.config['UPLOAD_FOLDER'], 'images', secure_filename(file.filename))
os.makedirs(os.path.dirname(original_path), exist_ok=True)
file.save(original_path)
return jsonify({
'message': '图片上传成功',
'original': img_info,
'thumbnail': thumb_path
}), 200
<!-- 图片上传带预览 -->
<div class="row">
<div class="col-md-6">
<form id="imageUploadForm" enctype="multipart/form-data">
<div class="mb-3">
<label for="imageInput" class="form-label">选择图片</label>
<input type="file" class="form-control" id="imageInput"
accept="image/*" required>
</div>
<div class="mb-3">
<label for="imageName" class="form-label">图片名称</label>
<input type="text" class="form-control" id="imageName"
placeholder="可选,默认为文件名">
</div>
<button type="submit" class="btn btn-primary">上传图片</button>
</form>
<!-- 上传进度条 -->
<div class="progress mt-3 d-none" id="progressBar">
<div class="progress-bar progress-bar-striped progress-bar-animated"
role="progressbar" style="width: 0%">0%</div>
</div>
</div>
<div class="col-md-6">
<div class="image-preview-container">
<h6>图片预览</h6>
<div class="image-wrapper" id="imagePreview">
<div class="text-center text-muted py-5">
<i class="fas fa-image fa-3x mb-3"></i>
<p>选择图片后显示预览</p>
</div>
</div>
</div>
</div>
</div>
<script>
// 图片预览
document.getElementById('imageInput').addEventListener('change', function(e) {
const file = e.target.files[0];
if (!file || !file.type.startsWith('image/')) return;
const reader = new FileReader();
reader.onload = function(e) {
const preview = document.getElementById('imagePreview');
preview.innerHTML = `
<img src="${e.target.result}"
class="img-fluid rounded"
alt="预览"
style="max-height: 300px;">
<div class="mt-2 text-center">
<small class="text-muted">
${file.name} (${(file.size / 1024).toFixed(2)} KB)
</small>
</div>
`;
};
reader.readAsDataURL(file);
});
// AJAX上传
document.getElementById('imageUploadForm').addEventListener('submit', function(e) {
e.preventDefault();
const formData = new FormData();
const fileInput = document.getElementById('imageInput');
const nameInput = document.getElementById('imageName');
formData.append('image', fileInput.files[0]);
if (nameInput.value) {
formData.append('name', nameInput.value);
}
const progressBar = document.getElementById('progressBar');
progressBar.classList.remove('d-none');
const xhr = new XMLHttpRequest();
// 上传进度
xhr.upload.addEventListener('progress', function(e) {
if (e.lengthComputable) {
const percent = Math.round((e.loaded / e.total) * 100);
progressBar.querySelector('.progress-bar').style.width = percent + '%';
progressBar.querySelector('.progress-bar').textContent = percent + '%';
}
});
// 完成处理
xhr.onload = function() {
if (xhr.status === 200) {
const response = JSON.parse(xhr.responseText);
alert('上传成功: ' + response.message);
// 重置表单
document.getElementById('imageUploadForm').reset();
document.getElementById('imagePreview').innerHTML = `
<div class="text-center text-muted py-5">
<i class="fas fa-check-circle fa-3x mb-3 text-success"></i>
<p>上传成功!</p>
</div>
`;
} else {
const error = JSON.parse(xhr.responseText);
alert('上传失败: ' + error.error);
}
progressBar.classList.add('d-none');
};
xhr.open('POST', '/upload-image');
xhr.send(formData);
});
</script>
| 配置项 | 说明 | 示例 |
|---|---|---|
MAX_CONTENT_LENGTH |
请求最大字节数(包括文件和其他表单数据) | 16 * 1024 * 1024 # 16MB |
UPLOAD_FOLDER |
上传文件存储目录 | 'static/uploads' |
ALLOWED_EXTENSIONS |
允许的文件扩展名集合 | {'jpg', 'png', 'pdf', 'doc'} |
MAX_FILE_SIZE |
单个文件最大大小(需要自定义验证) | 10 * 1024 * 1024 # 10MB |
# file_manager.py
import os
import json
import shutil
from datetime import datetime
from werkzeug.utils import secure_filename
from flask import Flask, request, jsonify, send_from_directory
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config.update(
SECRET_KEY='your-secret-key',
SQLALCHEMY_DATABASE_URI='sqlite:///files.db',
SQLALCHEMY_TRACK_MODIFICATIONS=False,
UPLOAD_FOLDER='uploads',
MAX_CONTENT_LENGTH=50 * 1024 * 1024 # 50MB
)
db = SQLAlchemy(app)
# 文件模型
class File(db.Model):
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False)
original_name = db.Column(db.String(255), nullable=False)
file_size = db.Column(db.Integer, nullable=False)
file_type = db.Column(db.String(50), nullable=False)
upload_date = db.Column(db.DateTime, default=datetime.utcnow)
description = db.Column(db.Text)
user_id = db.Column(db.Integer) # 关联用户
category = db.Column(db.String(50))
is_public = db.Column(db.Boolean, default=False)
def to_dict(self):
return {
'id': self.id,
'filename': self.filename,
'original_name': self.original_name,
'file_size': self.file_size,
'file_type': self.file_type,
'upload_date': self.upload_date.isoformat(),
'description': self.description,
'category': self.category,
'is_public': self.is_public,
'download_url': f"/download/{self.id}",
'view_url': f"/view/{self.id}" if self.file_type.startswith('image/') else None
}
# 创建数据库表
with app.app_context():
db.create_all()
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
class FileManager:
"""文件管理器类"""
@staticmethod
def save_file(file, user_id=None, category='general'):
"""保存文件并记录到数据库"""
if not file or file.filename == '':
raise ValueError("无效的文件")
# 验证文件
if not FileManager.is_allowed_file(file.filename):
raise ValueError("不支持的文件类型")
# 安全文件名
original_name = secure_filename(file.filename)
file_ext = original_name.rsplit('.', 1)[1].lower() if '.' in original_name else ''
# 生成唯一文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
unique_name = f"{timestamp}_{os.urandom(4).hex()}.{file_ext}"
# 按日期组织目录
date_path = datetime.now().strftime('%Y/%m/%d')
save_dir = os.path.join(app.config['UPLOAD_FOLDER'], date_path)
os.makedirs(save_dir, exist_ok=True)
# 保存文件
save_path = os.path.join(save_dir, unique_name)
file.save(save_path)
# 获取文件信息
file_size = os.path.getsize(save_path)
# 保存到数据库
file_record = File(
filename=unique_name,
original_name=original_name,
file_size=file_size,
file_type=file.mimetype if hasattr(file, 'mimetype') else FileManager.get_mime_type(file_ext),
description=request.form.get('description', ''),
user_id=user_id,
category=category,
is_public=request.form.get('is_public', 'false').lower() == 'true'
)
db.session.add(file_record)
db.session.commit()
return file_record
@staticmethod
def is_allowed_file(filename):
"""检查文件类型是否允许"""
allowed_extensions = {
'jpg', 'jpeg', 'png', 'gif', 'bmp', # 图片
'pdf', 'doc', 'docx', 'xls', 'xlsx', # 文档
'txt', 'csv', 'json', 'xml', # 文本
'zip', 'rar', '7z' # 压缩文件
}
if '.' not in filename:
return False
ext = filename.rsplit('.', 1)[1].lower()
return ext in allowed_extensions
@staticmethod
def get_mime_type(extension):
"""根据扩展名获取MIME类型"""
mime_types = {
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'pdf': 'application/pdf',
'doc': 'application/msword',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'zip': 'application/zip',
'txt': 'text/plain'
}
return mime_types.get(extension.lower(), 'application/octet-stream')
@staticmethod
def delete_file(file_id, user_id=None):
"""删除文件"""
file_record = File.query.get_or_404(file_id)
# 检查权限
if user_id and file_record.user_id != user_id:
raise PermissionError("没有删除权限")
# 构建文件路径
upload_date = file_record.upload_date
date_path = upload_date.strftime('%Y/%m/%d')
file_path = os.path.join(app.config['UPLOAD_FOLDER'], date_path, file_record.filename)
# 删除物理文件
if os.path.exists(file_path):
os.remove(file_path)
# 删除数据库记录
db.session.delete(file_record)
db.session.commit()
return True
# API路由
@app.route('/api/files/upload', methods=['POST'])
def upload_file_api():
"""上传文件API"""
try:
if 'file' not in request.files:
return jsonify({'error': '没有文件'}), 400
file = request.files['file']
user_id = request.form.get('user_id')
category = request.form.get('category', 'general')
file_record = FileManager.save_file(file, user_id, category)
return jsonify({
'success': True,
'message': '文件上传成功',
'file': file_record.to_dict()
}), 201
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
return jsonify({'error': '服务器错误', 'details': str(e)}), 500
@app.route('/api/files', methods=['GET'])
def list_files_api():
"""获取文件列表"""
try:
# 查询参数
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
category = request.args.get('category')
user_id = request.args.get('user_id')
# 构建查询
query = File.query
if category:
query = query.filter_by(category=category)
if user_id:
query = query.filter_by(user_id=user_id)
# 分页
pagination = query.order_by(File.upload_date.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
files = [file.to_dict() for file in pagination.items]
return jsonify({
'success': True,
'files': files,
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total': pagination.total,
'pages': pagination.pages
}
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/download/<int:file_id>')
def download_file(file_id):
"""下载文件"""
file_record = File.query.get_or_404(file_id)
# 构建文件路径
upload_date = file_record.upload_date
date_path = upload_date.strftime('%Y/%m/%d')
directory = os.path.join(app.config['UPLOAD_FOLDER'], date_path)
return send_from_directory(
directory,
file_record.filename,
as_attachment=True,
download_name=file_record.original_name
)
@app.route('/api/files/<int:file_id>', methods=['DELETE'])
def delete_file_api(file_id):
"""删除文件API"""
try:
user_id = request.args.get('user_id')
FileManager.delete_file(file_id, user_id)
return jsonify({
'success': True,
'message': '文件删除成功'
}), 200
except PermissionError as e:
return jsonify({'error': str(e)}), 403
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
这是因为文件大小超过了MAX_CONTENT_LENGTH限制。解决方案:
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024<input type="file" accept=".jpg,.png" data-max-size="10485760">推荐的方法:
import uuid
import os
from datetime import datetime
def generate_unique_filename(original_filename):
# 获取扩展名
ext = original_filename.rsplit('.', 1)[1].lower() if '.' in original_filename else ''
# 生成唯一文件名
unique_id = uuid.uuid4().hex
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if ext:
return f"{timestamp}_{unique_id[:8]}.{ext}"
else:
return f"{timestamp}_{unique_id[:8]}"
# 或者按日期组织目录
def organize_by_date(filename):
date_folder = datetime.now().strftime('%Y/%m/%d')
return os.path.join(date_folder, filename)
断点续传实现方案:
可以使用以下库:
推荐使用云存储(AWS S3、阿里云OSS等):
# 使用boto3上传到AWS S3
import boto3
from flask import current_app
def upload_to_s3(file_stream, filename, bucket_name):
s3_client = boto3.client(
's3',
aws_access_key_id=current_app.config['AWS_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AWS_SECRET_KEY']
)
try:
s3_client.upload_fileobj(
file_stream,
bucket_name,
filename,
ExtraArgs={
'ContentType': 'application/octet-stream',
'ACL': 'private' # 或 'public-read'
}
)
# 生成预签名URL(有时间限制)
url = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': filename},
ExpiresIn=3600 # 1小时有效
)
return url
except Exception as e:
raise Exception(f"S3上传失败: {str(e)}")
# 或者使用Flask扩展
from flask_uploads import UploadSet, configure_uploads, IMAGES
from flask_aws_s3 import FlaskS3
photos = UploadSet('photos', IMAGES)
def configure_upload(app):
configure_uploads(app, photos)
s3 = FlaskS3(app)