@@ -16,19 +16,14 @@ module Concurrent
1616 # @!macro monotonic_clock_warning
1717 class TimerSet < RubyExecutorService
1818
19- # A class for encapsulating a task and its intended execution time.
20- # It facilitates proper prioritization by overriding the comparison
21- # (spaceship) operator as a comparison of the intended execution
22- # times.
23- #
24- # @!visibility private
19+ # An `IVar` representing a tasked queued for execution in a `TimerSet`.
2520 class Task < Concurrent ::IVar
2621 include Comparable
2722
28- # @!visibility private
29- def initialize ( time , args , task )
23+ def initialize ( parent , time , args , task )
3024 super ( )
3125 synchronize do
26+ @parent = parent
3227 @time = time
3328 @args = args
3429 @task = task
@@ -40,20 +35,20 @@ def time
4035 synchronize { @time }
4136 end
4237
43- # @!visibility private
4438 def <=>( other )
4539 self . time <=> other . time
4640 end
4741
48- # @!visibility private
4942 def cancelled?
5043 state == :cancelled
5144 end
5245
53- # @!visibility private
5446 def cancel
5547 if compare_and_set_state ( :cancelled , :pending )
5648 complete ( false , nil , CancelledOperationError . new )
49+ # To avoid deadlocks this call must occur outside of #synchronize
50+ # Changing the state above should prevent redundant calls
51+ @parent . send ( :remove_task , self )
5752 true
5853 else
5954 false
@@ -64,8 +59,9 @@ def cancel
6459 def execute
6560 safe_execute ( @task , @args )
6661 end
62+
63+ protected :set , :try_set
6764 end
68- private_constant :Task
6965
7066 # Create a new set of timed tasks.
7167 #
@@ -88,7 +84,8 @@ def initialize(opts = {})
8884 #
8985 # @yield the task to be performed
9086 #
91- # @return [Boolean] true if the message is post, false after shutdown
87+ # @return [Concurrent::TimerSet::Task, false] IVar representing the task if the post
88+ # is successful; false after shutdown
9289 #
9390 # @raise [ArgumentError] if the intended execution time is not in the future
9491 # @raise [ArgumentError] if no block is given
@@ -102,7 +99,7 @@ def post(delay, *args, &task)
10299 return false unless running?
103100
104101 time = Concurrent . monotonic_time + delay
105- task = Task . new ( time , args , task )
102+ task = Task . new ( self , time , args , task )
106103
107104 if ( delay ) <= 0.01
108105 @task_executor . post { task . execute }
@@ -152,6 +149,18 @@ def self.calculate_delay!(delay)
152149
153150 protected
154151
152+ # Remove the given task from the queue.
153+ #
154+ # @note This is intended as a callback method from Task only.
155+ # It is not intended to be used directly. Cancel a task by
156+ # using the `Task#cancel` method.
157+ #
158+ # @!visibility private
159+ def remove_task ( task )
160+ synchronize { @queue . delete ( task ) }
161+ end
162+
163+ # @!visibility private
155164 def ns_initialize ( opts )
156165 @queue = PriorityQueue . new ( order : :min )
157166 @task_executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
@@ -204,9 +213,9 @@ def process_tasks
204213
205214 private
206215
216+ # @!visibility private
207217 def <<( task )
208- post ( 0.0 , &task )
209- self
218+ raise NotImplementedError . new
210219 end
211220 end
212221end
0 commit comments