Skip to content

Commit 31df91a

Browse files
author
xuchao
committed
Merge branch 'hotfix_1.8_3.10.x_25240' into 1.8_zy_3.10.x
2 parents f1e1d28 + 8e3344e commit 31df91a

File tree

4 files changed

+464
-6
lines changed

4 files changed

+464
-6
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
2727
import org.apache.flink.formats.csv.CsvRowSerializationSchema;
28-
import org.apache.flink.formats.json.JsonRowSerializationSchema;
28+
import org.apache.flink.formats.json.DTJsonRowSerializationSchema;
2929
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3030
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3131
import org.apache.flink.types.Row;
@@ -62,9 +62,9 @@ private SerializationSchema<Row> createSerializationSchema(KafkaSinkTableInfo ka
6262
if (FormatType.JSON.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {
6363

6464
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getSchemaString())) {
65-
serializationSchema = new JsonRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());
65+
serializationSchema = new DTJsonRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());
6666
} else if (typeInformation != null && typeInformation.getArity() != 0) {
67-
serializationSchema = new JsonRowSerializationSchema(typeInformation);
67+
serializationSchema = new DTJsonRowSerializationSchema(typeInformation);
6868
} else {
6969
throw new IllegalArgumentException("sinkDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
7070
}
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.formats.json;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
import org.apache.flink.api.common.serialization.SerializationSchema;
22+
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
23+
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.common.typeinfo.Types;
26+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
29+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
31+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
33+
import org.apache.flink.types.Row;
34+
import org.apache.flink.util.Preconditions;
35+
36+
import java.math.BigDecimal;
37+
import java.math.BigInteger;
38+
import java.sql.Time;
39+
import java.sql.Timestamp;
40+
import java.text.SimpleDateFormat;
41+
import java.util.Objects;
42+
43+
/**
44+
* Serialization schema that serializes an object of Flink types into a JSON bytes.
45+
*
46+
* <p>Serializes the input Flink object into a JSON string and
47+
* converts it into <code>byte[]</code>.
48+
*
49+
* <p>Result <code>byte[]</code> messages can be deserialized using {@link DTJsonRowDeserializationSchema}.
50+
*/
51+
@PublicEvolving
52+
public class DTJsonRowSerializationSchema implements SerializationSchema<Row> {
53+
54+
private static final long serialVersionUID = -2885556750743978636L;
55+
56+
/** Type information describing the input type. */
57+
private final TypeInformation<Row> typeInfo;
58+
59+
/** Object mapper that is used to create output JSON objects. */
60+
private final ObjectMapper mapper = new ObjectMapper();
61+
62+
/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */
63+
private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'");
64+
65+
/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */
66+
private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'");
67+
68+
/** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */
69+
private SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
70+
71+
/** Reusable object node. */
72+
private transient ObjectNode node;
73+
74+
/**
75+
* Creates a JSON serialization schema for the given type information.
76+
*
77+
* @param typeInfo The field names of {@link Row} are used to map to JSON properties.
78+
*/
79+
public DTJsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
80+
Preconditions.checkNotNull(typeInfo, "Type information");
81+
this.typeInfo = typeInfo;
82+
}
83+
84+
/**
85+
* Creates a JSON serialization schema for the given JSON schema.
86+
*
87+
* @param jsonSchema JSON schema describing the result type
88+
*
89+
* @see <a href="http://json-schema.org/">http://json-schema.org/</a>
90+
*/
91+
public DTJsonRowSerializationSchema(String jsonSchema) {
92+
this(JsonRowSchemaConverter.convert(jsonSchema));
93+
}
94+
95+
@Override
96+
public byte[] serialize(Row row) {
97+
if (node == null) {
98+
node = mapper.createObjectNode();
99+
}
100+
101+
try {
102+
convertRow(node, (RowTypeInfo) typeInfo, row);
103+
return mapper.writeValueAsBytes(node);
104+
} catch (Throwable t) {
105+
throw new RuntimeException("Could not serialize row '" + row + "'. " +
106+
"Make sure that the schema matches the input.", t);
107+
}
108+
}
109+
110+
@Override
111+
public boolean equals(Object o) {
112+
if (this == o) {
113+
return true;
114+
}
115+
if (o == null || getClass() != o.getClass()) {
116+
return false;
117+
}
118+
final DTJsonRowSerializationSchema that = (DTJsonRowSerializationSchema) o;
119+
return Objects.equals(typeInfo, that.typeInfo);
120+
}
121+
122+
@Override
123+
public int hashCode() {
124+
return Objects.hash(typeInfo);
125+
}
126+
127+
// --------------------------------------------------------------------------------------------
128+
129+
private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {
130+
if (reuse == null) {
131+
reuse = mapper.createObjectNode();
132+
}
133+
final String[] fieldNames = info.getFieldNames();
134+
final TypeInformation<?>[] fieldTypes = info.getFieldTypes();
135+
136+
// validate the row
137+
if (row.getArity() != fieldNames.length) {
138+
throw new IllegalStateException(String.format(
139+
"Number of elements in the row '%s' is different from number of field names: %d", row, fieldNames.length));
140+
}
141+
142+
for (int i = 0; i < fieldNames.length; i++) {
143+
final String name = fieldNames[i];
144+
145+
final JsonNode fieldConverted = convert(reuse, reuse.get(name), fieldTypes[i], row.getField(i));
146+
reuse.set(name, fieldConverted);
147+
}
148+
149+
return reuse;
150+
}
151+
152+
private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) {
153+
if (info.equals(Types.VOID) || object == null) {
154+
return container.nullNode();
155+
} else if (info.equals(Types.BOOLEAN)) {
156+
return container.booleanNode((Boolean) object);
157+
} else if (info.equals(Types.STRING)) {
158+
return container.textNode((String) object);
159+
} else if (info.equals(Types.BIG_DEC)) {
160+
// convert decimal if necessary
161+
if (object instanceof BigDecimal) {
162+
return container.numberNode((BigDecimal) object);
163+
}
164+
return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
165+
} else if (info.equals(Types.BIG_INT)) {
166+
// convert integer if necessary
167+
if (object instanceof BigInteger) {
168+
return container.numberNode((BigInteger) object);
169+
}
170+
return container.numberNode(BigInteger.valueOf(((Number) object).longValue()));
171+
} else if (info.equals(Types.SQL_DATE)) {
172+
return container.textNode(object.toString());
173+
} else if (info.equals(Types.SQL_TIME)) {
174+
final Time time = (Time) object;
175+
// strip milliseconds if possible
176+
if (time.getTime() % 1000 > 0) {
177+
return container.textNode(timeFormatWithMillis.format(time));
178+
}
179+
return container.textNode(timeFormat.format(time));
180+
} else if (info.equals(Types.SQL_TIMESTAMP)) {
181+
return container.textNode(timestampFormat.format((Timestamp) object));
182+
} else if (info instanceof RowTypeInfo) {
183+
if (reuse != null && reuse instanceof ObjectNode) {
184+
return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object);
185+
} else {
186+
return convertRow(null, (RowTypeInfo) info, (Row) object);
187+
}
188+
} else if (info instanceof ObjectArrayTypeInfo) {
189+
if (reuse != null && reuse instanceof ArrayNode) {
190+
return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
191+
} else {
192+
return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
193+
}
194+
} else if (info instanceof BasicArrayTypeInfo) {
195+
if (reuse != null && reuse instanceof ArrayNode) {
196+
return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
197+
} else {
198+
return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
199+
}
200+
} else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType().equals(Types.BYTE)) {
201+
return container.binaryNode((byte[]) object);
202+
} else {
203+
// for types that were specified without JSON schema
204+
// e.g. POJOs
205+
try {
206+
return mapper.valueToTree(object);
207+
} catch (IllegalArgumentException e) {
208+
throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e);
209+
}
210+
}
211+
}
212+
213+
private ArrayNode convertObjectArray(ArrayNode reuse, TypeInformation<?> info, Object[] array) {
214+
if (reuse == null) {
215+
reuse = mapper.createArrayNode();
216+
} else {
217+
reuse.removeAll();
218+
}
219+
220+
for (Object object : array) {
221+
reuse.add(convert(reuse, null, info, object));
222+
}
223+
return reuse;
224+
}
225+
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaConsumerFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.flink.api.common.typeinfo.TypeInformation;
2828
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
2929
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
30-
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
30+
import org.apache.flink.formats.json.DTJsonRowDeserializationSchema;
3131
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
3232
import org.apache.flink.types.Row;
3333

@@ -61,9 +61,9 @@ private DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableI
6161
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
6262

6363
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
64-
deserializationSchema = new JsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
64+
deserializationSchema = new DTJsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
6565
} else if (typeInformation != null && typeInformation.getArity() != 0) {
66-
deserializationSchema = new JsonRowDeserializationSchema(typeInformation);
66+
deserializationSchema = new DTJsonRowDeserializationSchema(typeInformation);
6767
} else {
6868
throw new IllegalArgumentException("sourceDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
6969
}

0 commit comments

Comments
 (0)