Skip to content

Commit abfb083

Browse files
author
dapeng
committed
fix mongo url bug
1 parent 3ba019d commit abfb083

File tree

4 files changed

+42
-10
lines changed

4 files changed

+42
-10
lines changed

docs/mongoSide.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
|----|---|
3131
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
3232
| colName | 列名称|
33-
| colType | 列类型 [colType支持的类型](colType.md)|
33+
| colType | 列类型 [colType支持的类型](docs/colType.md)|
3434
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
3535
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
3636

@@ -62,8 +62,7 @@ create table sideTable(
6262
PERIOD FOR SYSTEM_TIME
6363
)WITH(
6464
type ='mongo',
65-
//mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/[?options]]
66-
address ='mongodb://172.21.32.1:27017,172.21.32.1:27017',
65+
address ='172.21.32.1:27017,172.21.32.1:27017',
6766
database ='test',
6867
tableName ='sidetest',
6968
cache ='LRU',

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,10 @@ private String buildKey(Map<String, Object> val, List<String> equalFieldList) {
177177
return sb.toString();
178178
}
179179

180-
private MongoCollection getConn(String address, String database, String tableName) {
180+
private MongoCollection getConn(String host, String userName, String password, String database, String tableName) {
181+
181182
MongoCollection dbCollection;
182-
mongoClient = new MongoClient(new MongoClientURI(address));
183+
mongoClient = new MongoClient(new MongoClientURI(getConnectionUrl(host, userName, password)));
183184
db = mongoClient.getDatabase(database);
184185
dbCollection = db.getCollection(tableName, Document.class);
185186
return dbCollection;
@@ -193,7 +194,8 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
193194
try {
194195
for (int i = 0; i < CONN_RETRY_NUM; i++) {
195196
try {
196-
dbCollection = getConn(tableInfo.getAddress(), tableInfo.getDatabase(), tableInfo.getTableName());
197+
dbCollection = getConn(tableInfo.getAddress(), tableInfo.getUserName(), tableInfo.getPassword(),
198+
tableInfo.getDatabase(), tableInfo.getTableName());
197199
break;
198200
} catch (Exception e) {
199201
if (i == CONN_RETRY_NUM - 1) {
@@ -251,4 +253,14 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
251253
}
252254
}
253255
}
256+
private String getConnectionUrl(String address, String userName, String password){
257+
if(address.startsWith("mongodb://") || address.startsWith("mongodb+srv://")){
258+
return address;
259+
}
260+
if (StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)) {
261+
return String.format("mongodb://%s:%s@%s", userName, password, address);
262+
}
263+
return String.format("mongodb://%s", address);
264+
}
265+
254266
}

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.mongo;
2121

22+
import org.apache.commons.lang3.StringUtils;
2223
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.streaming.api.functions.async.ResultFuture;
@@ -84,9 +85,8 @@ public void open(Configuration parameters) throws Exception {
8485
}
8586

8687
public void connMongoDb() throws Exception {
87-
String address = mongoSideTableInfo.getAddress();
88-
ConnectionString connectionString = new ConnectionString(address);
89-
88+
ConnectionString connectionString = new ConnectionString(getConnectionUrl(mongoSideTableInfo.getAddress(),
89+
mongoSideTableInfo.getUserName(), mongoSideTableInfo.getPassword()));
9090
MongoClientSettings settings = MongoClientSettings.builder()
9191
.applyConnectionString(connectionString)
9292
.build();
@@ -193,4 +193,14 @@ public void close() throws Exception {
193193
}
194194
}
195195

196+
private String getConnectionUrl(String address, String userName, String password){
197+
if(address.startsWith("mongodb://") || address.startsWith("mongodb+srv://")){
198+
return address;
199+
}
200+
if (StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)) {
201+
return String.format("mongodb://%s:%s@%s", userName, password, address);
202+
}
203+
return String.format("mongodb://%s", address);
204+
}
205+
196206
}

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.mongodb.client.MongoCollection;
2626
import com.mongodb.client.MongoDatabase;
2727
import com.mongodb.client.result.UpdateResult;
28+
import org.apache.commons.lang3.StringUtils;
2829
import org.apache.flink.api.common.typeinfo.TypeInformation;
2930
import org.apache.flink.api.java.tuple.Tuple2;
3031
import org.apache.flink.configuration.Configuration;
@@ -119,7 +120,8 @@ public void close() {
119120
}
120121

121122
private void establishConnection() {
122-
mongoClient = new MongoClient(new MongoClientURI(address));
123+
124+
mongoClient = new MongoClient(new MongoClientURI(getConnectionUrl()));
123125
db = mongoClient.getDatabase(database);
124126
}
125127

@@ -197,5 +199,14 @@ public MongoOutputFormat finish() {
197199
}
198200
}
199201

202+
private String getConnectionUrl(){
203+
if(address.startsWith("mongodb://") || address.startsWith("mongodb+srv://")){
204+
return address;
205+
}
206+
if (StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)) {
207+
return String.format("mongodb://%s:%s@%s", userName, password, address);
208+
}
209+
return String.format("mongodb://%s", address);
210+
}
200211

201212
}

0 commit comments

Comments
 (0)