Queues
Introduction
Laravel now offers Horizon, a beautiful dashboard and configuration system for your Redis powered queues. Check out the full Horizon documentation for more information.
Laravel queues provide a unified API across a variety of different queue backends, such as Beanstalk, Amazon SQS, Redis, or even a relational database. Queues allow you to defer the processing of a time consuming task, such as sending an email, until a later time. Deferring these time consuming tasks drastically speeds up web requests to your application.
The queue configuration file is stored in config/queue.php
. In this file you will find connection configurations for each of the queue drivers that are included with the framework, which includes a database, Beanstalkd, Amazon SQS, Redis, and a synchronous driver that will execute jobs immediately (for local use). A null
queue driver is also included which discards queued jobs.
Connections Vs. Queues
Before getting started with Laravel queues, it is important to understand the distinction between "connections" and "queues". In your config/queue.php
configuration file, there is a connections
configuration option. This option defines a particular connection to a backend service such as Amazon SQS, Beanstalk, or Redis. However, any given queue connection may have multiple "queues" which may be thought of as different stacks or piles of queued jobs.
Note that each connection configuration example in the queue
configuration file contains a queue
attribute. This is the default queue that jobs will be dispatched to when they are sent to a given connection. In other words, if you dispatch a job without explicitly defining which queue it should be dispatched to, the job will be placed on the queue that is defined in the queue
attribute of the connection configuration:
// This job is sent to the default queue...
Job::dispatch();
// This job is sent to the "emails" queue...
Job::dispatch()->onQueue('emails');
Some applications may not need to ever push jobs onto multiple queues, instead preferring to have one simple queue. However, pushing jobs to multiple queues can be especially useful for applications that wish to prioritize or segment how jobs are processed, since the Laravel queue worker allows you to specify which queues it should process by priority. For example, if you push jobs to a high
queue, you may run a worker that gives them higher processing priority:
php artisan queue:work --queue=high,default
Driver Notes & Prerequisites
Database
In order to use the database
queue driver, you will need a database table to hold the jobs. To generate a migration that creates this table, run the queue:table
Artisan command. Once the migration has been created, you may migrate your database using the migrate
command:
php artisan queue:table
php artisan migrate
Redis
In order to use the redis
queue driver, you should configure a Redis database connection in your config/database.php
configuration file.
Redis Cluster
If your Redis queue connection uses a Redis Cluster, your queue names must contain a key hash tag. This is required in order to ensure all of the Redis keys for a given queue are placed into the same hash slot:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
Blocking
When using the Redis queue, you may use the block_for
configuration option to specify how long the driver should wait for a job to become available before iterating through the worker loop and re-polling the Redis database.
Adjusting this value based on your queue load can be more efficient than continually polling the Redis database for new jobs. For instance, you may set the value to 5
to indicate that the driver should block for five seconds while waiting for a job to become available:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],
Setting block_for
to 0
will cause queue workers to block indefinitely until a job is available. This will also prevent signals such as SIGTERM
from being handled until the next job has been processed.
Other Driver Prerequisites
The following dependencies are needed for the listed queue drivers:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
or phpredis PHP extension
Creating Jobs
Generating Job Classes
By default, all of the queueable jobs for your application are stored in the app/Jobs
directory. If the app/Jobs
directory doesn't exist, it will be created when you run the make:job
Artisan command. You may generate a new queued job using the Artisan CLI:
php artisan make:job ProcessPodcast
The generated class will implement the Illuminate\Contracts\Queue\ShouldQueue
interface, indicating to Laravel that the job should be pushed onto the queue to run asynchronously.
Class Structure
Job classes are very simple, normally containing only a handle
method which is called when the job is processed by the queue. To get started, let's take a look at an example job class. In this example, we'll pretend we manage a podcast publishing service and need to process the uploaded podcast files before they are published:
<?php
namespace App\Jobs;
use App\AudioProcessor;
use App\Podcast;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* Create a new job instance.
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* Execute the job.
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}
}
In this example, note that we were able to pass an Eloquent model directly into the queued job's constructor. Because of the SerializesModels
trait that the job is using, Eloquent models and their loaded relationships will be gracefully serialized and unserialized when the job is processing. If your queued job accepts an Eloquent model in its constructor, only the identifier for the model will be serialized onto the queue. When the job is actually handled, the queue system will automatically re-retrieve the full model instance and its loaded relationships from the database. It's all totally transparent to your application and prevents issues that can arise from serializing full Eloquent model instances.
The handle
method is called when the job is processed by the queue. Note that we are able to type-hint dependencies on the handle
method of the job. The Laravel service container automatically injects these dependencies.
If you would like to take total control over how the container injects dependencies into the handle
method, you may use the container's bindMethod
method. The bindMethod
method accepts a callback which receives the job and the container. Within the callback, you are free to invoke the handle
method however you wish. Typically, you should call this method from a service provider:
use App\Jobs\ProcessPodcast;
$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
return $job->handle($app->make(AudioProcessor::class));
});
Binary data, such as raw image contents, should be passed through the base64_encode
function before being passed to a queued job. Otherwise, the job may not properly serialize to JSON when being placed on the queue.
Handling Relationships
Because loaded relationships also get serialized, the serialized job string can become quite large. To prevent relations from being serialized, you can call the withoutRelations
method on the model when setting a property value. This method will return an instance of the model with no loaded relationships:
/**
* Create a new job instance.
*
* @param \App\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
Job Middleware
Job middleware allow you wrap custom logic around the execution of queued jobs, reducing boilerplate in the jobs themselves. For example, consider the following handle
method which leverages Laravel's Redis rate limiting features to allow only one job to process every five seconds:
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Handle job...
}, function () {
// Could not obtain lock...
return $this->release(5);
});
}
While this code is valid, the structure of the handle
method becomes noisy since it is cluttered with Redis rate limiting logic. In addition, this rate limiting logic must be duplicated for any other jobs that we want to rate limit.
Instead of rate limiting in the handle method, we could define a job middleware that handles rate limiting. Laravel does not have a default location for job middleware, so you are welcome to place job middleware anywhere in your application. In this example, we will place the middleware in a app/Jobs/Middleware
directory:
<?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}
As you can see, like route middleware, job middleware receive the job being processed and a callback that should be invoked to continue processing the job.
After creating job middleware, they may be attached to a job by returning them from the job's middleware
method. This method does not exist on jobs scaffolded by the make:job
Artisan command, so you will need to add it to your own job class definition:
use App\Jobs\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array
*/
public function middleware()
{
return [new RateLimited];
}
Dispatching Jobs
Once you have written your job class, you may dispatch it using the dispatch
method on the job itself. The arguments passed to the dispatch
method will be given to the job's constructor:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast);
}
}
Delayed Dispatching
If you would like to delay the execution of a queued job, you may use the delay
method when dispatching a job. For example, let's specify that a job should not be available for processing until 10 minutes after it has been dispatched:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
The Amazon SQS queue service has a maximum delay time of 15 minutes.
Synchronous Dispatching
If you would like to dispatch a job immediately (synchronously), you may use the dispatchNow
method. When using this method, the job will not be queued and will be run immediately within the current process:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatchNow($podcast);
}
}
Job Chaining
Job chaining allows you to specify a list of queued jobs that should be run in sequence after the primary job has executed successfully. If one job in the sequence fails, the rest of the jobs will not be run. To execute a queued job chain, you may use the withChain
method on any of your dispatchable jobs:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();
Deleting jobs using the $this->delete()
method will not prevent chained jobs from being processed. The chain will only stop executing if a job in the chain fails.
Chain Connection & Queue
If you would like to specify the default connection and queue that should be used for the chained jobs, you may use the allOnConnection
and allOnQueue
methods. These methods specify the queue connection and queue name that should be used unless the queued job is explicitly assigned a different connection / queue:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
Customizing The Queue & Connection
Dispatching To A Particular Queue
By pushing jobs to different queues, you may "categorize" your queued jobs and even prioritize how many workers you assign to various queues. Keep in mind, this does not push jobs to different queue "connections" as defined by your queue configuration file, but only to specific queues within a single connection. To specify the queue, use the onQueue
method when dispatching the job:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
Dispatching To A Particular Connection
If you are working with multiple queue connections, you may specify which connection to push a job to. To specify the connection, use the onConnection
method when dispatching the job:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
You may chain the onConnection
and onQueue
methods to specify the connection and the queue for a job:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
Alternatively, you may specify the connection
as a property on the job class:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The queue connection that should handle the job.
*
* @var string
*/
public $connection = 'sqs';
}
Specifying Max Job Attempts / Timeout Values
Max Attempts
One approach to specifying the maximum number of times a job may be attempted is via the --tries
switch on the Artisan command line:
php artisan queue:work --tries=3
However, you may take a more granular approach by defining the maximum number of attempts on the job class itself. If the maximum number of attempts is specified on the job, it will take precedence over the value provided on the command line:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
Time Based Attempts
As an alternative to defining how many times a job may be attempted before it fails, you may define a time at which the job should timeout. This allows a job to be attempted any number of times within a given time frame. To define the time at which a job should timeout, add a retryUntil
method to your job class:
/**
* Determine the time at which the job should timeout.
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
You may also define a retryUntil
method on your queued event listeners.
Timeout
The timeout
feature is optimized for PHP 7.1+ and the pcntl
PHP extension.
Likewise, the maximum number of seconds that jobs can run may be specified using the --timeout
switch on the Artisan command line:
php artisan queue:work --timeout=30
However, you may also define the maximum number of seconds a job should be allowed to run on the job class itself. If the timeout is specified on the job, it will take precedence over any timeout specified on the command line:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
Rate Limiting
This feature requires that your application can interact with a Redis server.
If your application interacts with Redis, you may throttle your queued jobs by time or concurrency. This feature can be of assistance when your queued jobs are interacting with APIs that are also rate limited.
For example, using the throttle
method, you may throttle a given type of job to only run 10 times every 60 seconds. If a lock can not be obtained, you should typically release the job back onto the queue so it can be retried later:
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});
In the example above, the key
may be any string that uniquely identifies the type of job you would like to rate limit. For example, you may wish to construct the key based on the class name of the job and the IDs of the Eloquent models it operates on.
Releasing a throttled job back onto the queue will still increment the job's total number of attempts
.
Alternatively, you may specify the maximum number of workers that may simultaneously process a given job. This can be helpful when a queued job is modifying a resource that should only be modified by one job at a time. For example, using the funnel
method, you may limit jobs of a given type to only be processed by one worker at a time:
Redis::funnel('key')->limit(1)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});