|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | + * contributor license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright ownership. |
| 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
| 8 | + * |
| 9 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | + * |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
| 16 | + */ |
| 17 | + |
| 18 | +package org.apache.flink.formats.json; |
| 19 | + |
| 20 | +import org.apache.flink.annotation.PublicEvolving; |
| 21 | +import org.apache.flink.api.common.serialization.SerializationSchema; |
| 22 | +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; |
| 23 | +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; |
| 24 | +import org.apache.flink.api.common.typeinfo.TypeInformation; |
| 25 | +import org.apache.flink.api.common.typeinfo.Types; |
| 26 | +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; |
| 27 | +import org.apache.flink.api.java.typeutils.RowTypeInfo; |
| 28 | +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; |
| 29 | +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; |
| 30 | +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; |
| 31 | +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; |
| 32 | +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; |
| 33 | +import org.apache.flink.types.Row; |
| 34 | +import org.apache.flink.util.Preconditions; |
| 35 | + |
| 36 | +import java.math.BigDecimal; |
| 37 | +import java.math.BigInteger; |
| 38 | +import java.sql.Time; |
| 39 | +import java.sql.Timestamp; |
| 40 | +import java.text.SimpleDateFormat; |
| 41 | +import java.util.Objects; |
| 42 | + |
| 43 | +/** |
| 44 | + * Serialization schema that serializes an object of Flink types into a JSON bytes. |
| 45 | + * |
| 46 | + * <p>Serializes the input Flink object into a JSON string and |
| 47 | + * converts it into <code>byte[]</code>. |
| 48 | + * |
| 49 | + * <p>Result <code>byte[]</code> messages can be deserialized using {@link DTJsonRowDeserializationSchema}. |
| 50 | + */ |
| 51 | +@PublicEvolving |
| 52 | +public class DTJsonRowSerializationSchema implements SerializationSchema<Row> { |
| 53 | + |
| 54 | + private static final long serialVersionUID = -2885556750743978636L; |
| 55 | + |
| 56 | + /** Type information describing the input type. */ |
| 57 | + private final TypeInformation<Row> typeInfo; |
| 58 | + |
| 59 | + /** Object mapper that is used to create output JSON objects. */ |
| 60 | + private final ObjectMapper mapper = new ObjectMapper(); |
| 61 | + |
| 62 | + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */ |
| 63 | + private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'"); |
| 64 | + |
| 65 | + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */ |
| 66 | + private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'"); |
| 67 | + |
| 68 | + /** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */ |
| 69 | + private SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); |
| 70 | + |
| 71 | + /** Reusable object node. */ |
| 72 | + private transient ObjectNode node; |
| 73 | + |
| 74 | + /** |
| 75 | + * Creates a JSON serialization schema for the given type information. |
| 76 | + * |
| 77 | + * @param typeInfo The field names of {@link Row} are used to map to JSON properties. |
| 78 | + */ |
| 79 | + public DTJsonRowSerializationSchema(TypeInformation<Row> typeInfo) { |
| 80 | + Preconditions.checkNotNull(typeInfo, "Type information"); |
| 81 | + this.typeInfo = typeInfo; |
| 82 | + } |
| 83 | + |
| 84 | + /** |
| 85 | + * Creates a JSON serialization schema for the given JSON schema. |
| 86 | + * |
| 87 | + * @param jsonSchema JSON schema describing the result type |
| 88 | + * |
| 89 | + * @see <a href="http://json-schema.org/">http://json-schema.org/</a> |
| 90 | + */ |
| 91 | + public DTJsonRowSerializationSchema(String jsonSchema) { |
| 92 | + this(JsonRowSchemaConverter.convert(jsonSchema)); |
| 93 | + } |
| 94 | + |
| 95 | + @Override |
| 96 | + public byte[] serialize(Row row) { |
| 97 | + if (node == null) { |
| 98 | + node = mapper.createObjectNode(); |
| 99 | + } |
| 100 | + |
| 101 | + try { |
| 102 | + convertRow(node, (RowTypeInfo) typeInfo, row); |
| 103 | + return mapper.writeValueAsBytes(node); |
| 104 | + } catch (Throwable t) { |
| 105 | + throw new RuntimeException("Could not serialize row '" + row + "'. " + |
| 106 | + "Make sure that the schema matches the input.", t); |
| 107 | + } |
| 108 | + } |
| 109 | + |
| 110 | + @Override |
| 111 | + public boolean equals(Object o) { |
| 112 | + if (this == o) { |
| 113 | + return true; |
| 114 | + } |
| 115 | + if (o == null || getClass() != o.getClass()) { |
| 116 | + return false; |
| 117 | + } |
| 118 | + final DTJsonRowSerializationSchema that = (DTJsonRowSerializationSchema) o; |
| 119 | + return Objects.equals(typeInfo, that.typeInfo); |
| 120 | + } |
| 121 | + |
| 122 | + @Override |
| 123 | + public int hashCode() { |
| 124 | + return Objects.hash(typeInfo); |
| 125 | + } |
| 126 | + |
| 127 | + // -------------------------------------------------------------------------------------------- |
| 128 | + |
| 129 | + private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) { |
| 130 | + if (reuse == null) { |
| 131 | + reuse = mapper.createObjectNode(); |
| 132 | + } |
| 133 | + final String[] fieldNames = info.getFieldNames(); |
| 134 | + final TypeInformation<?>[] fieldTypes = info.getFieldTypes(); |
| 135 | + |
| 136 | + // validate the row |
| 137 | + if (row.getArity() != fieldNames.length) { |
| 138 | + throw new IllegalStateException(String.format( |
| 139 | + "Number of elements in the row '%s' is different from number of field names: %d", row, fieldNames.length)); |
| 140 | + } |
| 141 | + |
| 142 | + for (int i = 0; i < fieldNames.length; i++) { |
| 143 | + final String name = fieldNames[i]; |
| 144 | + |
| 145 | + final JsonNode fieldConverted = convert(reuse, reuse.get(name), fieldTypes[i], row.getField(i)); |
| 146 | + reuse.set(name, fieldConverted); |
| 147 | + } |
| 148 | + |
| 149 | + return reuse; |
| 150 | + } |
| 151 | + |
| 152 | + private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) { |
| 153 | + if (info.equals(Types.VOID) || object == null) { |
| 154 | + return container.nullNode(); |
| 155 | + } else if (info.equals(Types.BOOLEAN)) { |
| 156 | + return container.booleanNode((Boolean) object); |
| 157 | + } else if (info.equals(Types.STRING)) { |
| 158 | + return container.textNode((String) object); |
| 159 | + } else if (info.equals(Types.BIG_DEC)) { |
| 160 | + // convert decimal if necessary |
| 161 | + if (object instanceof BigDecimal) { |
| 162 | + return container.numberNode((BigDecimal) object); |
| 163 | + } |
| 164 | + return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue())); |
| 165 | + } else if (info.equals(Types.BIG_INT)) { |
| 166 | + // convert integer if necessary |
| 167 | + if (object instanceof BigInteger) { |
| 168 | + return container.numberNode((BigInteger) object); |
| 169 | + } |
| 170 | + return container.numberNode(BigInteger.valueOf(((Number) object).longValue())); |
| 171 | + } else if (info.equals(Types.SQL_DATE)) { |
| 172 | + return container.textNode(object.toString()); |
| 173 | + } else if (info.equals(Types.SQL_TIME)) { |
| 174 | + final Time time = (Time) object; |
| 175 | + // strip milliseconds if possible |
| 176 | + if (time.getTime() % 1000 > 0) { |
| 177 | + return container.textNode(timeFormatWithMillis.format(time)); |
| 178 | + } |
| 179 | + return container.textNode(timeFormat.format(time)); |
| 180 | + } else if (info.equals(Types.SQL_TIMESTAMP)) { |
| 181 | + return container.textNode(timestampFormat.format((Timestamp) object)); |
| 182 | + } else if (info instanceof RowTypeInfo) { |
| 183 | + if (reuse != null && reuse instanceof ObjectNode) { |
| 184 | + return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object); |
| 185 | + } else { |
| 186 | + return convertRow(null, (RowTypeInfo) info, (Row) object); |
| 187 | + } |
| 188 | + } else if (info instanceof ObjectArrayTypeInfo) { |
| 189 | + if (reuse != null && reuse instanceof ArrayNode) { |
| 190 | + return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); |
| 191 | + } else { |
| 192 | + return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); |
| 193 | + } |
| 194 | + } else if (info instanceof BasicArrayTypeInfo) { |
| 195 | + if (reuse != null && reuse instanceof ArrayNode) { |
| 196 | + return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); |
| 197 | + } else { |
| 198 | + return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); |
| 199 | + } |
| 200 | + } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType().equals(Types.BYTE)) { |
| 201 | + return container.binaryNode((byte[]) object); |
| 202 | + } else { |
| 203 | + // for types that were specified without JSON schema |
| 204 | + // e.g. POJOs |
| 205 | + try { |
| 206 | + return mapper.valueToTree(object); |
| 207 | + } catch (IllegalArgumentException e) { |
| 208 | + throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e); |
| 209 | + } |
| 210 | + } |
| 211 | + } |
| 212 | + |
| 213 | + private ArrayNode convertObjectArray(ArrayNode reuse, TypeInformation<?> info, Object[] array) { |
| 214 | + if (reuse == null) { |
| 215 | + reuse = mapper.createArrayNode(); |
| 216 | + } else { |
| 217 | + reuse.removeAll(); |
| 218 | + } |
| 219 | + |
| 220 | + for (Object object : array) { |
| 221 | + reuse.add(convert(reuse, null, info, object)); |
| 222 | + } |
| 223 | + return reuse; |
| 224 | + } |
| 225 | +} |
0 commit comments