Skip to content

Commit 9484dcc

Browse files
committed
resolve conflict
2 parents 4112f78 + 6749990 commit 9484dcc

File tree

6 files changed

+45
-25
lines changed

6 files changed

+45
-25
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.flink.types.Row;
3939

4040
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
41-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4241
import com.dtstack.flink.sql.enums.ClusterMode;
4342
import com.dtstack.flink.sql.enums.ECacheType;
4443
import com.dtstack.flink.sql.enums.EPluginLoadMode;

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545

4646
/**
4747
* source data parse to json format
48-
*
48+
* <p>
4949
* Date: 2019/12/12
5050
* Company: www.dtstack.com
5151
*
@@ -55,28 +55,33 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5555

5656
private final ObjectMapper objectMapper = new ObjectMapper();
5757

58-
private Map<String, String> rowAndFieldMapping;
59-
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
58+
private final Map<String, String> rowAndFieldMapping;
59+
private final Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
6060

6161
private final String[] fieldNames;
6262
private final TypeInformation<?>[] fieldTypes;
63-
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
63+
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
64+
private final String charsetName;
6465

65-
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos) {
66+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
67+
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
68+
String charsetName) {
6669
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6770
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6871
this.rowAndFieldMapping = rowAndFieldMapping;
6972
this.fieldExtraInfos = fieldExtraInfos;
73+
this.charsetName = charsetName;
7074
}
7175

7276
@Override
7377
public Row deserialize(byte[] message) throws IOException {
74-
JsonNode root = objectMapper.readTree(message);
78+
String decoderStr = new String(message, charsetName);
79+
JsonNode root = objectMapper.readTree(decoderStr);
7580
this.parseTree(root, null);
7681
return convertTopRow();
7782
}
7883

79-
private void parseTree(JsonNode jsonNode, String prefix){
84+
private void parseTree(JsonNode jsonNode, String prefix) {
8085
if (jsonNode.isArray()) {
8186
ArrayNode array = (ArrayNode) jsonNode;
8287
for (int i = 0; i < array.size(); i++) {
@@ -95,15 +100,15 @@ private void parseTree(JsonNode jsonNode, String prefix){
95100
return;
96101
}
97102
Iterator<String> iterator = jsonNode.fieldNames();
98-
while (iterator.hasNext()){
103+
while (iterator.hasNext()) {
99104
String next = iterator.next();
100105
JsonNode child = jsonNode.get(next);
101106
String nodeKey = getNodeKey(prefix, next);
102107

103108
nodeAndJsonNodeMapping.put(nodeKey, child);
104-
if(child.isArray()){
109+
if (child.isArray()) {
105110
parseTree(child, nodeKey);
106-
}else {
111+
} else {
107112
parseTree(child, nodeKey);
108113
}
109114
}
@@ -114,8 +119,8 @@ private JsonNode getIgnoreCase(String key) {
114119
return nodeAndJsonNodeMapping.get(nodeMappingKey);
115120
}
116121

117-
private String getNodeKey(String prefix, String nodeName){
118-
if(Strings.isNullOrEmpty(prefix)){
122+
private String getNodeKey(String prefix, String nodeName) {
123+
if (Strings.isNullOrEmpty(prefix)) {
119124
return nodeName;
120125
}
121126
return prefix + "." + nodeName;
@@ -139,7 +144,7 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
139144
} else {
140145
return node.asText();
141146
}
142-
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
147+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
143148
return Date.valueOf(node.asText());
144149
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
145150
// local zone

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaConsumerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaS
5656
private DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
5757
DeserializationSchema<Row> deserializationSchema = null;
5858
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
59-
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(), kafkaSourceTableInfo.getFieldExtraInfoList());
60-
59+
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(),
60+
kafkaSourceTableInfo.getFieldExtraInfoList(),kafkaSourceTableInfo.getCharsetName());
6161
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
6262

6363
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5050
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5151
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5252
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));
53-
53+
kafkaSourceTableInfo.setCharsetName(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CHARSET_NAME_KEY.toLowerCase(),"UTF-8")));
5454
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5555
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
5656
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.source.kafka.table;
2121

22+
import com.dtstack.flink.sql.format.FormatType;
2223
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
2324
import com.google.common.base.Preconditions;
2425

@@ -53,6 +54,8 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
5354

5455
public static final String SOURCE_DATA_TYPE_KEY = "sourceDataType";
5556

57+
public static final String CHARSET_NAME_KEY = "charsetName";
58+
5659
private String bootstrapServers;
5760

5861
private String topic;
@@ -71,6 +74,7 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
7174

7275
public Map<String, String> kafkaParams = new HashMap<>();
7376

77+
public String charsetName;
7478

7579
public String getBootstrapServers() {
7680
return bootstrapServers;
@@ -148,11 +152,19 @@ public void setFieldDelimiter(String fieldDelimiter) {
148152
this.fieldDelimiter = fieldDelimiter;
149153
}
150154

151-
@Override
152-
public boolean check() {
153-
Preconditions.checkNotNull(getType(), "kafka of type is required");
154-
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
155-
Preconditions.checkNotNull(topic, "kafka of topic is required");
156-
return false;
157-
}
155+
public String getCharsetName() {
156+
return charsetName;
157+
}
158+
159+
public void setCharsetName(String charsetName) {
160+
this.charsetName = charsetName;
161+
}
162+
163+
@Override
164+
public boolean check() {
165+
Preconditions.checkNotNull(getType(), "kafka of type is required");
166+
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
167+
Preconditions.checkNotNull(topic, "kafka of topic is required");
168+
return false;
169+
}
158170
}

serversocket/serversocket-source/src/main/java/com/dtstack/flink/sql/source/serversocket/CustomerSocketTextStreamFunction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,16 @@ public class CustomerSocketTextStreamFunction implements SourceFunction<Row> {
6464

6565
private transient Socket currentSocket;
6666

67+
private String CHARSET_NAME = "UTF-8";
68+
6769
ServersocketSourceTableInfo tableInfo;
6870

71+
72+
6973
public CustomerSocketTextStreamFunction(ServersocketSourceTableInfo tableInfo, TypeInformation<Row> typeInfo,
7074
Map<String, String> rowAndFieldMapping, List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos) {
7175
this.tableInfo = tableInfo;
72-
this.deserializationSchema = new DtNestRowDeserializationSchema(typeInfo, rowAndFieldMapping, fieldExtraInfos);
76+
this.deserializationSchema = new DtNestRowDeserializationSchema(typeInfo, rowAndFieldMapping, fieldExtraInfos, CHARSET_NAME);
7377
this.deserializationMetricWrapper = new DeserializationMetricWrapper(typeInfo, deserializationSchema);
7478
}
7579

0 commit comments

Comments
 (0)