2424import org .apache .flink .api .common .serialization .AbstractDeserializationSchema ;
2525import org .apache .flink .api .common .typeinfo .TypeInformation ;
2626import org .apache .flink .api .common .typeinfo .Types ;
27+ import org .apache .flink .api .java .typeutils .ObjectArrayTypeInfo ;
2728import org .apache .flink .api .java .typeutils .RowTypeInfo ;
2829import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonProcessingException ;
2930import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3435import org .apache .flink .types .Row ;
3536
3637import java .io .IOException ;
38+ import java .lang .reflect .Array ;
3739import java .sql .Date ;
3840import java .sql .Time ;
3941import java .sql .Timestamp ;
4345
4446/**
4547 * source data parse to json format
46- *
48+ * <p>
4749 * Date: 2019/12/12
4850 * Company: www.dtstack.com
4951 *
@@ -53,51 +55,33 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5355
5456 private final ObjectMapper objectMapper = new ObjectMapper ();
5557
56- private Map <String , String > rowAndFieldMapping ;
57- private Map <String , JsonNode > nodeAndJsonNodeMapping = Maps .newHashMap ();
58+ private final Map <String , String > rowAndFieldMapping ;
59+ private final Map <String , JsonNode > nodeAndJsonNodeMapping = Maps .newHashMap ();
5860
5961 private final String [] fieldNames ;
6062 private final TypeInformation <?>[] fieldTypes ;
61- private List <AbstractTableInfo .FieldExtraInfo > fieldExtraInfos ;
63+ private final List <AbstractTableInfo .FieldExtraInfo > fieldExtraInfos ;
64+ private final String charsetName ;
6265
63- public DtNestRowDeserializationSchema (TypeInformation <Row > typeInfo , Map <String , String > rowAndFieldMapping , List <AbstractTableInfo .FieldExtraInfo > fieldExtraInfos ) {
66+ public DtNestRowDeserializationSchema (TypeInformation <Row > typeInfo , Map <String , String > rowAndFieldMapping ,
67+ List <AbstractTableInfo .FieldExtraInfo > fieldExtraInfos ,
68+ String charsetName ) {
6469 this .fieldNames = ((RowTypeInfo ) typeInfo ).getFieldNames ();
6570 this .fieldTypes = ((RowTypeInfo ) typeInfo ).getFieldTypes ();
6671 this .rowAndFieldMapping = rowAndFieldMapping ;
6772 this .fieldExtraInfos = fieldExtraInfos ;
73+ this .charsetName = charsetName ;
6874 }
6975
7076 @ Override
7177 public Row deserialize (byte [] message ) throws IOException {
72- JsonNode root = objectMapper .readTree (message );
78+ String decoderStr = new String (message , charsetName );
79+ JsonNode root = objectMapper .readTree (decoderStr );
7380 this .parseTree (root , null );
74- Row row = new Row (fieldNames .length );
75-
76- try {
77- for (int i = 0 ; i < fieldNames .length ; i ++) {
78- JsonNode node = getIgnoreCase (fieldNames [i ]);
79- AbstractTableInfo .FieldExtraInfo fieldExtraInfo = fieldExtraInfos .get (i );
80-
81- if (node == null ) {
82- if (fieldExtraInfo != null && fieldExtraInfo .getNotNull ()) {
83- throw new IllegalStateException ("Failed to find field with name '"
84- + fieldNames [i ] + "'." );
85- } else {
86- row .setField (i , null );
87- }
88- } else {
89- // Read the value as specified type
90- Object value = convert (node , fieldTypes [i ]);
91- row .setField (i , value );
92- }
93- }
94- return row ;
95- } finally {
96- nodeAndJsonNodeMapping .clear ();
97- }
81+ return convertTopRow ();
9882 }
9983
100- private void parseTree (JsonNode jsonNode , String prefix ){
84+ private void parseTree (JsonNode jsonNode , String prefix ) {
10185 if (jsonNode .isArray ()) {
10286 ArrayNode array = (ArrayNode ) jsonNode ;
10387 for (int i = 0 ; i < array .size (); i ++) {
@@ -116,15 +100,15 @@ private void parseTree(JsonNode jsonNode, String prefix){
116100 return ;
117101 }
118102 Iterator <String > iterator = jsonNode .fieldNames ();
119- while (iterator .hasNext ()){
103+ while (iterator .hasNext ()) {
120104 String next = iterator .next ();
121105 JsonNode child = jsonNode .get (next );
122106 String nodeKey = getNodeKey (prefix , next );
123107
124108 nodeAndJsonNodeMapping .put (nodeKey , child );
125- if (child .isArray ()){
109+ if (child .isArray ()) {
126110 parseTree (child , nodeKey );
127- }else {
111+ } else {
128112 parseTree (child , nodeKey );
129113 }
130114 }
@@ -135,8 +119,8 @@ private JsonNode getIgnoreCase(String key) {
135119 return nodeAndJsonNodeMapping .get (nodeMappingKey );
136120 }
137121
138- private String getNodeKey (String prefix , String nodeName ){
139- if (Strings .isNullOrEmpty (prefix )){
122+ private String getNodeKey (String prefix , String nodeName ) {
123+ if (Strings .isNullOrEmpty (prefix )) {
140124 return nodeName ;
141125 }
142126 return prefix + "." + nodeName ;
@@ -160,15 +144,19 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
160144 } else {
161145 return node .asText ();
162146 }
163- } else if (info .getTypeClass ().equals (Types .SQL_DATE .getTypeClass ())) {
147+ } else if (info .getTypeClass ().equals (Types .SQL_DATE .getTypeClass ())) {
164148 return Date .valueOf (node .asText ());
165149 } else if (info .getTypeClass ().equals (Types .SQL_TIME .getTypeClass ())) {
166150 // local zone
167151 return Time .valueOf (node .asText ());
168152 } else if (info .getTypeClass ().equals (Types .SQL_TIMESTAMP .getTypeClass ())) {
169153 // local zone
170154 return Timestamp .valueOf (node .asText ());
171- } else {
155+ } else if (info instanceof RowTypeInfo ) {
156+ return convertRow (node , (RowTypeInfo ) info );
157+ } else if (info instanceof ObjectArrayTypeInfo ) {
158+ return convertObjectArray (node , ((ObjectArrayTypeInfo ) info ).getComponentInfo ());
159+ } else {
172160 // for types that were specified without JSON schema
173161 // e.g. POJOs
174162 try {
@@ -179,5 +167,55 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
179167 }
180168 }
181169
170+ private Row convertTopRow () {
171+ Row row = new Row (fieldNames .length );
172+ try {
173+ for (int i = 0 ; i < fieldNames .length ; i ++) {
174+ JsonNode node = getIgnoreCase (fieldNames [i ]);
175+ AbstractTableInfo .FieldExtraInfo fieldExtraInfo = fieldExtraInfos .get (i );
176+
177+ if (node == null ) {
178+ if (fieldExtraInfo != null && fieldExtraInfo .getNotNull ()) {
179+ throw new IllegalStateException ("Failed to find field with name '"
180+ + fieldNames [i ] + "'." );
181+ } else {
182+ row .setField (i , null );
183+ }
184+ } else {
185+ // Read the value as specified type
186+ Object value = convert (node , fieldTypes [i ]);
187+ row .setField (i , value );
188+ }
189+ }
190+ return row ;
191+ } finally {
192+ nodeAndJsonNodeMapping .clear ();
193+ }
194+ }
195+
196+ private Row convertRow (JsonNode node , RowTypeInfo info ) {
197+ final String [] names = info .getFieldNames ();
198+ final TypeInformation <?>[] types = info .getFieldTypes ();
199+
200+ final Row row = new Row (names .length );
201+ for (int i = 0 ; i < names .length ; i ++) {
202+ final String name = names [i ];
203+ final JsonNode subNode = node .get (name );
204+ if (subNode == null ) {
205+ row .setField (i , null );
206+ } else {
207+ row .setField (i , convert (subNode , types [i ]));
208+ }
209+ }
182210
211+ return row ;
212+ }
213+
214+ private Object convertObjectArray (JsonNode node , TypeInformation <?> elementType ) {
215+ final Object [] array = (Object []) Array .newInstance (elementType .getTypeClass (), node .size ());
216+ for (int i = 0 ; i < node .size (); i ++) {
217+ array [i ] = convert (node .get (i ), elementType );
218+ }
219+ return array ;
220+ }
183221}
0 commit comments