3939import com .fasterxml .jackson .databind .JsonMappingException ;
4040import com .fasterxml .jackson .databind .ObjectMapper ;
4141
42- import com .twitter .clientlib .JSON ;
4342import com .twitter .clientlib .model .StreamingTweet ;
4443
4544public class TweetsStreamExecutor {
@@ -49,15 +48,15 @@ public class TweetsStreamExecutor {
4948
5049 private long startTime ;
5150 private int tweetsCount = 0 ;
52- private final int tweetsLimit = 80000 ;
53- private final int threads = 1 ; //TODO parametrize this
51+ private final int tweetsLimit = 250000 ;
52+ private final int deserializationThreads = 1 ; //TODO parametrize this
5453
5554 private ExecutorService rawTweetsQueuerService ;
5655 private ExecutorService deserializationService ;
5756 private ExecutorService listenersService ;
5857
5958 private final List <TweetsStreamListener > listeners = new ArrayList <>();
60- private final InputStream stream ;
59+ private InputStream stream ;
6160
6261 public TweetsStreamExecutor (InputStream stream ) {
6362 this .rawTweets = new LinkedBlockingDeque <>();
@@ -83,9 +82,9 @@ public void start() {
8382 rawTweetsQueuerService = Executors .newSingleThreadExecutor ();
8483 rawTweetsQueuerService .submit (new RawTweetsQueuer ());
8584
86- deserializationService = Executors .newFixedThreadPool (threads );
87- for (int i = 0 ; i < threads ; i ++) {
88- deserializationService .submit (new ParseTweetsTask ());
85+ deserializationService = Executors .newFixedThreadPool (deserializationThreads );
86+ for (int i = 0 ; i < deserializationThreads ; i ++) {
87+ deserializationService .submit (new DeserializeTweetsTask ());
8988 }
9089
9190 listenersService = Executors .newSingleThreadExecutor ();
@@ -136,7 +135,6 @@ public void queueTweets() {
136135 while (isRunning ) {
137136 line = reader .readLine ();
138137 if (line == null || line .isEmpty ()) {
139- Thread .sleep (10 );
140138 continue ;
141139 }
142140 try {
@@ -152,10 +150,10 @@ public void queueTweets() {
152150 }
153151 }
154152
155- private class ParseTweetsTask implements Runnable {
153+ private class DeserializeTweetsTask implements Runnable {
156154 private final ObjectMapper objectMapper ;
157155
158- private ParseTweetsTask () {
156+ private DeserializeTweetsTask () {
159157 this .objectMapper = new ObjectMapper ();
160158 objectMapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
161159 }
@@ -186,28 +184,27 @@ public void run() {
186184
187185 private void processTweets () {
188186 StreamingTweet streamingTweet ;
189- try {
187+
190188 while (isRunning ) {
191- streamingTweet = tweets . poll ();
192- if ( streamingTweet == null ) {
193- Thread . sleep ( 10 );
194- continue ;
195- }
196- for ( TweetsStreamListener listener : listeners ) {
197- listener . actionOnTweetsStream ( streamingTweet );
198- }
199- tweetsCount ++ ;
200- if ( tweetsCount == tweetsLimit ) {
201- long stopTime = System .currentTimeMillis ( );
202- long durationInMillis = stopTime - startTime ;
203- double seconds = durationInMillis / 1000.0 ;
204- System . out . println ( "Total duration in seconds: " + seconds );
205- shutdown ( );
189+ try {
190+ streamingTweet = tweets . take ();
191+ for ( TweetsStreamListener listener : listeners ) {
192+ listener . actionOnTweetsStream ( streamingTweet ) ;
193+ }
194+ tweetsCount ++;
195+ if ( tweetsCount == tweetsLimit ) {
196+ long stopTime = System . currentTimeMillis ();
197+ long durationInMillis = stopTime - startTime ;
198+ double seconds = durationInMillis / 1000.0 ;
199+ System .out . println ( "Total duration in seconds: " + seconds );
200+ shutdown () ;
201+ }
202+ } catch ( InterruptedException e ) {
203+ System . out . println ( "processTweets: Fail 1" );
206204 }
205+
207206 }
208- } catch (Exception e ) {
209- e .printStackTrace ();
210- }
207+
211208 }
212209 }
213210}
0 commit comments