Ruby
Article

What You Don’t Get With ActiveJob

By David Copeland

ressort inox

Rails 4.2 includes ActiveJob, which is a unified API for queuing and writing background jobs. ActiveJob provides simple solutions to two problems faced by Rails developers when writing background jobs: queueing jobs (via the ActiveJob API) and serialization of ActiveRecord objects (via GlobalID).

It doesn’t, however, help with the third problem: writing resilient jobs that can survive in a production environment. That’s where this post comes in.

First, let’s briefly review what ActiveJob is and what it provides.

What ActiveJob Gives You

ActiveJob is an adapter to various job queues such as Delayed Job, Resque, or Sidekiq. Although this allows you to “swap out” job queuing systems, the true power of ActiveJob is that it’s a single API that you can develop additional libraries, tooling, and practices around.

For example, you could develop a gem that queues jobs, safe in the knowledge that it would work in any Rails app.

Let’s see an example.

Suppose we have a job to charge a customer some money, called PurchaseJob. Queueing the job can be done via perform_later.

PurchaseJob.perform_later(customer,amount)

To implement the job, simply subclass ActiveJob::Base and provide a perform method.

class PurchaseJob < ActiveJob::Base
  def perform(customer,amount)
    # job logic here
  end
end

Usually, you’d have to queue the ID of the record on which to operate (in this case, a Customer) and deserialize it as the first step of your job. ActiveJob’s GlobalID handles all of this for you.

Using ActiveJob provides a great foundation for background processing, helping avoid some common mistakes. But, you are still on your own when dealing with the hardest part of background processing: managing failure.

Failure is Not an Option

Even at small scale, your background jobs will fail. Since the primary reason for running code in a background job is because the code is long running, background jobs tend to “attract” failure. Issues like network flakiness, system reboots (Heroku dynos, anyone?), and timeouts all effect background jobs significantly.

Let’s see how by imagining an implementation of our job from the previous section, PurchaseJob. The implementation will find a customer’s credit card, ask our third-party payment processor (a networked call over the Internet) to charge some amount of money on the card, and finally create a purchase record with the results of the charge.

class PurchaseJob < ActiveJob::Base
  def perform(customer,amount)
    credit_card = customer.credit_card

    result = PaymentProcessor.charge(credit_card.token,amount)

    if result.success?
      customer.purchases.create!(amount: amount,
                                success: true)
    else
      customer.purchases.create!(amount: amount,
                                success: false,
                                 reason: result.error_message)
    end
  end
end

Suppose during the call to PaymentProcessor.charge we experience a network timeout, and an exception is raised. Since we don’t know if the charge ultimately went through, we can’t just execute this job again, or the customer could be double-charged. But, we need to complete the purchasing process somehow.

Given the way our job is designed, someone will have to manually intervene to complete the purchase. This is obviously not scalable. The problem is that our job wasn’t designed with this sort of failure in mind, yet this failure is likely to happen on a regular basis.

ActiveJob provides no default failure handling strategy. It simply defers to whatever the underlying job queue would do. Even if it did provide a failure handling strategy, such as automatically retrying failed jobs, that won’t help us here.

We still need our jobs to be designed to handle failure.

There are three common techniques we can apply to the design of our jobs to deal with failure: idempotent jobs, decomposing jobs, and jobs where failure doesn’t matter.

Idempotent Jobs

Idempotence is:

the property of certain operations in mathematics and computer science, that can be applied multiple times without changing the result beyond the initial application.

This maps almost exactly to the problem we’re facing. If a job could be executed multiple times without changing the result beyond the initial execution, we could simply retry it endlessly until it completed.

Here’s a simple example of an idempotent job that generates a receipt as a PDF.

class ReceiptPdfJob < ActiveJob::Base
  def perform(customer_name,customer_email,amount,amount_tax)
    File.open("/receipts/#{purchase.id}.pdf","w") do pdf
      ReceiptPdf.new(for: "#{customer_name} (#{customer_email})"
                subtotal: amount,
                     tax: amount_tax,
             grand_total: amount + amount_tax).write!(pdf)
    end
  end
end

No matter how many times we execute this code, it will write to the same file with the same contents to the same location. If we were to experience a failure at any point in this job, we could safely just re-execute the job.

Our PurchaseJob, however, isn’t designed to allow this.

To make that job idempotent, we’d need to fundamentally redesign how we charge our customers. We’d need to “save our work” at each step so we could pick up where we left off.

To do that, we’ll need some more bookkeeping on our end that stores where we are in the process. If we get interrupted, we can look at our record of the transaction in progress to find out where to pick back up.

Our job will then work like so:

  1. Create a record, called a Transaction, in our database of the charge we’re about to make.
  2. Attempt the charge with our payment processor, including the id of our Transaction.
  3. When the charge completes, update our Transaction with the results.
  4. If we get interrupted, we can find the incomplete Transaction and ask the payment processor if it completed on their end.
  5. If so, we fetch the results and update our database. If not, we re-attempt the charge.

It’s a bit tricky.

class PurchaseJob < ActiveJob::Base
  def perform(customer,amount)
    # See if there is an incomplete transaction for
    # this customer and amount
    transaction = Transaction.where(customer: customer,
                                      amount: amount,
                                    complete: false).first


    if transaction.present?
      # if there is, find the analagous transaction
      # in our payment processor's system
      existing_payment = PaymentProcessor.payment(
        custom: { transaction_id: transaction.id }
      )
    else
      # if there isn't, create an in-progress one that we can find later
      transaction = Transaction.create!(customer: customer,
                                          amount: amount,
                                        complete: false)
    end

    unless existing_payment.present?
      # if there was no existing payment, actually charge the customer,
      # passing along our transaction ID as a customer data element
      existing_payment = PaymentProcessor.charge(
          credit_card.token,
          amount,
          custom: { transaction_id: transaction.id })
    end

    # complete the transaction and update the purchase record
    transaction.complete!(existing_payment)
    customer.purchases.create_from_transaction!(transaction)
  end
end

Holy cow! Our simple charging process just ballooned into a giant mess of code. But, it’s now bullet-proof.

You can mentally play through each line of code and, no matter what happens, restarting will correctly pick up where the failed job left off. This mental exercise is a great technique for checking if your job can be replayed. Assume that each line of code can blow up and that execution will resume at the top of the routine, as if each line is followed by rescue retry.

If the code will work in all of the circumstances, you’re good to go. If not, you’ll need to redesign it so it can.

This may seem painful, but for jobs that are crucial, such as charging people money, you have to take extra care.

One thing you’ll notice about this change is that we took a simple operation (charging a credit card) and broke it up into several smaller steps. We can use this technique in another way to make our jobs resilient to failure.

Decomposing Jobs into Smaller Jobs

When faced with complex code, a common technique is to decompose that code into reusable functions that make the overall routine easier to follow. We can do that with our background jobs as well.

Our newly-resilient purchase-charging code’s “happy path” is as follows:

  1. Create an in-progress transaction
  2. Charge the customer at our payment processor
  3. Update the in-progress transaction to show the completed charge

Instead of creating one complex job, what if we created three distinct jobs, each queuing the next, and each designed to be retried on its own?

First, we’d have a job that starts the process off. The purpose of this job is to find or create an in-progress transaction for the customer and amount.

class StartPurchaseTransactionJob < ActiveJob::Base
  def perform(customer,amount)
    transaction = Transaction.where(customer: customer,
                                      amount: amount,
                                    complete: false).first


    unless transaction.present?
      transaction = Transaction.create!(customer: customer,
                                          amount: amount,
                                        complete: false)
    end

    ChargeTransactionJob.perform_later(transaction)
  end
end

Ultimately, this job queues a ChargeTransactionJob. The purpose of that job is to get a completed charge from the payment processor, either by querying for an existing one, or by actually charging the customer.

class ChargeTransactionJob < ActiveJob::Base
  def perform(transaction)
    payment = PaymentProcessor.payment(
      custom: { transaction_id: transaction.id }
    )

    unless payment.present?
      payment = PaymentProcessor.charge(
          transaction.credit_card.token,
          transaction.amount,
          custom: { transaction_id: transaction.id })
    end

    transaction.complete!(payment)

    CompletePurchaseJob.perform_later(transaction)
  end
end

Finally, CompletePurchaseJob handles the “business logic” of what to do once a purchase has gone through.

class CompletePurchaseJob < ActiveJob::Base
  def perform(transaction)
    transaction.customer.purchases.create_from_transaction!(transaction)
  end
end

You’ll notice that each job is idempotent and can be safely replayed if it were to fail. The logic around “picking up where we left off” is still there, but now we have three simple jobs, each easy to understand. The jobs are also reusable. If we were to require charging customers a monthly fee for a subscription service, the ChargeTransactionJob could be easily reused, since its only input is a transaction.

There’s a third way of dealing with failure, which is to design our jobs so failure can be simply ignored.

Jobs that Ignore Failure

Ignoring the failure of a job seems like a bad idea. These jobs have been queued because they need to execute and need to complete. How can we simply ignore their failures?

If a job can

a) figure out what work needs to be done on its own and
b) be run on a regular schedule

then individual job failures can be ignored. This is because the next time the job is run, it will “catch up” on the lost work of the failed job.

Consider how we broke up our purchasing process in the previous section. The first job, StartPurchaseTransactionJob creates an “in-progress” transaction, while the second job, ChargeTransactionJob, completes such transactions.

We can modify ChargeTransactionJob so it processes all incomplete transactions, not just a particular one. Designed that way, we can simply schedule the job to run periodically and ignore the failures of any one job.

class ChargeIncompleteTransactionsJob < ActiveJob::Job
  def perform
    Transaction.incomplete.find_each do |transaction|
      payment = PaymentProcessor.payment(
        custom: { transaction_id: transaction.id }
      )

      unless payment.present?
        payment = PaymentProcessor.charge(
            transaction.credit_card.token,
            transaction.amount,
            custom: { transaction_id: transaction.id })
      end
      transaction.complete!(payment)

      CompletePurchaseJob.perform_later(transaction)
    end
  end
end

Assuming ChargeIncompleteTransactionsJob is queued periodically, it will process all incomplete transactions, even those left unprocessed by a previous job’s failure. As long the job doesn’t fail frequently, all the incomplete transactions will eventually be processed.

Not every task can be implemented in this way, but it’s a great pattern to use when a database query can determine what work needs to be done and where that work does not need completion in a timely fashion. For example, caches can be refreshed by querying for all items updated since the last run of the job.

In Conclusion

ActiveJob removes some of the friction involved in writing background jobs by providing a single API for queuing and writing jobs, however the hard work of dealing with failure is still a problem developers must solve.

Keep all this in mind when designing processes for background processing. Pay careful attention when extracting inlined code into background jobs. By using the techniques discussed here, your background jobs can run safely and quietly, withstanding the rigors of production use.

No Reader comments

Recommended

Learn Coding Online
Learn Web Development

Start learning web development and design for free with SitePoint Premium!

Get the latest in Ruby, once a week, for free.