Skip to content

Commit edfe0f3

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_32844' into 1.10_release_4.0.x
2 parents 6d097ad + 1dbc1b0 commit edfe0f3

File tree

1 file changed

+6
-9
lines changed

1 file changed

+6
-9
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,21 @@
2929
import org.apache.calcite.sql.JoinType;
3030
import org.apache.commons.collections.CollectionUtils;
3131
import org.apache.commons.lang3.StringUtils;
32-
import org.apache.flink.api.common.typeinfo.TypeInformation;
3332
import org.apache.flink.configuration.Configuration;
3433
import org.apache.flink.table.dataformat.BaseRow;
35-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3634
import org.apache.flink.types.Row;
3735
import org.apache.flink.util.Collector;
3836
import org.slf4j.Logger;
3937
import org.slf4j.LoggerFactory;
4038

41-
import java.sql.*;
42-
import java.time.LocalDateTime;
39+
import java.sql.Connection;
40+
import java.sql.ResultSet;
41+
import java.sql.SQLException;
42+
import java.sql.Statement;
4343
import java.util.ArrayList;
4444
import java.util.Calendar;
45-
import java.util.HashMap;
4645
import java.util.List;
4746
import java.util.Map;
48-
import java.util.Objects;
4947
import java.util.concurrent.atomic.AtomicReference;
5048
import java.util.stream.Collectors;
5149

@@ -105,7 +103,6 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
105103
List<Integer> equalValIndex = sideInfo.getEqualValIndex();
106104
ArrayList<Object> inputParams = equalValIndex.stream()
107105
.map(value::getField)
108-
.filter(Objects::nonNull)
109106
.collect(Collectors.toCollection(ArrayList::new));
110107

111108
if (inputParams.size() != equalValIndex.size() && sideInfo.getJoinType() == JoinType.LEFT) {
@@ -115,7 +112,7 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
115112
}
116113

117114
String cacheKey = inputParams.stream()
118-
.map(Object::toString)
115+
.map(e -> String.valueOf(e))
119116
.collect(Collectors.joining("_"));
120117

121118
List<Map<String, Object>> cacheList = cacheRef.get().get(cacheKey);
@@ -185,7 +182,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
185182

186183
String cacheKey = sideInfo.getEqualFieldList().stream()
187184
.map(oneRow::get)
188-
.map(Object::toString)
185+
.map(e -> String.valueOf(e))
189186
.collect(Collectors.joining("_"));
190187

191188
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())

0 commit comments

Comments
 (0)