2121import org .apache .flink .table .dataformat .BaseRow ;
2222import org .apache .flink .table .dataformat .BinaryString ;
2323import org .apache .flink .table .dataformat .DataFormatConverters ;
24+ import org .apache .flink .table .dataformat .Decimal ;
2425import org .apache .flink .table .dataformat .GenericRow ;
2526import org .apache .flink .table .dataformat .SqlTimestamp ;
2627import org .apache .flink .types .Row ;
2728
29+ import java .math .BigDecimal ;
2830import java .sql .Date ;
2931import java .sql .Time ;
3032import java .sql .Timestamp ;
3133import java .time .LocalDate ;
34+ import java .time .LocalTime ;
3235
3336/**
3437 * Company: www.dtstack.com
@@ -49,21 +52,37 @@ public static BaseRow convertToBaseRow(Row row) {
4952 genericRow .setField (i , newTimestamp );
5053 } else if (row .getField (i ) instanceof Time ) {
5154 genericRow .setField (i , DataFormatConverters .TimeConverter .INSTANCE .toInternal ((Time ) row .getField (i )));
52- } else if (row .getField (i ) instanceof Double ) {
55+ } else if (row .getField (i ) instanceof Double || row . getField ( i ). getClass (). equals ( double . class ) ) {
5356 genericRow .setField (i , DataFormatConverters .DoubleConverter .INSTANCE .toInternal ((Double ) row .getField (i )));
54- } else if (row .getField (i ) instanceof Float ) {
57+ } else if (row .getField (i ) instanceof Float || row . getField ( i ). getClass (). equals ( float . class ) ) {
5558 genericRow .setField (i , DataFormatConverters .FloatConverter .INSTANCE .toInternal ((Float ) row .getField (i )));
56- } else if (row .getField (i ) instanceof Long ) {
59+ } else if (row .getField (i ) instanceof Long || row . getField ( i ). getClass (). equals ( long . class ) ) {
5760 genericRow .setField (i , DataFormatConverters .LongConverter .INSTANCE .toInternal ((Long ) row .getField (i )));
61+ } else if (row .getField (i ) instanceof Boolean || row .getField (i ).getClass ().equals (boolean .class )) {
62+ genericRow .setField (i , DataFormatConverters .BooleanConverter .INSTANCE .toInternal ((Boolean ) row .getField (i )));
63+ } else if (row .getField (i ) instanceof Integer || row .getField (i ).getClass ().equals (int .class )) {
64+ genericRow .setField (i , DataFormatConverters .IntConverter .INSTANCE .toInternal ((Integer ) row .getField (i )));
65+ } else if (row .getField (i ) instanceof Short || row .getField (i ).getClass ().equals (short .class )) {
66+ genericRow .setField (i , DataFormatConverters .ShortConverter .INSTANCE .toInternal ((Short ) row .getField (i )));
67+ } else if (row .getField (i ) instanceof Byte || row .getField (i ).getClass ().equals (byte .class )) {
68+ genericRow .setField (i , DataFormatConverters .ByteConverter .INSTANCE .toInternal ((Byte ) row .getField (i )));
5869 } else if (row .getField (i ) instanceof Date ) {
5970 genericRow .setField (i , DataFormatConverters .DateConverter .INSTANCE .toInternal ((Date ) row .getField (i )));
6071 } else if (row .getField (i ) instanceof LocalDate ) {
6172 genericRow .setField (i , DataFormatConverters .LocalDateConverter .INSTANCE .toInternal ((LocalDate ) row .getField (i )));
73+ } else if (row .getField (i ) instanceof LocalTime ) {
74+ genericRow .setField (i , DataFormatConverters .LocalTimeConverter .INSTANCE .toInternal ((LocalTime ) row .getField (i )));
75+ } else if (row .getField (i ) instanceof BigDecimal ) {
76+ BigDecimal tempDecimal = (BigDecimal ) row .getField (i );
77+ int precision = ((BigDecimal ) row .getField (i )).precision ();
78+ int scale = ((BigDecimal ) row .getField (i )).scale ();
79+ DataFormatConverters .DecimalConverter decimalConverter = new DataFormatConverters .DecimalConverter (precision , scale );
80+ genericRow .setField (i , decimalConverter .toExternal (Decimal .fromBigDecimal (tempDecimal , precision , scale )));
6281 } else {
6382 genericRow .setField (i , row .getField (i ));
6483 }
6584 }
6685
6786 return genericRow ;
6887 }
69- }
88+ }
0 commit comments