Skip to content

Commit 4e48f31

Browse files
committed
[fix-34344][kafka] remove partition key blank char.
1 parent d95f04c commit 4e48f31

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaSink.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.flink.util.Preconditions;
4040
import org.apache.kafka.clients.consumer.ConsumerConfig;
4141

42+
import java.util.Arrays;
4243
import java.util.HashMap;
4344
import java.util.Optional;
4445
import java.util.Properties;
@@ -113,8 +114,13 @@ protected TableSchema buildTableSchema(String[] fieldNames, TypeInformation<?>[]
113114
}
114115

115116
protected String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo) {
116-
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())) {
117-
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
117+
String keysStr = kafkaSinkTableInfo.getPartitionKeys();
118+
if (StringUtils.isNotBlank(keysStr)) {
119+
String[] keys = keysStr.split(",");
120+
String[] cleanedKeys = Arrays.stream(keys)
121+
.map(x -> x.trim())
122+
.toArray(String[]::new);
123+
return cleanedKeys;
118124
}
119125
return null;
120126
}

0 commit comments

Comments
 (0)