1- require 'thread '
1+ require 'concurrent/scheduled_task '
22require 'concurrent/atomic/event'
33require 'concurrent/collection/priority_queue'
4- require 'concurrent/executor/executor'
54require 'concurrent/executor/executor_service'
65require 'concurrent/executor/single_thread_executor'
7- require 'concurrent/utility/monotonic_time'
86
97module Concurrent
108
119 # Executes a collection of tasks, each after a given delay. A master task
1210 # monitors the set and schedules each task for execution at the appropriate
13- # time. Tasks are run on the global task pool or on the supplied executor.
11+ # time. Tasks are run on the global thread pool or on the supplied executor.
12+ # Each task is represented as a `ScheduledTask`.
13+ #
14+ # @see Concurrent::ScheduledTask
1415 #
1516 # @!macro monotonic_clock_warning
1617 class TimerSet < RubyExecutorService
@@ -32,32 +33,28 @@ def initialize(opts = {})
3233 # delay is less than 1/100th of a second the task will be immediately post
3334 # to the executor.
3435 #
35- # @param [Float] delay the number of seconds to wait for before executing the task
36+ # @param [Float] delay the number of seconds to wait for before executing the task.
37+ # @param [Array<Object>] args the arguments passed to the task on execution.
3638 #
37- # @yield the task to be performed
39+ # @yield the task to be performed.
3840 #
39- # @return [Boolean] true if the message is post, false after shutdown
41+ # @return [Concurrent::ScheduledTask, false] IVar representing the task if the post
42+ # is successful; false after shutdown.
4043 #
41- # @raise [ArgumentError] if the intended execution time is not in the future
42- # @raise [ArgumentError] if no block is given
44+ # @raise [ArgumentError] if the intended execution time is not in the future.
45+ # @raise [ArgumentError] if no block is given.
4346 #
4447 # @!macro deprecated_scheduling_by_clock_time
4548 def post ( delay , *args , &task )
4649 raise ArgumentError . new ( 'no block given' ) unless block_given?
47- delay = TimerSet . calculate_delay! ( delay ) # raises exceptions
48-
49- synchronize do
50- return false unless running?
51-
52- if ( delay ) <= 0.01
53- @task_executor . post ( *args , &task )
54- else
55- @queue . push ( Task . new ( Concurrent . monotonic_time + delay , args , task ) )
56- @timer_executor . post ( &method ( :process_tasks ) )
57- end
58- @condition . set
59- end
60- true
50+ return false unless running?
51+ opts = {
52+ executor : @task_executor ,
53+ args : args ,
54+ timer_set : self
55+ }
56+ task = ScheduledTask . execute ( delay , opts , &task ) # may raise exception
57+ task . unscheduled? ? false : task
6158 end
6259
6360 # Begin an immediate shutdown. In-progress tasks will be allowed to
@@ -68,48 +65,14 @@ def kill
6865 shutdown
6966 end
7067
71- # Schedule a task to be executed after a given delay (in seconds).
72- #
73- # @param [Float] delay the number of seconds to wait for before executing the task
74- #
75- # @return [Float] the number of seconds to delay
76- #
77- # @raise [ArgumentError] if the intended execution time is not in the future
78- # @raise [ArgumentError] if no block is given
79- #
80- # @!macro deprecated_scheduling_by_clock_time
81- #
82- # @!visibility private
83- def self . calculate_delay! ( delay )
84- if delay . is_a? ( Time )
85- warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock'
86- now = Time . now
87- raise ArgumentError . new ( 'schedule time must be in the future' ) if delay <= now
88- delay . to_f - now . to_f
89- else
90- raise ArgumentError . new ( 'seconds must be greater than zero' ) if delay . to_f < 0.0
91- delay . to_f
92- end
93- end
68+ private :<<
9469
9570 protected
9671
97- # A struct for encapsulating a task and its intended execution time.
98- # It facilitates proper prioritization by overriding the comparison
99- # (spaceship) operator as a comparison of the intended execution
100- # times.
72+ # Initialize the object.
10173 #
74+ # @param [Hash] opts the options to create the object with.
10275 # @!visibility private
103- Task = Struct . new ( :time , :args , :op ) do
104- include Comparable
105-
106- def <=>( other )
107- self . time <=> other . time
108- end
109- end
110-
111- private_constant :Task
112-
11376 def ns_initialize ( opts )
11477 @queue = PriorityQueue . new ( order : :min )
11578 @task_executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
@@ -118,6 +81,44 @@ def ns_initialize(opts)
11881 self . auto_terminate = opts . fetch ( :auto_terminate , true )
11982 end
12083
84+ # Post the task to the internal queue.
85+ #
86+ # @note This is intended as a callback method from ScheduledTask
87+ # only. It is not intended to be used directly. Post a task
88+ # by using the `SchedulesTask#execute` method.
89+ #
90+ # @!visibility private
91+ def post_task ( task )
92+ synchronize { ns_post_task ( task ) }
93+ end
94+
95+ # @!visibility private
96+ def ns_post_task ( task )
97+ return false unless ns_running?
98+ if ( task . initial_delay ) <= 0.01
99+ task . executor . post { task . process_task }
100+ else
101+ @queue . push ( task )
102+ # only post the process method when the queue is empty
103+ @timer_executor . post ( &method ( :process_tasks ) ) if @queue . length == 1
104+ @condition . set
105+ end
106+ true
107+ end
108+
109+ # Remove the given task from the queue.
110+ #
111+ # @note This is intended as a callback method from `ScheduledTask`
112+ # only. It is not intended to be used directly. Cancel a task
113+ # by using the `ScheduledTask#cancel` method.
114+ #
115+ # @!visibility private
116+ def remove_task ( task )
117+ synchronize { @queue . delete ( task ) }
118+ end
119+
120+ # `ExecutorServic` callback called during shutdown.
121+ #
121122 # @!visibility private
122123 def shutdown_execution
123124 @queue . clear
@@ -137,7 +138,7 @@ def process_tasks
137138 break unless task
138139
139140 now = Concurrent . monotonic_time
140- diff = task . time - now
141+ diff = task . schedule_time - now
141142
142143 if diff <= 0
143144 # We need to remove the task from the queue before passing
@@ -153,18 +154,11 @@ def process_tasks
153154 # queue now must have the same pop time, or a closer one, as
154155 # when we peeked).
155156 task = synchronize { @queue . pop }
156- @task_executor . post ( * task . args , & task . op )
157+ task . executor . post { task . process_task }
157158 else
158159 @condition . wait ( [ diff , 60 ] . min )
159160 end
160161 end
161162 end
162-
163- private
164-
165- def <<( task )
166- post ( 0.0 , &task )
167- self
168- end
169163 end
170164end
0 commit comments