Skip to content

Commit 818c320

Browse files
author
dapeng
committed
fix mongo and async block bug
1 parent 15c8635 commit 818c320

File tree

2 files changed

+20
-11
lines changed
  • mongo/mongo-side/mongo-side-core/src/main/java/com/dtstack/flink/sql/side/mongo/utils
  • rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async

2 files changed

+20
-11
lines changed

mongo/mongo-side/mongo-side-core/src/main/java/com/dtstack/flink/sql/side/mongo/utils/MongoUtil.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.side.PredicateInfo;
2222
import com.mongodb.BasicDBObject;
2323
import org.apache.commons.lang3.StringUtils;
24+
import org.bson.Document;
2425

2526
import java.util.Arrays;
2627
import java.util.stream.Collectors;
@@ -32,25 +33,27 @@
3233
*/
3334
public class MongoUtil {
3435
public static BasicDBObject buildFilterObject(PredicateInfo info) {
36+
37+
String value = info.getCondition().replaceAll("'", "");
3538
switch (info.getOperatorName()) {
3639
case "=":
37-
return new BasicDBObject("$eq", info.getCondition());
40+
return new BasicDBObject("$eq", value);
3841
case ">":
39-
return new BasicDBObject("$gt", info.getCondition());
42+
return new BasicDBObject("$gt", value);
4043
case ">=":
41-
return new BasicDBObject("$gte", info.getCondition());
44+
return new BasicDBObject("$gte", value);
4245
case "<":
43-
return new BasicDBObject("$lt", info.getCondition());
46+
return new BasicDBObject("$lt", value);
4447
case "<=":
45-
return new BasicDBObject("$lte", info.getCondition());
48+
return new BasicDBObject("$lte", value);
4649
case "<>":
47-
return new BasicDBObject("$ne", info.getCondition());
50+
return new BasicDBObject("$ne", value);
4851
case "IN":
49-
Object[] values = Arrays.stream(StringUtils.split(info.getCondition(), ",")).map(String::trim)
52+
Object[] values = Arrays.stream(StringUtils.split(value, ",")).map(String::trim)
5053
.collect(Collectors.toList()).toArray();
5154
return new BasicDBObject("$in", values);
5255
case "NOT IN":
53-
return new BasicDBObject("$nin", StringUtils.split(info.getCondition(), ","));
56+
return new BasicDBObject("$nin", StringUtils.split(value, ","));
5457
case "IS NOT NULL":
5558
return new BasicDBObject("$exists", true);
5659
case "IS NULL":

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
137137
if(failCounter.getAndIncrement() % 1000 == 0){
138138
LOG.error("getConnection error", conn.cause());
139139
}
140-
if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(3)){
141-
dealFillDataError(input, resultFuture, conn.cause());
140+
if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){
141+
resultFuture.completeExceptionally(conn.cause());
142142
finishFlag.set(true);
143143
}
144144
conn.result().close();
@@ -155,12 +155,18 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
155155
latch.countDown();
156156
}
157157
});
158-
//主线程阻塞
159158
try {
160159
latch.wait();
161160
} catch (InterruptedException e) {
162161
LOG.error("", e);
163162
}
163+
if(!finishFlag.get()){
164+
try {
165+
Thread.sleep(100);
166+
} catch (Exception e){
167+
LOG.error("", e);
168+
}
169+
}
164170
}
165171

166172
}

0 commit comments

Comments
 (0)