1- require 'thread'
1+ require 'concurrent/errors'
2+ require 'concurrent/ivar'
23require 'concurrent/atomic/event'
34require 'concurrent/collection/priority_queue'
45require 'concurrent/executor/executor'
@@ -15,6 +16,57 @@ module Concurrent
1516 # @!macro monotonic_clock_warning
1617 class TimerSet < RubyExecutorService
1718
19+ # @!visibility private
20+ class Job < Concurrent ::IVar
21+
22+ # @!visibility private
23+ def initialize ( args , task )
24+ super ( )
25+ @args = args
26+ @task = task
27+ ensure_ivar_visibility!
28+ end
29+
30+ # @!visibility private
31+ def cancelled?
32+ state == :cancelled
33+ end
34+
35+ # @!visibility private
36+ def cancel
37+ if compare_and_set_state ( :cancelled , :pending )
38+ complete ( false , nil , CancelledOperationError . new )
39+ true
40+ else
41+ false
42+ end
43+ end
44+
45+ # @!visibility private
46+ def execute
47+ if compare_and_set_state ( :processing , :pending )
48+ success , val , reason = SafeTaskExecutor . new ( @task , rescue_exception : true ) . execute ( *@args )
49+ complete ( success , val , reason )
50+ end
51+ end
52+ end
53+ private_constant :Job
54+
55+ # A struct for encapsulating a task and its intended execution time.
56+ # It facilitates proper prioritization by overriding the comparison
57+ # (spaceship) operator as a comparison of the intended execution
58+ # times.
59+ #
60+ # @!visibility private
61+ Task = Struct . new ( :time , :job ) do
62+ include Comparable
63+
64+ def <=>( other )
65+ self . time <=> other . time
66+ end
67+ end
68+ private_constant :Task
69+
1870 # Create a new set of timed tasks.
1971 #
2072 # @!macro [attach] executor_options
@@ -48,16 +100,19 @@ def post(delay, *args, &task)
48100
49101 synchronize do
50102 return false unless running?
103+
104+ job = Job . new ( args , task )
51105
52106 if ( delay ) <= 0.01
53- @task_executor . post ( * args , & task )
107+ @task_executor . post { job . execute }
54108 else
55- @queue . push ( Task . new ( Concurrent . monotonic_time + delay , args , task ) )
109+ @queue . push ( Task . new ( Concurrent . monotonic_time + delay , job ) )
56110 @timer_executor . post ( &method ( :process_tasks ) )
57111 end
112+
58113 @condition . set
114+ job
59115 end
60- true
61116 end
62117
63118 # Begin an immediate shutdown. In-progress tasks will be allowed to
@@ -94,22 +149,6 @@ def self.calculate_delay!(delay)
94149
95150 protected
96151
97- # A struct for encapsulating a task and its intended execution time.
98- # It facilitates proper prioritization by overriding the comparison
99- # (spaceship) operator as a comparison of the intended execution
100- # times.
101- #
102- # @!visibility private
103- Task = Struct . new ( :time , :args , :op ) do
104- include Comparable
105-
106- def <=>( other )
107- self . time <=> other . time
108- end
109- end
110-
111- private_constant :Task
112-
113152 def ns_initialize ( opts )
114153 @queue = PriorityQueue . new ( order : :min )
115154 @task_executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
@@ -153,7 +192,7 @@ def process_tasks
153192 # queue now must have the same pop time, or a closer one, as
154193 # when we peeked).
155194 task = synchronize { @queue . pop }
156- @task_executor . post ( * task . args , & task . op )
195+ @task_executor . post { task . job . execute }
157196 else
158197 @condition . wait ( [ diff , 60 ] . min )
159198 end
0 commit comments