Skip to content

Commit b44a86d

Browse files
committed
Merge remote-tracking branch 'origin/1.8_release_3.10.x' into feat_1.8_merge3.10.x
# Conflicts: # README.md # core/pom.xml # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java # core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java # core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java # core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java # elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java # kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java
2 parents b78a005 + 210b9c3 commit b44a86d

File tree

6 files changed

+9
-15
lines changed

6 files changed

+9
-15
lines changed

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@
1818

1919
package com.dtstack.flink.sql.environment;
2020

21-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
22-
import com.dtstack.flink.sql.enums.EStateBackend;
23-
import com.dtstack.flink.sql.util.MathUtil;
24-
import com.dtstack.flink.sql.util.PropertiesUtils;
25-
import org.apache.commons.lang3.BooleanUtils;
26-
import org.apache.commons.lang3.StringUtils;
2721
import org.apache.flink.api.common.ExecutionConfig;
2822
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2923
import org.apache.flink.api.common.time.Time;

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ private static void sqlTranslation(String localSqlPluginPath,
206206
if (LOG.isInfoEnabled()) {
207207
LOG.info("exe-sql:\n" + result.getExecSql());
208208
}
209-
210209
boolean isSide = false;
211210
for (String tableName : result.getTargetTableList()) {
212211
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
@@ -233,7 +232,6 @@ private static void sqlTranslation(String localSqlPluginPath,
233232
System.out.println(result.getExecSql());
234233
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
235234
if (LOG.isInfoEnabled()) {
236-
System.out.println();
237235
LOG.info("exec sql: " + result.getExecSql());
238236
}
239237
}
@@ -355,6 +353,7 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
355353
return env;
356354
}
357355

356+
358357
public static void setLogLevel(ParamsInfo paramsInfo){
359358
String logLevel = paramsInfo.getConfProp().getProperty(ConfigConstrant.LOG_LEVEL_KEY);
360359
if(org.apache.commons.lang3.StringUtils.isBlank(logLevel)){

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.factory.DTThreadFactory;
24-
import org.apache.calcite.sql.JoinType;
2523
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2624
import org.apache.flink.configuration.Configuration;
2725
import org.apache.flink.table.runtime.types.CRow;
2826
import org.apache.flink.types.Row;
2927
import org.apache.flink.util.Collector;
3028

29+
import com.dtstack.flink.sql.factory.DTThreadFactory;
30+
import org.apache.calcite.sql.JoinType;
31+
3132
import java.sql.SQLException;
3233
import java.util.concurrent.ScheduledExecutorService;
3334
import java.util.concurrent.ScheduledThreadPoolExecutor;

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -793,9 +793,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
793793
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
794794
}
795795

796-
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
797-
throw new RuntimeException("ON condition must contain all equal fields!!!");
798-
}
796+
// if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
797+
// throw new RuntimeException("ON condition must contain all equal fields!!!");
798+
// }
799799

800800
rightScopeChild.setRowTypeInfo(sideTableInfo.getRowTypeInfo());
801801

docs/elasticsearch6Sink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ CREATE TABLE tableName(
3535
|cluster | ES 集群名称 |||
3636
|index | 选择的ES上的index名称|||
3737
|esType | 选择ES上的type名称|||
38-
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
38+
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
3939
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
4040
|authMesh | 是否进行用户名密码认证 || false|
4141
|userName | 用户名 | 否,authMesh='true'时为必填 ||

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5151
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
5252
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5353
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
54-
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
54+
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5555
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5656
for (String key : props.keySet()) {
5757
if (!key.isEmpty() && key.startsWith("kafka.")) {

0 commit comments

Comments
 (0)