diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 4ad4d239..999d49a4 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -36,8 +36,17 @@ def release_one(concurrency_key) def releasable(concurrency_keys) semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).to_h + # Concurrency keys that have jobs currently being executed should not be released, + # even if their semaphore expired. This prevents duplicate job execution when + # jobs run longer than their concurrency_duration. + executing_keys = Job.joins(:claimed_execution) + .where(concurrency_key: concurrency_keys) + .distinct + .pluck(:concurrency_key) + # Concurrency keys without semaphore + concurrency keys with open semaphore - (concurrency_keys - semaphores.keys) | semaphores.select { |_key, value| value > 0 }.keys + # MINUS keys that have jobs currently executing + ((concurrency_keys - semaphores.keys) | semaphores.select { |_key, value| value > 0 }.keys) - executing_keys end end diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 5d0a4057..651741ca 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -78,7 +78,13 @@ def perform def release SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do transaction do - job.dispatch_bypassing_concurrency_limits + if other_executions_holding_concurrency_lock? + # Another job with same concurrency key is already running. + # Go through normal dispatch which respects concurrency limits. + job.dispatch + else + job.dispatch_bypassing_concurrency_limits + end destroy! end end @@ -113,4 +119,13 @@ def finished destroy! end end + + def other_executions_holding_concurrency_lock? + return false unless job.concurrency_limited? + + SolidQueue::Job.joins(:claimed_execution) + .where(concurrency_key: job.concurrency_key) + .where.not(id: job.id) + .exists? + end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index b3be95c5..6c02ba12 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -162,6 +162,49 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, ("A".."K").to_a end + test "don't unblock jobs when semaphore expires but job is still executing" do + # This tests a race condition where a job runs longer than concurrency_duration, + # causing the semaphore to expire while the job is still running. Without the fix, + # blocked jobs would be released, causing duplicate concurrent executions. + + # Start a long-running job + NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 3.seconds) + + # Wait for it to be claimed + wait_while_with_timeout(2.seconds) { SolidQueue::ClaimedExecution.count == 0 } + first_job = SolidQueue::Job.last + assert first_job.claimed?, "First job should be claimed" + + # Enqueue more jobs with the same concurrency key - they'll be blocked + assert_difference -> { SolidQueue::BlockedExecution.count }, +3 do + ("B".."D").each do |name| + NonOverlappingUpdateResultJob.perform_later(@result, name: name) + end + end + + # Simulate the semaphore and blocked executions expiring while job A is still running + skip_active_record_query_cache do + SolidQueue::Semaphore.where(key: first_job.concurrency_key).update_all(expires_at: 15.minutes.ago) + SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) + end + + # Give the dispatcher time to run concurrency maintenance + sleep(1.5) + + # The blocked jobs should NOT have been released because job A is still executing + skip_active_record_query_cache do + assert first_job.reload.claimed?, "First job should still be claimed" + assert_equal 3, SolidQueue::BlockedExecution.count, "Blocked jobs should not be released while job is executing" + end + + # Now wait for everything to complete normally + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + + # All jobs should have executed in sequence + assert_stored_sequence @result, ("A".."D").to_a + end + test "don't block claimed executions that get released" do NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds) job = SolidQueue::Job.last diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 98513c94..97108d9d 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -62,6 +62,24 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase assert job.reload.ready? end + test "release bypasses concurrency limits when no other job with same key is executing" do + job_result = JobResult.create!(queue_name: "default", status: "") + + # Create Job A with concurrency limit and claim it + job_a = DiscardableUpdateResultJob.perform_later(job_result, name: "A") + solid_queue_job_a = SolidQueue::Job.find_by(active_job_id: job_a.job_id) + SolidQueue::ReadyExecution.claim(solid_queue_job_a.queue_name, 1, @process.id) + claimed_execution_a = SolidQueue::ClaimedExecution.find_by(job_id: solid_queue_job_a.id) + assert claimed_execution_a + + # Release job A - no other job with same key is running, so it should go to ready + assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::ReadyExecution.count } => 1 do + claimed_execution_a.release + end + + assert solid_queue_job_a.reload.ready? + end + test "fail with error" do claimed_execution = prepare_and_claim_job AddToBufferJob.perform_later(42) job = claimed_execution.job