Skip to content

Commit 27a25e8

Browse files
committed
Add TimerQueue.stop()
1 parent acf9eb6 commit 27a25e8

File tree

1 file changed

+77
-52
lines changed

1 file changed

+77
-52
lines changed

src/main/java/org/jsl/collider/TimerQueue.java

Lines changed: 77 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
public class TimerQueue extends ThreadPool.Runnable
4242
{
43-
private static final Logger s_logger = Logger.getLogger( TimerQueue.class.getName() );
43+
private static final Logger s_logger = Logger.getLogger(TimerQueue.class.getName());
4444

4545
public interface Task
4646
{
@@ -56,6 +56,7 @@ public interface Task
5656
private final Condition m_cond;
5757
private final TreeMap<Long, TimerInfo> m_sortedTimers;
5858
private final Map<Task, TimerInfo> m_timers;
59+
private boolean m_stop;
5960

6061
private class TimerInfo extends ThreadPool.Runnable
6162
{
@@ -66,23 +67,24 @@ private class TimerInfo extends ThreadPool.Runnable
6667
public long threadID;
6768
public Condition cond;
6869

69-
public TimerInfo( Task task )
70+
public TimerInfo(Task task, long fireTime)
7071
{
7172
this.task = task;
73+
this.fireTime = fireTime;
7274
}
7375

7476
public void runInThreadPool()
7577
{
76-
assert( threadID == -1 );
77-
assert( prev == null );
78-
assert( next == null );
78+
assert(threadID == -1);
79+
assert(prev == null);
80+
assert(next == null);
7981
threadID = Thread.currentThread().getId();
8082
final long interval = task.run();
81-
restateTimer( this, interval );
83+
restateTimer(this, interval);
8284
}
8385
}
8486

85-
private void restateTimer( TimerInfo timerInfo, long interval )
87+
private void restateTimer(TimerInfo timerInfo, long interval)
8688
{
8789
boolean snatchThread = false;
8890

@@ -92,7 +94,7 @@ private void restateTimer( TimerInfo timerInfo, long interval )
9294
if (timerInfo.cond != null)
9395
{
9496
if (s_logger.isLoggable(Level.FINER))
95-
s_logger.log( Level.FINER, System.identityHashCode(timerInfo.task) + ": pending cancel" );
97+
s_logger.log(Level.FINER, System.identityHashCode(timerInfo.task) + ": pending cancel");
9698

9799
timerInfo.threadID = -2;
98100
timerInfo.cond.signalAll();
@@ -106,22 +108,22 @@ else if (interval > 0)
106108
{
107109
if (s_logger.isLoggable(Level.FINER))
108110
{
109-
s_logger.log( Level.FINER, System.identityHashCode(timerInfo.task) +
110-
": interval=" + interval + ", snatch thread" );
111+
s_logger.log(Level.FINER, System.identityHashCode(timerInfo.task) +
112+
": interval=" + interval + ", snatch thread");
111113
}
112-
m_sortedTimers.put( fireTime, timerInfo );
114+
m_sortedTimers.put(fireTime, timerInfo);
113115
snatchThread = true;
114116
}
115117
else
116118
{
117-
final TimerInfo next = m_sortedTimers.get( fireTime );
118-
m_sortedTimers.put( fireTime, timerInfo );
119+
final TimerInfo next = m_sortedTimers.get(fireTime);
120+
m_sortedTimers.put(fireTime, timerInfo);
119121
if (next == null)
120122
{
121123
if (s_logger.isLoggable(Level.FINER))
122124
{
123-
s_logger.log( Level.FINER, System.identityHashCode(timerInfo.task) +
124-
": interval=" + interval + ", wakeup thread" );
125+
s_logger.log(Level.FINER, System.identityHashCode(timerInfo.task) +
126+
": interval=" + interval + ", wakeup thread");
125127
}
126128
if (m_sortedTimers.firstKey() == fireTime)
127129
m_cond.signal();
@@ -130,8 +132,8 @@ else if (interval > 0)
130132
{
131133
if (s_logger.isLoggable(Level.FINER))
132134
{
133-
s_logger.log( Level.FINER, System.identityHashCode(timerInfo.task) +
134-
": interval=" + interval );
135+
s_logger.log(Level.FINER, System.identityHashCode(timerInfo.task) +
136+
": interval=" + interval);
135137
}
136138
timerInfo.next = next;
137139
next.prev = timerInfo;
@@ -141,8 +143,8 @@ else if (interval > 0)
141143
else
142144
{
143145
if (s_logger.isLoggable(Level.FINER))
144-
s_logger.log( Level.FINER, System.identityHashCode(timerInfo.task) + ": done" );
145-
m_timers.remove( timerInfo.task );
146+
s_logger.log(Level.FINER, System.identityHashCode(timerInfo.task) + ": done");
147+
m_timers.remove(timerInfo.task);
146148
}
147149
}
148150
finally
@@ -162,21 +164,21 @@ public void runInThreadPool()
162164
m_lock.lock();
163165
try
164166
{
165-
while (!m_sortedTimers.isEmpty())
167+
while (!m_sortedTimers.isEmpty() && !m_stop)
166168
{
167169
final Map.Entry<Long, TimerInfo> firstEntry = m_sortedTimers.firstEntry();
168-
assert( firstEntry != null );
170+
assert(firstEntry != null);
169171

170172
final long currentTime = System.currentTimeMillis();
171173
if (firstEntry.getKey() <= currentTime)
172174
{
173175
if (s_logger.isLoggable(Level.FINER))
174-
s_logger.log( Level.FINER, "fireTime=" + firstEntry.getKey() + ": execute" );
176+
s_logger.log(Level.FINER, "fireTime=" + firstEntry.getKey() + ": execute");
175177

176178
TimerInfo timerInfo = firstEntry.getValue();
177179
do
178180
{
179-
assert( timerInfo.threadID == 0 );
181+
assert(timerInfo.threadID == 0);
180182
final TimerInfo next = timerInfo.next;
181183
timerInfo.prev = null;
182184
timerInfo.next = null;
@@ -190,17 +192,17 @@ public void runInThreadPool()
190192
}
191193
else
192194
{
193-
final long sleepTime = (firstEntry.getKey() - currentTime);
195+
final long waitTime = (firstEntry.getKey() - currentTime);
194196
if (s_logger.isLoggable(Level.FINER))
195-
s_logger.log( Level.FINER, "firstEntry=" + firstEntry.getKey() + ", sleepTime=" + sleepTime );
197+
s_logger.log(Level.FINER, "firstEntry=" + firstEntry.getKey() + ", waitTime=" + waitTime);
196198

197199
try
198200
{
199-
m_cond.awaitNanos( TimeUnit.MILLISECONDS.toNanos(sleepTime) );
201+
m_cond.awaitNanos(TimeUnit.MILLISECONDS.toNanos(waitTime));
200202
}
201203
catch (final InterruptedException ex)
202204
{
203-
s_logger.warning( ex.toString() );
205+
s_logger.warning(ex.toString());
204206
}
205207
}
206208
}
@@ -211,10 +213,10 @@ public void runInThreadPool()
211213
}
212214

213215
if (s_logger.isLoggable(Level.FINE))
214-
s_logger.log( Level.FINE, "finished" );
216+
s_logger.log(Level.FINE, "finished");
215217
}
216218

217-
private void removeTimerLocked( TimerInfo timerInfo )
219+
private void removeTimerLocked(TimerInfo timerInfo)
218220
{
219221
boolean wakeUpThread = false;
220222

@@ -227,7 +229,7 @@ private void removeTimerLocked( TimerInfo timerInfo )
227229
}
228230
else
229231
{
230-
m_sortedTimers.put( timerInfo.fireTime, timerInfo.next );
232+
m_sortedTimers.put(timerInfo.fireTime, timerInfo.next);
231233
timerInfo.next = null;
232234
}
233235
}
@@ -279,40 +281,45 @@ public int schedule(Task task, long delay, TimeUnit unit)
279281
return -1;
280282
}
281283

282-
final boolean wasEmpty = m_sortedTimers.isEmpty();
284+
final Map.Entry<Long, TimerInfo> firstEntry = m_sortedTimers.firstEntry();
283285
final long fireTime = (System.currentTimeMillis() + unit.toMillis(delay));
284-
final TimerInfo timerInfo = new TimerInfo( task );
285-
timerInfo.fireTime = fireTime;
286+
final TimerInfo timerInfo = new TimerInfo(task, fireTime);
286287

287-
final TimerInfo next = m_sortedTimers.get( fireTime );
288+
final TimerInfo next = m_sortedTimers.get(fireTime);
288289
timerInfo.next = next;
289290
if (next != null)
290291
next.prev = timerInfo;
291292

292-
m_sortedTimers.put( fireTime, timerInfo );
293-
m_timers.put( task, timerInfo );
293+
m_sortedTimers.put(fireTime, timerInfo);
294+
m_timers.put(task, timerInfo);
294295

295-
if (wasEmpty)
296+
if (firstEntry == null)
296297
{
297298
if (s_logger.isLoggable(Level.FINER))
298-
s_logger.log( Level.FINER, System.identityHashCode(task) + ": fireTime=" + fireTime + ", start worker" );
299-
m_threadPool.execute( this );
299+
{
300+
s_logger.log(Level.FINER, System.identityHashCode(task)
301+
+ ": fireTime=" + fireTime + ", start worker");
302+
}
303+
m_threadPool.execute(this);
300304
}
301305
else
302306
{
303-
/* It make sense to wake up worker thread
307+
/* It makes sense to wake up worker thread
304308
* only if new timer is sooner than all previous.
305309
*/
306-
if (fireTime < m_sortedTimers.firstKey())
310+
if (fireTime < firstEntry.getKey())
307311
{
308312
if (s_logger.isLoggable(Level.FINER))
309-
s_logger.log( Level.FINER, System.identityHashCode(task) + ": firerTime=" + fireTime + ", wakeup worker" );
313+
{
314+
s_logger.log(Level.FINER, System.identityHashCode(task)
315+
+ ": firerTime=" + fireTime + ", wakeup worker");
316+
}
310317
m_cond.signal();
311318
}
312319
else
313320
{
314321
if (s_logger.isLoggable(Level.FINER))
315-
s_logger.log( Level.FINER, System.identityHashCode(task) + ": fireTime=" + fireTime );
322+
s_logger.log(Level.FINER, System.identityHashCode(task) + ": fireTime=" + fireTime);
316323
}
317324
}
318325
}
@@ -339,16 +346,16 @@ public int cancel(Task task) throws InterruptedException
339346
{
340347
for (;;)
341348
{
342-
final TimerInfo timerInfo = m_timers.get( task );
349+
final TimerInfo timerInfo = m_timers.get(task);
343350
if (timerInfo == null)
344351
{
345352
/* Timer already canceled or was not scheduled. */
346353
if (s_logger.isLoggable( Level.FINER))
347-
s_logger.log( Level.FINER, System.identityHashCode(task) + ": not registered" );
354+
s_logger.log(Level.FINER, System.identityHashCode(task) + ": not registered");
348355
return -1;
349356
}
350357

351-
assert( timerInfo.task == task );
358+
assert(timerInfo.task == task);
352359

353360
if (timerInfo.threadID == Thread.currentThread().getId())
354361
{
@@ -359,18 +366,18 @@ else if (timerInfo.threadID == 0)
359366
{
360367
/* Timer is not fired yet */
361368
if (s_logger.isLoggable( Level.FINER))
362-
s_logger.log( Level.FINER, System.identityHashCode(task) + ": canceled" );
369+
s_logger.log(Level.FINER, System.identityHashCode(task) + ": canceled");
363370
removeTimerLocked( timerInfo );
364371
return 0;
365372
}
366373
else if (timerInfo.threadID == -2)
367374
{
368375
/* Timer just fired */
369376
if (s_logger.isLoggable(Level.FINER))
370-
s_logger.log( Level.FINER, System.identityHashCode(task) + ": canceled, just fired" );
371-
assert( timerInfo.cond != null );
377+
s_logger.log(Level.FINER, System.identityHashCode(task) + ": canceled, just fired");
378+
assert(timerInfo.cond != null);
372379
timerInfo.cond = null;
373-
m_timers.remove( task );
380+
m_timers.remove(task);
374381
return 0;
375382
}
376383
else
@@ -403,7 +410,7 @@ public int cancelNoWait(Task task)
403410
m_lock.lock();
404411
try
405412
{
406-
final TimerInfo timerInfo = m_timers.get( task );
413+
final TimerInfo timerInfo = m_timers.get(task);
407414
if (timerInfo == null)
408415
{
409416
/* Timer already canceled or was not scheduled. */
@@ -416,12 +423,30 @@ public int cancelNoWait(Task task)
416423
return 1;
417424
}
418425

419-
removeTimerLocked( timerInfo );
426+
removeTimerLocked(timerInfo);
420427
}
421428
finally
422429
{
423430
m_lock.unlock();
424431
}
425432
return 0;
426433
}
434+
435+
/**
436+
* Stop the timer queue.
437+
*/
438+
public void stop()
439+
{
440+
m_lock.lock();
441+
try
442+
{
443+
m_stop = true;
444+
if (!m_timers.isEmpty())
445+
m_cond.signal();
446+
}
447+
finally
448+
{
449+
m_lock.unlock();
450+
}
451+
}
427452
}

0 commit comments

Comments
 (0)