|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, software |
| 13 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + * See the License for the specific language governing permissions and |
| 16 | + * limitations under the License. |
| 17 | + */ |
| 18 | + |
1 | 19 | package com.dtstack.flink.sql.util; |
2 | 20 |
|
3 | 21 | import org.apache.flink.table.dataformat.BaseRow; |
4 | 22 | import org.apache.flink.table.dataformat.BinaryString; |
| 23 | +import org.apache.flink.table.dataformat.DataFormatConverters; |
5 | 24 | import org.apache.flink.table.dataformat.GenericRow; |
6 | 25 | import org.apache.flink.table.dataformat.SqlTimestamp; |
7 | 26 | import org.apache.flink.types.Row; |
8 | 27 |
|
| 28 | +import java.sql.Date; |
| 29 | +import java.sql.Time; |
9 | 30 | import java.sql.Timestamp; |
10 | | - |
| 31 | +import java.time.LocalDate; |
11 | 32 |
|
12 | 33 | /** |
13 | 34 | * Company: www.dtstack.com |
|
17 | 38 | */ |
18 | 39 | public class RowDataConvert { |
19 | 40 |
|
20 | | - public static BaseRow convertToBaseRow(Row row){ |
| 41 | + public static BaseRow convertToBaseRow(Row row) { |
21 | 42 | int length = row.getArity(); |
22 | 43 | GenericRow genericRow = new GenericRow(length); |
23 | | - for(int i=0; i<length; i++){ |
24 | | - if(row.getField(i) instanceof String){ |
25 | | - genericRow.setField(i, BinaryString.fromString((String)row.getField(i))); |
26 | | - } else if(row.getField(i) instanceof Timestamp){ |
27 | | - SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp)row.getField(i))); |
| 44 | + for (int i = 0; i < length; i++) { |
| 45 | + if (row.getField(i) instanceof String) { |
| 46 | + genericRow.setField(i, BinaryString.fromString((String) row.getField(i))); |
| 47 | + } else if (row.getField(i) instanceof Timestamp) { |
| 48 | + SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp) row.getField(i))); |
28 | 49 | genericRow.setField(i, newTimestamp); |
29 | | - }else{ |
| 50 | + } else if (row.getField(i) instanceof Time) { |
| 51 | + genericRow.setField(i, DataFormatConverters.TimeConverter.INSTANCE.toInternal((Time) row.getField(i))); |
| 52 | + } else if (row.getField(i) instanceof Double) { |
| 53 | + genericRow.setField(i, DataFormatConverters.DoubleConverter.INSTANCE.toInternal((Double) row.getField(i))); |
| 54 | + } else if (row.getField(i) instanceof Float) { |
| 55 | + genericRow.setField(i, DataFormatConverters.FloatConverter.INSTANCE.toInternal((Float) row.getField(i))); |
| 56 | + } else if (row.getField(i) instanceof Long) { |
| 57 | + genericRow.setField(i, DataFormatConverters.LongConverter.INSTANCE.toInternal((Long) row.getField(i))); |
| 58 | + } else if (row.getField(i) instanceof Date) { |
| 59 | + genericRow.setField(i, DataFormatConverters.DateConverter.INSTANCE.toInternal((Date) row.getField(i))); |
| 60 | + } else if (row.getField(i) instanceof LocalDate) { |
| 61 | + genericRow.setField(i, DataFormatConverters.LocalDateConverter.INSTANCE.toInternal((LocalDate) row.getField(i))); |
| 62 | + } else { |
30 | 63 | genericRow.setField(i, row.getField(i)); |
31 | 64 | } |
32 | 65 | } |
|
0 commit comments