Uncategorized

Ruby: Resque Jobs and Jitter with `resque-scheduler`

Resque is a background job processor for Ruby. Sometimes you need to do something that’ll take a long time and you don’t want that happening as part of the HTTP request lifecycle. It helps you do that.

But what happens when you want to do LOTS of things at once, you want to avoid the thundering herd problem by spreading that work out rather than doing it all at once.

Jitter is one way to do this, taking an example using Resque, say you want to recalculate something for products and you have 10,000 products. Here we can define a resque job to do the recalculation, taking in the product ID and then queue a job for each but we don’t want them to all start at the same time due to the load this would put on the DB. To fix that we add jitter, saying start these jobs somewhere between now and 120seconds time, picking a random duration per job.

To do this for Resque we can use resque-scheduler, which lets us queue work for the future, along with before_enqueue resque hook to let a job declare its jitter with, for example, @jitter_milliseconds = 10 on the job class.

Here is an example of this all wired up (note: We ended up using a slightly different code for the prod implementation due to requirements on our side, be sure to test the below works for you before adopting).

# typed: strict
class ApplicationJob
extend T::Sig
sig { params(args: T.untyped).returns(T::Boolean) }
def self.before_enqueue_application_job(*args)
# Unique key for this job
job_key = Digest::MD5.hexdigest(resque.encode(class: self.to_s, args:))
jitter_milliseconds = jitter_milliseconds()
# Is jitter enabled for this job?
if jitter_milliseconds > 0
# Has the jitter wait already been done?
jitter_completed = resque.redis.get("resque:jitter:#{job_key}")
# If not, enqueue the job again with a random jitter delay
if !jitter_completed
# Track that we've done the jitter
resque.redis.setex("resque:jitter:#{job_key}", jitter_milliseconds * 3, 1)
# Requires unsafe because of the `*args` splat which isn't supported by sorbet.
resque.enqueue_in(jitter_milliseconds, self, *args)
return false
end
end
sig { returns(Integer) }
def self.jitter_milliseconds
return 0 unless self.instance_variable_defined?(:@jitter_milliseconds)
max_jitter = self.instance_variable_get(:@jitter_milliseconds)
rand(max_jitter)
end
end
class TestJitterJob < ApplicationJob
@queue = :dummy
@jitter_milliseconds = 10
def self.perform(id)
# noop
end
end

Standard

Leave a comment