Laravel 队列任务分发

Laravel 队列系统允许你将耗时任务(如发送邮件、处理图片、调用 API)推迟到后台异步执行,从而显著提升 Web 请求的响应速度。队列任务(Job)可以分发到多种驱动(Redis、数据库、SQS 等),并支持失败重试、延迟执行、任务链等高级特性。本章将详细讲解如何配置队列、生成任务、分发任务以及运行队列处理器。

🚀 核心优势: 将耗时操作放入队列后,用户请求可以立即返回,提升用户体验;同时队列可以在后台独立处理任务,支持失败重试和监控,保证任务最终完成。

1. 队列配置

队列配置文件位于 config/queue.php。你可以通过 .env 文件设置默认驱动。Laravel 支持以下主流驱动:

驱动说明适用场景
sync同步执行(不实际入队),用于本地测试。开发环境
database使用数据库存储队列任务,需创建 jobs 表。小型项目、无 Redis 环境
redis基于 Redis,性能高,支持阻塞弹出。生产环境(推荐)
sqsAmazon Simple Queue Service,适用于 AWS 生态。云原生应用
beanstalkd轻量级队列服务。传统部署环境

.env 配置示例(Redis)


QUEUE_CONNECTION=redis
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379
                    

使用数据库驱动前需执行迁移创建 jobs 表:


php artisan queue:table
php artisan migrate
                    

2. 创建队列任务

使用 Artisan 命令生成任务类,通常存放在 app/Jobs 目录:


php artisan make:job ProcessPodcast
                    

生成的类结构如下:


<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    public function __construct($podcast)
    {
        $this->podcast = $podcast;
    }

    public function handle()
    {
        // 执行实际处理逻辑,例如下载、转码等
    }
}
                    

关键 trait 说明:

  • Dispatchable – 提供 dispatch() 方法。
  • InteractsWithQueue – 允许与队列交互(如释放、删除任务)。
  • Queueable – 支持设置队列连接、名称等。
  • SerializesModels – 优雅序列化 Eloquent 模型,避免完整序列化。

3. 分发任务(入队)

在控制器或其他位置,可以通过任务类的 dispatch 方法将任务推送到队列:


use App\Jobs\ProcessPodcast;

// 基本分发
ProcessPodcast::dispatch($podcast);

// 延迟分发(10分钟后执行)
ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));

// 指定队列连接或名称
ProcessPodcast::dispatch($podcast)->onConnection('redis')->onQueue('processing');
                    

也可以使用辅助函数 dispatch()


dispatch(new ProcessPodcast($podcast));
                    

链式调用与任务批处理

Laravel 支持任务链(按顺序执行)和批处理(并行执行并收集结果)。


use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast($podcast),
    new OptimizePodcast($podcast),
    new SendPodcastNotification($podcast),
])->dispatch();
                    

4. 运行队列处理器

为了让队列任务真正执行,需要在服务器上启动队列工作器(worker)。开发环境可以使用:


php artisan queue:work
                    

此命令会持续运行,监听并处理新任务。常用选项:

  • --queue=high,low – 指定监听的队列名称(优先级顺序)。
  • --tries=3 – 任务最大重试次数。
  • --timeout=60 – 任务超时时间(秒)。
  • --sleep=3 – 无任务时休眠秒数。

php artisan queue:work redis --queue=high,default --tries=3 --timeout=120
                    
⚠️ 生产环境注意: 应使用进程监控工具(如 Supervisor)确保队列工作器始终运行。同时使用 queue:work 而非 queue:listen(后者性能较差)。

5. 处理失败任务

任务执行过程中如果抛出异常,Laravel 会自动将任务放回队列重试(可配置重试次数)。若超过最大重试次数仍失败,该任务会被写入 failed_jobs 表。

创建失败任务表


php artisan queue:failed-table
php artisan migrate
                    

定义失败处理方法

在任务类中添加 failed 方法:


public function failed(Throwable $exception)
{
    // 发送告警、记录日志等
    Log::error('任务失败:'.$exception->getMessage());
}
                    

常用失败任务命令


php artisan queue:failed      # 列出所有失败任务
php artisan queue:retry all   # 重试所有失败任务
php artisan queue:retry 5     # 重试 ID 为 5 的任务
php artisan queue:forget 5    # 删除失败任务记录
php artisan queue:flush       # 清空所有失败任务
                    

6. 任务中间件

任务中间件可以在任务执行前进行额外的处理,如限流、单例限制等。定义中间件:


namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
    public function handle($job, $next)
    {
        Redis::throttle('key')
             ->allow(10)->every(60)
             ->then(function () use ($job, $next) {
                 $next($job);
             }, function () use ($job) {
                 $job->release(10);
             });
    }
}
                    

在任务类的 middleware 方法中返回中间件数组:


public function middleware()
{
    return [new RateLimited];
}
                    

7. 重试与超时配置

你可以在任务类中定义最大重试次数和超时时间:


class ProcessPodcast implements ShouldQueue
{
    public $tries = 5;      // 最大重试次数
    public $timeout = 120;  // 超时秒数
}
                    

也可以动态计算重试次数:


public function retryUntil()
{
    return now()->addMinutes(10);
}
                    

8. 使用 Supervisor 守护进程

生产环境中推荐使用 Supervisor 来管理队列工作器,确保它崩溃后自动重启。示例配置文件 /etc/supervisor/conf.d/laravel-worker.conf


[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/project/artisan queue:work redis --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/path/to/your/project/storage/logs/worker.log
stopwaitsecs=3600
                    

然后运行:


sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
                    

9. 唯一任务(避免重复入队)

使用 ShouldBeUnique 接口可以确保同一时刻队列中只有一个特定任务实例:


use Illuminate\Contracts\Queue\ShouldBeUnique;

class ProcessPodcast implements ShouldQueue, ShouldBeUnique
{
    public $uniqueId;
    public $uniqueFor = 3600; // 锁保留时间(秒)

    public function __construct($podcastId)
    {
        $this->uniqueId = $podcastId;
    }
}
                    

10. 最佳实践与常见陷阱

  • 任务可重入性: 设计任务时确保其幂等(多次执行结果相同),因为可能因重试而重复执行。
  • 模型序列化: 使用 SerializesModels trait 传递 Eloquent 模型,避免序列化完整模型数据。
  • 避免在任务中注入过多依赖: 构造函数参数应尽量轻量,复杂的依赖使用服务容器解析。
  • 监控队列长度: 使用 Horizon(Redis)或第三方工具监控队列积压情况。
  • 失败告警:failed 方法中集成通知(如发送到 Slack 或邮件),及时感知异常。
  • 避免内存泄漏: 长时间运行的工作器建议使用 --max-time 选项定期重启。
💡 提示: 对于 Redis 驱动的队列,推荐使用 Laravel Horizon 来管理,它提供了美观的监控面板、优雅的重试策略和标签功能。

11. 总结

Laravel 队列系统为异步处理提供了强大而灵活的解决方案。通过正确配置驱动、定义任务类、分发任务并运行工作器,你可以轻松将耗时操作移到后台,极大提升应用性能。掌握队列的高级特性(如链式任务、唯一任务、中间件等)将帮助你构建更可靠和可扩展的系统。

📖 官方文档: Laravel QueuesHorizon 提供了更详尽的指南。