diff --git a/lib/delayed/monitor.rb b/lib/delayed/monitor.rb index 20a3f77..ebe768b 100644 --- a/lib/delayed/monitor.rb +++ b/lib/delayed/monitor.rb @@ -20,9 +20,11 @@ class Monitor def initialize @jobs = Job.group(priority_case_statement).group(:queue) @jobs = @jobs.where(queue: Worker.queues) if Worker.queues.any? + @memo = {} end def run! + @memo = {} ActiveSupport::Notifications.instrument('delayed.monitor.run', default_tags) do METRICS.each { |metric| emit_metric!(metric) } end @@ -67,7 +69,11 @@ def default_tags end def count_grouped - jobs.count + if Job.connection.supports_partial_index? + failed_count_grouped.merge(jobs.live.count) { |_, l, f| l + f } + else + jobs.count + end end def future_count_grouped @@ -75,7 +81,7 @@ def future_count_grouped end def locked_count_grouped - jobs.claimed.count + @memo[:locked_count_grouped] ||= jobs.claimed.count end def erroring_count_grouped @@ -83,7 +89,7 @@ def erroring_count_grouped end def failed_count_grouped - jobs.failed.count + @memo[:failed_count_grouped] ||= jobs.failed.count end def max_lock_age_grouped @@ -109,16 +115,16 @@ def workable_count_grouped jobs.claimable.count end - def working_count_grouped - jobs.claimed.count - end + alias working_count_grouped locked_count_grouped def oldest_locked_job_grouped - jobs.claimed.select("#{priority_case_statement} AS priority, queue, MIN(locked_at) AS locked_at") + jobs.claimed + .select("#{priority_case_statement} AS priority, queue, MIN(locked_at) AS locked_at") end def oldest_workable_job_grouped - jobs.claimable.select("(#{priority_case_statement}) AS priority, queue, MIN(run_at) AS run_at") + @memo[:oldest_workable_job_grouped] ||= jobs.claimable + .select("(#{priority_case_statement}) AS priority, queue, MIN(run_at) AS run_at") end def priority_case_statement diff --git a/spec/delayed/__snapshots__/monitor_spec.rb.snap b/spec/delayed/__snapshots__/monitor_spec.rb.snap index 0514254..8540599 100644 --- a/spec/delayed/__snapshots__/monitor_spec.rb.snap +++ b/spec/delayed/__snapshots__/monitor_spec.rb.snap @@ -4,6 +4,17 @@ SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0 AND priority < 20 THEN 10 WHEN priority >= 20 AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue FROM \"delayed_jobs\" + WHERE \"delayed_jobs\".\"failed_at\" IS NOT NULL + GROUP BY CASE WHEN priority >= 0 + AND priority < 10 THEN 0 WHEN priority >= 10 + AND priority < 20 THEN 10 WHEN priority >= 20 + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"delayed_jobs\".\"queue\" +SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0 + AND priority < 10 THEN 0 WHEN priority >= 10 + AND priority < 20 THEN 10 WHEN priority >= 20 + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue + FROM \"delayed_jobs\" + WHERE \"delayed_jobs\".\"failed_at\" IS NULL GROUP BY CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 @@ -17,7 +28,15 @@ GroupAggregate (cost=...) -> Sort (cost=...) Output: (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue - -> Seq Scan on public.delayed_jobs (cost=...) + -> Index Only Scan using idx_delayed_jobs_failed on public.delayed_jobs (cost=...) + Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue +GroupAggregate (cost=...) + Output: count(*), (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue + Group Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue + -> Sort (cost=...) + Output: (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue + Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue + -> Index Only Scan using idx_delayed_jobs_live on public.delayed_jobs (cost=...) Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue SNAP @@ -30,6 +49,16 @@ GroupAggregate (cost=...) Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue -> Seq Scan on public.delayed_jobs (cost=...) Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue + Filter: (delayed_jobs.failed_at IS NOT NULL) +GroupAggregate (cost=...) + Output: count(*), (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue + Group Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue + -> Sort (cost=...) + Output: (CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END), queue + Sort Key: (CASE WHEN ((delayed_jobs.priority >= 0) AND (delayed_jobs.priority < 10)) THEN 0 WHEN ((delayed_jobs.priority >= 10) AND (delayed_jobs.priority < 20)) THEN 10 WHEN ((delayed_jobs.priority >= 20) AND (delayed_jobs.priority < 30)) THEN 20 WHEN (delayed_jobs.priority >= 30) THEN 30 ELSE NULL::integer END), delayed_jobs.queue + -> Seq Scan on public.delayed_jobs (cost=...) + Output: CASE WHEN ((priority >= 0) AND (priority < 10)) THEN 0 WHEN ((priority >= 10) AND (priority < 20)) THEN 10 WHEN ((priority >= 20) AND (priority < 30)) THEN 20 WHEN (priority >= 30) THEN 30 ELSE NULL::integer END, queue + Filter: (delayed_jobs.failed_at IS NULL) SNAP snapshots["runs the expected postgresql query for future_count 1"] = <<-SNAP @@ -400,6 +429,17 @@ SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0 AND priority < 20 THEN 10 WHEN priority >= 20 AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue FROM \"delayed_jobs\" + WHERE \"delayed_jobs\".\"failed_at\" IS NOT NULL + GROUP BY CASE WHEN priority >= 0 + AND priority < 10 THEN 0 WHEN priority >= 10 + AND priority < 20 THEN 10 WHEN priority >= 20 + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"delayed_jobs\".\"queue\" +SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0 + AND priority < 10 THEN 0 WHEN priority >= 10 + AND priority < 20 THEN 10 WHEN priority >= 20 + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"delayed_jobs\".\"queue\" AS delayed_jobs_queue + FROM \"delayed_jobs\" + WHERE \"delayed_jobs\".\"failed_at\" IS NULL GROUP BY CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 @@ -407,13 +447,17 @@ SELECT COUNT(*) AS count_all, CASE WHEN priority >= 0 SNAP snapshots["produces the expected sqlite3 query plan for count 1"] = <<-SNAP -SCAN delayed_jobs +SCAN delayed_jobs USING INDEX idx_delayed_jobs_failed +USE TEMP B-TREE FOR GROUP BY +SCAN delayed_jobs USING INDEX idx_delayed_jobs_live USE TEMP B-TREE FOR GROUP BY SNAP snapshots["[legacy index] produces the expected sqlite3 query plan for count 1"] = <<-SNAP SCAN delayed_jobs USE TEMP B-TREE FOR GROUP BY +SCAN delayed_jobs +USE TEMP B-TREE FOR GROUP BY SNAP snapshots["runs the expected sqlite3 query for future_count 1"] = <<-SNAP diff --git a/spec/delayed/monitor_spec.rb b/spec/delayed/monitor_spec.rb index b2187eb..5675173 100644 --- a/spec/delayed/monitor_spec.rb +++ b/spec/delayed/monitor_spec.rb @@ -293,24 +293,24 @@ end end - def query_for(metric) + def queries_for(metric) monitor.query_for(metric) - QueryUnderTest.for(queries.first || raise("Expected a query for #{metric}, but none was executed")) + queries.map { |q| QueryUnderTest.for(q) } end described_class::METRICS.each do |metric| context "('#{metric}')" do it "runs the expected #{current_adapter} query for #{metric}" do - expect(query_for(metric).formatted).to match_snapshot + expect(queries_for(metric).map(&:formatted).join("\n")).to match_snapshot end it "produces the expected #{current_adapter} query plan for #{metric}" do - expect(query_for(metric).explain).to match_snapshot + expect(queries_for(metric).map(&:explain).join("\n")).to match_snapshot end context 'when using the legacy index', index: :legacy do it "[legacy index] produces the expected #{current_adapter} query plan for #{metric}" do - expect(query_for(metric).explain).to match_snapshot + expect(queries_for(metric).map(&:explain).join("\n")).to match_snapshot end end end