Laravel 的队列系统介绍
简介
Laravel 现在为你的 Redis 队列 提供了 Horizon,一个漂亮的仪表盘和配置系统。查看完整的 Horizon 文档 了解更多信息。
Laravel 队列为不同的后台队列服务提供统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列。队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和相应的时间。
队列配置文件存放在 config/queue.php
。每一种队列驱动的配置都可以在该文件中找到,包括数据库,Beanstalkd ,Amazon SQS,Redis,以及同步(本地使用)驱动。其中还包含了一个 null
队列驱动用于那些放弃队列的任务。
连接 Vs. 队列
在开始使用 Laravel 队列前,弄明白 「连接」 和 「队列」 的区别是很重要的。在你的 config/queue.php
配置文件里,有一个 connections
配置选项。这个选项给 Amazon SQS,Beanstalk,或者 Redis 这样的后端服务定义了一个特有的连接。不管是哪一种,一个给定的连接可能会有多个 「队列」,而 「队列」 可以被认为是不同的栈或者大量的队列任务。
要注意的是,queue
配置文件中每个连接的配置示例中都包含一个 queue
属性。这是默认队列任务被发给指定连接的时候会被分发到这个队列中。换句话说,如果你分发任务的时候没有显式定义队列,那么它就会被放到连接配置中 queue
属性所定义的队列中:
// 这个任务将被分发到默认队列...
Job::dispatch();
// 这个任务将被发送到「emails」队列...
Job::dispatch()->onQueue('emails');
有些应用可能不需要把任务发到不同的队列,而只发到一个简单的队列中就行了。但是把任务推到不同的队列仍然是非常有用的,因为 Laravel 队列处理器允许你定义队列的优先级,所以你能给不同的队列划分不同的优先级或者区分不同任务的不同处理方式了。比如说,如果你把任务推到 high
队列中,你就能让队列处理器优先处理这些任务了:
php artisan queue:work --queue=high,default
驱动的必要设置
数据库
要使用 database
这个队列驱动的话,你需要创建一个数据表来存储任务。你可以用 queue:table
这个 Artisan 命令来创建这个数据表的迁移。当迁移创建好以后,就可以用 migrate
这条命令来创建数据表:
php artisan queue:table
php artisan migrate
Redis
为了使用 redis
队列驱动,你需要在你的配置文件 config/database.php
中配置Redis的数据库连接。
如果你的 Redis 队列连接使用的是 Redis 集群,你的队列名称必须包含 key hash tag。这是为了确保所有的 Redis 键对于一个给定的队列都置于同一哈希中:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
其它队列驱动的依赖扩展包
在使用列表里的队列服务前,必须安装以下依赖扩展包:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~3.0
- Redis:
predis/predis ~1.0
创建任务
生成任务类
在你的应用程序中,队列的任务类都默认放在 app/Jobs
目录下 。如果这个目录不存在,那当你运行 make:job
Artisan 命令时目录就会被自动创建。你可以用以下的 Artisan 命令来生成一个新的队列任务:
php artisan make:job SendReminderEmail
生成的类实现了 Illuminate\Contracts\Queue\ShouldQueue
接口,这意味着这个任务将会被推送到队列中,而不是同步执行。
任务类结构
任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle
方法。我们来看一个示例的任务类。这个示例里,假设我们管理着一个播客发布服务,在发布之前需要处理上传播客文件:
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的任务实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 运行任务。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}
}
注意,在这个例子中,我们在任务类的构造器中直接传递了一个 Eloquent 模型。因为我们在任务类里引用了 SerializesModels
这个 trait,使得 Eloquent 模型在处理任务时可以被优雅地序列化和反序列化。如果你的队列任务类在构造器中接收了一个 Eloquent 模型,那么只有可识别出该模型的属性会被序列化到队列里。当任务被实际运行时,队列系统便会自动从数据库中重新取回完整的模型。这整个过程对你的应用程序来说是完全透明的,这样可以避免在序列化完整的 Eloquent 模式实例时所带来的一些问题。
在队列处理任务时,会调用 handle
方法 ,而这里我们也可以通过 handle
方法的参数类型提示,让 Laravel 的 服务容器 自动注入依赖对象。
像图片内容这种二进制数据,在放入队列任务之前必须使用 base64_encode
方法转换一下。否则,当这项任务放置到队列中时,可能无法正确序列化为 JSON。
分发任务
你写好任务类后,就能通过 dispatch
辅助函数来分发它了。唯一需要传递给 dispatch
的参数是这个任务类的实例:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast);
}
}
延迟分发
如果你想延迟执行一个队列中的任务,你可以用任务实例的 delay
方法。例如,我们指定一个任务在分配后 10 分钟内不可被处理:
<?php
namespace App\Http\Controllers;
use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)
->delay(Carbon::now()->addMinutes(10));
}
}
Amazon SQS 队列服务最大延迟 15 分钟 。
工作链
工作链允许你指定应该按顺序运行的队列列表。如果一个任务失败了,则其余任务将不会运行。你可以在分发任务的时候使用 withChain
方法来执行具有工作链的队列任务。
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();
自定义队列 & 连接
分发任务到指定队列
通过推送任务到不同的队列,你可以给队列任务分类,甚至可以控制给不同的队列分配多少任务。记住,这个并不是要推送任务到队列配置文件中不同的 「connections」 里,而是推送到一个连接中不同的队列里。要指定队列的话,就调 用任务实例的 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
分发任务到指定连接
如果你使用了多个队列连接,你可以将任务推到指定连接。要指定连接的话,你可以在分发任务的时候使用 onConnection
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 保存一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
当然,你可以链式调用 onConnection
和 onQueue
方法 来同时指定任务的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
指定任务最大尝试次数 / 超时值
最大 尝试次数
在一项任务中指定最大的尝试次数可以尝试通过 Artisan 命令行 --tries
来设置:
php artisan queue:work --tries=3
但是,你可以采取更为精致的方法来完成这项工作比如说在任务类中定义最大尝试次数。如果在类和命令行中都定义了最大尝试次数,Laravel 会优先执行任务类中的值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务最大尝试次数。
*
* @var int
*/
public $tries = 5;
}
超时
timeout
功能针对 PHP 7.1+ 和 pcntl
PHP 扩展进行了优化。
同样的,任务可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout
开关指定:
php artisan queue:work --timeout=30
然而,你也可以在任务类中定义一个变量来设置可运行的最大描述,如果在类和命令行中都定义了最大尝试次数,Laravel 会优先执行任务类中的值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务运行的超时时间。
*
* @var int
*/
public $timeout = 120;
}
错误处理
如果任务运行的时候抛出异常,这个任务就自动被释放回队列,这样它就能被再重新运行了。如果继续抛出异常,这个任务会继续被释放回队列,直到重试次数达到你应用允许的最多次数。这个最多次数是在调用 queue:work
Artisan 命令时通过 --tries
参数或在类中定义变量来指定的。更多队列处理器的信息可以 在下面看到。
运行队列处理器
Laravel 包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过 queue:work
命令来运行处理器。要注意,一旦 queue:work
命令开始,它将一直运行,直到你手动停止或者你关闭控制台:
php artisan queue:work
要让 queue:work
进程永久在后台运行,你应该使用进程监控工具,比如 Supervisor 来保证队列处理器没有停止运行。
一定要记得,队列处理器是长时间运行的进程,并在内存里保存着已经启动的应用状态。这样的结果就是,处 理器运行后如果你修改代码那这些改变是不会应用到处理器中的。所以在你重新部署过程中,一定要 重启队列处理器 。
处理单一任务
你可以使用 --once
选项来指定仅对队列中的单一任务进行处理:
php artisan queue:work --once