2323
2424import com .dtstack .flink .sql .source .AbsDeserialization ;
2525import com .dtstack .flink .sql .source .kafka .metric .KafkaTopicPartitionLagMetric ;
26+ import com .dtstack .flink .sql .table .TableInfo ;
2627import org .apache .flink .api .common .typeinfo .TypeInformation ;
28+ import org .apache .flink .api .common .typeinfo .Types ;
2729import org .apache .flink .api .java .typeutils .RowTypeInfo ;
2830import org .apache .flink .calcite .shaded .com .google .common .base .Strings ;
2931import org .apache .flink .metrics .MetricGroup ;
3032import org .apache .flink .shaded .guava18 .com .google .common .collect .Maps ;
33+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonProcessingException ;
3134import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3235import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
3336import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .JsonNodeType ;
4245import org .slf4j .LoggerFactory ;
4346
4447import java .io .IOException ;
48+ import java .lang .reflect .Array ;
4549import java .lang .reflect .Field ;
50+ import java .sql .Date ;
51+ import java .sql .Time ;
52+ import java .sql .Timestamp ;
4653import java .util .Iterator ;
54+ import java .util .List ;
4755import java .util .Map ;
4856import java .util .Set ;
4957
@@ -64,7 +72,7 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
6472
6573 private static final long serialVersionUID = 2385115520960444192L ;
6674
67- private static int rowLenth = 1000 ;
75+ private static int dirtyDataFrequency = 1000 ;
6876
6977 private final ObjectMapper objectMapper = new ObjectMapper ();
7078
@@ -115,7 +123,7 @@ public Row deserialize(byte[] message) throws IOException {
115123 try {
116124 JsonNode root = objectMapper .readTree (message );
117125
118- if (numInRecord .getCount ()% rowLenth == 0 ){
126+ if (numInRecord .getCount () % dirtyDataFrequency == 0 ) {
119127 LOG .info (root .toString ());
120128 }
121129
@@ -137,17 +145,19 @@ public Row deserialize(byte[] message) throws IOException {
137145 }
138146 } else {
139147 // Read the value as specified type
140- Object value = objectMapper .treeToValue (node , fieldTypes [i ].getTypeClass ());
148+
149+ Object value = convert (node , fieldTypes [i ]);
141150 row .setField (i , value );
142151 }
143152 }
144153
145154 numInResolveRecord .inc ();
146155 return row ;
147- } catch (Throwable t ) {
156+ } catch (Exception e ) {
148157 //add metric of dirty data
149- if (dirtyDataCounter .getCount ()% rowLenth == 0 ){
158+ if (dirtyDataCounter .getCount () % dirtyDataFrequency == 0 ) {
150159 LOG .info ("dirtyData: " + new String (message ));
160+ LOG .error ("" , e );
151161 }
152162 dirtyDataCounter .inc ();
153163 return null ;
@@ -245,4 +255,29 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
245255 private static String partitionLagMetricName (TopicPartition tp ) {
246256 return tp + ".records-lag" ;
247257 }
258+
259+ private Object convert (JsonNode node , TypeInformation <?> info ) {
260+ if (info .getTypeClass ().equals (Types .BOOLEAN .getTypeClass ())) {
261+ return node .asBoolean ();
262+ } else if (info .getTypeClass ().equals (Types .STRING .getTypeClass ())) {
263+ return node .asText ();
264+ } else if (info .getTypeClass ().equals (Types .SQL_DATE .getTypeClass ())) {
265+ return Date .valueOf (node .asText ());
266+ } else if (info .getTypeClass ().equals (Types .SQL_TIME .getTypeClass ())) {
267+ // local zone
268+ return Time .valueOf (node .asText ());
269+ } else if (info .getTypeClass ().equals (Types .SQL_TIMESTAMP .getTypeClass ())) {
270+ // local zone
271+ return Timestamp .valueOf (node .asText ());
272+ } else {
273+ // for types that were specified without JSON schema
274+ // e.g. POJOs
275+ try {
276+ return objectMapper .treeToValue (node , info .getTypeClass ());
277+ } catch (JsonProcessingException e ) {
278+ throw new IllegalStateException ("Unsupported type information '" + info + "' for node: " + node );
279+ }
280+ }
281+ }
282+
248283}
0 commit comments