Skip to content

Commit 2a58e7b

Browse files
committed
extract method
1 parent 1470167 commit 2a58e7b

File tree

4 files changed

+26
-31
lines changed

4 files changed

+26
-31
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
111111
}
112112
}
113113

114+
protected void dealCacheData(String key, CacheObj missKeyObj) {
115+
if (openCache()) {
116+
putCache(key, missKeyObj);
117+
}
118+
}
119+
114120
@Override
115121
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
116122
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
132132
dealMissKey(inputRow, resultFuture);
133133
return;
134134
}
135-
136135
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
137136
}
138137

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
9696
return;
9797
} else if (ECacheContentType.MultiLine == val.getType()) {
9898
try {
99-
List<Row> rowList = Lists.newArrayList();
100-
for (Object jsonArray : (List) val.getContent()) {
101-
Row row = fillData(inputRow, jsonArray);
102-
rowList.add(row);
103-
}
99+
List<Row> rowList = getRows(inputRow, null, (List) val.getContent());
104100
resultFuture.complete(rowList);
105101
} catch (Exception e) {
106102
dealFillDataError(resultFuture, e, inputRow);
@@ -127,33 +123,19 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
127123
resultFuture.completeExceptionally(rs.cause());
128124
return;
129125
}
130-
131126
List<JsonArray> cacheContent = Lists.newArrayList();
132-
133-
int resultSize = rs.result().getResults().size();
134-
if (resultSize > 0) {
127+
List<JsonArray> results = rs.result().getResults();
128+
if (results.size() > 0) {
135129
try {
136-
List<Row> rowList = Lists.newArrayList();
137-
for (JsonArray line : rs.result().getResults()) {
138-
Row row = fillData(inputRow, line);
139-
if (openCache()) {
140-
cacheContent.add(line);
141-
}
142-
rowList.add(row);
143-
}
144-
if (openCache()) {
145-
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
146-
}
147-
130+
List<Row> rowList = getRows(inputRow, cacheContent, results);
131+
dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
148132
resultFuture.complete(rowList);
149133
} catch (Exception e){
150134
dealFillDataError(resultFuture, e, inputRow);
151135
}
152136
} else {
153137
dealMissKey(inputRow, resultFuture);
154-
if (openCache()) {
155-
putCache(key, CacheMissVal.getMissKeyObj());
156-
}
138+
dealCacheData(key, CacheMissVal.getMissKeyObj());
157139
}
158140

159141
// and close the connection
@@ -166,6 +148,18 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
166148
});
167149
}
168150

151+
protected List<Row> getRows(Row inputRow, List<JsonArray> cacheContent, List<JsonArray> results) {
152+
List<Row> rowList = Lists.newArrayList();
153+
for (JsonArray line : results) {
154+
Row row = fillData(inputRow, line);
155+
if (null != cacheContent && openCache()) {
156+
cacheContent.add(line);
157+
}
158+
rowList.add(row);
159+
}
160+
return rowList;
161+
}
162+
169163
@Override
170164
public Row fillData(Row input, Object line) {
171165
JsonArray jsonArray = (JsonArray) line;

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,14 @@ public void accept(List<KeyValue<String, String>> keyValues) {
172172
}
173173
try {
174174
Row row = fillData(inputRow, keyValue);
175-
if (openCache()) {
176-
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
177-
}
175+
dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
178176
resultFuture.complete(Collections.singleton(row));
179177
} catch (Exception e) {
180178
dealFillDataError(resultFuture, e, inputRow);
181179
}
182180
} else {
183181
dealMissKey(inputRow, resultFuture);
184-
if (openCache()) {
185-
putCache(key, CacheMissVal.getMissKeyObj());
186-
}
182+
dealCacheData(key,CacheMissVal.getMissKeyObj());
187183
}
188184
}
189185
});

0 commit comments

Comments
 (0)