Skip to content

Commit 6d2e88d

Browse files
author
Sergey Zubarev
committed
Configurable thread priority for collider worker threads.
1 parent d3823c0 commit 6d2e88d

File tree

3 files changed

+31
-15
lines changed

3 files changed

+31
-15
lines changed

src/main/java/org/jsl/collider/Collider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public abstract class Collider
4444
{
4545
public static class Config
4646
{
47+
public int threadPriority;
4748
public int threadPoolThreads;
4849
public boolean useDirectBuffers;
4950
public ByteOrder byteOrder;
@@ -58,6 +59,7 @@ public static class Config
5859

5960
public Config()
6061
{
62+
threadPriority = Thread.NORM_PRIORITY;
6163
threadPoolThreads = 0; /* by default = number of cores */
6264
useDirectBuffers = true;
6365
byteOrder = ByteOrder.nativeOrder();
@@ -66,7 +68,7 @@ public Config()
6668
socketRecvBufSize = 0; /* Use system default settings by default */
6769

6870
forwardReadMaxSize = (256 * 1024);
69-
inputQueueBlockSize = (32 * 1024);
71+
inputQueueBlockSize = (64 * 1024);
7072
inputQueueCacheMaxSize = 128;
7173
joinMessageMaxSize = 0;
7274
datagramReadMinSize = (2 * 1024);

src/main/java/org/jsl/collider/ColliderImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public ColliderImpl( Config config ) throws IOException
371371
threadPoolThreads = Runtime.getRuntime().availableProcessors();
372372
if (threadPoolThreads < 4)
373373
threadPoolThreads = 4;
374-
m_threadPool = new ThreadPool( "CTP", threadPoolThreads );
374+
m_threadPool = new ThreadPool("CTP", threadPoolThreads, config.threadPriority);
375375

376376
if (config.inputQueueCacheMaxSize == 0)
377377
config.inputQueueCacheMaxSize = (threadPoolThreads * 3);
@@ -390,8 +390,11 @@ public ColliderImpl( Config config ) throws IOException
390390
public void run()
391391
{
392392
if (s_logger.isLoggable(Level.FINE))
393-
s_logger.fine( "start" );
393+
s_logger.fine("start");
394394

395+
final Thread currentThread = Thread.currentThread();
396+
final int threadPriority = currentThread.getPriority();
397+
currentThread.setPriority(getConfig().threadPriority);
395398
m_threadPool.start();
396399

397400
final DummyRunnable dummyRunnable = new DummyRunnable();
@@ -493,14 +496,16 @@ public void run()
493496
}
494497

495498
for (Map.Entry<Integer, RetainableDataBlockCache> me : m_dataBlockCache.entrySet())
496-
me.getValue().clear( s_logger );
499+
me.getValue().clear(s_logger);
497500
m_dataBlockCache.clear();
498501

499502
if (m_joinPool != null)
500-
m_joinPool.release( s_logger );
503+
m_joinPool.release(s_logger);
501504

502505
if (s_logger.isLoggable(Level.FINE))
503-
s_logger.fine( "finish (" + statLoopIt + ", " + statLoopReadersG0 + ")." );
506+
s_logger.fine("finish (" + statLoopIt + ", " + statLoopReadersG0 + ").");
507+
508+
currentThread.setPriority(threadPriority);
504509
}
505510

506511
public void stop()

src/main/java/org/jsl/collider/ThreadPool.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private class Worker extends Thread
3838
{
3939
private final int m_id;
4040

41-
public Worker( int id )
41+
Worker(int id)
4242
{
4343
m_id = id;
4444
}
@@ -47,10 +47,10 @@ public void run()
4747
{
4848
final String name = m_name + "-" + getId();
4949
final int workerId = (1 << m_id);
50-
setName( name );
50+
setName(name);
5151

5252
if (s_logger.isLoggable(Level.FINE))
53-
s_logger.log( Level.FINE, name + ": started." );
53+
s_logger.log(Level.FINE, name + ": started.");
5454

5555
int parks = 0;
5656
int idx = 0;
@@ -148,7 +148,7 @@ private static class DummyRunnable extends Runnable
148148
public void runInThreadPool()
149149
{
150150
/* Should never be called */
151-
assert( false );
151+
assert(false);
152152
}
153153
}
154154

@@ -172,7 +172,7 @@ public void runInThreadPool()
172172
private final AtomicReferenceArray<Runnable> m_tra;
173173
private volatile int m_state;
174174

175-
public ThreadPool( String name, int threads, int contentionFactor )
175+
public ThreadPool(String name, int threads, int threadPriority, int contentionFactor)
176176
{
177177
/* Current implementation supports up to 29 worker threads,
178178
* should be enough for most cases for now.
@@ -181,7 +181,7 @@ public ThreadPool( String name, int threads, int contentionFactor )
181181
if (threads > 29)
182182
threads = 29;
183183

184-
assert( contentionFactor >= 1 );
184+
assert(contentionFactor >= 1);
185185
if (contentionFactor < 1)
186186
contentionFactor = 1;
187187

@@ -190,16 +190,25 @@ public ThreadPool( String name, int threads, int contentionFactor )
190190

191191
m_thread = new Thread[threads];
192192
for (int idx=0; idx<threads; idx++)
193-
m_thread[idx] = new Worker(idx);
193+
{
194+
final Worker worker = new Worker(idx);
195+
worker.setPriority(threadPriority);
196+
m_thread[idx] = worker;
197+
}
194198

195199
m_hra = new AtomicReferenceArray<Runnable>( contentionFactor * FS_PADDING );
196200
m_tra = new AtomicReferenceArray<Runnable>( contentionFactor * FS_PADDING );
197201
m_state = 0;
198202
}
199203

200-
public ThreadPool( String name, int threads )
204+
public ThreadPool(String name, int threads)
205+
{
206+
this(name, threads, Thread.NORM_PRIORITY, 4);
207+
}
208+
209+
public ThreadPool(String name, int threads, int threadPriority)
201210
{
202-
this( name, threads, 4 );
211+
this(name, threads, threadPriority, 4);
203212
}
204213

205214
public void start()

0 commit comments

Comments
 (0)