Skip to content

Commit 73d058a

Browse files
committed
partitioned join bug fix
1 parent 2411ac6 commit 73d058a

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.calcite.sql.parser.SqlParserPos;
4747
import org.apache.commons.collections.CollectionUtils;
4848
import org.apache.flink.api.common.typeinfo.TypeInformation;
49+
import org.apache.flink.api.java.functions.KeySelector;
4950
import org.apache.flink.api.java.tuple.Tuple2;
5051
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5152
import com.google.common.collect.HashBasedTable;
@@ -58,7 +59,9 @@
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
6061

62+
import java.io.Serializable;
6163
import java.util.*;
64+
import java.util.stream.Collectors;
6265

6366
import static org.apache.calcite.sql.SqlKind.*;
6467

@@ -743,12 +746,12 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
743746
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
744747
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
745748
.returns(Row.class);
749+
adaptStream.getTransformation().setOutputType(leftTypeInfo);
746750

747751
//join side table before keyby ===> Reducing the size of each dimension table cache of async
748752
if(sideTableInfo.isPartitionedJoin()){
749753
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
750-
String[] leftJoinColArr = new String[leftJoinColList.size()];
751-
leftJoinColArr = leftJoinColList.toArray(leftJoinColArr);
754+
String[] leftJoinColArr = leftJoinColList.toArray(new String[leftJoinColList.size()]);
752755
adaptStream = adaptStream.keyBy(leftJoinColArr);
753756
}
754757

0 commit comments

Comments
 (0)