Flask文件上传

前置条件:在学习本章前,请确保已了解Flask基础、表单处理和闪现消息。

文件上传概述

文件上传是Web应用中常见的功能,Flask通过request.files对象和Werkzeug库提供了强大的文件上传支持。

基础文件上传

1. 创建文件上传表单

首先创建一个HTML表单,必须设置enctype="multipart/form-data"

<!-- templates/upload.html -->
{% raw %}{% extends "base.html" %}{% endraw %}

{% raw %}{% block content %}{% endraw %}
<div class="row justify-content-center">
    <div class="col-md-8">
        <div class="card">
            <div class="card-header">
                <h3 class="text-center">文件上传</h3>
            </div>
            <div class="card-body">
                <form method="POST" action="{{ url_for('upload_file') }}" enctype="multipart/form-data">
                    <div class="mb-3">
                        <label for="file" class="form-label">选择文件</label>
                        <input type="file" class="form-control" id="file" name="file" required>
                        <div class="form-text">支持的文件类型: JPG, PNG, PDF, DOC (最大10MB)</div>
                    </div>
                    <div class="mb-3">
                        <label for="description" class="form-label">文件描述 (可选)</label>
                        <textarea class="form-control" id="description" name="description" rows="3"></textarea>
                    </div>
                    <div class="d-grid">
                        <button type="submit" class="btn btn-primary">上传文件</button>
                    </div>
                </form>
            </div>
        </div>
    </div>
</div>
{% raw %}{% endblock %}{% endraw %}

2. Flask后端处理

# app.py - 基础文件上传
import os
from flask import Flask, render_template, request, flash, redirect, url_for
from werkzeug.utils import secure_filename

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'

# 配置上传参数
UPLOAD_FOLDER = 'uploads'  # 上传目录
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx'}

app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024  # 10MB限制

def allowed_file(filename):
    """检查文件扩展名是否允许"""
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        # 检查是否有文件部分
        if 'file' not in request.files:
            flash('没有选择文件', 'error')
            return redirect(request.url)

        file = request.files['file']
        description = request.form.get('description', '')

        # 如果用户没有选择文件,浏览器会提交一个空文件
        if file.filename == '':
            flash('没有选择文件', 'error')
            return redirect(request.url)

        # 检查文件类型
        if file and allowed_file(file.filename):
            # 安全化文件名
            filename = secure_filename(file.filename)

            # 确保上传目录存在
            os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)

            # 保存文件
            file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
            file.save(file_path)

            # 保存文件信息(这里可以保存到数据库)
            file_info = {
                'filename': filename,
                'original_name': file.filename,
                'file_size': os.path.getsize(file_path),
                'description': description,
                'upload_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }

            flash(f'文件 "{filename}" 上传成功!', 'success')
            return redirect(url_for('upload_file'))
        else:
            flash('不支持的文件类型', 'error')
            return redirect(request.url)

    return render_template('upload.html')

if __name__ == '__main__':
    app.run(debug=True)

文件验证与安全

安全注意事项

  • 文件名安全:使用secure_filename()防止路径遍历攻击
  • 文件类型验证:不要仅依赖扩展名,实际检查文件内容
  • 大小限制:使用MAX_CONTENT_LENGTH限制文件大小
  • 存储位置:不要将上传文件存储在Web可访问的目录

增强版文件验证

# 增强版文件验证
import imghdr  # 用于检查图片类型
import magic  # python-magic库,需要安装
from werkzeug.utils import secure_filename
import os

class FileValidator:
    """文件验证器类"""

    def __init__(self, allowed_extensions=None, max_size=10*1024*1024):
        self.allowed_extensions = allowed_extensions or {'jpg', 'jpeg', 'png', 'gif', 'pdf'}
        self.max_size = max_size

    def validate_file(self, file_stream, filename):
        """综合验证文件"""
        errors = []

        # 1. 检查文件名
        if not self._is_safe_filename(filename):
            errors.append("文件名不安全")

        # 2. 检查文件大小
        if not self._validate_size(file_stream):
            errors.append(f"文件大小超过限制 ({self.max_size/1024/1024}MB)")

        # 3. 检查扩展名
        if not self._validate_extension(filename):
            errors.append(f"不支持的文件扩展名,支持: {', '.join(self.allowed_extensions)}")

        # 4. 检查文件内容类型(更安全)
        if not self._validate_content_type(file_stream, filename):
            errors.append("文件内容类型与扩展名不匹配")

        return errors

    def _is_safe_filename(self, filename):
        """检查文件名是否安全"""
        safe_name = secure_filename(filename)
        return safe_name == filename and '..' not in filename

    def _validate_size(self, file_stream):
        """验证文件大小"""
        # 保存当前位置
        current_pos = file_stream.tell()
        # 移动到文件末尾
        file_stream.seek(0, 2)
        size = file_stream.tell()
        # 回到原始位置
        file_stream.seek(current_pos)
        return size <= self.max_size

    def _validate_extension(self, filename):
        """验证文件扩展名"""
        if '.' not in filename:
            return False
        ext = filename.rsplit('.', 1)[1].lower()
        return ext in self.allowed_extensions

    def _validate_content_type(self, file_stream, filename):
        """验证文件实际内容类型"""
        # 对于图片,使用imghdr检查
        if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
            current_pos = file_stream.tell()
            file_type = imghdr.what(file_stream)
            file_stream.seek(current_pos)
            if not file_type:
                return False

        # 使用python-magic进行更严格的验证
        try:
            import magic
            current_pos = file_stream.tell()
            mime = magic.from_buffer(file_stream.read(2048), mime=True)
            file_stream.seek(current_pos)

            # 检查MIME类型是否与扩展名匹配
            expected_mimes = {
                'jpg': 'image/jpeg',
                'jpeg': 'image/jpeg',
                'png': 'image/png',
                'gif': 'image/gif',
                'pdf': 'application/pdf',
                'txt': 'text/plain',
            }
            ext = filename.rsplit('.', 1)[1].lower()
            if ext in expected_mimes and mime != expected_mimes.get(ext):
                return False
        except ImportError:
            # 如果没有安装python-magic,跳过这部分检查
            pass

        return True

# 在Flask应用中使用
@app.route('/upload-secure', methods=['POST'])
def upload_file_secure():
    if 'file' not in request.files:
        return jsonify({'error': '没有文件'}), 400

    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': '没有选择文件'}), 400

    # 创建验证器
    validator = FileValidator(
        allowed_extensions={'jpg', 'jpeg', 'png', 'gif', 'pdf', 'txt'},
        max_size=5*1024*1024  # 5MB
    )

    # 验证文件
    errors = validator.validate_file(file.stream, file.filename)

    if errors:
        return jsonify({'error': '; '.join(errors)}), 400

    # 保存文件
    filename = secure_filename(file.filename)
    file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))

    return jsonify({'message': '文件上传成功', 'filename': filename}), 200

多文件上传

1. 多文件上传表单

<!-- templates/multi_upload.html -->
<form method="POST" action="{{ url_for('upload_multiple_files') }}" enctype="multipart/form-data">
    <div class="mb-3">
        <label for="files" class="form-label">选择多个文件</label>
        <input type="file" class="form-control" id="files" name="files" multiple required>
        <div class="form-text">按住Ctrl键可以选择多个文件</div>
    </div>
    <div class="mb-3">
        <label for="category" class="form-label">分类</label>
        <select class="form-control" id="category" name="category">
            <option value="images">图片</option>
            <option value="documents">文档</option>
            <option value="other">其他</option>
        </select>
    </div>
    <div class="d-grid">
        <button type="submit" class="btn btn-primary">上传多个文件</button>
    </div>
</form>

<!-- 显示已选择文件的预览 -->
<div id="file-preview" class="mt-3"></div>

<script>
document.getElementById('files').addEventListener('change', function(e) {
    const preview = document.getElementById('file-preview');
    preview.innerHTML = '';

    const files = e.target.files;
    if (files.length === 0) return;

    const list = document.createElement('div');
    list.className = 'list-group';

    for (let i = 0; i < files.length; i++) {
        const file = files[i];
        const item = document.createElement('div');
        item.className = 'list-group-item d-flex justify-content-between align-items-center';
        item.innerHTML = `
            <div>
                <i class="fas fa-file me-2"></i>
                ${file.name}
                <small class="text-muted ms-2">(${(file.size / 1024).toFixed(2)} KB)</small>
            </div>
            <span class="badge bg-secondary">${file.type || '未知类型'}</span>
        `;
        list.appendChild(item);
    }

    const header = document.createElement('h6');
    header.className = 'mt-3';
    header.textContent = `已选择 ${files.length} 个文件`;

    preview.appendChild(header);
    preview.appendChild(list);
});
</script>

2. 处理多个文件

# 处理多个文件上传
import os
import uuid
from datetime import datetime
from werkzeug.utils import secure_filename

@app.route('/upload-multiple', methods=['GET', 'POST'])
def upload_multiple_files():
    if request.method == 'POST':
        # 检查是否有文件
        if 'files' not in request.files:
            flash('没有选择文件', 'error')
            return redirect(request.url)

        files = request.files.getlist('files')
        category = request.form.get('category', 'other')

        if len(files) == 0 or files[0].filename == '':
            flash('没有选择文件', 'error')
            return redirect(request.url)

        # 限制最多10个文件
        if len(files) > 10:
            flash('一次最多上传10个文件', 'error')
            return redirect(request.url)

        uploaded_files = []
        failed_files = []

        for file in files:
            # 验证每个文件
            if file and allowed_file(file.filename):
                try:
                    # 生成唯一文件名
                    original_name = secure_filename(file.filename)
                    ext = original_name.rsplit('.', 1)[1].lower() if '.' in original_name else ''

                    # 使用UUID防止文件名冲突
                    unique_filename = f"{uuid.uuid4().hex}{f'.{ext}' if ext else ''}"

                    # 按日期组织目录
                    date_folder = datetime.now().strftime('%Y/%m/%d')
                    upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, date_folder)
                    os.makedirs(upload_dir, exist_ok=True)

                    # 保存文件
                    file_path = os.path.join(upload_dir, unique_filename)
                    file.save(file_path)

                    # 记录文件信息
                    uploaded_files.append({
                        'original_name': original_name,
                        'saved_name': unique_filename,
                        'size': os.path.getsize(file_path),
                        'path': file_path,
                        'category': category
                    })

                except Exception as e:
                    failed_files.append({
                        'name': file.filename,
                        'error': str(e)
                    })
            else:
                failed_files.append({
                    'name': file.filename,
                    'error': '不支持的文件类型'
                })

        # 显示结果
        if uploaded_files:
            flash(f"成功上传 {len(uploaded_files)} 个文件", 'success')
        if failed_files:
            flash(f"{len(failed_files)} 个文件上传失败", 'warning')

        # 这里可以将文件信息保存到数据库
        return redirect(url_for('upload_multiple_files'))

    return render_template('multi_upload.html')

图片上传与处理

1. 图片上传和缩略图生成

# 图片处理(需要安装Pillow库)
from PIL import Image
import io

def process_image(file_stream, filename):
    """处理上传的图片,生成缩略图"""

    # 打开图片
    img = Image.open(file_stream)

    # 获取图片信息
    img_info = {
        'format': img.format,
        'size': img.size,
        'mode': img.mode,
        'filename': filename
    }

    # 生成缩略图(固定尺寸)
    thumb_size = (200, 200)
    img.thumbnail(thumb_size, Image.Resampling.LANCZOS)

    # 保存缩略图
    thumb_filename = f"thumb_{secure_filename(filename)}"
    thumb_path = os.path.join(app.config['UPLOAD_FOLDER'], 'thumbnails', thumb_filename)
    os.makedirs(os.path.dirname(thumb_path), exist_ok=True)

    # 转换为RGB模式(如果是RGBA)
    if img.mode in ('RGBA', 'LA'):
        background = Image.new('RGB', img.size, (255, 255, 255))
        background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else img)
        img = background

    img.save(thumb_path, 'JPEG', quality=85)

    return img_info, thumb_path

@app.route('/upload-image', methods=['POST'])
def upload_image():
    if 'image' not in request.files:
        return jsonify({'error': '没有图片文件'}), 400

    file = request.files['image']

    # 验证必须是图片
    try:
        img = Image.open(file.stream)
        img.verify()  # 验证图片完整性
        file.stream.seek(0)  # 重置流位置
    except:
        return jsonify({'error': '无效的图片文件'}), 400

    # 处理图片
    img_info, thumb_path = process_image(file.stream, file.filename)

    # 保存原图
    original_path = os.path.join(app.config['UPLOAD_FOLDER'], 'images', secure_filename(file.filename))
    os.makedirs(os.path.dirname(original_path), exist_ok=True)
    file.save(original_path)

    return jsonify({
        'message': '图片上传成功',
        'original': img_info,
        'thumbnail': thumb_path
    }), 200

2. 前端图片预览

<!-- 图片上传带预览 -->
<div class="row">
    <div class="col-md-6">
        <form id="imageUploadForm" enctype="multipart/form-data">
            <div class="mb-3">
                <label for="imageInput" class="form-label">选择图片</label>
                <input type="file" class="form-control" id="imageInput"
                       accept="image/*" required>
            </div>
            <div class="mb-3">
                <label for="imageName" class="form-label">图片名称</label>
                <input type="text" class="form-control" id="imageName"
                       placeholder="可选,默认为文件名">
            </div>
            <button type="submit" class="btn btn-primary">上传图片</button>
        </form>

        <!-- 上传进度条 -->
        <div class="progress mt-3 d-none" id="progressBar">
            <div class="progress-bar progress-bar-striped progress-bar-animated"
                 role="progressbar" style="width: 0%">0%</div>
        </div>
    </div>

    <div class="col-md-6">
        <div class="image-preview-container">
            <h6>图片预览</h6>
            <div class="image-wrapper" id="imagePreview">
                <div class="text-center text-muted py-5">
                    <i class="fas fa-image fa-3x mb-3"></i>
                    <p>选择图片后显示预览</p>
                </div>
            </div>
        </div>
    </div>
</div>

<script>
// 图片预览
document.getElementById('imageInput').addEventListener('change', function(e) {
    const file = e.target.files[0];
    if (!file || !file.type.startsWith('image/')) return;

    const reader = new FileReader();
    reader.onload = function(e) {
        const preview = document.getElementById('imagePreview');
        preview.innerHTML = `
            <img src="${e.target.result}"
                 class="img-fluid rounded"
                 alt="预览"
                 style="max-height: 300px;">
            <div class="mt-2 text-center">
                <small class="text-muted">
                    ${file.name} (${(file.size / 1024).toFixed(2)} KB)
                </small>
            </div>
        `;
    };
    reader.readAsDataURL(file);
});

// AJAX上传
document.getElementById('imageUploadForm').addEventListener('submit', function(e) {
    e.preventDefault();

    const formData = new FormData();
    const fileInput = document.getElementById('imageInput');
    const nameInput = document.getElementById('imageName');

    formData.append('image', fileInput.files[0]);
    if (nameInput.value) {
        formData.append('name', nameInput.value);
    }

    const progressBar = document.getElementById('progressBar');
    progressBar.classList.remove('d-none');

    const xhr = new XMLHttpRequest();

    // 上传进度
    xhr.upload.addEventListener('progress', function(e) {
        if (e.lengthComputable) {
            const percent = Math.round((e.loaded / e.total) * 100);
            progressBar.querySelector('.progress-bar').style.width = percent + '%';
            progressBar.querySelector('.progress-bar').textContent = percent + '%';
        }
    });

    // 完成处理
    xhr.onload = function() {
        if (xhr.status === 200) {
            const response = JSON.parse(xhr.responseText);
            alert('上传成功: ' + response.message);
            // 重置表单
            document.getElementById('imageUploadForm').reset();
            document.getElementById('imagePreview').innerHTML = `
                <div class="text-center text-muted py-5">
                    <i class="fas fa-check-circle fa-3x mb-3 text-success"></i>
                    <p>上传成功!</p>
                </div>
            `;
        } else {
            const error = JSON.parse(xhr.responseText);
            alert('上传失败: ' + error.error);
        }
        progressBar.classList.add('d-none');
    };

    xhr.open('POST', '/upload-image');
    xhr.send(formData);
});
</script>

文件上传配置

配置项 说明 示例
MAX_CONTENT_LENGTH 请求最大字节数(包括文件和其他表单数据) 16 * 1024 * 1024 # 16MB
UPLOAD_FOLDER 上传文件存储目录 'static/uploads'
ALLOWED_EXTENSIONS 允许的文件扩展名集合 {'jpg', 'png', 'pdf', 'doc'}
MAX_FILE_SIZE 单个文件最大大小(需要自定义验证) 10 * 1024 * 1024 # 10MB

完整示例:文件管理系统

完整文件管理API

# file_manager.py
import os
import json
import shutil
from datetime import datetime
from werkzeug.utils import secure_filename
from flask import Flask, request, jsonify, send_from_directory
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config.update(
    SECRET_KEY='your-secret-key',
    SQLALCHEMY_DATABASE_URI='sqlite:///files.db',
    SQLALCHEMY_TRACK_MODIFICATIONS=False,
    UPLOAD_FOLDER='uploads',
    MAX_CONTENT_LENGTH=50 * 1024 * 1024  # 50MB
)

db = SQLAlchemy(app)

# 文件模型
class File(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    filename = db.Column(db.String(255), nullable=False)
    original_name = db.Column(db.String(255), nullable=False)
    file_size = db.Column(db.Integer, nullable=False)
    file_type = db.Column(db.String(50), nullable=False)
    upload_date = db.Column(db.DateTime, default=datetime.utcnow)
    description = db.Column(db.Text)
    user_id = db.Column(db.Integer)  # 关联用户
    category = db.Column(db.String(50))
    is_public = db.Column(db.Boolean, default=False)

    def to_dict(self):
        return {
            'id': self.id,
            'filename': self.filename,
            'original_name': self.original_name,
            'file_size': self.file_size,
            'file_type': self.file_type,
            'upload_date': self.upload_date.isoformat(),
            'description': self.description,
            'category': self.category,
            'is_public': self.is_public,
            'download_url': f"/download/{self.id}",
            'view_url': f"/view/{self.id}" if self.file_type.startswith('image/') else None
        }

# 创建数据库表
with app.app_context():
    db.create_all()
    os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)

class FileManager:
    """文件管理器类"""

    @staticmethod
    def save_file(file, user_id=None, category='general'):
        """保存文件并记录到数据库"""

        if not file or file.filename == '':
            raise ValueError("无效的文件")

        # 验证文件
        if not FileManager.is_allowed_file(file.filename):
            raise ValueError("不支持的文件类型")

        # 安全文件名
        original_name = secure_filename(file.filename)
        file_ext = original_name.rsplit('.', 1)[1].lower() if '.' in original_name else ''

        # 生成唯一文件名
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        unique_name = f"{timestamp}_{os.urandom(4).hex()}.{file_ext}"

        # 按日期组织目录
        date_path = datetime.now().strftime('%Y/%m/%d')
        save_dir = os.path.join(app.config['UPLOAD_FOLDER'], date_path)
        os.makedirs(save_dir, exist_ok=True)

        # 保存文件
        save_path = os.path.join(save_dir, unique_name)
        file.save(save_path)

        # 获取文件信息
        file_size = os.path.getsize(save_path)

        # 保存到数据库
        file_record = File(
            filename=unique_name,
            original_name=original_name,
            file_size=file_size,
            file_type=file.mimetype if hasattr(file, 'mimetype') else FileManager.get_mime_type(file_ext),
            description=request.form.get('description', ''),
            user_id=user_id,
            category=category,
            is_public=request.form.get('is_public', 'false').lower() == 'true'
        )

        db.session.add(file_record)
        db.session.commit()

        return file_record

    @staticmethod
    def is_allowed_file(filename):
        """检查文件类型是否允许"""
        allowed_extensions = {
            'jpg', 'jpeg', 'png', 'gif', 'bmp',  # 图片
            'pdf', 'doc', 'docx', 'xls', 'xlsx',  # 文档
            'txt', 'csv', 'json', 'xml',          # 文本
            'zip', 'rar', '7z'                    # 压缩文件
        }

        if '.' not in filename:
            return False

        ext = filename.rsplit('.', 1)[1].lower()
        return ext in allowed_extensions

    @staticmethod
    def get_mime_type(extension):
        """根据扩展名获取MIME类型"""
        mime_types = {
            'jpg': 'image/jpeg',
            'jpeg': 'image/jpeg',
            'png': 'image/png',
            'gif': 'image/gif',
            'pdf': 'application/pdf',
            'doc': 'application/msword',
            'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
            'zip': 'application/zip',
            'txt': 'text/plain'
        }
        return mime_types.get(extension.lower(), 'application/octet-stream')

    @staticmethod
    def delete_file(file_id, user_id=None):
        """删除文件"""
        file_record = File.query.get_or_404(file_id)

        # 检查权限
        if user_id and file_record.user_id != user_id:
            raise PermissionError("没有删除权限")

        # 构建文件路径
        upload_date = file_record.upload_date
        date_path = upload_date.strftime('%Y/%m/%d')
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], date_path, file_record.filename)

        # 删除物理文件
        if os.path.exists(file_path):
            os.remove(file_path)

        # 删除数据库记录
        db.session.delete(file_record)
        db.session.commit()

        return True

# API路由
@app.route('/api/files/upload', methods=['POST'])
def upload_file_api():
    """上传文件API"""
    try:
        if 'file' not in request.files:
            return jsonify({'error': '没有文件'}), 400

        file = request.files['file']
        user_id = request.form.get('user_id')
        category = request.form.get('category', 'general')

        file_record = FileManager.save_file(file, user_id, category)

        return jsonify({
            'success': True,
            'message': '文件上传成功',
            'file': file_record.to_dict()
        }), 201

    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        return jsonify({'error': '服务器错误', 'details': str(e)}), 500

@app.route('/api/files', methods=['GET'])
def list_files_api():
    """获取文件列表"""
    try:
        # 查询参数
        page = int(request.args.get('page', 1))
        per_page = int(request.args.get('per_page', 20))
        category = request.args.get('category')
        user_id = request.args.get('user_id')

        # 构建查询
        query = File.query

        if category:
            query = query.filter_by(category=category)
        if user_id:
            query = query.filter_by(user_id=user_id)

        # 分页
        pagination = query.order_by(File.upload_date.desc()).paginate(
            page=page, per_page=per_page, error_out=False
        )

        files = [file.to_dict() for file in pagination.items]

        return jsonify({
            'success': True,
            'files': files,
            'pagination': {
                'page': pagination.page,
                'per_page': pagination.per_page,
                'total': pagination.total,
                'pages': pagination.pages
            }
        }), 200

    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/download/<int:file_id>')
def download_file(file_id):
    """下载文件"""
    file_record = File.query.get_or_404(file_id)

    # 构建文件路径
    upload_date = file_record.upload_date
    date_path = upload_date.strftime('%Y/%m/%d')
    directory = os.path.join(app.config['UPLOAD_FOLDER'], date_path)

    return send_from_directory(
        directory,
        file_record.filename,
        as_attachment=True,
        download_name=file_record.original_name
    )

@app.route('/api/files/<int:file_id>', methods=['DELETE'])
def delete_file_api(file_id):
    """删除文件API"""
    try:
        user_id = request.args.get('user_id')
        FileManager.delete_file(file_id, user_id)

        return jsonify({
            'success': True,
            'message': '文件删除成功'
        }), 200

    except PermissionError as e:
        return jsonify({'error': str(e)}), 403
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

性能优化建议

前端优化
  • 使用分片上传大文件
  • 前端压缩图片(使用canvas)
  • 显示上传进度条
  • 限制同时上传文件数量
  • 提供取消上传功能
后端优化
  • 使用异步处理(Celery)
  • CDN存储静态文件
  • 使用云存储(S3、OSS等)
  • 数据库索引优化
  • 定期清理临时文件

常见问题

这是因为文件大小超过了MAX_CONTENT_LENGTH限制。解决方案:

  1. 增加限制:app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
  2. 前端限制:<input type="file" accept=".jpg,.png" data-max-size="10485760">
  3. 使用分片上传大文件

推荐的方法:

import uuid
import os
from datetime import datetime

def generate_unique_filename(original_filename):
    # 获取扩展名
    ext = original_filename.rsplit('.', 1)[1].lower() if '.' in original_filename else ''

    # 生成唯一文件名
    unique_id = uuid.uuid4().hex
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    if ext:
        return f"{timestamp}_{unique_id[:8]}.{ext}"
    else:
        return f"{timestamp}_{unique_id[:8]}"

# 或者按日期组织目录
def organize_by_date(filename):
    date_folder = datetime.now().strftime('%Y/%m/%d')
    return os.path.join(date_folder, filename)

断点续传实现方案:

  1. 前端将大文件分片(如1MB每片)
  2. 每个分片单独上传,记录上传进度
  3. 后端接收分片并临时保存
  4. 所有分片上传完成后合并
  5. 提供API查询上传进度

可以使用以下库:

  • 前端:Resumable.js、Plupload
  • 后端:自定义分片处理逻辑

推荐使用云存储(AWS S3、阿里云OSS等):

# 使用boto3上传到AWS S3
import boto3
from flask import current_app

def upload_to_s3(file_stream, filename, bucket_name):
    s3_client = boto3.client(
        's3',
        aws_access_key_id=current_app.config['AWS_ACCESS_KEY'],
        aws_secret_access_key=current_app.config['AWS_SECRET_KEY']
    )

    try:
        s3_client.upload_fileobj(
            file_stream,
            bucket_name,
            filename,
            ExtraArgs={
                'ContentType': 'application/octet-stream',
                'ACL': 'private'  # 或 'public-read'
            }
        )

        # 生成预签名URL(有时间限制)
        url = s3_client.generate_presigned_url(
            'get_object',
            Params={'Bucket': bucket_name, 'Key': filename},
            ExpiresIn=3600  # 1小时有效
        )

        return url

    except Exception as e:
        raise Exception(f"S3上传失败: {str(e)}")

# 或者使用Flask扩展
from flask_uploads import UploadSet, configure_uploads, IMAGES
from flask_aws_s3 import FlaskS3

photos = UploadSet('photos', IMAGES)

def configure_upload(app):
    configure_uploads(app, photos)
    s3 = FlaskS3(app)

最佳实践总结

  1. 安全第一:始终验证文件类型和大小
  2. 错误处理:提供清晰的错误反馈
  3. 用户体验:显示上传进度和预览
  4. 性能优化:大文件使用分片上传
  5. 存储策略:考虑使用云存储服务
  6. 备份恢复:定期备份重要文件