@@ -50,17 +50,20 @@ public class ThreadPool {
5050
5151 private int numThreads ;
5252 private Integer maxPoolSize ;
53- private ArrayList <Thread > threads ;
54- private List pool ;
53+ private final ArrayList <Thread > threads ;
54+ private final ArrayList <Thread > activeThreads ;
55+ private final List pool ;
5556 private class Return {}
5657 private ConcurrentHashMap <Long , HashMap <String , Object >> params ;
5758
59+
5860 //**************************************************************************
5961 //** Constructor
6062 //**************************************************************************
6163 public ThreadPool (int numThreads , Integer maxPoolSize ){
6264 this .numThreads = numThreads ;
6365 threads = new ArrayList <>();
66+ activeThreads = new ArrayList <>();
6467 pool = new LinkedList ();
6568 if (maxPoolSize !=null ){
6669 if (maxPoolSize <1 ) maxPoolSize = null ;
@@ -69,6 +72,10 @@ public ThreadPool(int numThreads, Integer maxPoolSize){
6972 params = new ConcurrentHashMap <>();
7073 }
7174
75+
76+ //**************************************************************************
77+ //** Constructor
78+ //**************************************************************************
7279 public ThreadPool (int numThreads ){
7380 this (numThreads , null );
7481 }
@@ -80,12 +87,15 @@ public ThreadPool(int numThreads){
8087 /** Used to initialize threads in the pool
8188 */
8289 public ThreadPool start (){
90+
91+ threads .clear ();
92+
8393 for (int i =0 ; i <numThreads ; i ++){
8494 Thread t = new Thread (){
8595 public void run (){
8696 while (true ) {
8797
88- Object obj = null ;
98+ Object obj ;
8999 synchronized (pool ) {
90100 while (pool .isEmpty ()) {
91101 try {
@@ -100,16 +110,64 @@ public void run(){
100110 }
101111
102112 if ((obj instanceof Return )){
103- add (obj );
113+
114+
115+ //Remove thread from the activeThreads array
116+ synchronized (activeThreads ){
117+ for (int i =0 ; i <activeThreads .size (); i ++){
118+ if (activeThreads .get (i ).getId ()==this .getId ()){
119+ activeThreads .remove (i );
120+ break ;
121+ }
122+ }
123+
124+
125+ //Add the object back to the pool for other threads to process
126+ if (!activeThreads .isEmpty ()) add (obj );
127+ activeThreads .notify ();
128+ }
129+
130+
131+ //Call the exit() callback and return
104132 exit ();
105133 return ;
134+
106135 }
107136 else {
108- process (obj );
137+
138+ try {
139+
140+ //Call the process() callback
141+ process (obj );
142+
143+ }
144+ catch (Exception e ){
145+
146+ //Remove thread from the activeThreads array
147+ synchronized (activeThreads ){
148+ for (int i =0 ; i <activeThreads .size (); i ++){
149+ if (activeThreads .get (i ).getId ()==this .getId ()){
150+ activeThreads .remove (i );
151+ break ;
152+ }
153+ }
154+ activeThreads .notify ();
155+ }
156+
157+ //Throw exception
158+ throw e ;
159+ }
160+
109161 }
110162 }
111163 }
112164 };
165+
166+ synchronized (activeThreads ){
167+ activeThreads .add (t );
168+ activeThreads .notify ();
169+ }
170+
113171 t .start ();
114172 threads .add (t );
115173 }
@@ -267,6 +325,20 @@ public int add(Object object){
267325 }
268326
269327
328+ //**************************************************************************
329+ //** getActiveThreadCount
330+ //**************************************************************************
331+ /** Returns the number of active threads in the pool
332+ */
333+ public int getActiveThreadCount (){
334+ int count ;
335+ synchronized (activeThreads ){
336+ count = activeThreads .size ();
337+ }
338+ return count ;
339+ }
340+
341+
270342 //**************************************************************************
271343 //** getQueue
272344 //**************************************************************************
@@ -295,13 +367,9 @@ public void done(){
295367 //**************************************************************************
296368 /** Used to wait for threads to complete
297369 */
298- public void join () throws InterruptedException {
299- while (true ) {
300- for (Thread thread : threads ){
301- //System.out.println(thread.getId() + " " + thread.isAlive());
302- thread .join ();
303- }
304- break ;
370+ public void join () throws InterruptedException {
371+ for (Thread thread : threads ){
372+ thread .join ();
305373 }
306374 }
307375}
0 commit comments