The need to execute tasks asynchronously, out of the HTTP request handling cycle, will sooner or later appear in all web applications. One gem that can help you handle this need is delayed_job. There are others too, like resque and Sidekiq. I have used many of them and I have to say that I really like delayed_job
because it integrates so well with my RDBMS backend (usually, either MySQL or PostgreSQL).
There have been many posts about Delayed Job, with tons of useful information. In this article, I’ll cover some of the practices I apply at work when working with Delayed Job, such as:
- Enhancements on the table that holds the delayed jobs
- Good practices when queuing jobs, including custom delayed jobs
- Managing jobs using the Rails console
- Managing jobs using a Web interface
- Testing with delayed jobs
- Tagged logging
I’ll be using Rails and ActiveRecord in my demo application, so feel free to create a Rails app and follow along. You’ll need to add delayed_job
to your Gemfile.
Key Takeaways
- Optimize Delayed Jobs Table: Enhance the `delayed_jobs` table by adding necessary indexes on columns like `queue`, `priority`, and `run_at`, and use appropriate data types like `longtext` for `handler` and `last_error` in MySQL to handle longer text efficiently.
- Manage Job Lifecycle: Consider not deleting failed jobs immediately to facilitate debugging. Customize maximum run time settings based on expected task completion times to better manage job processing and system resources.
- Utilize Multiple Queues: Distribute jobs across multiple queues to improve scalability and manageability. Assign meaningful names to queues based on their business context and manage them separately to optimize performance.
- Implement Custom Delayed Jobs: Create custom delayed job classes to handle specific tasks, allowing for better organization and specificity in job handling, including custom hooks like `enqueue`, `success`, `error`, and `failure` for enhanced control over job lifecycle events.
- Interactive Job Management via Rails Console and Web Interface: Use Rails console commands for detailed manipulation and querying of jobs, and employ a web interface for accessible job management, ensuring jobs can be monitored and managed effectively in different environments.
- Testing and Logging Best Practices: Avoid queuing or mocking delayed jobs in tests to directly test job execution; use tagged logging to enhance traceability and debugging capabilities by capturing detailed information about job processing in logs.
Table to Hold Delayed Jobs
If you run the following command:
rails generate delayed_job:active_record
you will get the following migration:
def self.up
create_table :delayed_jobs, :force => true do |table|
table.integer :priority, :default => 0, :null => false
table.integer :attempts, :default => 0, :null => false
table.text :handler, :null => false
table.text :last_error
table.datetime :run_at
table.datetime :locked_at
table.datetime :failed_at
table.string :locked_by
table.string :queue
table.timestamps
end
add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
end
Suggested Optimizations to the Migration
Add Index on the Queue Column
You will need to query your jobs by queue
. If you have a lot of jobs in your database table, querying by queue
will do a full table scan and take a lot of time to complete. An index is very useful, in this case.
# optimization #1
#
def self.up
create_table :delayed_jobs, :force => true do |table|
# ... #
end
# ... #
add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue'
end
MySQL longtext
Optimizations
Some exceptions received by Delayed Job can be quite lengthy. If you are using MySQL, the handler
and last_error
fields may not be long enough. Change their datatype to longtext
to avoid this issue. If you are using PostgreSQL, this will not be a problem.
# optimization #2
#
def self.up
create_table :delayed_jobs, :force => true do |table|
# ... #
# replace the migration for column +handler+ with
table.column :handler, :longtext, :null => false
# replace the migration for column +last_error+ with
table.column :last_error, :longtext
# ... #
end
# ... #
add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue'
end
Columns for Your Delayed Entity
Usually, a job is created in order to handle a background task that is related to a business entity. For example, to send an email to a User.
I am using two columns to store a reference to this business entity instance. Hence, I am able to quickly query my job entries that are related to specific business entity types or instances. I also add corresponding indexes so these queries finish quickly.
# optimization #3
#
def self.up
create_table :delayed_jobs, :force => true do |table|
# ... #
# replace the migration for column +handler+ with
table.column :handler, :longtext, :null => false
# replace the migration for column +last_error+ with
table.column :last_error, :longtext
# ... #
table.integer :delayed_reference_id
table.string :delayed_reference_type
end
# ... #
add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue'
add_index :delayed_jobs, [:delayed_reference_id], :name => 'delayed_jobs_delayed_reference_id'
add_index :delayed_jobs, [:delayed_reference_type], :name => 'delayed_jobs_delayed_reference_type'
end
Later, I’ll show you how I am populating these two columns.
Queuing Jobs
Don’t Delete Failed Jobs
By default, delayed workers delete failed jobs as soon as they reach the maximum number of attempts. This might be annoying when you want to find the root of a problem and troubleshoot. My suggestion is to leave the failed jobs in the database and have a process setup to handle failed jobs.
You can set the value of this configuration attribute with the following statement:
Delayed::Worker.destroy_failed_jobs = false
inside a Delayed Job initializer.
Think about the Maximum Run Time Value
You may want to consider what value the maximum run time configuration attribute should have. The default is 4.hours
. However, if you don’t expect to have such long running tasks, it is better to decrease that value. Doing so will kill the delayed worker when this limit is reached and allow the job to fail so another worker can pick it up. Also, you will be notified for tasks that you expected to run in short time but took longer (via email notifications on error
and failure
hooks).
On the other hand, I have worked with Delayed Job on projects where 4.hours
was not enough and I had to increase that value. coughs
Delayed::Worker.max_run_time = 15.minutes
will set this limit to 15 minutes.
Don’t Use One Queue
Don’t use the default
or only one queue. This will not scale. Even if you only have a few workers/jobs at the start of your project, start by giving meaningful names to the queues. Distribute jobs with different business contexts to different queues.
For example, if you have registration emails, queue them in the registration_emails
queue, whereas email notifications to users of your app might be queued to email_notifications
.
If you have different queues, they are easier to manage. If there is an exception that you want to handle manually or with a script, you can stop the workers and delete all the email_notifications
queue entries. If you have all your jobs on the same queue, tasks such as these are more complicated to tackle.
Having your jobs distributed to different queues also allow starting different workers to handle different queues. My suggestion is to have at least one worker per queue. Hence, you will execute your jobs in parallel.
Use Custom Delayed Jobs
You can use handle_asynchronously
to declare that a call to a method should be handled asynchronously. I rarely use this technique. I prefer to declare custom delayed job objects inside the jobs folder of my project. Using custom delayed jobs allows me to fine tune what I store inside the delayed_jobs
table.
Here is an example custom job. Assume I have a job that processes a video (and my application is VideoStreamer
).
# app/jobs/video_streamer/process_video_job.rb
#
module VideoStreamer
class ProcessVideoJob < Struct.new(:video_id)
# ... #
end
end
I create the folder video_streamer inside the jobs folder, along with the processvideo_job.rb file to hold the delayed custom job code. The class for the custom job is namespaced with the name of my application. Also, I suffix the class name with the suffix Job
. Using Struct
, I store the ID of the video (the business entity instance this job is related to). The attribute will have the name video_id
.
Implement Enqueue Hook
I register a hook handler for the enqueue
hook. The hook handles items that I want to be done whenever a new job of this class is put in the queue.
# app/jobs/video_streamer/process_video_job.rb
#
module VideoStreamer
class ProcessVideoJob < Struct.new(:video_id)
def enqueue(job)
job.delayed_reference_id = video_id
job.delayed_reference_type = 'VideoStreamer::Video'
job.save!
end
end
end
As you can see, the video (business entity) is stored when a job is enqueued.
Of course, there are times when I want to do more complicated things in enqueue
. In the following example, I accept an enqueue only if the status
has the correct value. Then, I update that status
to the value processing
to indicate that the specific video instance is being processed:
module VideoStreamer
class ProcessVideoJob < Struct.new(:video_id)
def enqueue(job)
check_and_update_status
job.delayed_reference_id = video_id
job.delayed_reference_type = 'VideoStreamer::Video'
job.save!
end
private
def check_and_update_status
video = VideoStreamer::Video.find video_id
raise StandardError.new("Video: #{video.id} is not on status 'new' (status: #{video.status}") unless video.status == 'new'
video.status = 'processing'
video.save!
end
end
end
Implement Success Hook
To update the status once a job is successfully processed, I implement the success
hook:
module VideoStreamer
class ProcessVideoJob < Struct.new(:video_id)
# ... #
def success(job)
update_status('success')
end
private
def update_status(status)
video = VideoStreamer::Video.find video_id
video.status = status
video.save!
end
# ... #
end
end
Implement Error Hook
The error
hook in your custom job can, for example, send an email alert or change the status of the related business entity. The error
method has access to the exception
that will give you information about the error. Note that the error indicates temporary failure and, if there are attempts left, another worker will try to run your background task again.
module VideoStreamer
class ProcessVideoJob < Struct.new(:video_id)
def enqueue(job)
# ... #
end
def success(job)
# ... #
end
def error(job, exception)
update_status('temp_error')
# Send email notification / alert / alarm
end
private
# ... #
end
end
Implement Failure Hook
Use the failure hook, for example, to send an email alert or to change the status of the related business entity when a job fails for good and won’t be retried. If you have configured the failed jobs to remain in the database table, then you could retry the job manually.
module VideoStreamer
class ProcessVideoJob < Struct.new(:video_id)
def enqueue(job)
# ... #
end
def success(job)
# ... #
end
def error(job, exception)
# ... #
end
def failure(job)
update_status('failure')
# Send email notification / alert / alarm / SMS / call ... whatever
end
private
# ... #
end
end
Implement Perform Hook – Delegate – Raise
This hook is the most crucial, otherwise the job won’t do anything. Your implementation should be very simple and delegate the actual work to a model or other service object. The real implementation should not be part of the perform
implementation. This will make sure that you can execute the logic of the implementation without necessarily having an instance of a delayed job. Also, it can be easier for you to test the logic in a unit test. So, as a bare minimum:
module VideoStreamer
class ProcessVideoJob < Struct.new(:video_id)
def enqueue(job)
# ... #
end
def success(job)
# ... #
end
def error(job, exception)
# ... #
end
def failure(job)
# ... #
end
def perform
video = VideoSteamer::Video.find video_id
video.process!
end
private
# ... #
end
end
It is absolutely necessary to raise any exceptions so they can be handled by the worker (which will call error
and failure
, as appropriate). Do not swallow any exceptions that perform
might raise. In the above example, video.process!
might raise an exception that I allow to bubble up. Same goes for locating the business entity. I use #find()
and give the video_id
, which raises an exception if the business entity is not found. Don’t use, for example, find_by_id()
which does not raise such an exception.
If the service object performing the task does not raise an error when needed (maybe it returns false
), perform
should raise an error.
def perform
video = VideoSteamer::Video.find video_id
raise StandardError.new("Failed to process video with id: #{video.id}") unless video.process?
end
Managing Jobs via Rails Console
Delayed Job includes script interface to start/stop jobs. But, there are cases in which I want to stop running workers and start/stop specific jobs manually. I usually do that when I am in the development environment, where I rarely have background runners running. I run them manually from the console using specific Delayed::Job
API calls. Also, in production environments, I have been in situations in which I had to stop running workers and do fine grained manual operation of the jobs, using the same technique.
Here are some Delayed::Job
API commands that I find useful in such situations:
Query delayed_jobs Table with Corresponding Model
I use the Delayed::Job
model to query the delayed_jobs
table. For example, the following returns jobs that belong to the email_notification
queue that have raised an error:
Delayed::Job.where(queue: 'email_notifications').where.not(last_error: nil)
Understand, you can use any of the columns in your delayed_jobs
table to build your where
clause.
Which Class Will Handle My Job?
When I want to see which class
will handle a job (usually one that failed), I query for the handler
:
handler = Delayed::Job.last.handler
This is a YAML serialized object instance.
Custom Delayed Jobs
Here is an example output for my VideoStreamer::ProcessVideoJob
:
"--- !ruby/struct:VideoStreamer::ProcessVideoJob\nvideo_id: 68\n"
This output tells me that the particular delayed job instance is a task to process the video with ID = 68.
Application Mailers
If you send your emails asynchronously, then the YAML serialization is a bit different:
"--- !ruby/object:Delayed::PerformableMailer\nobject: !ruby/class 'UserNotifierMailer'\nmethod_name: :new_user_registration\nargs:\n- 35\n"
All the serialized mailers are !rubyobject:Delayed::PerformableMailer
followed by the mailer class from your app (see !ruby/class 'UserNotifierMailer'
). You can see the actual method that is used to send the email (new_user_registration
) and its arguments (35
).
Devise Mailers
If you send your devise emails asynchronously (I am using the devise-async gem to do that), then the YAML serialization looks something like this:
"--- !ruby/object:Delayed::PerformableMethod\nobject: !ruby/object:Devise::Async::Backend::DelayedJob {}\nmethod_name: :perform\nargs:\n- :confirmation_instructions\n- User\n- '317'\n- daD9bVQ2d_kaR3abyS7X\n- {}\n"
Again, you can see which email might be failing along with its run time arguments. In the example above, the email that fails is the one that sends confirmation instructions to user with ID = 317.
Deserializing Different Jobs on Same Queue
If you have different job types in the same queue, then it will be difficult to manage the jobs grouped by the job type. For example, you may want to count the number of queued jobs per type on a queue that is called solr_indexing
that handles various classes for background indexing.
If the only information that you have on your delayed_jobs
table is the handler
(i.e. the delayed_reference_type
does not help), you will have to work with … the handler
.
The point is that handler
stores different serialized objects according to the class serialized as we discussed above.
The YAML.load(dj.handler)
will deserialize your serialized object.
Custom Delayed Jobs
If the queued job is a custom delayed job
dj = Delayed::Job.last
dj.handler # Assume: "--- !ruby/struct:VideoStreamer::ProcessVideoJob\nvideo_id: 68\n"
job = YAML.load(dj.handler) # +job+ will be instance of +VideoStreamer::ProcessVideoJob+ struct with
# +video_id+ attribute set to +68+
Application Mailers
If it is a mailer object, the instantiated job is a Delayed::PerformableMailer
:
=> #<Delayed::PerformableMailer:0x0000000a050898 @object=UserNotifierMailer, @method_name=:new_user_registration, @args=[35]>
This handler, as you can see, responds to object
, which is the actual mailer instance, method_name
, which is the mailer instance method that will be used to send the email, and args
, which contains the runtime arguments to the method.
Devise Mailers
When the handler is a Devise
mailer, there is one more level of abstraction. The job is of type Delayed::PerformableMethod
:
=> #<Delayed::PerformableMethod:0x0000000a0bb800 @object=#<Devise::Async::Backend::DelayedJob:0x0000000a0bf090>, @method_name=:perform, @args=[:confirmation_instructions, "User", "317", "daD9bVQ2d_kaR3abyS7X", {}]>
This one, as you can see, responds to object
, which is a instance of Devise::Async::Backend::DelayedJob
, the method_name
, which is the method to call on this instance, and the args
, which contains the run-time arguments to this method. This args
array contains the actual email method and its real arguments.
The YAML.load(dj.handler)
might return different object types and you might need to implement some kind of is_a?(....)
logic if you want to write a script that operates on all or part of the jobs that belong to the same queue.
What Was the Exception for an Errored Job?
When I want to see the exception details for a job, I inspect the last_error
column.
Run a Job from the Rails Console Without Queuing
Assuming that you have a job and you want to run it manually from the rails console, but, you do not want it to be put in the delayed job queue lifecycle loop:
# This will run your job but will not go through the delayed job lifecycle loop
job = VideoStreamer::ProcessVideoJob.new(68)
job.perform
It will work, but no registered hooks will be executed.
I rarely use this method, but it has come in handy on occasion.
Instantiate a Worker Within Rails Console
This is very handy when you want to run a failed job again, but you want it done through the console and not through background running workers:
dw = Delayed::Worker.new
Easy, eh?
Run a Failed Job from Within Rails Console
Assuming that you have a failed job and you want to run it again manually, from within the console:
dw = Delayed::Worker.new
dj = Delayed::Job.last # assuming that the last job is the failed one, otherwise use a proper query to
# locate it
dw.run dj
That’s it. Your worker will run the job call the corresponding error and failure hooks if the job fails again. If it succeeds, it will delete it from the queue.
Managing Jobs Using the Web Interface
Use the delayed_job_web interface to have access to your queued jobs. Add the following line to your routes:
mount DelayedJobWeb => "/delayed_job"
This will allow you to access the management interface using an address like https://www.myapp.com/delayed_job
.
I also have a delayed_job_web.rb initializer (in my config/initializers folder):
DelayedJobWeb.use Rack::Auth::Basic do |username, password|
# authenticate
user = User.find_by_username(username)
return false unless user.authenticate(password)
# authorize. I am using cancancan for authorization. You can use any other authorization gem you see fit.
ability = Ability.new(user)
can = ability.can? :manage, Delayed::Job
raise CanCan::AccessDenied unless can
true
end
This allows me to authenticate and authorize the request to access /delayed_job
.
Testing with Delayed Job
Do Not Queue or Mock When Testing
When testing your application code (with any kind of tests, unit, or integration tests or UI tests), do not queue or mock your delayed job tasks. This might seem a strange practice to you , but to me, it has been proven invaluable. I need to see whether they break or not. If the cases where a task takes too long or has recursion, I might decide to mock that job. But in general, I do not mock my jobs.
To get Delayed Job to simply execute your task and not queue it, do the following in the initializer:
Delayed::Worker.delay_jobs = !%w[ test ].include?(Rails.env)
The above one will not queue jobs in the test
environment.
Disable Immediate Execution When Testing
However, there might be tests where you want to test the queuing functionality. I wrap these tests with special tags and ask Delayed Job to queue them up.
RSpec
When using RSpec
, I have an around(:each)
configuration that allows the use of a :delayed_job
tag:
# spec/spec_helper.rb
RSpec.configure do |config|
# ... other config here ... #
config.around(:each, :delayed_job) do |example|
old_value = Delayed::Worker.delay_jobs
Delayed::Worker.delay_jobs = true
Delayed::Job.destroy_all
example.run
Delayed::Worker.delay_jobs = old_value
end
# ... other config here ... #
end
I am enabling the queuing of jobs and deleting any existing jobs before the example runs. After the example run, I set the delayed job queuing back to what it was.
Now, when I want to write a spec that uses delayed job queuing, I do the following:
it 'should queue the job`, delayed_job: true do
...
end
Cucumber
# features/support/hooks.rb
Around('@delayed_job') do |scenario, block|
old_value = Delayed::Worker.delay_jobs
Delayed::Worker.delay_jobs = true
Delayed::Job.destroy_all
block.call
Delayed::Worker.delay_jobs = old_value
end
Similar to the RSpec
configuration, I am using an Around
hook with tag @delayed_job
. Then, I tag the Scenarios that I want to use real queuing:
@delayed_job
Scenario: As a User when I sign up there is a new user registration email queued
Tagged Logging
Rails supports tagged logging, as you probably know. I have configured Delayed Job to use tagged logging, too.
In order to do that, I use the Delayed::Plugin
technique:
module Delayed
module Plugins
class TaggedLogging < Delayed::Plugin
Delayed::Worker.logger = Rails.logger
callbacks do |lifecycle|
lifecycle.around(:execute) do |worker, *args, &block|
Rails.logger.tagged "Worker:#{worker.name_prefix.strip}", "Queues:#{worker.queues.join(',')}" do
block.call(worker, *args)
end
end
lifecycle.around(:invoke_job) do |job, *args, &block|
Rails.logger.tagged "Job:#{job.id}" do
block.call(job, *args)
end
end
end
end
end
end
As you can see, I am catching the hooks around(:execute)
and around(:invoke_job)
and using Rails.logger
to implement tagged logging. I log the worker name, queues, and the job id.
Don’t forget to register your Delayed::Plugin
subclass in your delayed_job
initializer with:
Delayed::Worker.plugins << Delayed::Plugins::TaggedLogging
By the way, if you want to know which events you can hook to, see this line here.
Conclusion
In this article, I have presented some of the practices that I use to tackle my background jobs using Delayed Job gem. I hope that you find some, if not all, of these tips useful.
Frequently Asked Questions (FAQs) on Delayed Jobs Best Practices
What are the key differences between Delayed Job and Sidekiq?
Delayed Job and Sidekiq are both background processing tools used in Ruby on Rails applications. However, they differ in several ways. Delayed Job is a simple and straightforward tool that stores jobs in your application’s database. It’s easy to set up and doesn’t require any additional services. On the other hand, Sidekiq is a more complex tool that uses Redis for job storage. It supports advanced features like job prioritization, scheduling, and retrying. While Sidekiq can handle a larger volume of jobs and execute them faster, it requires more setup and maintenance than Delayed Job.
How can I implement Delayed Job in Rails?
Implementing Delayed Job in Rails involves several steps. First, you need to add the ‘delayed_job_active_record’ gem to your Gemfile and run ‘bundle install’. Then, you need to generate the migration for the jobs table by running ‘rails generate delayed_job:active_record’. After running the migration with ‘rake db:migrate’, you can start creating jobs. To do this, you define a method that contains the code you want to run in the background, and then call ‘.delay.method_name’ on the object. Finally, you can start the Delayed Job worker with the ‘rake jobs:work’ command.
How does Delayed Job handle job failures?
Delayed Job has a built-in mechanism for handling job failures. If a job fails, it’s automatically retried after a certain amount of time. The delay before retrying increases exponentially with each failure, to prevent overloading the system with repeated attempts. The job is discarded after a maximum number of attempts, which is configurable. You can also define custom behavior on failure by overriding the ‘error’ method in your job class.
Can I prioritize jobs in Delayed Job?
Yes, Delayed Job supports job prioritization. When you enqueue a job, you can specify a priority value. Lower values have higher priority, so a job with a priority of 0 will be executed before a job with a priority of 10. If no priority is specified, the default value is 0.
How can I monitor the status of my Delayed Job workers?
There are several tools available for monitoring Delayed Job workers. One popular option is the ‘delayed_job_web’ gem, which provides a web interface for viewing and managing jobs. You can see the number of enqueued, working, and failed jobs, and you can also manually start, stop, or restart workers. Other options include using a monitoring service like New Relic or AppSignal, or setting up custom logging and alerting.
How can I improve the performance of Delayed Job?
There are several ways to improve the performance of Delayed Job. One is to optimize your job code to reduce execution time. Another is to increase the number of workers, so more jobs can be processed concurrently. However, this requires more system resources. You can also adjust the job prioritization and retry settings to ensure that important jobs are executed promptly and that failed jobs don’t overload the system.
Can I schedule jobs to run at a specific time with Delayed Job?
Yes, Delayed Job supports job scheduling. You can use the ‘.delay(run_at: time).method_name’ syntax to schedule a job to run at a specific time. The ‘time’ parameter can be any object that responds to ‘.to_time’.
How can I test Delayed Job in my Rails application?
Testing Delayed Job in Rails can be done using the built-in Rails testing tools. You can use ‘assert_difference’ to check that a job has been enqueued, and ‘perform_enqueued_jobs’ to execute the jobs immediately during the test. You can also use mocking and stubbing to isolate the job code and test it independently.
What are some common issues with Delayed Job and how can I troubleshoot them?
Some common issues with Delayed Job include jobs not being executed, jobs failing repeatedly, and high system resource usage. To troubleshoot these issues, you can check the Delayed Job logs, which contain detailed information about job execution. You can also use monitoring tools to track the status of your workers and jobs. If a job is failing, you can inspect the job object in the database to see the error message and stack trace.
Can I use Delayed Job with other databases besides ActiveRecord?
Yes, Delayed Job can be used with other databases besides ActiveRecord. There are separate gems for using Delayed Job with Mongoid and Sequel. The setup and usage are similar to the ActiveRecord version, but there may be some differences in the specific features and options available.
Currently, spending most of the day as CTO for Book&Table. Otherwise, full stack engineer for Fraudpointer and Wyngle. Also, blogging here and teaching computer programming.