Skip to content

Commit 5a8cd28

Browse files
committed
Merge branch 'feat_1.8_3.10.x_mergedDev' into 'v1.8.0_dev'
3.10.x merged dev See merge request dt-insight-engine/flinkStreamSQL!4
2 parents d124305 + 7bcb8ac commit 5a8cd28

File tree

23 files changed

+564
-363
lines changed

23 files changed

+564
-363
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ public void open(Configuration parameters) throws Exception {
5151
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
54-
.put("provider_class", DT_PROVIDER_CLASS);
54+
.put("provider_class", DT_PROVIDER_CLASS)
55+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
56+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
57+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
58+
5559
System.setProperty("vertx.disableFileCPResolving", "true");
5660
VertxOptions vo = new VertxOptions();
5761
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ private static void sqlTranslation(String localSqlPluginPath,
206206
if (LOG.isInfoEnabled()) {
207207
LOG.info("exe-sql:\n" + result.getExecSql());
208208
}
209-
210209
boolean isSide = false;
211210
for (String tableName : result.getTargetTableList()) {
212211
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
@@ -233,7 +232,6 @@ private static void sqlTranslation(String localSqlPluginPath,
233232
System.out.println(result.getExecSql());
234233
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
235234
if (LOG.isInfoEnabled()) {
236-
System.out.println();
237235
LOG.info("exec sql: " + result.getExecSql());
238236
}
239237
}
@@ -355,6 +353,7 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
355353
return env;
356354
}
357355

356+
358357
public static void setLogLevel(ParamsInfo paramsInfo){
359358
String logLevel = paramsInfo.getConfProp().getProperty(ConfigConstrant.LOG_LEVEL_KEY);
360359
if(org.apache.commons.lang3.StringUtils.isBlank(logLevel)){

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.factory.DTThreadFactory;
24-
import org.apache.calcite.sql.JoinType;
2523
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2624
import org.apache.flink.configuration.Configuration;
2725
import org.apache.flink.table.runtime.types.CRow;
2826
import org.apache.flink.types.Row;
2927
import org.apache.flink.util.Collector;
3028

29+
import com.dtstack.flink.sql.factory.DTThreadFactory;
30+
import org.apache.calcite.sql.JoinType;
31+
3132
import java.sql.SQLException;
3233
import java.util.concurrent.ScheduledExecutorService;
3334
import java.util.concurrent.ScheduledThreadPoolExecutor;

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424

2525
import java.io.Serializable;
26+
import java.util.Objects;
2627

2728
/**
2829
* Reason:
@@ -64,4 +65,24 @@ public TypeInformation getTypeInformation() {
6465
public void setTypeInformation(TypeInformation typeInformation) {
6566
this.typeInformation = typeInformation;
6667
}
68+
69+
@Override
70+
public boolean equals(Object o) {
71+
if (this == o) {
72+
return true;
73+
}
74+
75+
if (o == null || getClass() != o.getClass()) {
76+
return false;
77+
}
78+
79+
FieldInfo fieldInfo = (FieldInfo) o;
80+
return Objects.equals(table, fieldInfo.table) &&
81+
Objects.equals(fieldName, fieldInfo.fieldName);
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(table, fieldName);
87+
}
6788
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
190190
tableInfo.setJoinType(joinType);
191191
tableInfo.setCondition(joinNode.getCondition());
192192

193-
if(!needBuildTemp){
194-
return tableInfo;
195-
}
196-
197-
if(tableInfo.getLeftNode().getKind() != AS){
193+
if(tableInfo.getLeftNode().getKind() != AS && needBuildTemp){
198194
extractTemporaryQuery(tableInfo.getLeftNode(), tableInfo.getLeftTableAlias(), (SqlBasicCall) parentWhere,
199195
parentSelectList, queueInfo, joinFieldSet, tableRef);
200196
}else {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -793,9 +793,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
793793
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
794794
}
795795

796-
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
797-
throw new RuntimeException("ON condition must contain all equal fields!!!");
798-
}
796+
// if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
797+
// throw new RuntimeException("ON condition must contain all equal fields!!!");
798+
// }
799799

800800
rightScopeChild.setRowTypeInfo(sideTableInfo.getRowTypeInfo());
801801

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfoParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public AbstractTableInfo parseWithTableType(int tableType, CreateTableParser.Sql
8181
if(absTableParser == null){
8282
String cacheType = MathUtil.getString(props.get(AbstractSideTableInfo.CACHE_KEY));
8383
absTableParser = StreamSideFactory.getSqlParser(type, localPluginRoot, cacheType);
84-
sideTableInfoMap.put(type, absTableParser);
84+
sideTableInfoMap.put(type + cacheType, absTableParser);
8585
}
8686
}
8787

0 commit comments

Comments
 (0)