Skip to content

Commit 86d05ee

Browse files
committed
mongo scan
1 parent 22bcea5 commit 86d05ee

File tree

6 files changed

+38
-119
lines changed

6 files changed

+38
-119
lines changed

docs/mongoSide.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
|----|---|---|----|
4141
| type |表明 输出表类型 mongo|||
4242
| address | 连接mongo数据库 jdbcUrl |||
43-
| userName | mongo连接用户名|||
44-
| password | mongo连接密码|||
4543
| tableName | mongo表名称|||
4644
| database | mongo表名称|||
4745
| cache | 维表缓存策略(NONE/LRU)||NONE|
@@ -64,7 +62,8 @@ create table sideTable(
6462
PERIOD FOR SYSTEM_TIME
6563
)WITH(
6664
type ='mongo',
67-
address ='172.21.32.1:27017,172.21.32.1:27017',
65+
//mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/[?options]]
66+
address ='mongodb://172.21.32.1:27017,172.21.32.1:27017',
6867
database ='test',
6968
tableName ='sidetest',
7069
cache ='LRU',

docs/mongoSink.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ CREATE TABLE tableName(
3333
|----|----|----|----|
3434
|type |表明 输出表类型 mongo|||
3535
|address | 连接mongo数据库 jdbcUrl |||
36-
|userName | mongo连接用户名|||
37-
|password | mongo连接密码|||
3836
|tableName | mongo表名称|||
3937
|database | mongo表名称|||
4038
|parallelism | 并行度设置||1|
@@ -46,9 +44,8 @@ CREATE TABLE MyResult(
4644
pv VARCHAR
4745
)WITH(
4846
type ='mongo',
49-
address ='172.21.32.1:27017,172.21.32.1:27017',
50-
userName ='dtstack',
51-
password ='abc123',
47+
//mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/[?options]]
48+
address ='mongodb://172.21.32.1:27017,172.21.32.1:27017',
5249
database ='test',
5350
tableName ='pv',
5451
parallelism ='1'

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,13 @@
2626
import com.dtstack.flink.sql.side.mongo.utils.MongoUtil;
2727
import com.mongodb.BasicDBObject;
2828
import com.mongodb.MongoClient;
29-
import com.mongodb.MongoClientOptions;
30-
import com.mongodb.MongoCredential;
31-
import com.mongodb.ServerAddress;
29+
import com.mongodb.MongoClientURI;
3230
import com.mongodb.client.FindIterable;
3331
import com.mongodb.client.MongoCollection;
3432
import com.mongodb.client.MongoCursor;
3533
import com.mongodb.client.MongoDatabase;
3634
import org.apache.calcite.sql.JoinType;
3735
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.commons.lang3.StringUtils;
3936
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4037
import com.google.common.collect.Lists;
4138
import com.google.common.collect.Maps;
@@ -49,7 +46,6 @@
4946

5047
import java.sql.SQLException;
5148
import java.sql.Timestamp;
52-
import java.util.ArrayList;
5349
import java.util.Calendar;
5450
import java.util.List;
5551
import java.util.Map;
@@ -180,34 +176,13 @@ private String buildKey(Map<String, Object> val, List<String> equalFieldList) {
180176
return sb.toString();
181177
}
182178

183-
private MongoCollection getConn(String address, String userName, String password, String database, String tableName) {
179+
private MongoCollection getConn(String address, String database, String tableName) {
184180
MongoCollection dbCollection;
185-
try {
186-
MongoCredential credential;
187-
String[] servers = address.split(",");
188-
String host;
189-
Integer port;
190-
String[] hostAndPort;
191-
List<ServerAddress> lists = new ArrayList<>();
192-
for (String server : servers) {
193-
hostAndPort = server.split(":");
194-
host = hostAndPort[0];
195-
port = Integer.parseInt(hostAndPort[1]);
196-
lists.add(new ServerAddress(host, port));
197-
}
198-
if (!StringUtils.isEmpty(userName) || !StringUtils.isEmpty(password)) {
199-
credential = MongoCredential.createCredential(userName, database, password.toCharArray());
200-
// To connect to mongodb server
201-
mongoClient = new MongoClient(lists, credential, new MongoClientOptions.Builder().build());
202-
} else {
203-
mongoClient = new MongoClient(lists);
204-
}
205-
db = mongoClient.getDatabase(database);
206-
dbCollection = db.getCollection(tableName, Document.class);
207-
return dbCollection;
208-
} catch (Exception e) {
209-
throw new RuntimeException("[connMongoDB]:" + e.getMessage());
210-
}
181+
mongoClient = new MongoClient(new MongoClientURI(address));
182+
db = mongoClient.getDatabase(database);
183+
dbCollection = db.getCollection(tableName, Document.class);
184+
return dbCollection;
185+
211186
}
212187

213188
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
@@ -217,8 +192,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
217192
try {
218193
for (int i = 0; i < CONN_RETRY_NUM; i++) {
219194
try {
220-
dbCollection = getConn(tableInfo.getAddress(), tableInfo.getUserName(), tableInfo.getPassword(),
221-
tableInfo.getDatabase(), tableInfo.getTableName());
195+
dbCollection = getConn(tableInfo.getAddress(), tableInfo.getDatabase(), tableInfo.getTableName());
222196
break;
223197
} catch (Exception e) {
224198
if (i == CONN_RETRY_NUM - 1) {
@@ -237,9 +211,9 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
237211

238212
//load data from table
239213
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
240-
BasicDBObject basicDBObject = new BasicDBObject();
214+
BasicDBObject basicDbObject = new BasicDBObject();
241215
for (String selectField : sideFieldNames) {
242-
basicDBObject.append(selectField, 1);
216+
basicDbObject.append(selectField, 1);
243217
}
244218
BasicDBObject filterObject = new BasicDBObject();
245219
try {
@@ -256,7 +230,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
256230
}
257231

258232

259-
FindIterable<Document> findIterable = dbCollection.find(filterObject).projection(basicDBObject).limit(FETCH_SIZE);
233+
FindIterable<Document> findIterable = dbCollection.find(filterObject).projection(basicDbObject).limit(FETCH_SIZE);
260234
MongoCursor<Document> mongoCursor = findIterable.iterator();
261235
while (mongoCursor.hasNext()) {
262236
Document doc = mongoCursor.next();

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@
2929
import com.dtstack.flink.sql.side.mongo.utils.MongoUtil;
3030
import com.mongodb.BasicDBObject;
3131
import com.mongodb.Block;
32+
import com.mongodb.ConnectionString;
3233
import com.mongodb.MongoCredential;
3334
import com.mongodb.ServerAddress;
3435
import com.mongodb.async.SingleResultCallback;
3536
import com.mongodb.async.client.MongoClient;
36-
import com.mongodb.async.client.MongoClientSettings;
37+
import com.mongodb.MongoClientSettings;
3738
import com.mongodb.async.client.MongoClients;
3839
import com.mongodb.async.client.MongoCollection;
3940
import com.mongodb.async.client.MongoDatabase;
@@ -70,13 +71,11 @@ public class MongoAsyncReqRow extends AsyncReqRow {
7071

7172
private static final Logger LOG = LoggerFactory.getLogger(MongoAsyncReqRow.class);
7273

73-
private final static int DEFAULT_MAX_DB_CONN_POOL_SIZE = 20;
74-
7574
private transient MongoClient mongoClient;
7675

7776
private MongoDatabase db;
7877

79-
private MongoSideTableInfo MongoSideTableInfo;
78+
private MongoSideTableInfo mongoSideTableInfo;
8079

8180
public MongoAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
8281
super(new MongoAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
@@ -85,71 +84,48 @@ public MongoAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldIn
8584
@Override
8685
public void open(Configuration parameters) throws Exception {
8786
super.open(parameters);
88-
MongoSideTableInfo = (MongoSideTableInfo) sideInfo.getSideTableInfo();
89-
connMongoDB();
87+
mongoSideTableInfo = (MongoSideTableInfo) sideInfo.getSideTableInfo();
88+
connMongoDb();
9089
}
9190

92-
public void connMongoDB() throws Exception {
93-
MongoCredential mongoCredential;
94-
String[] servers = MongoSideTableInfo.getAddress().split(",");
95-
String host;
96-
Integer port;
97-
String[] hostAndPort;
98-
List<ServerAddress> lists = new ArrayList<>();
99-
for (String server : servers) {
100-
hostAndPort = server.split(":");
101-
host = hostAndPort[0];
102-
port = Integer.parseInt(hostAndPort[1]);
103-
lists.add(new ServerAddress(host, port));
104-
}
105-
ClusterSettings clusterSettings = ClusterSettings.builder().hosts(lists).build();
106-
ConnectionPoolSettings connectionPoolSettings = ConnectionPoolSettings.builder()
107-
.maxSize(DEFAULT_MAX_DB_CONN_POOL_SIZE)
91+
public void connMongoDb() throws Exception {
92+
String address = mongoSideTableInfo.getAddress();
93+
ConnectionString connectionString = new ConnectionString(address);
94+
95+
MongoClientSettings settings = MongoClientSettings.builder()
96+
.applyConnectionString(connectionString)
10897
.build();
109-
if (!StringUtils.isEmpty(MongoSideTableInfo.getUserName()) || !StringUtils.isEmpty(MongoSideTableInfo.getPassword())) {
110-
mongoCredential = MongoCredential.createCredential(MongoSideTableInfo.getUserName(), MongoSideTableInfo.getDatabase(),
111-
MongoSideTableInfo.getPassword().toCharArray());
112-
MongoClientSettings settings = MongoClientSettings.builder().credential(mongoCredential)
113-
.clusterSettings(clusterSettings)
114-
.connectionPoolSettings(connectionPoolSettings)
115-
.build();
116-
mongoClient = MongoClients.create(settings);
117-
} else {
118-
MongoClientSettings settings = MongoClientSettings.builder().clusterSettings(clusterSettings)
119-
.connectionPoolSettings(connectionPoolSettings)
120-
.build();
121-
mongoClient = MongoClients.create(settings);
122-
}
123-
db = mongoClient.getDatabase(MongoSideTableInfo.getDatabase());
98+
mongoClient = MongoClients.create(settings);
99+
db = mongoClient.getDatabase(mongoSideTableInfo.getDatabase());
124100
}
125101

126102
@Override
127103
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
128104
CRow inputCopy = new CRow(input.row(), input.change());
129-
BasicDBObject basicDBObject = new BasicDBObject();
105+
BasicDBObject basicDbObject = new BasicDBObject();
130106
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
131107
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
132108
Object equalObj = inputCopy.row().getField(conValIndex);
133109
if (equalObj == null) {
134110
dealMissKey(inputCopy, resultFuture);
135111
return;
136112
}
137-
basicDBObject.put(sideInfo.getEqualFieldList().get(i), equalObj);
113+
basicDbObject.put(sideInfo.getEqualFieldList().get(i), equalObj);
138114
}
139115
try {
140116
// 填充谓词
141117
sideInfo.getSideTableInfo().getPredicateInfoes().stream().map(info -> {
142118
BasicDBObject filterCondition = MongoUtil.buildFilterObject(info);
143119
if (null != filterCondition) {
144-
basicDBObject.append(info.getFieldName(), filterCondition);
120+
basicDbObject.append(info.getFieldName(), filterCondition);
145121
}
146122
return info;
147123
}).count();
148124
} catch (Exception e) {
149125
LOG.info("add predicate infoes error ", e);
150126
}
151127

152-
String key = buildCacheKey(basicDBObject.values());
128+
String key = buildCacheKey(basicDbObject.values());
153129
if (openCache()) {
154130
CacheObj val = getFromCache(key);
155131
if (val != null) {
@@ -171,7 +147,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
171147
}
172148
}
173149
AtomicInteger atomicInteger = new AtomicInteger(0);
174-
MongoCollection dbCollection = db.getCollection(MongoSideTableInfo.getTableName(), Document.class);
150+
MongoCollection dbCollection = db.getCollection(mongoSideTableInfo.getTableName(), Document.class);
175151
List<Document> cacheContent = Lists.newArrayList();
176152
Block<Document> printDocumentBlock = new Block<Document>() {
177153
@Override
@@ -197,7 +173,7 @@ public void onResult(final Void result, final Throwable t) {
197173
}
198174
}
199175
};
200-
dbCollection.find(basicDBObject).forEach(printDocumentBlock, callbackWhenFinished);
176+
dbCollection.find(basicDbObject).forEach(printDocumentBlock, callbackWhenFinished);
201177
}
202178

203179
@Override

mongo/mongo-side/mongo-side-core/src/main/java/com/dtstack/flink/sql/side/mongo/utils/MongoUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public static BasicDBObject buildFilterObject(PredicateInfo info) {
5555
return new BasicDBObject("$exists", true);
5656
case "IS NULL":
5757
return new BasicDBObject("$exists", false);
58+
default:
5859
}
5960
return null;
6061
}

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,10 @@
2121

2222
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
2323
import com.mongodb.MongoClient;
24-
import com.mongodb.MongoClientOptions;
25-
import com.mongodb.MongoCredential;
26-
import com.mongodb.ServerAddress;
24+
import com.mongodb.MongoClientURI;
2725
import com.mongodb.client.MongoCollection;
2826
import com.mongodb.client.MongoDatabase;
2927
import com.mongodb.client.result.UpdateResult;
30-
import org.apache.commons.lang3.StringUtils;
3128
import org.apache.flink.api.common.typeinfo.TypeInformation;
3229
import org.apache.flink.api.java.tuple.Tuple2;
3330
import org.apache.flink.configuration.Configuration;
@@ -37,9 +34,6 @@
3734
import org.slf4j.Logger;
3835
import org.slf4j.LoggerFactory;
3936
import java.io.IOException;
40-
import java.text.SimpleDateFormat;
41-
import java.util.ArrayList;
42-
import java.util.List;
4337

4438
/**
4539
* Reason:
@@ -124,30 +118,8 @@ public void close() {
124118
}
125119

126120
private void establishConnection() {
127-
try {
128-
MongoCredential credential;
129-
String[] servers = address.split(",");
130-
String host;
131-
Integer port;
132-
String[] hostAndPort;
133-
List<ServerAddress> lists = new ArrayList<>();
134-
for (String server : servers) {
135-
hostAndPort = server.split(":");
136-
host = hostAndPort[0];
137-
port = Integer.parseInt(hostAndPort[1]);
138-
lists.add(new ServerAddress(host, port));
139-
}
140-
if (!StringUtils.isEmpty(userName) || !StringUtils.isEmpty(password)) {
141-
credential = MongoCredential.createCredential(userName, database, password.toCharArray());
142-
// To connect to mongodb server
143-
mongoClient = new MongoClient(lists, credential, new MongoClientOptions.Builder().build());
144-
} else {
145-
mongoClient = new MongoClient(lists);
146-
}
147-
db = mongoClient.getDatabase(database);
148-
} catch (Exception e) {
149-
throw new IllegalArgumentException("[connMongoDB]:" + e.getMessage());
150-
}
121+
mongoClient = new MongoClient(new MongoClientURI(address));
122+
db = mongoClient.getDatabase(database);
151123
}
152124

153125
private MongoOutputFormat() {

0 commit comments

Comments
 (0)