Skip to content

Commit f46e6a0

Browse files
committed
Merge remote-tracking branch 'origin/1.8.0_dev_partitioned_join' into v1.8.0_dev
2 parents 8d847d0 + ababf5c commit f46e6a0

File tree

1 file changed

+30
-1
lines changed

1 file changed

+30
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.calcite.sql.parser.SqlParseException;
4646
import org.apache.calcite.sql.parser.SqlParserPos;
4747
import org.apache.commons.collections.CollectionUtils;
48+
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
4849
import org.apache.flink.api.common.typeinfo.TypeInformation;
4950
import org.apache.flink.api.java.functions.KeySelector;
5051
import org.apache.flink.api.java.tuple.Tuple2;
@@ -56,11 +57,13 @@
5657
import org.apache.flink.table.api.StreamQueryConfig;
5758
import org.apache.flink.table.api.Table;
5859
import org.apache.flink.table.api.java.StreamTableEnvironment;
60+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5961
import org.apache.flink.types.Row;
6062
import org.slf4j.Logger;
6163
import org.slf4j.LoggerFactory;
6264

6365
import java.io.Serializable;
66+
import java.sql.Timestamp;
6467
import java.util.*;
6568
import java.util.stream.Collectors;
6669

@@ -242,6 +245,28 @@ public RowTypeInfo buildOutRowTypeInfo(List<FieldInfo> sideJoinFieldInfo, HashBa
242245
return new RowTypeInfo(sideOutTypes, sideOutNames);
243246
}
244247

248+
/**
249+
* 对时间类型进行类型转换
250+
* @param leftTypeInfo
251+
* @return
252+
*/
253+
private RowTypeInfo buildLeftTableOutType(RowTypeInfo leftTypeInfo) {
254+
TypeInformation[] sideOutTypes = new TypeInformation[leftTypeInfo.getFieldNames().length];
255+
TypeInformation<?>[] fieldTypes = leftTypeInfo.getFieldTypes();
256+
for (int i = 0; i < sideOutTypes.length; i++) {
257+
sideOutTypes[i] = convertTimeAttributeType(fieldTypes[i]);
258+
}
259+
RowTypeInfo rowTypeInfo = new RowTypeInfo(sideOutTypes, leftTypeInfo.getFieldNames());
260+
return rowTypeInfo;
261+
}
262+
263+
private TypeInformation convertTimeAttributeType(TypeInformation typeInformation) {
264+
if (typeInformation instanceof TimeIndicatorTypeInfo) {
265+
return TypeInformation.of(Timestamp.class);
266+
}
267+
return typeInformation;
268+
}
269+
245270
//需要考虑更多的情况
246271
private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, String> mappingTable, String targetTableName, String tableAlias) {
247272
SqlKind sqlKind = sqlNode.getKind();
@@ -747,10 +772,12 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
747772
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
748773
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
749774
.returns(Row.class);
750-
//adaptStream.getTransformation().setOutputType(leftTypeInfo);
775+
751776

752777
//join side table before keyby ===> Reducing the size of each dimension table cache of async
753778
if(sideTableInfo.isPartitionedJoin()){
779+
RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
780+
adaptStream.getTransformation().setOutputType(leftTableOutType);
754781
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
755782
String[] leftJoinColArr = leftJoinColList.toArray(new String[leftJoinColList.size()]);
756783
adaptStream = adaptStream.keyBy(leftJoinColArr);
@@ -781,6 +808,8 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
781808
}
782809
}
783810

811+
812+
784813
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
785814
List<String> fieldNames = new LinkedList<>();
786815
String fieldsInfo = result.getFieldsInfoStr();

0 commit comments

Comments
 (0)