Skip to content

Commit dab11f9

Browse files
committed
Merge branch '1.10_test_4.0.x_merged_joinConstant' into '1.10_test_4.0.x'
1.10 test 4.0.x merged join constant See merge request dt-insight-engine/flinkStreamSQL!87
2 parents b7c4124 + e408d12 commit dab11f9

File tree

20 files changed

+203
-483
lines changed

20 files changed

+203
-483
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,20 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import com.datastax.driver.core.*;
21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
27+
import com.datastax.driver.core.Cluster;
28+
import com.datastax.driver.core.ConsistencyLevel;
29+
import com.datastax.driver.core.HostDistance;
30+
import com.datastax.driver.core.PoolingOptions;
31+
import com.datastax.driver.core.QueryOptions;
32+
import com.datastax.driver.core.ResultSet;
33+
import com.datastax.driver.core.Session;
34+
import com.datastax.driver.core.SocketOptions;
2235
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
2336
import com.datastax.driver.core.policies.RetryPolicy;
2437
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -72,27 +85,6 @@ public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
7285
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7386
}
7487

75-
@Override
76-
public Row fillData(Row input, Object sideInput) {
77-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
78-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
79-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
80-
Object obj = input.getField(entry.getValue());
81-
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
82-
row.setField(entry.getKey(), obj);
83-
}
84-
85-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
86-
if (cacheInfo == null) {
87-
row.setField(entry.getKey(), null);
88-
} else {
89-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
90-
}
91-
}
92-
93-
return row;
94-
}
95-
9688
@Override
9789
protected void initCache() throws SQLException {
9890
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -72,57 +72,4 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
7272
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
7373
}
7474

75-
76-
@Override
77-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
78-
if (sqlNode.getKind() != SqlKind.EQUALS) {
79-
throw new RuntimeException("not equal operator.");
80-
}
81-
82-
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
83-
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
84-
85-
String leftTableName = left.getComponent(0).getSimple();
86-
String leftField = left.getComponent(1).getSimple();
87-
88-
String rightTableName = right.getComponent(0).getSimple();
89-
String rightField = right.getComponent(1).getSimple();
90-
91-
if (leftTableName.equalsIgnoreCase(sideTableName)) {
92-
equalFieldList.add(leftField);
93-
int equalFieldIndex = -1;
94-
for (int i = 0; i < getFieldNames().length; i++) {
95-
String fieldName = getFieldNames()[i];
96-
if (fieldName.equalsIgnoreCase(rightField)) {
97-
equalFieldIndex = i;
98-
}
99-
}
100-
if (equalFieldIndex == -1) {
101-
throw new RuntimeException("can't deal equal field: " + sqlNode);
102-
}
103-
104-
equalValIndex.add(equalFieldIndex);
105-
106-
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
107-
108-
equalFieldList.add(rightField);
109-
int equalFieldIndex = -1;
110-
for (int i = 0; i < getFieldNames().length; i++) {
111-
String fieldName = getFieldNames()[i];
112-
if (fieldName.equalsIgnoreCase(leftField)) {
113-
equalFieldIndex = i;
114-
}
115-
}
116-
if (equalFieldIndex == -1) {
117-
throw new RuntimeException("can't deal equal field: " + sqlNode.toString());
118-
}
119-
120-
equalValIndex.add(equalFieldIndex);
121-
122-
} else {
123-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
124-
}
125-
126-
}
127-
12875
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.util.RowDataComplete;
2424
import org.apache.calcite.sql.JoinType;
2525
import org.apache.flink.api.common.functions.RichFlatMapFunction;
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.table.dataformat.BaseRow;
2829
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -34,6 +35,8 @@
3435
import java.sql.SQLException;
3536
import java.sql.Timestamp;
3637
import java.time.LocalDateTime;
38+
import java.util.Map;
39+
import java.util.TimeZone;
3740
import java.util.concurrent.ScheduledExecutorService;
3841
import java.util.concurrent.ScheduledThreadPoolExecutor;
3942
import java.util.concurrent.TimeUnit;
@@ -42,7 +45,6 @@
4245
* Reason:
4346
* Date: 2018/9/18
4447
* Company: www.dtstack.com
45-
*
4648
* @author xuchao
4749
*/
4850

@@ -52,6 +54,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<Row, BaseRow> im
5254

5355
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
5456

57+
public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
58+
5559
protected BaseSideInfo sideInfo;
5660

5761
private ScheduledExecutorService es;
@@ -95,6 +99,45 @@ protected void sendOutputRow(Row value, Object sideInput, Collector<BaseRow> out
9599
RowDataComplete.collectRow(out, row);
96100
}
97101

102+
@Override
103+
public Row fillData(Row input, Object sideInput) {
104+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
105+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
106+
107+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
108+
// origin value
109+
Object obj = input.getField(entry.getValue());
110+
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
111+
row.setField(entry.getKey(), obj);
112+
}
113+
114+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
115+
if (cacheInfo == null) {
116+
row.setField(entry.getKey(), null);
117+
} else {
118+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
119+
}
120+
}
121+
return row;
122+
}
123+
124+
/**
125+
* covert flink time attribute.Type information for indicating event or processing time.
126+
* However, it behaves like a regular SQL timestamp but is serialized as Long.
127+
*
128+
* @param entry
129+
* @param obj
130+
* @return
131+
*/
132+
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
133+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
134+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
135+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
136+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
137+
}
138+
return obj;
139+
}
140+
98141
@Override
99142
public void close() throws Exception {
100143
if (null != es && !es.isShutdown()) {

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 71 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
24+
import com.google.common.base.Preconditions;
2425
import org.apache.calcite.sql.JoinType;
2526
import org.apache.calcite.sql.SqlBasicCall;
2627
import org.apache.calcite.sql.SqlIdentifier;
2728
import org.apache.calcite.sql.SqlKind;
29+
import org.apache.calcite.sql.SqlLiteral;
2830
import org.apache.calcite.sql.SqlNode;
2931
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3032
import com.google.common.collect.Lists;
3133
import com.google.common.collect.Maps;
32-
import org.apache.flink.table.dataformat.BaseRow;
3334
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
3435

3536
import java.io.Serializable;
@@ -123,55 +124,88 @@ public String getTargetFieldType(String fieldName){
123124
return sideTableInfo.getFieldTypes()[fieldIndex];
124125
}
125126

126-
127-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
128-
if(!SqlKind.COMPARISON.contains(sqlNode.getKind())){
127+
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
128+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
129129
throw new RuntimeException("not compare operator.");
130130
}
131131

132-
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];
133-
SqlIdentifier right = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[1];
132+
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
133+
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
134+
if (leftNode.getKind() == SqlKind.LITERAL) {
135+
evalConstantEquation(
136+
(SqlLiteral) leftNode,
137+
(SqlIdentifier) rightNode
138+
);
139+
} else if (rightNode.getKind() == SqlKind.LITERAL) {
140+
evalConstantEquation(
141+
(SqlLiteral) rightNode,
142+
(SqlIdentifier) leftNode
143+
);
144+
} else {
145+
SqlIdentifier left = (SqlIdentifier) leftNode;
146+
SqlIdentifier right = (SqlIdentifier) rightNode;
147+
evalEquation(left, right, sideTableName, sqlNode);
148+
}
149+
}
134150

151+
/**
152+
* deal normal equation etc. foo.id = bar.id
153+
* @param left
154+
* @param right
155+
* @param sideTableName
156+
* @param sqlNode
157+
*/
158+
private void evalEquation(SqlIdentifier left, SqlIdentifier right, String sideTableName, SqlNode sqlNode) {
135159
String leftTableName = left.getComponent(0).getSimple();
136160
String leftField = left.getComponent(1).getSimple();
137161

138162
String rightTableName = right.getComponent(0).getSimple();
139163
String rightField = right.getComponent(1).getSimple();
140164

141-
if(leftTableName.equalsIgnoreCase(sideTableName)){
142-
equalFieldList.add(leftField);
143-
int equalFieldIndex = -1;
144-
for(int i=0; i<getFieldNames().length; i++){
145-
String fieldName = getFieldNames()[i];
146-
if(fieldName.equalsIgnoreCase(rightField)){
147-
equalFieldIndex = i;
148-
}
149-
}
150-
if(equalFieldIndex == -1){
151-
throw new RuntimeException("can't find equal field " + rightField);
152-
}
153-
154-
equalValIndex.add(equalFieldIndex);
155-
156-
}else if(rightTableName.equalsIgnoreCase(sideTableName)){
165+
if (leftTableName.equalsIgnoreCase(sideTableName)) {
166+
associateField(rightField, leftField, sqlNode);
167+
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
168+
associateField(leftField, rightField, sqlNode);
169+
} else {
170+
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
171+
}
172+
}
157173

158-
equalFieldList.add(rightField);
159-
int equalFieldIndex = -1;
160-
for(int i=0; i<getFieldNames().length; i++){
161-
String fieldName = getFieldNames()[i];
162-
if(fieldName.equalsIgnoreCase(leftField)){
163-
equalFieldIndex = i;
164-
}
165-
}
166-
if(equalFieldIndex == -1){
167-
throw new RuntimeException("can't find equal field " + leftField);
174+
/**
175+
* deal with equation with constant etc. foo.id = 1
176+
* @param literal
177+
* @param identifier
178+
*/
179+
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier) {
180+
String tableName = identifier.getComponent(0).getSimple();
181+
String sideTableName = sideTableInfo.getName();
182+
String errorMsg = "only support set side table constant field, error field " + identifier;
183+
Preconditions.checkState(tableName.equals(sideTableName), errorMsg);
184+
String fieldName = identifier.getComponent(1).getSimple();
185+
Object constant = literal.getValue();
186+
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
187+
PredicateInfo predicate = PredicateInfo.builder()
188+
.setOperatorName("=")
189+
.setOperatorKind("EQUALS")
190+
.setOwnerTable(tableName)
191+
.setFieldName(fieldName)
192+
.setCondition(constant.toString())
193+
.build();
194+
predicateInfos.add(predicate);
195+
}
196+
197+
private void associateField(String sourceTableField, String sideTableField, SqlNode sqlNode) {
198+
String errorMsg = "can't deal equal field: " + sqlNode;
199+
equalFieldList.add(sideTableField);
200+
int equalFieldIndex = -1;
201+
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
202+
String fieldName = rowTypeInfo.getFieldNames()[i];
203+
if (fieldName.equalsIgnoreCase(sourceTableField)) {
204+
equalFieldIndex = i;
168205
}
169-
170-
equalValIndex.add(equalFieldIndex);
171-
172-
}else{
173-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
174206
}
207+
Preconditions.checkState(equalFieldIndex != -1, errorMsg);
208+
equalValIndex.add(equalFieldIndex);
175209
}
176210

177211
public abstract void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo);

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,17 @@ public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList) {
7474

7575
public static void parseJoinCompareOperate(SqlNode condition, List<String> sqlJoinCompareOperate) {
7676
SqlBasicCall joinCondition = (SqlBasicCall) condition;
77-
if (joinCondition.getKind() == SqlKind.AND) {
77+
78+
// 跳过常量JOIN的等式
79+
if (joinCondition.getKind() == SqlKind.EQUALS) {
80+
SqlNode left = joinCondition.getOperands()[0];
81+
SqlNode right = joinCondition.getOperands()[1];
82+
if (left.getKind() != SqlKind.LITERAL &&
83+
right.getKind() != SqlKind.LITERAL) {
84+
String operator = transformNotEqualsOperator(joinCondition.getKind());
85+
sqlJoinCompareOperate.add(operator);
86+
}
87+
} else if (joinCondition.getKind() == SqlKind.AND) {
7888
List<SqlNode> operandList = joinCondition.getOperandList();
7989
for (SqlNode sqlNode : operandList) {
8090
parseJoinCompareOperate(sqlNode, sqlJoinCompareOperate);

0 commit comments

Comments
 (0)