Skip to content

Commit bab514d

Browse files
committed
switched from InputStream and BufferedReader to okio BufferedSource
1 parent 09f86fc commit bab514d

File tree

4 files changed

+23
-24
lines changed

4 files changed

+23
-24
lines changed

src/main/java/com/twitter/clientlib/ApiClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import okhttp3.logging.HttpLoggingInterceptor.Level;
3030
import okio.Buffer;
3131
import okio.BufferedSink;
32+
import okio.BufferedSource;
3233
import okio.Okio;
3334
import org.apache.oltu.oauth2.client.request.OAuthClientRequest.TokenRequestBuilder;
3435
import org.apache.oltu.oauth2.common.message.types.GrantType;
@@ -1099,11 +1100,10 @@ public <T> ApiResponse<T> execute(Call call, Type returnType) throws ApiExceptio
10991100
* <p>Execute stream.</p>
11001101
*
11011102
* @param call a {@link okhttp3.Call} object
1102-
* @param returnType a {@link java.lang.reflect.Type} object
11031103
* @return a {@link java.io.InputStream} object
11041104
* @throws com.twitter.clientlib.ApiException if any.
11051105
*/
1106-
public InputStream executeStream(Call call, Type returnType) throws ApiException {
1106+
public BufferedSource executeStream(Call call) throws ApiException {
11071107
try {
11081108
Response response = call.execute();
11091109
if (!response.isSuccessful()) {
@@ -1112,7 +1112,7 @@ public InputStream executeStream(Call call, Type returnType) throws ApiException
11121112
if (response.body() == null) {
11131113
return null;
11141114
}
1115-
return response.body().byteStream();
1115+
return response.body().source();
11161116
} catch (IOException e) {
11171117
throw new ApiException(e);
11181118
}

src/main/java/com/twitter/clientlib/api/TweetsApi.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.io.InputStream;
7979
import javax.ws.rs.core.GenericType;
8080

81+
import okio.BufferedSource;
8182
import org.apache.commons.lang3.StringUtils;
8283

8384
public class TweetsApi extends ApiCommon {
@@ -1692,17 +1693,16 @@ private okhttp3.Call sampleStreamValidateBeforeCall(StreamQueryParameters stream
16921693
<tr><td> 0 </td><td> The request has failed. </td><td> - </td></tr>
16931694
</table>
16941695
*/
1695-
public InputStream sampleStream(StreamQueryParameters streamParameters) throws ApiException {
1696-
InputStream localVarResp = sampleStreamWithHttpInfo(streamParameters);
1697-
return localVarResp;
1696+
public BufferedSource sampleStream(StreamQueryParameters streamParameters) throws ApiException {
1697+
return sampleStreamWithHttpInfo(streamParameters);
16981698
}
16991699

17001700
/**
17011701
* Calls the API using a retry mechanism to handle rate limits errors.
17021702
*
17031703
*/
1704-
public InputStream sampleStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException {
1705-
InputStream localVarResp;
1704+
public BufferedSource sampleStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException {
1705+
BufferedSource localVarResp;
17061706
try{
17071707
localVarResp = sampleStream(streamParameters);
17081708
}
@@ -1730,11 +1730,10 @@ public InputStream sampleStream(Integer retries, StreamQueryParameters streamPar
17301730
<tr><td> 0 </td><td> The request has failed. </td><td> - </td></tr>
17311731
</table>
17321732
*/
1733-
public InputStream sampleStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException {
1733+
public BufferedSource sampleStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException {
17341734
okhttp3.Call localVarCall = sampleStreamValidateBeforeCall(streamParameters, null);
17351735
try {
1736-
Type localVarReturnType = new TypeToken<StreamingTweet>(){}.getType();
1737-
return localVarApiClient.executeStream(localVarCall, localVarReturnType);
1736+
return localVarApiClient.executeStream(localVarCall);
17381737
} catch (ApiException e) {
17391738
e.setErrorObject(localVarApiClient.getJSON().getGson().fromJson(e.getResponseBody(), new TypeToken<com.twitter.clientlib.model.ProblemOrError>(){}.getType()));
17401739
throw e;
@@ -1857,17 +1856,16 @@ private okhttp3.Call searchStreamValidateBeforeCall(StreamQueryParameters stream
18571856
<tr><td> 0 </td><td> The request has failed. </td><td> - </td></tr>
18581857
</table>
18591858
*/
1860-
public InputStream searchStream(StreamQueryParameters streamParameters) throws ApiException {
1861-
InputStream localVarResp = searchStreamWithHttpInfo(streamParameters);
1862-
return localVarResp;
1859+
public BufferedSource searchStream(StreamQueryParameters streamParameters) throws ApiException {
1860+
return searchStreamWithHttpInfo(streamParameters);
18631861
}
18641862

18651863
/**
18661864
* Calls the API using a retry mechanism to handle rate limits errors.
18671865
*
18681866
*/
1869-
public InputStream searchStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException {
1870-
InputStream localVarResp;
1867+
public BufferedSource searchStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException {
1868+
BufferedSource localVarResp;
18711869
try{
18721870
localVarResp = searchStream(streamParameters);
18731871
}
@@ -1895,11 +1893,10 @@ public InputStream searchStream(Integer retries, StreamQueryParameters streamPar
18951893
<tr><td> 0 </td><td> The request has failed. </td><td> - </td></tr>
18961894
</table>
18971895
*/
1898-
public InputStream searchStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException {
1896+
public BufferedSource searchStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException {
18991897
okhttp3.Call localVarCall = searchStreamValidateBeforeCall(streamParameters, null);
19001898
try {
1901-
Type localVarReturnType = new TypeToken<FilteredStreamingTweet>(){}.getType();
1902-
return localVarApiClient.executeStream(localVarCall, localVarReturnType);
1899+
return localVarApiClient.executeStream(localVarCall);
19031900
} catch (ApiException e) {
19041901
e.setErrorObject(localVarApiClient.getJSON().getGson().fromJson(e.getResponseBody(), new TypeToken<com.twitter.clientlib.model.ProblemOrError>(){}.getType()));
19051902
throw e;

src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.fasterxml.jackson.databind.ObjectMapper;
4141

4242
import com.twitter.clientlib.model.StreamingTweet;
43+
import okio.BufferedSource;
4344

4445
public class TweetsStreamExecutor {
4546
private volatile BlockingQueue<String> rawTweets;
@@ -56,9 +57,9 @@ public class TweetsStreamExecutor {
5657
private ExecutorService listenersService;
5758

5859
private final List<TweetsStreamListener> listeners = new ArrayList<>();
59-
private InputStream stream;
60+
private BufferedSource stream;
6061

61-
public TweetsStreamExecutor(InputStream stream) {
62+
public TweetsStreamExecutor(BufferedSource stream) {
6263
this.rawTweets = new LinkedBlockingDeque<>();
6364
this.tweets = new LinkedBlockingDeque<>();
6465
this.stream = stream;
@@ -131,9 +132,9 @@ public void run() {
131132
public void queueTweets() {
132133

133134
String line = null;
134-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
135+
try {
135136
while (isRunning) {
136-
line = reader.readLine();
137+
line = stream.readUtf8Line();
137138
if(line == null || line.isEmpty()) {
138139
continue;
139140
}

src/main/java/com/twitter/clientlib/stream/TwitterStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.twitter.clientlib.TwitterCredentialsBearer;
66
import com.twitter.clientlib.api.TweetsApi;
77
import com.twitter.clientlib.query.StreamQueryParameters;
8+
import okio.BufferedSource;
89

910
import java.io.InputStream;
1011
import java.util.LinkedList;
@@ -32,7 +33,7 @@ public void addListener(TweetsStreamListener listener) {
3233

3334
public void sampleStream(StreamQueryParameters streamParameters) {
3435
try {
35-
InputStream streamResult = tweets.sampleStream(streamParameters);
36+
BufferedSource streamResult = tweets.sampleStream(streamParameters);
3637
TweetsStreamExecutor executor = new TweetsStreamExecutor(streamResult);
3738
listeners.forEach(executor::addListener);
3839
executor.start();

0 commit comments

Comments
 (0)