Skip to content

Commit 9d28fe1

Browse files
committed
reduced duplication for closing executor service and switched to single thread deserialization layer
1 parent bab514d commit 9d28fe1

File tree

3 files changed

+56
-37
lines changed

3 files changed

+56
-37
lines changed

examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,21 @@ public static void main(String[] args) {
4343
* Check the 'security' tag of the required APIs in https://api.twitter.com/2/openapi.json in order
4444
* to use the right credential object.
4545
*/
46-
TwitterCredentialsBearer credentials = new TwitterCredentialsBearer("AAAAAAAAAAAAAAAAAAAAAJotdgEAAAAAmXZex2w6l1D6TcwC8KfripI3ADY%3DkKTjcNPxea1aAL8yYXJTdbAX1NkgEJii7SpXetxuP9GXXhSnOa");
46+
TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"));
4747
TwitterStream twitterStream = new TwitterStream();
4848
twitterStream.setTwitterCredentials(credentials);
4949
twitterStream.addListener(new Responder());
50+
5051
// twitterStream.sampleStream(new StreamQueryParameters.Builder()
5152
// .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT)
5253
// .build());
5354

5455
twitterStream.sampleStream(new StreamQueryParameters.Builder()
55-
.withTweetFields(TweetField.all().toArray(new TweetField[0]))
56-
.withMediaFields(MediaField.all().toArray(new MediaField[0]))
57-
.withUserFields(UserField.all().toArray(new UserField[0]))
58-
.withPollFields(PollField.all().toArray(new PollField[0]))
59-
.withPlaceFields(PlaceField.all().toArray(new PlaceField[0]))
56+
.withTweetFields(TweetField.all())
57+
.withMediaFields(MediaField.all())
58+
.withUserFields(UserField.all())
59+
.withPollFields(PollField.all())
60+
.withPlaceFields(PlaceField.all())
6061
.build());
6162

6263
}

src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,26 +106,51 @@ public Builder withTweetFields(TweetField... tweetFields) {
106106
return this;
107107
}
108108

109+
public Builder withTweetFields(List<TweetField> tweetFields) {
110+
this.tweetFields.addAll(tweetFields);
111+
return this;
112+
}
113+
109114
public Builder withMediaFields(MediaField... mediaFields) {
110115
this.mediaFields.addAll(Arrays.asList(mediaFields));
111116
return this;
112117
}
113118

119+
public Builder withMediaFields(List<MediaField> mediaFields) {
120+
this.mediaFields.addAll(mediaFields);
121+
return this;
122+
}
123+
114124
public Builder withPollFields(PollField... pollFields) {
115125
this.pollFields.addAll(Arrays.asList(pollFields));
116126
return this;
117127
}
118128

129+
public Builder withPollFields(List<PollField> pollFields) {
130+
this.pollFields.addAll(pollFields);
131+
return this;
132+
}
133+
119134
public Builder withUserFields(UserField... userFields) {
120135
this.userFields.addAll(Arrays.asList(userFields));
121136
return this;
122137
}
123138

139+
public Builder withUserFields(List<UserField> userFields) {
140+
this.userFields.addAll(userFields);
141+
return this;
142+
}
143+
124144
public Builder withPlaceFields(PlaceField... placeFields) {
125145
this.placeFields.addAll(Arrays.asList(placeFields));
126146
return this;
127147
}
128148

149+
public Builder withPlaceFields(List<PlaceField> placeFields) {
150+
this.placeFields.addAll(placeFields);
151+
return this;
152+
}
153+
129154
public Builder withBackfillMinutes(Integer backFillMinutes) {
130155
this.backFillMinutes = backFillMinutes;
131156
return this;

src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
package com.twitter.clientlib.stream;
2424

2525

26-
import java.io.BufferedReader;
27-
import java.io.InputStream;
28-
import java.io.InputStreamReader;
2926
import java.util.ArrayList;
3027
import java.util.List;
3128
import java.util.concurrent.BlockingQueue;
@@ -50,8 +47,6 @@ public class TweetsStreamExecutor {
5047
private long startTime;
5148
private int tweetsCount = 0;
5249
private final int tweetsLimit = 250000;
53-
private final int deserializationThreads = 1; //TODO parametrize this
54-
5550
private ExecutorService rawTweetsQueuerService;
5651
private ExecutorService deserializationService;
5752
private ExecutorService listenersService;
@@ -83,45 +78,44 @@ public void start() {
8378
rawTweetsQueuerService = Executors.newSingleThreadExecutor();
8479
rawTweetsQueuerService.submit(new RawTweetsQueuer());
8580

86-
deserializationService = Executors.newFixedThreadPool(deserializationThreads);
87-
for (int i = 0; i < deserializationThreads; i++) {
88-
deserializationService.submit(new DeserializeTweetsTask());
89-
}
81+
deserializationService = Executors.newSingleThreadExecutor();
82+
deserializationService.submit(new DeserializeTweetsTask());
9083

9184
listenersService = Executors.newSingleThreadExecutor();
9285
listenersService.submit(new TweetsListenersTask());
9386
}
9487

9588
public synchronized void shutdown() {
9689
isRunning = false;
97-
rawTweetsQueuerService.shutdown();
98-
deserializationService.shutdown();
99-
listenersService.shutdown();
90+
shutDownServices();
10091
try {
101-
if (!rawTweetsQueuerService.awaitTermination(3, TimeUnit.SECONDS)) {
102-
rawTweetsQueuerService.shutdownNow();
103-
if (!rawTweetsQueuerService.awaitTermination(3, TimeUnit.SECONDS))
104-
System.err.println("Pool did not terminate");
105-
}
106-
if (!deserializationService.awaitTermination(3, TimeUnit.SECONDS)) {
107-
deserializationService.shutdownNow();
108-
if (!deserializationService.awaitTermination(3, TimeUnit.SECONDS))
109-
System.err.println("Pool did not terminate");
110-
}
111-
if (!listenersService.awaitTermination(3, TimeUnit.SECONDS)) {
112-
listenersService.shutdownNow();
113-
if (!listenersService.awaitTermination(3, TimeUnit.SECONDS))
114-
System.err.println("Pool did not terminate");
115-
}
92+
terminateServices();
11693
} catch (InterruptedException ie) {
117-
rawTweetsQueuerService.shutdown();
118-
deserializationService.shutdown();
119-
listenersService.shutdown();
94+
shutDownServices();
12095
Thread.currentThread().interrupt();
12196
}
12297
System.out.println("TweetsStreamListenersExecutor is shutting down.");
12398
}
12499

100+
private void shutDownServices() {
101+
rawTweetsQueuerService.shutdown();
102+
deserializationService.shutdown();
103+
listenersService.shutdown();
104+
}
105+
106+
private void terminateServices() throws InterruptedException {
107+
terminateService(rawTweetsQueuerService);
108+
terminateService(deserializationService);
109+
terminateService(listenersService);
110+
}
111+
private void terminateService(ExecutorService executorService) throws InterruptedException {
112+
if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
113+
executorService.shutdownNow();
114+
if (!executorService.awaitTermination(3, TimeUnit.SECONDS))
115+
System.err.println("Pool did not terminate");
116+
}
117+
}
118+
125119
private class RawTweetsQueuer implements Runnable {
126120

127121
@Override
@@ -130,7 +124,6 @@ public void run() {
130124
}
131125

132126
public void queueTweets() {
133-
134127
String line = null;
135128
try {
136129
while (isRunning) {

0 commit comments

Comments
 (0)