Skip to content

Commit 2c4af88

Browse files
committed
Merge branch 'v1.8.0_dev' of ssh://gitlab.prod.dtstack.cn:10022/dt-insight-engine/flinkstreamsql into v1.8.0_dev
2 parents ed88255 + 19d27b4 commit 2c4af88

File tree

3 files changed

+2
-5
lines changed

3 files changed

+2
-5
lines changed

docs/kafkaSink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ CREATE TABLE tableName(
4141
|partitionKeys | 用来分区的字段|||
4242
|updateMode | 回溯流数据下发模式,append,upsert.upsert模式下会将是否为回溯信息以字段形式进行下发。||append|
4343
|sinkdatatype | 写入kafka数据格式,json,avro,csv||json|
44-
|fieldDelimiter | csv数据分隔符|| \ |
44+
|fieldDelimiter | csv数据分隔符|| , |
4545

4646

4747
**kafka相关参数可以自定义,使用kafka.开头即可。**

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,9 @@
2626
import org.apache.commons.lang3.StringUtils;
2727
import org.apache.flink.api.common.serialization.SerializationSchema;
2828
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
30-
import org.apache.flink.formats.csv.CsvRowSerializationSchema;
3129
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3230
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3331
import org.apache.flink.table.runtime.types.CRow;
34-
import org.apache.flink.types.Row;
3532

3633
import java.util.Optional;
3734
import java.util.Properties;

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4949
}
5050

5151
kafkaSinkTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSinkTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
52-
kafkaSinkTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));
52+
kafkaSinkTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), ",")));
5353
kafkaSinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
5454
kafkaSinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
5555

0 commit comments

Comments
 (0)