Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions lib/delayed/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ class Monitor
def initialize
@jobs = Job.group(priority_case_statement).group(:queue)
@jobs = @jobs.where(queue: Worker.queues) if Worker.queues.any?
@memo = {}
end

def run!
@memo = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a bit of a stretch, but this makes me think that a separate class that can calc the metrics might be helpful, and this class is more in charge of the looping/sleeping mechanics. then we don't need this overwritable internal state, we just new up a new metric-calc-class during each call to run!. I see that Runnable handles some of this, but i guess organizing it that way means we have this notion of resettable state which is a bit smelly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed -- I was also starting to think about how we could reimagine Runnable to take on less of the whole lifecycle of Delayed::Monitor and Delayed::Worker kinds of classes.

If I keep pulling on that thread, I imagine that this smelliness will go away, but I didn't want to refactor too much within this one PR.

ActiveSupport::Notifications.instrument('delayed.monitor.run', default_tags) do
METRICS.each { |metric| emit_metric!(metric) }
end
Expand Down Expand Up @@ -67,23 +69,27 @@ def default_tags
end

def count_grouped
jobs.count
if Job.connection.supports_partial_index?
failed_count_grouped.merge(jobs.live.count) { |_, l, f| l + f }
else
jobs.count
end
end

def future_count_grouped
jobs.future.count
end

def locked_count_grouped
jobs.claimed.count
@memo[:locked_count_grouped] ||= jobs.claimed.count
end

def erroring_count_grouped
jobs.erroring.count
end

def failed_count_grouped
jobs.failed.count
@memo[:failed_count_grouped] ||= jobs.failed.count
end

def max_lock_age_grouped
Expand All @@ -109,16 +115,16 @@ def workable_count_grouped
jobs.claimable.count
end

def working_count_grouped
jobs.claimed.count
end
alias working_count_grouped locked_count_grouped

def oldest_locked_job_grouped
jobs.claimed.select("#{priority_case_statement} AS priority, queue, MIN(locked_at) AS locked_at")
jobs.claimed
.select("#{priority_case_statement} AS priority, queue, MIN(locked_at) AS locked_at")
end

def oldest_workable_job_grouped
jobs.claimable.select("(#{priority_case_statement}) AS priority, queue, MIN(run_at) AS run_at")
@memo[:oldest_workable_job_grouped] ||= jobs.claimable
.select("(#{priority_case_statement}) AS priority, queue, MIN(run_at) AS run_at")
end

def priority_case_statement
Expand Down
48 changes: 46 additions & 2 deletions spec/delayed/__snapshots__/monitor_spec.rb.snap
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0
AND priority < 20 THEN 10 WHEN priority >= 20
AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue
FROM \"delayed_jobs\"
WHERE \"delayed_jobs\".\"failed_at\" IS NOT NULL
GROUP BY CASE WHEN priority >= 0
AND priority < 10 THEN 0 WHEN priority >= 10
AND priority < 20 THEN 10 WHEN priority >= 20
AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"delayed_jobs\".\"queue\"
SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0
AND priority < 10 THEN 0 WHEN priority >= 10
AND priority < 20 THEN 10 WHEN priority >= 20
AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue
FROM \"delayed_jobs\"
WHERE \"delayed_jobs\".\"failed_at\" IS NULL
GROUP BY CASE WHEN priority >= 0
AND priority < 10 THEN 0 WHEN priority >= 10
AND priority < 20 THEN 10 WHEN priority >= 20
Expand All @@ -17,7 +28,15 @@ GroupAggregate (cost=...)
-> Sort (cost=...)
Output: (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue
Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue
-> Seq Scan on public.delayed_jobs (cost=...)
-> Index Only Scan using idx_delayed_jobs_failed on public.delayed_jobs (cost=...)
Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue
GroupAggregate (cost=...)
Output: count(*), (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue
Group Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue
-> Sort (cost=...)
Output: (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue
Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue
-> Index Only Scan using idx_delayed_jobs_live on public.delayed_jobs (cost=...)
Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue
SNAP

Expand All @@ -30,6 +49,16 @@ GroupAggregate (cost=...)
Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue
-> Seq Scan on public.delayed_jobs (cost=...)
Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue
Filter: (delayed_jobs.failed_at IS NOT NULL)
GroupAggregate (cost=...)
Output: count(*), (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue
Group Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue
-> Sort (cost=...)
Output: (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue
Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue
-> Seq Scan on public.delayed_jobs (cost=...)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The [legacy index] case gets two Seq Scans instead of one. However, in practice, the @memo means that the monitor is still doing less work, as one of these Seq Scans is identical to a seq scan it already had to do to count failed rows.

Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue
Filter: (delayed_jobs.failed_at IS NULL)
SNAP

snapshots["runs the expected postgresql query for future_count 1"] = <<-SNAP
Expand Down Expand Up @@ -400,20 +429,35 @@ SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0
AND priority < 20 THEN 10 WHEN priority >= 20
AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue
FROM \"delayed_jobs\"
WHERE \"delayed_jobs\".\"failed_at\" IS NOT NULL
GROUP BY CASE WHEN priority >= 0
AND priority < 10 THEN 0 WHEN priority >= 10
AND priority < 20 THEN 10 WHEN priority >= 20
AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"delayed_jobs\".\"queue\"
SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0
AND priority < 10 THEN 0 WHEN priority >= 10
AND priority < 20 THEN 10 WHEN priority >= 20
AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue
FROM \"delayed_jobs\"
WHERE \"delayed_jobs\".\"failed_at\" IS NULL
GROUP BY CASE WHEN priority >= 0
AND priority < 10 THEN 0 WHEN priority >= 10
AND priority < 20 THEN 10 WHEN priority >= 20
AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"delayed_jobs\".\"queue\"
SNAP

snapshots["produces the expected sqlite3 query plan for count 1"] = <<-SNAP
SCAN delayed_jobs
SCAN delayed_jobs USING INDEX idx_delayed_jobs_failed
USE TEMP B-TREE FOR GROUP BY
SCAN delayed_jobs USING INDEX idx_delayed_jobs_live
USE TEMP B-TREE FOR GROUP BY
SNAP

snapshots["[legacy index] produces the expected sqlite3 query plan for count 1"] = <<-SNAP
SCAN delayed_jobs
USE TEMP B-TREE FOR GROUP BY
SCAN delayed_jobs
USE TEMP B-TREE FOR GROUP BY
SNAP

snapshots["runs the expected sqlite3 query for future_count 1"] = <<-SNAP
Expand Down
10 changes: 5 additions & 5 deletions spec/delayed/monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -293,24 +293,24 @@
end
end

def query_for(metric)
def queries_for(metric)
monitor.query_for(metric)
QueryUnderTest.for(queries.first || raise("Expected a query for #{metric}, but none was executed"))
queries.map { |q| QueryUnderTest.for(q) }
end

described_class::METRICS.each do |metric|
context "('#{metric}')" do
it "runs the expected #{current_adapter} query for #{metric}" do
expect(query_for(metric).formatted).to match_snapshot
expect(queries_for(metric).map(&:formatted).join("\n")).to match_snapshot
end

it "produces the expected #{current_adapter} query plan for #{metric}" do
expect(query_for(metric).explain).to match_snapshot
expect(queries_for(metric).map(&:explain).join("\n")).to match_snapshot
end

context 'when using the legacy index', index: :legacy do
it "[legacy index] produces the expected #{current_adapter} query plan for #{metric}" do
expect(query_for(metric).explain).to match_snapshot
expect(queries_for(metric).map(&:explain).join("\n")).to match_snapshot
end
end
end
Expand Down