2020*/
2121
2222
23- package com .twitter .clientlib ;
23+ package com .twitter .clientlib . stream ;
2424
2525
2626import java .io .BufferedReader ;
2727import java .io .InputStream ;
2828import java .io .InputStreamReader ;
29- import java .lang .reflect .Type ;
3029import java .util .ArrayList ;
31- import java .util .LinkedList ;
3230import java .util .List ;
33- import java .util .Queue ;
31+ import java .util .concurrent .BlockingQueue ;
32+ import java .util .concurrent .ExecutorService ;
33+ import java .util .concurrent .Executors ;
34+ import java .util .concurrent .LinkedBlockingDeque ;
35+ import java .util .stream .IntStream ;
3436
35- import com .google .gson .reflect .TypeToken ;
37+ import com .fasterxml .jackson .core .JsonProcessingException ;
38+ import com .fasterxml .jackson .databind .DeserializationFeature ;
39+ import com .fasterxml .jackson .databind .JsonMappingException ;
40+ import com .fasterxml .jackson .databind .ObjectMapper ;
3641
3742import com .twitter .clientlib .model .StreamingTweet ;
38- import com .twitter .clientlib .model .SingleTweetLookupResponse ;
3943
40- public class TweetsStreamListenersExecutor {
41- private final ITweetsQueue tweetsQueue ;
42- private final List <TweetsStreamListener > listeners = new ArrayList <>();
43- private final InputStream stream ;
44+ public class TweetsStreamExecutor {
45+ private volatile BlockingQueue <String > rawTweets ;
46+ private volatile BlockingQueue <StreamingTweet > tweets ;
4447 private volatile boolean isRunning = true ;
4548
46- public TweetsStreamListenersExecutor (InputStream stream ) {
47- this .tweetsQueue = new LinkedListTweetsQueue ();
48- this .stream = stream ;
49- }
49+ private ExecutorService executorService ;
50+ private final List <TweetsStreamListener > listeners = new ArrayList <>();
51+ private final InputStream stream ;
5052
51- public TweetsStreamListenersExecutor (ITweetsQueue tweetsQueue , InputStream stream ) {
52- this .tweetsQueue = tweetsQueue ;
53+ public TweetsStreamExecutor (InputStream stream ) {
54+ this .rawTweets = new LinkedBlockingDeque <>();
55+ this .tweets = new LinkedBlockingDeque <>();
5356 this .stream = stream ;
5457 }
5558
5659 public void addListener (TweetsStreamListener toAdd ) {
5760 listeners .add (toAdd );
5861 }
5962
60- public void executeListeners () {
63+ public void removeListener (TweetsStreamListener toRemove ) {
64+ listeners .remove (toRemove );
65+ }
66+
67+ public void start () {
6168 if (stream == null ) {
6269 System .out .println ("Error: stream is null." );
6370 return ;
64- } else if (this .tweetsQueue == null ) {
65- System .out .println ("Error: tweetsQueue is null." );
66- return ;
6771 }
6872
69- TweetsQueuer tweetsQueuer = new TweetsQueuer ();
73+ RawTweetsQueuer rawTweetsQueuer = new RawTweetsQueuer ();
7074 TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor ();
75+ rawTweetsQueuer .start ();
76+ int threads = 5 ; //TODO parametrize this
77+ executorService = Executors .newFixedThreadPool (threads );
78+ for (int i = 0 ; i < threads ; i ++) {
79+ executorService .submit (new ParseTweetsTask ());
80+ }
7181 tweetsListenersExecutor .start ();
72- tweetsQueuer .start ();
7382 }
7483
7584 public synchronized void shutdown () {
7685 isRunning = false ;
86+ executorService .shutdown ();
7787 System .out .println ("TweetsStreamListenersExecutor is shutting down." );
7888 }
7989
@@ -87,7 +97,7 @@ private void processTweets() {
8797 StreamingTweet streamingTweet ;
8898 try {
8999 while (isRunning ) {
90- streamingTweet = tweetsQueue .poll ();
100+ streamingTweet = tweets .poll ();
91101 if (streamingTweet == null ) {
92102 Thread .sleep (100 );
93103 continue ;
@@ -102,15 +112,39 @@ private void processTweets() {
102112 }
103113 }
104114
105- private class TweetsQueuer extends Thread {
115+ private class ParseTweetsTask implements Runnable {
116+ private final ObjectMapper objectMapper ;
117+ private ParseTweetsTask () {
118+ this .objectMapper = new ObjectMapper ();
119+ objectMapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
120+ }
121+
122+ @ Override
123+ public void run () {
124+ while (isRunning ) {
125+ try {
126+ String rawTweet = rawTweets .take ();
127+ StreamingTweet tweet = objectMapper .readValue (rawTweet , StreamingTweet .class );
128+ tweets .put (tweet );
129+ } catch (InterruptedException e ) {
130+ System .out .println ("Fail 1" );
131+ } catch (JsonMappingException e ) {
132+ System .out .println ("Fail 2" );
133+ } catch (JsonProcessingException e ) {
134+ System .out .println ("Fail 3" );
135+ }
136+ }
137+ }
138+ }
139+
140+ private class RawTweetsQueuer extends Thread {
141+
106142 @ Override
107143 public void run () {
108144 queueTweets ();
109145 }
110146
111147 public void queueTweets () {
112- JSON json = new JSON ();
113- Type localVarReturnType = new TypeToken <SingleTweetLookupResponse >() {}.getType ();
114148
115149 String line = null ;
116150 try (BufferedReader reader = new BufferedReader (new InputStreamReader (stream ))) {
@@ -121,7 +155,7 @@ public void queueTweets() {
121155 continue ;
122156 }
123157 try {
124- tweetsQueue . add ( StreamingTweet . fromJson ( line ) );
158+ rawTweets . put ( line );
125159 } catch (Exception interExcep ) {
126160 interExcep .printStackTrace ();
127161 }
@@ -134,21 +168,3 @@ public void queueTweets() {
134168 }
135169}
136170
137- interface ITweetsQueue {
138- StreamingTweet poll ();
139- void add (StreamingTweet streamingTweet );
140- }
141-
142- class LinkedListTweetsQueue implements ITweetsQueue {
143- private final Queue <StreamingTweet > tweetsQueue = new LinkedList <>();
144-
145- @ Override
146- public StreamingTweet poll () {
147- return tweetsQueue .poll ();
148- }
149-
150- @ Override
151- public void add (StreamingTweet streamingTweet ) {
152- tweetsQueue .add (streamingTweet );
153- }
154- }
0 commit comments