3232import java .util .concurrent .ExecutorService ;
3333import java .util .concurrent .Executors ;
3434import java .util .concurrent .LinkedBlockingDeque ;
35- import java .util .stream . IntStream ;
35+ import java .util .concurrent . TimeUnit ;
3636
3737import com .fasterxml .jackson .core .JsonProcessingException ;
3838import com .fasterxml .jackson .databind .DeserializationFeature ;
3939import com .fasterxml .jackson .databind .JsonMappingException ;
4040import com .fasterxml .jackson .databind .ObjectMapper ;
4141
42+ import com .twitter .clientlib .JSON ;
4243import com .twitter .clientlib .model .StreamingTweet ;
4344
4445public class TweetsStreamExecutor {
4546 private volatile BlockingQueue <String > rawTweets ;
4647 private volatile BlockingQueue <StreamingTweet > tweets ;
4748 private volatile boolean isRunning = true ;
4849
49- private ExecutorService executorService ;
50+ private long startTime ;
51+ private int tweetsCount = 0 ;
52+ private final int tweetsLimit = 80000 ;
53+ private final int threads = 1 ; //TODO parametrize this
54+
55+ private ExecutorService rawTweetsQueuerService ;
56+ private ExecutorService deserializationService ;
57+ private ExecutorService listenersService ;
58+
5059 private final List <TweetsStreamListener > listeners = new ArrayList <>();
5160 private final InputStream stream ;
5261
@@ -69,51 +78,83 @@ public void start() {
6978 System .out .println ("Error: stream is null." );
7079 return ;
7180 }
81+ startTime = System .currentTimeMillis ();
7282
73- RawTweetsQueuer rawTweetsQueuer = new RawTweetsQueuer ();
74- TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor ();
75- rawTweetsQueuer .start ();
76- int threads = 5 ; //TODO parametrize this
77- executorService = Executors .newFixedThreadPool (threads );
83+ rawTweetsQueuerService = Executors .newSingleThreadExecutor ();
84+ rawTweetsQueuerService .submit (new RawTweetsQueuer ());
85+
86+ deserializationService = Executors .newFixedThreadPool (threads );
7887 for (int i = 0 ; i < threads ; i ++) {
79- executorService .submit (new ParseTweetsTask ());
88+ deserializationService .submit (new ParseTweetsTask ());
8089 }
81- tweetsListenersExecutor .start ();
90+
91+ listenersService = Executors .newSingleThreadExecutor ();
92+ listenersService .submit (new TweetsListenersTask ());
8293 }
8394
8495 public synchronized void shutdown () {
8596 isRunning = false ;
86- executorService .shutdown ();
97+ rawTweetsQueuerService .shutdown ();
98+ deserializationService .shutdown ();
99+ listenersService .shutdown ();
100+ try {
101+ if (!rawTweetsQueuerService .awaitTermination (3 , TimeUnit .SECONDS )) {
102+ rawTweetsQueuerService .shutdownNow ();
103+ if (!rawTweetsQueuerService .awaitTermination (3 , TimeUnit .SECONDS ))
104+ System .err .println ("Pool did not terminate" );
105+ }
106+ if (!deserializationService .awaitTermination (3 , TimeUnit .SECONDS )) {
107+ deserializationService .shutdownNow ();
108+ if (!deserializationService .awaitTermination (3 , TimeUnit .SECONDS ))
109+ System .err .println ("Pool did not terminate" );
110+ }
111+ if (!listenersService .awaitTermination (3 , TimeUnit .SECONDS )) {
112+ listenersService .shutdownNow ();
113+ if (!listenersService .awaitTermination (3 , TimeUnit .SECONDS ))
114+ System .err .println ("Pool did not terminate" );
115+ }
116+ } catch (InterruptedException ie ) {
117+ rawTweetsQueuerService .shutdown ();
118+ deserializationService .shutdown ();
119+ listenersService .shutdown ();
120+ Thread .currentThread ().interrupt ();
121+ }
87122 System .out .println ("TweetsStreamListenersExecutor is shutting down." );
88123 }
89124
90- private class TweetsListenersExecutor extends Thread {
125+ private class RawTweetsQueuer implements Runnable {
126+
91127 @ Override
92128 public void run () {
93- processTweets ();
129+ queueTweets ();
94130 }
95131
96- private void processTweets () {
97- StreamingTweet streamingTweet ;
98- try {
132+ public void queueTweets () {
133+
134+ String line = null ;
135+ try (BufferedReader reader = new BufferedReader (new InputStreamReader (stream ))) {
99136 while (isRunning ) {
100- streamingTweet = tweets . poll ();
101- if ( streamingTweet == null ) {
102- Thread .sleep (100 );
137+ line = reader . readLine ();
138+ if ( line == null || line . isEmpty () ) {
139+ Thread .sleep (10 );
103140 continue ;
104141 }
105- for (TweetsStreamListener listener : listeners ) {
106- listener .actionOnTweetsStream (streamingTweet );
142+ try {
143+ rawTweets .put (line );
144+ } catch (Exception interExcep ) {
145+ interExcep .printStackTrace ();
107146 }
108147 }
109148 } catch (Exception e ) {
110149 e .printStackTrace ();
150+ shutdown ();
111151 }
112152 }
113153 }
114154
115155 private class ParseTweetsTask implements Runnable {
116156 private final ObjectMapper objectMapper ;
157+
117158 private ParseTweetsTask () {
118159 this .objectMapper = new ObjectMapper ();
119160 objectMapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
@@ -137,32 +178,35 @@ public void run() {
137178 }
138179 }
139180
140- private class RawTweetsQueuer extends Thread {
141-
181+ private class TweetsListenersTask implements Runnable {
142182 @ Override
143183 public void run () {
144- queueTweets ();
184+ processTweets ();
145185 }
146186
147- public void queueTweets () {
148-
149- String line = null ;
150- try (BufferedReader reader = new BufferedReader (new InputStreamReader (stream ))) {
187+ private void processTweets () {
188+ StreamingTweet streamingTweet ;
189+ try {
151190 while (isRunning ) {
152- line = reader . readLine ();
153- if ( line == null || line . isEmpty () ) {
154- Thread .sleep (100 );
191+ streamingTweet = tweets . poll ();
192+ if ( streamingTweet == null ) {
193+ Thread .sleep (10 );
155194 continue ;
156195 }
157- try {
158- rawTweets .put (line );
159- } catch (Exception interExcep ) {
160- interExcep .printStackTrace ();
196+ for (TweetsStreamListener listener : listeners ) {
197+ listener .actionOnTweetsStream (streamingTweet );
198+ }
199+ tweetsCount ++;
200+ if (tweetsCount == tweetsLimit ) {
201+ long stopTime = System .currentTimeMillis ();
202+ long durationInMillis = stopTime - startTime ;
203+ double seconds = durationInMillis / 1000.0 ;
204+ System .out .println ("Total duration in seconds: " + seconds );
205+ shutdown ();
161206 }
162207 }
163208 } catch (Exception e ) {
164209 e .printStackTrace ();
165- shutdown ();
166210 }
167211 }
168212 }
0 commit comments