Skip to content

Commit 8050cae

Browse files
committed
Merge branch 'feat_1.8_kafkaUpdateMode' into feat_1.8_kafkaUpdateMode_mergedDev
2 parents b5c6b4a + ba3680f commit 8050cae

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

docs/kafkaSink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ CREATE TABLE tableName(
4141
|partitionKeys | 用来分区的字段|||
4242
|updateMode | 回溯流数据下发模式,append,upsert.upsert模式下会将是否为回溯信息以字段形式进行下发。||append|
4343
|sinkdatatype | 写入kafka数据格式,json,avro,csv||json|
44-
|fieldDelimiter | csv数据分隔符|| \ |
44+
|fieldDelimiter | csv数据分隔符|| , |
4545

4646

4747
**kafka相关参数可以自定义,使用kafka.开头即可。**

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4949
}
5050

5151
kafkaSinkTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSinkTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
52-
kafkaSinkTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));
52+
kafkaSinkTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), ",")));
5353
kafkaSinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
5454
kafkaSinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
5555

0 commit comments

Comments
 (0)