Skip to content

Commit 97f3568

Browse files
committed
Merge branch 'hotfix_1.10_4.x_30567' into '1.10_release_4.0.x'
fix 给groupId 添加默认值 See merge request dt-insight-engine/flinkStreamSQL!179
2 parents aed2e2d + 0cd7cc7 commit 97f3568

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.util.MathUtil;
2727

2828
import java.util.Map;
29+
import java.util.UUID;
2930
import java.util.stream.Collectors;
3031

3132
/**
@@ -46,7 +47,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4647
kafkaSourceTableInfo.setType(MathUtil.getString(props.get(KafkaSourceTableInfo.TYPE_KEY.toLowerCase())));
4748
kafkaSourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
4849
kafkaSourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
49-
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
50+
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase(), UUID.randomUUID().toString().replace("-", ""))));
5051
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5152
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5253
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));

0 commit comments

Comments
 (0)