Handle Job Queue Workers Concurrency In Rails

Delayed Job and Resque are two very popular gems to handle long running tasks asynchronously in Rails. Regardless of their specific implementation details, the pattern remains the same. Operations are placed in a shared work queue and multiple workers consume the items from the queue and execute the operation.

Because multiple worker processes are involved, these systems handle concurrency well, ensuring that only one job gets processed by one and only one worker.

So no concurrency problem to worry about right? Well, it depends of the nature of the jobs placed in the queue and what triggers them to be enqueued in the first place.

Let's take an hypothetical service where users can purchase training courses in the form of pdfs. They setup their credit card once and every time they purchase a course online, their card gets charged accordingly and the pdf is delivered via email. Let's assume that the charging of the credit card and the personalization of the pdf are lengthy operations so they are being handled asynchronously in order to not block the web UI.

The service has low tolerance for freeloaders, so it will suspend an account immediately in the event the charge is declined and inform the user via email.

In this very contrived example, the synchronous code requesting the course might look like this:

# Make sure the user's account is in good standing
unless account.suspended?
  Fullfilment.send_later(:charge_and_send, account, course)
end

A worker performing the task might look like:

def charge_and_send(account, course)
  if !account.suspended?
    if CreditCardProcessor.charge(account, course)
      course.personalize_for(account)
      Notification.deliver_course(account, course)
    else
      account.suspended = true
      account.suspended_at = Time.now
      account.save
      Notification.deliver_account_suspended_notification(account)
    end
  end
end

Everything seems to be working fine. Ultimately the code gets deployed to a staging environment and QA starts hammering on it. Suddenly, there are reports of account suspension emails being sent multiple times. What's going on?

Well, it turns out that a concurrency issue exists and unless you run multiple workers in your development environment, you won't experience it. The problem is that there could be multiple job enqueued for the same account, for instance a user requesting several courses at once.

Because there are many workers, two or more workers could simultaneously be processing their respective jobs. There is a race condition on account.suspended and under the right circumstances, each worker would pass the account.suspended? check and each end up in the else clause, resulting in the email being sent multiple times.

Guard Against Concurrency

Concurrency issues are easily solved by identifying the critical section in the code and protecting it with some synchronization primitive.

A simple approach is to use the native database row locking. In this case, it would be applied against the account table. Active Record pessimistic lock provides a wrapper to the native "FOR UPDATE" SQL option that will ensure that other processes are blocked until the operation is completed.

A transaction block is necessary to determine when to release the lock. Because potential deadlock could occur, an exception could be raised after a certain timeout. Make sure to handle this properly, perhaps by rescheduling the work item at a later time.

def charge_and_send(account, course)
  begin
    Account.transaction do
    
      # Attempts to acquire an exclusive lock on the row
      Account.find(account.id, :lock => true)

      if !account.suspended?
        if CreditCardProcessor.charge(account, course)
          course.personalize_for(account)
          Notification.deliver_course(account, course)
        else
          account.suspended = true
          account.suspended_at = Time.now
          account.save
          Notification.deliver_account_suspended_notification(account)
        end
      end 
    end
  rescue Mysql::Error => exc
    # Handle "Mysql::Error: Lock wait timeout exceeded; try restarting transaction"
  end
end

Simulate Workers for Testing

Now that we've added appropriate measures to guard against this situation, how do we verify that it's fixed properly? Race conditions are very difficult to track down because they are intermittent. We need a deterministic way to cause the problem to happen and validate the fix.

We can simulate multiple workers by simply using forked processes to execute the method invoked by the workers. Each forked process needs to get its own active record database connection, otherwise the lock won't work.

The following helper method encapsulates all of this for ease of use in a test.

# Helper method to fork a process with proper database connection.
# This only works on *nix OSes as it relies on pipes to serialize and re-raise exception
# to parent process.
def fork_with_new_connection(config, klass = ActiveRecord::Base)
  readme, writeme = IO.pipe

  pid = Process.fork do
    value = nil
    begin
      klass.establish_connection(config)
      yield
    rescue => e
      value = e
    ensure
      klass.remove_connection
    end

    writeme.write Marshal.dump(value)

    readme.close
    writeme.close

    # Prevent rspec from autorunning in child process
    at_exit { exit! }
  end

  writeme.close
  Process.waitpid(pid)

  if exception = Marshal.load(readme.read)
    raise exception
  end

  readme.close
end

Then it can be simply used in a spec.

describe "Multiple job workers" do
  it "should not create concurrency problems" do
    config = ActiveRecord::Base.remove_connection       
        (1..5).each do
          fork_with_new_connection(config) {
            
            # Invoke worker entry point method
            
          }
        end
        ActiveRecord::Base.establish_connection(config)

        # Assert expected result
  end
end

A big gotcha with this technique is that it needs to work around Rspec's way of running the specs. By default Rspec hooks into the at_exit kernel method to run the entire suite. Unless you remove that hook, each child process will cause the entire suite to run again. Simply overriding the hook does the trick for Rspec 1.x. Rspec 2.x might use a different mechanism, so your mileage may vary with that specific way of doing it.

Limitations

Workers are now executed in a forked process, which is the intended behavior. However, in that context, mock expectations will not work and you can't easily share variables to validate results.

One way to handle this is to extend the io pipe communication to return richer data to validate the test or alternatively use a shared key-value pair store like redis to stash results that can be asserted subsequently.

There has been some attempts to allow mocks in child process. Check out rspec process mocks.

Conclusion

Asynchronous work queues are essential to any scalable Rails application. Because there are multiple processes running concurrently, carefully review your business logic to ensure that there are no critical sections that would result in duplicate operations. This is mostly an issue when interfacing with external systems such as email or 3rd party apis that are non-idempotent and thus calling them multiple times would result in undesirable "side effects".

09/26/12: Updated examples to correct erroneous logic.

Talk Back