Skip to content

Commit e2c5643

Browse files
committed
partionjoin type convert
1 parent 73d058a commit e2c5643

File tree

1 file changed

+29
-1
lines changed

1 file changed

+29
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.calcite.sql.parser.SqlParseException;
4646
import org.apache.calcite.sql.parser.SqlParserPos;
4747
import org.apache.commons.collections.CollectionUtils;
48+
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
4849
import org.apache.flink.api.common.typeinfo.TypeInformation;
4950
import org.apache.flink.api.java.functions.KeySelector;
5051
import org.apache.flink.api.java.tuple.Tuple2;
@@ -55,11 +56,13 @@
5556
import org.apache.flink.streaming.api.datastream.DataStream;
5657
import org.apache.flink.table.api.Table;
5758
import org.apache.flink.table.api.java.StreamTableEnvironment;
59+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5860
import org.apache.flink.types.Row;
5961
import org.slf4j.Logger;
6062
import org.slf4j.LoggerFactory;
6163

6264
import java.io.Serializable;
65+
import java.sql.Timestamp;
6366
import java.util.*;
6467
import java.util.stream.Collectors;
6568

@@ -241,6 +244,28 @@ public RowTypeInfo buildOutRowTypeInfo(List<FieldInfo> sideJoinFieldInfo, HashBa
241244
return new RowTypeInfo(sideOutTypes, sideOutNames);
242245
}
243246

247+
/**
248+
* 对时间类型进行类型转换
249+
* @param leftTypeInfo
250+
* @return
251+
*/
252+
private RowTypeInfo buildLeftTableOutType(RowTypeInfo leftTypeInfo) {
253+
TypeInformation[] sideOutTypes = new TypeInformation[leftTypeInfo.getFieldNames().length];
254+
TypeInformation<?>[] fieldTypes = leftTypeInfo.getFieldTypes();
255+
for (int i = 0; i < sideOutTypes.length; i++) {
256+
sideOutTypes[i] = convertTimeAttributeType(fieldTypes[i]);
257+
}
258+
RowTypeInfo rowTypeInfo = new RowTypeInfo(sideOutTypes, leftTypeInfo.getFieldNames());
259+
return rowTypeInfo;
260+
}
261+
262+
private TypeInformation convertTimeAttributeType(TypeInformation typeInformation) {
263+
if (typeInformation instanceof TimeIndicatorTypeInfo) {
264+
return TypeInformation.of(Timestamp.class);
265+
}
266+
return typeInformation;
267+
}
268+
244269
//需要考虑更多的情况
245270
private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, String> mappingTable, String targetTableName, String tableAlias) {
246271
SqlKind sqlKind = sqlNode.getKind();
@@ -746,7 +771,8 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
746771
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
747772
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
748773
.returns(Row.class);
749-
adaptStream.getTransformation().setOutputType(leftTypeInfo);
774+
RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
775+
adaptStream.getTransformation().setOutputType(leftTableOutType);
750776

751777
//join side table before keyby ===> Reducing the size of each dimension table cache of async
752778
if(sideTableInfo.isPartitionedJoin()){
@@ -780,6 +806,8 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
780806
}
781807
}
782808

809+
810+
783811
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
784812
List<String> fieldNames = new LinkedList<>();
785813
String fieldsInfo = result.getFieldsInfoStr();

0 commit comments

Comments
 (0)