File tree Expand file tree Collapse file tree 3 files changed +16
-3
lines changed
hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase
mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql
mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql Expand file tree Collapse file tree 3 files changed +16
-3
lines changed Original file line number Diff line number Diff line change 3636import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3737import org .apache .flink .configuration .Configuration ;
3838import org .apache .flink .streaming .api .functions .async .ResultFuture ;
39+ import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
3940import org .apache .flink .types .Row ;
4041import org .hbase .async .HBaseClient ;
4142import org .slf4j .Logger ;
@@ -159,9 +160,12 @@ protected Row fillData(Row input, Object sideInput){
159160 Row row = new Row (sideInfo .getOutFieldInfoList ().size ());
160161 for (Map .Entry <Integer , Integer > entry : sideInfo .getInFieldIndex ().entrySet ()){
161162 Object obj = input .getField (entry .getValue ());
162- if (obj instanceof Timestamp ){
163+ boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo .class .isAssignableFrom (sideInfo .getRowTypeInfo ().getTypeAt (entry .getValue ()).getClass ());
164+
165+ if (obj instanceof Timestamp && isTimeIndicatorTypeInfo ){
163166 obj = ((Timestamp )obj ).getTime ();
164167 }
168+
165169 row .setField (entry .getKey (), obj );
166170 }
167171
Original file line number Diff line number Diff line change 1414import org .apache .flink .calcite .shaded .com .google .common .collect .Lists ;
1515import org .apache .flink .calcite .shaded .com .google .common .collect .Maps ;
1616import org .apache .flink .configuration .Configuration ;
17+ import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
1718import org .apache .flink .types .Row ;
1819import org .apache .flink .util .Collector ;
1920import org .slf4j .Logger ;
@@ -66,7 +67,10 @@ protected Row fillData(Row input, Object sideInput) {
6667 Row row = new Row (sideInfo .getOutFieldInfoList ().size ());
6768 for (Map .Entry <Integer , Integer > entry : sideInfo .getInFieldIndex ().entrySet ()){
6869 Object obj = input .getField (entry .getValue ());
69- if (obj instanceof Timestamp ){
70+ boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo .class .isAssignableFrom (sideInfo .getRowTypeInfo ().getTypeAt (entry .getValue ()).getClass ());
71+
72+ //Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
73+ if (obj instanceof Timestamp && isTimeIndicatorTypeInfo ){
7074 obj = ((Timestamp )obj ).getTime ();
7175 }
7276 row .setField (entry .getKey (), obj );
Original file line number Diff line number Diff line change 3535import io .vertx .ext .jdbc .JDBCClient ;
3636import io .vertx .ext .sql .SQLClient ;
3737import io .vertx .ext .sql .SQLConnection ;
38+ import org .apache .flink .api .common .typeinfo .SqlTimeTypeInfo ;
3839import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3940import org .apache .flink .calcite .shaded .com .google .common .collect .Lists ;
4041import org .apache .flink .configuration .Configuration ;
4142import org .apache .flink .streaming .api .functions .async .ResultFuture ;
43+ import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
4244import org .apache .flink .types .Row ;
4345import org .slf4j .Logger ;
4446import org .slf4j .LoggerFactory ;
@@ -185,9 +187,12 @@ public Row fillData(Row input, Object line){
185187 Row row = new Row (sideInfo .getOutFieldInfoList ().size ());
186188 for (Map .Entry <Integer , Integer > entry : sideInfo .getInFieldIndex ().entrySet ()){
187189 Object obj = input .getField (entry .getValue ());
188- if (obj instanceof Timestamp ){
190+ boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo .class .isAssignableFrom (sideInfo .getRowTypeInfo ().getTypeAt (entry .getValue ()).getClass ());
191+
192+ if (obj instanceof Timestamp && isTimeIndicatorTypeInfo ){
189193 obj = ((Timestamp )obj ).getTime ();
190194 }
195+
191196 row .setField (entry .getKey (), obj );
192197 }
193198
You can’t perform that action at this time.
0 commit comments