Skip to content

Commit d8965bc

Browse files
committed
modify position
1 parent e2c5643 commit d8965bc

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -771,11 +771,12 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
771771
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
772772
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
773773
.returns(Row.class);
774-
RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
775-
adaptStream.getTransformation().setOutputType(leftTableOutType);
774+
776775

777776
//join side table before keyby ===> Reducing the size of each dimension table cache of async
778777
if(sideTableInfo.isPartitionedJoin()){
778+
RowTypeInfo leftTableOutType = buildLeftTableOutType(leftTypeInfo);
779+
adaptStream.getTransformation().setOutputType(leftTableOutType);
779780
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
780781
String[] leftJoinColArr = leftJoinColList.toArray(new String[leftJoinColList.size()]);
781782
adaptStream = adaptStream.keyBy(leftJoinColArr);

0 commit comments

Comments
 (0)