Skip to content

Commit fa12392

Browse files
author
dapeng
committed
Merge branch 'feat_1.8_asyncException_mergeTest' into 1.8_release_3.10.x_mergedTest_new
2 parents 16d4957 + 1487ed9 commit fa12392

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncSideInfo.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import com.dtstack.flink.sql.side.JoinInfo;
55
import com.dtstack.flink.sql.side.BaseSideInfo;
66
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
7+
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
8+
import com.dtstack.flink.sql.util.ParseUtils;
9+
import com.google.common.collect.Lists;
710
import org.apache.calcite.sql.SqlBasicCall;
811
import org.apache.calcite.sql.SqlIdentifier;
912
import org.apache.calcite.sql.SqlKind;
@@ -21,6 +24,22 @@ public KuduAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
2124

2225
@Override
2326
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
27+
KuduSideTableInfo kuduSideTableInfo = (KuduSideTableInfo) sideTableInfo;
28+
29+
String sideTableName = joinInfo.getSideTableName();
30+
31+
SqlNode conditionNode = joinInfo.getCondition();
32+
33+
List<SqlNode> sqlNodeList = Lists.newArrayList();
34+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
35+
36+
for (SqlNode sqlNode : sqlNodeList) {
37+
dealOneEqualCon(sqlNode, sideTableName);
38+
}
39+
40+
sqlCondition = "select ${selectField} from ${tableName} ";
41+
sqlCondition = sqlCondition.replace("${tableName}", kuduSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
42+
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
2443
}
2544

2645
@Override

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,13 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
155155
}
156156
});
157157
try {
158-
latch.wait();
158+
latch.await();
159159
} catch (InterruptedException e) {
160160
LOG.error("", e);
161161
}
162162
if(!finishFlag.get()){
163163
try {
164-
Thread.sleep(100);
164+
Thread.sleep(3000);
165165
} catch (Exception e){
166166
LOG.error("", e);
167167
}

0 commit comments

Comments
 (0)