1+ require 'concurrent/errors'
2+ require 'concurrent/ivar'
13require 'concurrent/configuration'
4+ require 'concurrent/executor/executor'
25require 'concurrent/executor/timer_set'
6+ require 'concurrent/utility/monotonic_time'
37
48module Concurrent
59
@@ -131,7 +135,10 @@ module Concurrent
131135 # #>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?'
132136 #
133137 # @!macro monotonic_clock_warning
134- class ScheduledTask < TimerSet ::Task
138+ class ScheduledTask < IVar
139+ include Comparable
140+
141+ attr_reader :executor
135142
136143 # Schedule a task for execution at a specified future time.
137144 #
@@ -147,26 +154,108 @@ class ScheduledTask < TimerSet::Task
147154 # more accurate, but only when scheduling based on a delay interval.
148155 # Scheduling a task based on a clock time is deprecated. It will still work
149156 # but will not be supported in the 1.0 release.
150- def initialize ( delay , opts = { } , &block )
157+ def initialize ( delay , opts = { } , &task )
151158 raise ArgumentError . new ( 'no block given' ) unless block_given?
152- timer_set = opts . fetch ( :timer_set , Concurrent . global_timer_set )
153- args = get_arguments_from ( opts )
154- super ( timer_set , delay , args , block , opts , &nil )
159+ super ( IVar ::NO_VALUE , opts , &nil )
155160 synchronize do
161+ @original_delay = delay
162+ ns_set_delay_and_time! ( delay ) # may raise exception
156163 ns_set_state ( :unscheduled )
157- @__original_delay__ = delay
164+ @parent = opts . fetch ( :timer_set , Concurrent . global_timer_set )
165+ @args = get_arguments_from ( opts )
166+ @task = task
158167 @time = nil
168+ @executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
169+ self . observers = CopyOnNotifyObserverSet . new
159170 end
160171 end
161172
173+ def original_delay
174+ synchronize { @delay }
175+ end
176+
177+ # @deprecated
178+ def delay
179+ warn '[DEPRECATED] use #original_delay instead'
180+ original_delay
181+ end
182+
183+ def schedule_time
184+ synchronize { @time }
185+ end
186+
187+ def <=>( other )
188+ self . schedule_time <=> other . schedule_time
189+ end
190+
191+ # Has the task been cancelled?
192+ #
193+ # @return [Boolean] true if the task is in the given state else false
194+ def cancelled?
195+ synchronize { ns_check_state? ( :cancelled ) }
196+ end
197+
198+ # In the task execution in progress?
199+ #
200+ # @return [Boolean] true if the task is in the given state else false
201+ def processing?
202+ synchronize { ns_check_state? ( :processing ) }
203+ end
204+
205+ # In the task execution in progress?
206+ #
207+ # @return [Boolean] true if the task is in the given state else false
208+ #
209+ # @deprecated
210+ def in_progress?
211+ warn '[DEPRECATED] use #processing? instead'
212+ processing?
213+ end
214+
215+ # Cancel this task and prevent it from executing. A task can only be
216+ # cancelled if it is pending or unscheduled.
217+ #
218+ # @return [Boolean] true if task execution is successfully cancelled
219+ # else false
220+ def cancel
221+ if compare_and_set_state ( :cancelled , :pending , :unscheduled )
222+ complete ( false , nil , CancelledOperationError . new )
223+ # To avoid deadlocks this call must occur outside of #synchronize
224+ # Changing the state above should prevent redundant calls
225+ @parent . send ( :remove_task , self )
226+ else
227+ false
228+ end
229+ end
230+
231+ # Cancel this task and prevent it from executing. A task can only be
232+ # cancelled if it is pending or unscheduled.
233+ #
234+ # @return [Boolean] true if task execution is successfully cancelled
235+ # else false
236+ #
237+ # @deprecated
238+ def stop
239+ warn '[DEPRECATED] use #cancel instead'
240+ cancel
241+ end
242+
243+ def reset
244+ synchronize { ns_reschedule ( @delay ) }
245+ end
246+
247+ def reschedule ( delay )
248+ synchronize { ns_reschedule ( delay ) }
249+ end
250+
162251 # Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending`
163252 # and starts counting down toward execution. Does nothing if the `ScheduledTask` is
164253 # in any state other than `:unscheduled`.
165254 #
166255 # @return [ScheduledTask] a reference to `self`
167256 def execute
168257 if compare_and_set_state ( :pending , :unscheduled )
169- synchronize { ns_reschedule ( @__original_delay__ , false ) }
258+ synchronize { ns_reschedule ( @original_delay , false ) }
170259 end
171260 self
172261 end
@@ -183,36 +272,54 @@ def execute
183272 # @raise [ArgumentError] if no block is given
184273 #
185274 # @!macro deprecated_scheduling_by_clock_time
186- def self . execute ( delay , opts = { } , &block )
187- new ( delay , opts , &block ) . execute
275+ def self . execute ( delay , opts = { } , &task )
276+ new ( delay , opts , &task ) . execute
188277 end
189278
190- # In the task execution in progress?
191- #
192- # @return [Boolean] true if the task is in the given state else false
193- #
194- # @deprecated
195- def in_progress?
196- warn '[DEPRECATED] use #processing? instead'
197- processing?
279+ # @!visibility private
280+ def process_task
281+ safe_execute ( @task , @args )
198282 end
199283
200- # Cancel this task and prevent it from executing. A task can only be
201- # cancelled if it is pending or unscheduled.
202- #
203- # @return [Boolean] true if task execution is successfully cancelled
204- # else false
205- #
206- # @deprecated
207- def stop
208- warn '[DEPRECATED] use #processing? instead'
209- cancel
284+ protected :set , :try_set , :fail , :complete
285+
286+ protected
287+
288+ def ns_set_delay_and_time! ( delay )
289+ @delay = calculate_delay! ( delay )
290+ @time = Concurrent . monotonic_time + @delay
210291 end
211292
212- # @deprecated
213- def delay
214- warn '[DEPRECATED] use #original_delay instead'
215- original_delay
293+ def ns_reschedule ( delay , fail_if_cannot_remove = true )
294+ return false unless ns_check_state? ( :pending )
295+ ns_set_delay_and_time! ( delay )
296+ removed = @parent . send ( :remove_task , self )
297+ return false if fail_if_cannot_remove && !removed
298+ @parent . send ( :post_task , self )
299+ end
300+
301+ # Schedule a task to be executed after a given delay (in seconds).
302+ #
303+ # @param [Float] delay the number of seconds to wait for before executing the task
304+ #
305+ # @return [Float] the number of seconds to delay
306+ #
307+ # @raise [ArgumentError] if the intended execution time is not in the future
308+ # @raise [ArgumentError] if no block is given
309+ #
310+ # @!macro deprecated_scheduling_by_clock_time
311+ #
312+ # @!visibility private
313+ def calculate_delay! ( delay )
314+ if delay . is_a? ( Time )
315+ warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock'
316+ now = Time . now
317+ raise ArgumentError . new ( 'schedule time must be in the future' ) if delay <= now
318+ delay . to_f - now . to_f
319+ else
320+ raise ArgumentError . new ( 'seconds must be greater than zero' ) if delay . to_f < 0.0
321+ delay . to_f
322+ end
216323 end
217324 end
218325end
0 commit comments