Ruby
Article

Delayed Job Best Practices

By Panayotis Matsinopoulos

Best Practice concept stamp

The need to execute tasks asynchronously, out of the HTTP request handling cycle, will sooner or later appear in all web applications. One gem that can help you handle this need is delayed_job. There are others too, like resque and Sidekiq. I have used many of them and I have to say that I really like delayed_job because it integrates so well with my RDBMS backend (usually, either MySQL or PostgreSQL).

There have been many posts about Delayed Job, with tons of useful information. In this article, I’ll cover some of the practices I apply at work when working with Delayed Job, such as:

  • Enhancements on the table that holds the delayed jobs
  • Good practices when queuing jobs, including custom delayed jobs
  • Managing jobs using the Rails console
  • Managing jobs using a Web interface
  • Testing with delayed jobs
  • Tagged logging

I’ll be using Rails and ActiveRecord in my demo application, so feel free to create a Rails app and follow along. You’ll need to add delayed_job to your Gemfile.

Table to Hold Delayed Jobs

If you run the following command:

rails generate delayed_job:active_record

you will get the following migration:

def self.up
  create_table :delayed_jobs, :force => true do |table|
    table.integer  :priority, :default => 0, :null => false
    table.integer  :attempts, :default => 0, :null => false
    table.text     :handler,                 :null => false
    table.text     :last_error
    table.datetime :run_at
    table.datetime :locked_at
    table.datetime :failed_at
    table.string   :locked_by
    table.string   :queue

    table.timestamps
  end

  add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
end

Suggested Optimizations to the Migration

Add Index on the Queue Column

You will need to query your jobs by queue. If you have a lot of jobs in your database table, querying by queue will do a full table scan and take a lot of time to complete. An index is very useful, in this case.

# optimization #1
#
def self.up
  create_table :delayed_jobs, :force => true do |table|
    # ... #
  end

  # ... #
  add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue'
end

MySQL longtext Optimizations

Some exceptions received by Delayed Job can be quite lengthy. If you are using MySQL, the handler and last_error fields may not be long enough. Change their datatype to longtext to avoid this issue. If you are using PostgreSQL, this will not be a problem.

# optimization #2
#
def self.up
  create_table :delayed_jobs, :force => true do |table|
    # ... #

    # replace the migration for column +handler+ with
    table.column :handler, :longtext, :null => false

    # replace the migration for column +last_error+ with
    table.column :last_error, :longtext

    # ... #
  end

  # ... #
  add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue'
end

Columns for Your Delayed Entity

Usually, a job is created in order to handle a background task that is related to a business entity. For example, to send an email to a User.

I am using two columns to store a reference to this business entity instance. Hence, I am able to quickly query my job entries that are related to specific business entity types or instances. I also add corresponding indexes so these queries finish quickly.

# optimization #3
#
def self.up
  create_table :delayed_jobs, :force => true do |table|
    # ... #

    # replace the migration for column +handler+ with
    table.column :handler, :longtext, :null => false

    # replace the migration for column +last_error+ with
    table.column :last_error, :longtext

    # ... #

    table.integer :delayed_reference_id
    table.string  :delayed_reference_type
  end

  # ... #
  add_index :delayed_jobs, [:queue],                  :name => 'delayed_jobs_queue'
  add_index :delayed_jobs, [:delayed_reference_id],   :name => 'delayed_jobs_delayed_reference_id'
  add_index :delayed_jobs, [:delayed_reference_type], :name => 'delayed_jobs_delayed_reference_type'
end

Later, I’ll show you how I am populating these two columns.

Queuing Jobs

Don’t Delete Failed Jobs

By default, delayed workers delete failed jobs as soon as they reach the maximum number of attempts. This might be annoying when you want to find the root of a problem and troubleshoot. My suggestion is to leave the failed jobs in the database and have a process setup to handle failed jobs.

You can set the value of this configuration attribute with the following statement:

Delayed::Worker.destroy_failed_jobs = false

inside a Delayed Job initializer.

Think about the Maximum Run Time Value

You may want to consider what value the maximum run time configuration attribute should have. The default is 4.hours. However, if you don’t expect to have such long running tasks, it is better to decrease that value. Doing so will kill the delayed worker when this limit is reached and allow the job to fail so another worker can pick it up. Also, you will be notified for tasks that you expected to run in short time but took longer (via email notifications on error and failure hooks).

On the other hand, I have worked with Delayed Job on projects where 4.hours was not enough and I had to increase that value. coughs

Delayed::Worker.max_run_time = 15.minutes

will set this limit to 15 minutes.

Don’t Use One Queue

Don’t use the default or only one queue. This will not scale. Even if you only have a few workers/jobs at the start of your project, start by giving meaningful names to the queues. Distribute jobs with different business contexts to different queues.

For example, if you have registration emails, queue them in the registration_emails queue, whereas email notifications to users of your app might be queued to email_notifications.

If you have different queues, they are easier to manage. If there is an exception that you want to handle manually or with a script, you can stop the workers and delete all the email_notifications queue entries. If you have all your jobs on the same queue, tasks such as these are more complicated to tackle.

Having your jobs distributed to different queues also allow starting different workers to handle different queues. My suggestion is to have at least one worker per queue. Hence, you will execute your jobs in parallel.

Use Custom Delayed Jobs

You can use handle_asynchronously to declare that a call to a method should be handled asynchronously. I rarely use this technique. I prefer to declare custom delayed job objects inside the jobs folder of my project. Using custom delayed jobs allows me to fine tune what I store inside the delayed_jobs table.

Here is an example custom job. Assume I have a job that processes a video (and my application is VideoStreamer).

# app/jobs/video_streamer/process_video_job.rb
#
module VideoStreamer
  class ProcessVideoJob < Struct.new(:video_id)
    # ... #
  end
end

I create the folder video_streamer inside the jobs folder, along with the processvideo_job.rb file to hold the delayed custom job code. The class for the custom job is namespaced with the name of my application. Also, I suffix the class name with the suffix Job. Using Struct, I store the ID of the video (the business entity instance this job is related to). The attribute will have the name video_id.

Implement Enqueue Hook

I register a hook handler for the enqueue hook. The hook handles items that I want to be done whenever a new job of this class is put in the queue.

# app/jobs/video_streamer/process_video_job.rb
#
module VideoStreamer
  class ProcessVideoJob < Struct.new(:video_id)
    def enqueue(job)
      job.delayed_reference_id   = video_id
      job.delayed_reference_type = 'VideoStreamer::Video'
      job.save!
    end
  end
end

As you can see, the video (business entity) is stored when a job is enqueued.

Of course, there are times when I want to do more complicated things in enqueue. In the following example, I accept an enqueue only if the status has the correct value. Then, I update that status to the value processing to indicate that the specific video instance is being processed:

module VideoStreamer
  class ProcessVideoJob < Struct.new(:video_id)
    def enqueue(job)
      check_and_update_status

      job.delayed_reference_id   = video_id
      job.delayed_reference_type = 'VideoStreamer::Video'
      job.save!
    end

    private

    def check_and_update_status
      video = VideoStreamer::Video.find video_id
      raise StandardError.new("Video: #{video.id} is not on status 'new' (status: #{video.status}") unless video.status == 'new'
      video.status = 'processing'
      video.save!
    end
  end
end

Implement Success Hook

To update the status once a job is successfully processed, I implement the success hook:

module VideoStreamer
  class ProcessVideoJob < Struct.new(:video_id)
    # ... #

    def success(job)
      update_status('success')
    end

    private

    def update_status(status)
      video = VideoStreamer::Video.find video_id
      video.status = status
      video.save!
    end

    # ... #
  end
end

Implement Error Hook

The error hook in your custom job can, for example, send an email alert or change the status of the related business entity. The error method has access to the exception that will give you information about the error. Note that the error indicates temporary failure and, if there are attempts left, another worker will try to run your background task again.

module VideoStreamer
  class ProcessVideoJob < Struct.new(:video_id)
    def enqueue(job)
      # ... #
    end

    def success(job)
      # ... #
    end

    def error(job, exception)
      update_status('temp_error')
      # Send email notification / alert / alarm
    end

    private

    # ... #
  end
end

Implement Failure Hook

Use the failure hook, for example, to send an email alert or to change the status of the related business entity when a job fails for good and won’t be retried. If you have configured the failed jobs to remain in the database table, then you could retry the job manually.

module VideoStreamer
  class ProcessVideoJob < Struct.new(:video_id)
    def enqueue(job)
      # ... #
    end

    def success(job)
      # ... #
    end

    def error(job, exception)
      # ... #
    end

    def failure(job)
      update_status('failure')
      # Send email notification / alert / alarm / SMS / call ... whatever
    end

    private

    # ... #
  end
end

Implement Perform Hook – Delegate – Raise

This hook is the most crucial, otherwise the job won’t do anything. Your implementation should be very simple and delegate the actual work to a model or other service object. The real implementation should not be part of the perform implementation. This will make sure that you can execute the logic of the implementation without necessarily having an instance of a delayed job. Also, it can be easier for you to test the logic in a unit test. So, as a bare minimum:

module VideoStreamer
  class ProcessVideoJob < Struct.new(:video_id)
    def enqueue(job)
      # ... #
    end

    def success(job)
      # ... #
    end

    def error(job, exception)
      # ... #
    end

    def failure(job)
      # ... #
    end

    def perform
      video = VideoSteamer::Video.find video_id
      video.process!
    end

    private

    # ... #
  end
end

It is absolutely necessary to raise any exceptions so they can be handled by the worker (which will call error and failure, as appropriate). Do not swallow any exceptions that perform might raise. In the above example, video.process! might raise an exception that I allow to bubble up. Same goes for locating the business entity. I use #find() and give the video_id, which raises an exception if the business entity is not found. Don’t use, for example, find_by_id() which does not raise such an exception.

If the service object performing the task does not raise an error when needed (maybe it returns false), perform should raise an error.

def perform
  video = VideoSteamer::Video.find video_id
  raise StandardError.new("Failed to process video with id: #{video.id}") unless video.process?
end

Managing Jobs via Rails Console

Delayed Job includes script interface to start/stop jobs. But, there are cases in which I want to stop running workers and start/stop specific jobs manually. I usually do that when I am in the development environment, where I rarely have background runners running. I run them manually from the console using specific Delayed::Job API calls. Also, in production environments, I have been in situations in which I had to stop running workers and do fine grained manual operation of the jobs, using the same technique.

Here are some Delayed::Job API commands that I find useful in such situations:

Query delayed_jobs Table with Corresponding Model

I use the Delayed::Job model to query the delayed_jobs table. For example, the following returns jobs that belong to the email_notification queue that have raised an error:

Delayed::Job.where(queue: 'email_notifications').where.not(last_error: nil)

Understand, you can use any of the columns in your delayed_jobs table to build your where clause.

Which Class Will Handle My Job?

When I want to see which class will handle a job (usually one that failed), I query for the handler:

handler = Delayed::Job.last.handler

This is a YAML serialized object instance.

Custom Delayed Jobs

Here is an example output for my VideoStreamer::ProcessVideoJob:

"--- !ruby/struct:VideoStreamer::ProcessVideoJob\nvideo_id: 68\n"

This output tells me that the particular delayed job instance is a task to process the video with ID = 68.

Application Mailers

If you send your emails asynchronously, then the YAML serialization is a bit different:

"--- !ruby/object:Delayed::PerformableMailer\nobject: !ruby/class 'UserNotifierMailer'\nmethod_name: :new_user_registration\nargs:\n- 35\n"

All the serialized mailers are !rubyobject:Delayed::PerformableMailer followed by the mailer class from your app (see !ruby/class 'UserNotifierMailer'). You can see the actual method that is used to send the email (new_user_registration) and its arguments (35).

Devise Mailers

If you send your devise emails asynchronously (I am using the devise-async gem to do that), then the YAML serialization looks something like this:

"--- !ruby/object:Delayed::PerformableMethod\nobject: !ruby/object:Devise::Async::Backend::DelayedJob {}\nmethod_name: :perform\nargs:\n- :confirmation_instructions\n- User\n- '317'\n- daD9bVQ2d_kaR3abyS7X\n- {}\n"

Again, you can see which email might be failing along with its run time arguments. In the example above, the email that fails is the one that sends confirmation instructions to user with ID = 317.

Deserializing Different Jobs on Same Queue

If you have different job types in the same queue, then it will be difficult to manage the jobs grouped by the job type. For example, you may want to count the number of queued jobs per type on a queue that is called solr_indexing that handles various classes for background indexing.

If the only information that you have on your delayed_jobs table is the handler (i.e. the delayed_reference_type does not help), you will have to work with … the handler.

The point is that handler stores different serialized objects according to the class serialized as we discussed above.

The YAML.load(dj.handler) will deserialize your serialized object.

Custom Delayed Jobs

If the queued job is a custom delayed job

dj = Delayed::Job.last
dj.handler                   # Assume: "--- !ruby/struct:VideoStreamer::ProcessVideoJob\nvideo_id: 68\n"
job = YAML.load(dj.handler)  # +job+ will be instance of +VideoStreamer::ProcessVideoJob+ struct with
                             # +video_id+ attribute set to +68+

Application Mailers

If it is a mailer object, the instantiated job is a Delayed::PerformableMailer:

=> #<Delayed::PerformableMailer:0x0000000a050898 @object=UserNotifierMailer, @method_name=:new_user_registration, @args=[35]>

This handler, as you can see, responds to object, which is the actual mailer instance, method_name, which is the mailer instance method that will be used to send the email, and args, which contains the runtime arguments to the method.

Devise Mailers

When the handler is a Devise mailer, there is one more level of abstraction. The job is of type Delayed::PerformableMethod:

=> #<Delayed::PerformableMethod:0x0000000a0bb800 @object=#<Devise::Async::Backend::DelayedJob:0x0000000a0bf090>, @method_name=:perform, @args=[:confirmation_instructions, "User", "317", "daD9bVQ2d_kaR3abyS7X", {}]>

This one, as you can see, responds to object, which is a instance of Devise::Async::Backend::DelayedJob, the method_name, which is the method to call on this instance, and the args, which contains the run-time arguments to this method. This args array contains the actual email method and its real arguments.

The YAML.load(dj.handler) might return different object types and you might need to implement some kind of is_a?(....) logic if you want to write a script that operates on all or part of the jobs that belong to the same queue.

What Was the Exception for an Errored Job?

When I want to see the exception details for a job, I inspect the last_error column.

Run a Job from the Rails Console Without Queuing

Assuming that you have a job and you want to run it manually from the rails console, but, you do not want it to be put in the delayed job queue lifecycle loop:

# This will run your job but will not go through the delayed job lifecycle loop
job = VideoStreamer::ProcessVideoJob.new(68)
job.perform

It will work, but no registered hooks will be executed.

I rarely use this method, but it has come in handy on occasion.

Instantiate a Worker Within Rails Console

This is very handy when you want to run a failed job again, but you want it done through the console and not through background running workers:

dw = Delayed::Worker.new

Easy, eh?

Run a Failed Job from Within Rails Console

Assuming that you have a failed job and you want to run it again manually, from within the console:

dw = Delayed::Worker.new
dj = Delayed::Job.last # assuming that the last job is the failed one, otherwise use a proper query to
                       # locate it
dw.run dj

That’s it. Your worker will run the job call the corresponding error and failure hooks if the job fails again. If it succeeds, it will delete it from the queue.

Managing Jobs Using the Web Interface

Use the delayed_job_web interface to have access to your queued jobs. Add the following line to your routes:

mount DelayedJobWeb => "/delayed_job"

This will allow you to access the management interface using an address like https://www.myapp.com/delayed_job.

I also have a delayed_job_web.rb initializer (in my config/initializers folder):

DelayedJobWeb.use Rack::Auth::Basic do |username, password|
  # authenticate
  user = User.find_by_username(username)
  return false unless user.authenticate(password)

  # authorize. I am using cancancan for authorization. You can use any other authorization gem you see fit.
  ability = Ability.new(user)
  can = ability.can? :manage, Delayed::Job
  raise CanCan::AccessDenied unless can
  true
end

This allows me to authenticate and authorize the request to access /delayed_job.

Testing with Delayed Job

Do Not Queue or Mock When Testing

When testing your application code (with any kind of tests, unit, or integration tests or UI tests), do not queue or mock your delayed job tasks. This might seem a strange practice to you , but to me, it has been proven invaluable. I need to see whether they break or not. If the cases where a task takes too long or has recursion, I might decide to mock that job. But in general, I do not mock my jobs.

To get Delayed Job to simply execute your task and not queue it, do the following in the initializer:

Delayed::Worker.delay_jobs = !%w[ test ].include?(Rails.env)

The above one will not queue jobs in the test environment.

Disable Immediate Execution When Testing

However, there might be tests where you want to test the queuing functionality. I wrap these tests with special tags and ask Delayed Job to queue them up.

RSpec

When using RSpec, I have an around(:each) configuration that allows the use of a :delayed_job tag:

# spec/spec_helper.rb

RSpec.configure do |config|
  # ... other config here ... #

  config.around(:each, :delayed_job) do |example|
    old_value = Delayed::Worker.delay_jobs
    Delayed::Worker.delay_jobs = true
    Delayed::Job.destroy_all

    example.run

    Delayed::Worker.delay_jobs = old_value
  end

  # ... other config here ... #
end

I am enabling the queuing of jobs and deleting any existing jobs before the example runs. After the example run, I set the delayed job queuing back to what it was.

Now, when I want to write a spec that uses delayed job queuing, I do the following:

it 'should queue the job`, delayed_job: true do
  ...
end

Cucumber

# features/support/hooks.rb
Around('@delayed_job') do |scenario, block|
  old_value = Delayed::Worker.delay_jobs
  Delayed::Worker.delay_jobs = true
  Delayed::Job.destroy_all

  block.call

  Delayed::Worker.delay_jobs = old_value
end

Similar to the RSpec configuration, I am using an Around hook with tag @delayed_job. Then, I tag the Scenarios that I want to use real queuing:

@delayed_job
Scenario: As a User when I sign up there is a new user registration email queued

Tagged Logging

Rails supports tagged logging, as you probably know. I have configured Delayed Job to use tagged logging, too.

In order to do that, I use the Delayed::Plugin technique:

module Delayed
  module Plugins
    class TaggedLogging < Delayed::Plugin
      Delayed::Worker.logger = Rails.logger

      callbacks do |lifecycle|
        lifecycle.around(:execute) do |worker, *args, &block|
          Rails.logger.tagged "Worker:#{worker.name_prefix.strip}", "Queues:#{worker.queues.join(',')}" do
            block.call(worker, *args)
          end
        end

        lifecycle.around(:invoke_job) do |job, *args, &block|
          Rails.logger.tagged "Job:#{job.id}" do
            block.call(job, *args)
          end
        end
      end
    end
  end
end

As you can see, I am catching the hooks around(:execute) and around(:invoke_job) and using Rails.logger to implement tagged logging. I log the worker name, queues, and the job id.

Don’t forget to register your Delayed::Plugin subclass in your delayed_job initializer with:

Delayed::Worker.plugins << Delayed::Plugins::TaggedLogging

By the way, if you want to know which events you can hook to, see this line here.

Conclusion

In this article, I have presented some of the practices that I use to tackle my background jobs using Delayed Job gem. I hope that you find some, if not all, of these tips useful.

  • Gert

    A clear overview with useful examples of the feature set of Delayed Job, thanks!

  • Nidhal Jammali

    Es ce que c’est possible que delayed_job traite en 1er les requêtes en ordre de création et que ça ignore l’attribut run_at

    • Panayotis Matsinopoulos

      I guess that you want to change the order the jobs are processed. According to created_at instead of run_at. I do not know whether this is possible. Sorry.

  • Divya Konda

    Hi!

    Thanks a lot for this article!
    I am new to delayed_job, how do I enqueue my job onto my queue? If I call perform directly, it executes the perform method in the foreground. How do I execute perform in the background queue?

    Thanks!

Recommended

Learn Coding Online
Learn Web Development

Start learning web development and design for free with SitePoint Premium!

Get the latest in Ruby, once a week, for free.