Skip to content

Commit 610fdc3

Browse files
author
xuchao
committed
Merge branch 'v1.8.0_dev' into feat_1.8_codereview
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java
2 parents 029db2d + 2353b7f commit 610fdc3

File tree

25 files changed

+587
-361
lines changed

25 files changed

+587
-361
lines changed

.gitlab-ci.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
build:
2+
stage: test
3+
script:
4+
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
5+
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
6+
- sh ci/sonar_notify.sh
7+
only:
8+
- 1.8_dev
9+
tags:
10+
- dt-insight-engine

ci/sonar_notify.sh

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/bin/bash
2+
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
3+
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
4+
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
5+
-H "Content-Type: application/json" \
6+
-d "{
7+
\"msgtype\": \"markdown\",
8+
\"markdown\": {
9+
\"title\":\"sonar代码质量\",
10+
\"text\": \"## sonar代码质量报告: \n
11+
> [sonar地址](http://172.16.100.198:9000/dashboard?id=dt-insight-engine/flinkStreamSQL) \n
12+
> ${sonarreport} \n\"
13+
}
14+
}"

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ public void open(Configuration parameters) throws Exception {
5151
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
54-
.put("provider_class", DT_PROVIDER_CLASS);
54+
.put("provider_class", DT_PROVIDER_CLASS)
55+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
56+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
57+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
58+
5559
System.setProperty("vertx.disableFileCPResolving", "true");
5660
VertxOptions vo = new VertxOptions();
5761
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ private static void sqlTranslation(String localSqlPluginPath,
203203
if (LOG.isInfoEnabled()) {
204204
LOG.info("exe-sql:\n" + result.getExecSql());
205205
}
206-
207206
boolean isSide = false;
208207
for (String tableName : result.getTargetTableList()) {
209208
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
@@ -350,6 +349,7 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
350349
return env;
351350
}
352351

352+
353353
public static void setLogLevel(ParamsInfo paramsInfo){
354354
String logLevel = paramsInfo.getConfProp().getProperty(ConfigConstrant.LOG_LEVEL_KEY);
355355
if(org.apache.commons.lang3.StringUtils.isBlank(logLevel)){

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.factory.DTThreadFactory;
24-
import org.apache.calcite.sql.JoinType;
2523
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2624
import org.apache.flink.configuration.Configuration;
2725
import org.apache.flink.table.runtime.types.CRow;
@@ -30,6 +28,9 @@
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

31+
import com.dtstack.flink.sql.factory.DTThreadFactory;
32+
import org.apache.calcite.sql.JoinType;
33+
3334
import java.sql.SQLException;
3435
import java.util.concurrent.ScheduledExecutorService;
3536
import java.util.concurrent.ScheduledThreadPoolExecutor;

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424

2525
import java.io.Serializable;
26+
import java.util.Objects;
2627

2728
/**
2829
* Reason:
@@ -64,4 +65,24 @@ public TypeInformation getTypeInformation() {
6465
public void setTypeInformation(TypeInformation typeInformation) {
6566
this.typeInformation = typeInformation;
6667
}
68+
69+
@Override
70+
public boolean equals(Object o) {
71+
if (this == o) {
72+
return true;
73+
}
74+
75+
if (o == null || getClass() != o.getClass()) {
76+
return false;
77+
}
78+
79+
FieldInfo fieldInfo = (FieldInfo) o;
80+
return Objects.equals(table, fieldInfo.table) &&
81+
Objects.equals(fieldName, fieldInfo.fieldName);
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(table, fieldName);
87+
}
6788
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet,
191191
tableInfo.setJoinType(joinType);
192192
tableInfo.setCondition(joinNode.getCondition());
193193

194-
if(!needBuildTemp){
195-
return tableInfo;
196-
}
197-
198-
if(tableInfo.getLeftNode().getKind() != AS){
194+
if(tableInfo.getLeftNode().getKind() != AS && needBuildTemp){
199195
extractTemporaryQuery(tableInfo.getLeftNode(), tableInfo.getLeftTableAlias(), (SqlBasicCall) parentWhere,
200196
parentSelectList, queueInfo, joinFieldSet, tableRef);
201197
}else {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -787,9 +787,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
787787
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
788788
}
789789

790-
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
791-
throw new RuntimeException("ON condition must contain all equal fields!!!");
792-
}
790+
// if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
791+
// throw new RuntimeException("ON condition must contain all equal fields!!!");
792+
// }
793793

794794
rightScopeChild.setRowTypeInfo(sideTableInfo.getRowTypeInfo());
795795

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfoParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public AbstractTableInfo parseWithTableType(int tableType, CreateTableParser.Sql
8181
if(absTableParser == null){
8282
String cacheType = MathUtil.getString(props.get(AbstractSideTableInfo.CACHE_KEY));
8383
absTableParser = StreamSideFactory.getSqlParser(type, localPluginRoot, cacheType);
84-
sideTableInfoMap.put(type, absTableParser);
84+
sideTableInfoMap.put(type + cacheType, absTableParser);
8585
}
8686
}
8787

0 commit comments

Comments
 (0)