Skip to content

Commit 74708fc

Browse files
committed
Optimize code structure
1 parent 66d0e34 commit 74708fc

File tree

11 files changed

+40
-180
lines changed

11 files changed

+40
-180
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,34 +78,6 @@ public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
7878
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7979
}
8080

81-
@Override
82-
public Row fillData(Row input, Object sideInput) {
83-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
84-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
85-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
86-
Object obj = input.getField(entry.getValue());
87-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
88-
89-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
90-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
91-
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92-
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
93-
}
94-
95-
row.setField(entry.getKey(), obj);
96-
}
97-
98-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
99-
if (cacheInfo == null) {
100-
row.setField(entry.getKey(), null);
101-
} else {
102-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
103-
}
104-
}
105-
106-
return row;
107-
}
108-
10981
@Override
11082
protected void initCache() throws SQLException {
11183
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,14 @@ public Row fillData(Row input, Object sideInput) {
112112
return row;
113113
}
114114

115+
/**
116+
* covert flink time attribute.Type information for indicating event or processing time.
117+
* However, it behaves like a regular SQL timestamp but is serialized as Long.
118+
*
119+
* @param entry
120+
* @param obj
121+
* @return
122+
*/
115123
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
116124
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
117125
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,6 @@ private Map<String, Object> parseInputParam(CRow input){
194194
return inputParams;
195195
}
196196

197-
private void constantField() {
198-
199-
}
200-
201197
protected boolean isUseCache(Map<String, Object> inputParams){
202198
return openCache() && getFromCache(buildCacheKey(inputParams)) != null;
203199
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,19 @@
2222

2323
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2424
import com.google.common.base.Preconditions;
25-
import com.google.common.collect.Sets;
26-
import org.apache.calcite.sql.*;
25+
import org.apache.calcite.sql.JoinType;
26+
import org.apache.calcite.sql.SqlBasicCall;
27+
import org.apache.calcite.sql.SqlIdentifier;
28+
import org.apache.calcite.sql.SqlKind;
29+
import org.apache.calcite.sql.SqlLiteral;
30+
import org.apache.calcite.sql.SqlNode;
2731
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2832
import com.google.common.collect.Lists;
2933
import com.google.common.collect.Maps;
3034

3135
import java.io.Serializable;
3236
import java.util.List;
3337
import java.util.Map;
34-
import java.util.Set;
3538

3639
/**
3740
* Reason:
@@ -42,6 +45,7 @@
4245

4346
public abstract class BaseSideInfo implements Serializable{
4447

48+
// Source Table RowTypeInfo
4549
protected RowTypeInfo rowTypeInfo;
4650

4751
protected List<FieldInfo> outFieldInfoList;
@@ -128,13 +132,15 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
128132
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
129133
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
130134
if (leftNode.getKind() == SqlKind.LITERAL) {
131-
SqlLiteral literal = (SqlLiteral) leftNode;
132-
SqlIdentifier identifier = (SqlIdentifier) rightNode;
133-
evalConstantEquation(literal, identifier);
134-
} else if(rightNode.getKind() == SqlKind.LITERAL) {
135-
SqlLiteral literal = (SqlLiteral) rightNode;
136-
SqlIdentifier identifier = (SqlIdentifier) leftNode;
137-
evalConstantEquation(literal, identifier);
135+
evalConstantEquation(
136+
(SqlLiteral) leftNode,
137+
(SqlIdentifier) rightNode
138+
);
139+
} else if (rightNode.getKind() == SqlKind.LITERAL) {
140+
evalConstantEquation(
141+
(SqlLiteral) rightNode,
142+
(SqlIdentifier) leftNode
143+
);
138144
} else {
139145
SqlIdentifier left = (SqlIdentifier) leftNode;
140146
SqlIdentifier right = (SqlIdentifier) rightNode;

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import java.util.Map;
5151
import java.util.Queue;
5252
import java.util.Set;
53+
import java.util.regex.Matcher;
54+
import java.util.regex.Pattern;
5355

5456
import static org.apache.calcite.sql.SqlKind.*;
5557
import static org.apache.calcite.sql.SqlKind.CASE;
@@ -65,7 +67,7 @@
6567
public class TableUtils {
6668

6769
public static final char SPLIT = '_';
68-
70+
public static final Pattern stringPattern = Pattern.compile("\".*?\"|\'.*?\'");
6971
/**
7072
* 获取select 的字段
7173
* @param sqlSelect
@@ -720,11 +722,14 @@ public static void addConstant(Map<String, Object> keyMap, AbstractSideTableInfo
720722
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
721723
final String name = sideTableInfo.getName();
722724
for (PredicateInfo info : predicateInfos) {
723-
if (info.getOwnerTable().equals(name) &&
724-
info.getOperatorName().equals("=")) {
725+
if (info.getOwnerTable().equals(name)
726+
&& info.getOperatorName().equals("=")) {
725727
String condition = info.getCondition();
726-
String conditionWithoutQuota = condition.replaceAll("['\"]", "");
727-
keyMap.put(info.getFieldName(), conditionWithoutQuota);
728+
Matcher matcher = stringPattern.matcher(condition);
729+
if (matcher.matches()) {
730+
condition = condition.substring(1, condition.length() - 1);
731+
}
732+
keyMap.put(info.getFieldName(), condition);
728733
}
729734
}
730735
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -96,35 +96,6 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
9696
}
9797
}
9898

99-
@Override
100-
public Row fillData(Row input, Object sideInput) {
101-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
102-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
103-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
104-
Object obj = input.getField(entry.getValue());
105-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
106-
107-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
108-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
109-
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
110-
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
111-
}
112-
113-
114-
row.setField(entry.getKey(), obj);
115-
}
116-
117-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
118-
if (cacheInfo == null) {
119-
row.setField(entry.getKey(), null);
120-
} else {
121-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
122-
}
123-
}
124-
125-
return row;
126-
}
127-
12899
private String buildKey(List<Object> equalValList) {
129100
StringBuilder sb = new StringBuilder("");
130101
for (Object equalVal : equalValList) {

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
package com.dtstack.flink.sql.side.hbase;
2222

2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
24-
import com.dtstack.flink.sql.side.PredicateInfo;
2524
import com.dtstack.flink.sql.side.hbase.enums.EReplaceType;
26-
import com.dtstack.flink.sql.util.DtStringUtil;
2725
import com.dtstack.flink.sql.util.MD5Utils;
2826
import com.dtstack.flink.sql.util.TableUtils;
2927
import com.google.common.collect.Lists;

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,35 +65,6 @@ public KuduAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo>
6565
super(new KuduAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6666
}
6767

68-
69-
@Override
70-
public Row fillData(Row input, Object sideInput) {
71-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
72-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
73-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
74-
Object obj = input.getField(entry.getValue());
75-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
76-
77-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
78-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
79-
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
80-
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
81-
}
82-
83-
row.setField(entry.getKey(), obj);
84-
}
85-
86-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
87-
if (cacheInfo == null) {
88-
row.setField(entry.getKey(), null);
89-
} else {
90-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
91-
}
92-
}
93-
94-
return row;
95-
}
96-
9768
@Override
9869
protected void initCache() throws SQLException {
9970
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,34 +78,6 @@ public MongoAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
7878
super(new MongoAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7979
}
8080

81-
@Override
82-
public Row fillData(Row input, Object sideInput) {
83-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
84-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
85-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
86-
Object obj = input.getField(entry.getValue());
87-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
88-
89-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
90-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
91-
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92-
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
93-
}
94-
95-
row.setField(entry.getKey(), obj);
96-
}
97-
98-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
99-
if (cacheInfo == null) {
100-
row.setField(entry.getKey(), null);
101-
} else {
102-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
103-
}
104-
}
105-
106-
return row;
107-
}
108-
10981
@Override
11082
protected void initCache() throws SQLException {
11183
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -132,46 +132,6 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
132132
cacheList.forEach(one -> out.collect(new CRow(fillData(value.row(), one), value.change())));
133133
}
134134

135-
@Override
136-
public Row fillData(Row input, Object sideInput) {
137-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
138-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
139-
140-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
141-
// origin value
142-
Object obj = input.getField(entry.getValue());
143-
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
144-
row.setField(entry.getKey(), obj);
145-
}
146-
147-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
148-
if (cacheInfo == null) {
149-
row.setField(entry.getKey(), null);
150-
} else {
151-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
152-
}
153-
154-
}
155-
return row;
156-
}
157-
158-
/**
159-
* covert flink time attribute.Type information for indicating event or processing time.
160-
* However, it behaves like a regular SQL timestamp but is serialized as Long.
161-
*
162-
* @param entry
163-
* @param obj
164-
* @return
165-
*/
166-
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
167-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
168-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
169-
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
170-
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
171-
}
172-
return obj;
173-
}
174-
175135
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
176136
RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
177137
Connection connection = null;

0 commit comments

Comments
 (0)