From 52b3d7bc429c96a14521c6b43d06857ed22c3c5f Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Sat, 29 Nov 2025 10:12:15 +0100 Subject: [PATCH 1/2] Prevent blocked jobs from being released while job is still executing When a job runs longer than its concurrency_duration, the semaphore expires and gets deleted by the dispatcher's concurrency maintenance. Previously, BlockedExecution.releasable would consider a concurrency key releasable when its semaphore was missing, causing blocked jobs to be released and run concurrently with the still-executing job. This violated the concurrency guarantee: with limits_concurrency set to 1, two jobs with the same concurrency key could run simultaneously if the first job exceeded its concurrency_duration. Fix: Check for claimed executions before marking a concurrency key as releasable. A key is only releasable when: - Its semaphore is missing OR has available slots, AND - No jobs with that key are currently being executed (claimed) This ensures concurrency limits are respected even when jobs exceed their configured duration. --- app/models/solid_queue/blocked_execution.rb | 11 ++++- test/integration/concurrency_controls_test.rb | 43 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 4ad4d239..999d49a4 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -36,8 +36,17 @@ def release_one(concurrency_key) def releasable(concurrency_keys) semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).to_h + # Concurrency keys that have jobs currently being executed should not be released, + # even if their semaphore expired. This prevents duplicate job execution when + # jobs run longer than their concurrency_duration. + executing_keys = Job.joins(:claimed_execution) + .where(concurrency_key: concurrency_keys) + .distinct + .pluck(:concurrency_key) + # Concurrency keys without semaphore + concurrency keys with open semaphore - (concurrency_keys - semaphores.keys) | semaphores.select { |_key, value| value > 0 }.keys + # MINUS keys that have jobs currently executing + ((concurrency_keys - semaphores.keys) | semaphores.select { |_key, value| value > 0 }.keys) - executing_keys end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index b3be95c5..6c02ba12 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -162,6 +162,49 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, ("A".."K").to_a end + test "don't unblock jobs when semaphore expires but job is still executing" do + # This tests a race condition where a job runs longer than concurrency_duration, + # causing the semaphore to expire while the job is still running. Without the fix, + # blocked jobs would be released, causing duplicate concurrent executions. + + # Start a long-running job + NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 3.seconds) + + # Wait for it to be claimed + wait_while_with_timeout(2.seconds) { SolidQueue::ClaimedExecution.count == 0 } + first_job = SolidQueue::Job.last + assert first_job.claimed?, "First job should be claimed" + + # Enqueue more jobs with the same concurrency key - they'll be blocked + assert_difference -> { SolidQueue::BlockedExecution.count }, +3 do + ("B".."D").each do |name| + NonOverlappingUpdateResultJob.perform_later(@result, name: name) + end + end + + # Simulate the semaphore and blocked executions expiring while job A is still running + skip_active_record_query_cache do + SolidQueue::Semaphore.where(key: first_job.concurrency_key).update_all(expires_at: 15.minutes.ago) + SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) + end + + # Give the dispatcher time to run concurrency maintenance + sleep(1.5) + + # The blocked jobs should NOT have been released because job A is still executing + skip_active_record_query_cache do + assert first_job.reload.claimed?, "First job should still be claimed" + assert_equal 3, SolidQueue::BlockedExecution.count, "Blocked jobs should not be released while job is executing" + end + + # Now wait for everything to complete normally + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + + # All jobs should have executed in sequence + assert_stored_sequence @result, ("A".."D").to_a + end + test "don't block claimed executions that get released" do NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds) job = SolidQueue::Job.last From 3ece5f860be00d5eae9786d789544f5caa2b902a Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Sat, 29 Nov 2025 10:22:30 +0100 Subject: [PATCH 2/2] Respect concurrency limits when releasing claimed executions during shutdown When a worker gracefully shuts down, claimed executions are released back to ready state via ClaimedExecution#release. Previously, this always used dispatch_bypassing_concurrency_limits, which ignored whether another job with the same concurrency key was already running. This could cause duplicate concurrent executions in the following scenario: 1. Job A starts running with concurrency key "X" (semaphore value = 0) 2. Time passes, semaphore expires and is deleted 3. Job B with same key "X" enqueues, creates new semaphore, starts running 4. Worker running Job A receives shutdown signal 5. Job A is released via dispatch_bypassing_concurrency_limits 6. Job A goes to ready state, gets picked up by another worker 7. Both Job A and Job B now running concurrently (violates limit!) Fix: Before releasing a job, check if any other jobs with the same concurrency key are currently executing. If so, go through normal dispatch which respects the concurrency policy (block or discard). If not, continue to bypass limits for performance. This ensures that graceful shutdown doesn't violate concurrency guarantees, even when semaphores have expired during long-running job execution. --- app/models/solid_queue/claimed_execution.rb | 17 ++++++++++++++++- .../solid_queue/claimed_execution_test.rb | 18 ++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 5d0a4057..651741ca 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -78,7 +78,13 @@ def perform def release SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do transaction do - job.dispatch_bypassing_concurrency_limits + if other_executions_holding_concurrency_lock? + # Another job with same concurrency key is already running. + # Go through normal dispatch which respects concurrency limits. + job.dispatch + else + job.dispatch_bypassing_concurrency_limits + end destroy! end end @@ -113,4 +119,13 @@ def finished destroy! end end + + def other_executions_holding_concurrency_lock? + return false unless job.concurrency_limited? + + SolidQueue::Job.joins(:claimed_execution) + .where(concurrency_key: job.concurrency_key) + .where.not(id: job.id) + .exists? + end end diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 98513c94..97108d9d 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -62,6 +62,24 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase assert job.reload.ready? end + test "release bypasses concurrency limits when no other job with same key is executing" do + job_result = JobResult.create!(queue_name: "default", status: "") + + # Create Job A with concurrency limit and claim it + job_a = DiscardableUpdateResultJob.perform_later(job_result, name: "A") + solid_queue_job_a = SolidQueue::Job.find_by(active_job_id: job_a.job_id) + SolidQueue::ReadyExecution.claim(solid_queue_job_a.queue_name, 1, @process.id) + claimed_execution_a = SolidQueue::ClaimedExecution.find_by(job_id: solid_queue_job_a.id) + assert claimed_execution_a + + # Release job A - no other job with same key is running, so it should go to ready + assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::ReadyExecution.count } => 1 do + claimed_execution_a.release + end + + assert solid_queue_job_a.reload.ready? + end + test "fail with error" do claimed_execution = prepare_and_claim_job AddToBufferJob.perform_later(42) job = claimed_execution.job