Skip to content

Commit 5b1eef2

Browse files
committed
[flinksql][kafka source解析json数据为null][21182]
1 parent 30d99a6 commit 5b1eef2

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/source/JsonDataParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2828
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
2929
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
3031
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3132
import org.apache.flink.types.Row;
3233

@@ -151,7 +152,13 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
151152
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
152153
return node.asBoolean();
153154
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
154-
return node.asText();
155+
if (node instanceof ObjectNode) {
156+
return node.toString();
157+
} else if (node instanceof NullNode) {
158+
return null;
159+
} else {
160+
return node.asText();
161+
}
155162
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
156163
return Date.valueOf(node.asText());
157164
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {

0 commit comments

Comments
 (0)