Skip to content

Commit 5693662

Browse files
string.split改用StringUtils.split()
1 parent 9ec2f5e commit 5693662

File tree

35 files changed

+369
-406
lines changed

35 files changed

+369
-406
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,25 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import com.datastax.driver.core.Cluster;
22-
import com.datastax.driver.core.ConsistencyLevel;
23-
import com.datastax.driver.core.HostDistance;
24-
import com.datastax.driver.core.PoolingOptions;
25-
import com.datastax.driver.core.QueryOptions;
26-
import com.datastax.driver.core.ResultSet;
27-
import com.datastax.driver.core.Session;
28-
import com.datastax.driver.core.SocketOptions;
21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
27+
import com.datastax.driver.core.*;
2928
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3029
import com.datastax.driver.core.policies.RetryPolicy;
3130
import com.dtstack.flink.sql.side.AllReqRow;
3231
import com.dtstack.flink.sql.side.FieldInfo;
3332
import com.dtstack.flink.sql.side.JoinInfo;
3433
import com.dtstack.flink.sql.side.SideTableInfo;
3534
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3935
import com.google.common.collect.Lists;
4036
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.runtime.types.CRow;
42-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
43-
import org.apache.flink.types.Row;
44-
import org.apache.flink.util.Collector;
37+
import org.apache.calcite.sql.JoinType;
38+
import org.apache.commons.collections.CollectionUtils;
39+
import org.apache.commons.lang3.StringUtils;
4540
import org.slf4j.Logger;
4641
import org.slf4j.LoggerFactory;
4742

@@ -222,9 +217,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
222217
//重试策略
223218
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
224219

225-
for (String server : address.split(",")) {
226-
cassandraPort = Integer.parseInt(server.split(":")[1]);
227-
serversList.add(InetAddress.getByName(server.split(":")[0]));
220+
for (String server : StringUtils.split(address, ",")) {
221+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
222+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
228223
}
229224

230225
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -278,7 +273,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
278273
//load data from table
279274
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
280275
ResultSet resultSet = session.execute(sql);
281-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
276+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
282277
for (com.datastax.driver.core.Row row : resultSet) {
283278
Map<String, Object> oneRow = Maps.newHashMap();
284279
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,44 +19,34 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22-
import com.datastax.driver.core.Cluster;
23-
import com.datastax.driver.core.ConsistencyLevel;
24-
import com.datastax.driver.core.HostDistance;
25-
import com.datastax.driver.core.PoolingOptions;
26-
import com.datastax.driver.core.QueryOptions;
27-
import com.datastax.driver.core.ResultSet;
28-
import com.datastax.driver.core.Session;
29-
import com.datastax.driver.core.SocketOptions;
22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25+
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
27+
import org.apache.flink.types.Row;
28+
29+
import com.datastax.driver.core.*;
3030
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3131
import com.datastax.driver.core.policies.RetryPolicy;
3232
import com.dtstack.flink.sql.enums.ECacheContentType;
33-
import com.dtstack.flink.sql.side.AsyncReqRow;
34-
import com.dtstack.flink.sql.side.CacheMissVal;
35-
import com.dtstack.flink.sql.side.FieldInfo;
36-
import com.dtstack.flink.sql.side.JoinInfo;
37-
import com.dtstack.flink.sql.side.SideTableInfo;
33+
import com.dtstack.flink.sql.side.*;
3834
import com.dtstack.flink.sql.side.cache.CacheObj;
3935
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4036
import com.google.common.base.Function;
37+
import com.google.common.collect.Lists;
4138
import com.google.common.util.concurrent.AsyncFunction;
4239
import com.google.common.util.concurrent.FutureCallback;
4340
import com.google.common.util.concurrent.Futures;
4441
import com.google.common.util.concurrent.ListenableFuture;
4542
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.runtime.types.CRow;
51-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
52-
import org.apache.flink.types.Row;
43+
import org.apache.commons.lang3.StringUtils;
5344
import org.slf4j.Logger;
5445
import org.slf4j.LoggerFactory;
5546

5647
import java.net.InetAddress;
5748
import java.sql.Timestamp;
5849
import java.util.ArrayList;
59-
import java.util.Collections;
6050
import java.util.List;
6151
import java.util.Map;
6252

@@ -134,9 +124,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
134124
//重试策略
135125
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
136126

137-
for (String server : address.split(",")) {
138-
cassandraPort = Integer.parseInt(server.split(":")[1]);
139-
serversList.add(InetAddress.getByName(server.split(":")[0]));
127+
for (String server : StringUtils.split(address, ",")) {
128+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
129+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
140130
}
141131

142132
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,20 @@
3838

3939
package com.dtstack.flink.sql.sink.cassandra;
4040

41-
import com.datastax.driver.core.Cluster;
42-
import com.datastax.driver.core.ConsistencyLevel;
43-
import com.datastax.driver.core.HostDistance;
44-
import com.datastax.driver.core.PoolingOptions;
45-
import com.datastax.driver.core.QueryOptions;
46-
import com.datastax.driver.core.ResultSet;
47-
import com.datastax.driver.core.Session;
48-
import com.datastax.driver.core.SocketOptions;
49-
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
50-
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
5241
import org.apache.flink.api.common.typeinfo.TypeInformation;
5342
import org.apache.flink.api.java.tuple.Tuple;
5443
import org.apache.flink.api.java.tuple.Tuple2;
5544
import org.apache.flink.configuration.Configuration;
5645
import org.apache.flink.types.Row;
46+
47+
import com.datastax.driver.core.*;
48+
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
49+
import com.datastax.driver.core.policies.RetryPolicy;
50+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
51+
import org.apache.commons.lang3.StringUtils;
5752
import org.slf4j.Logger;
5853
import org.slf4j.LoggerFactory;
54+
5955
import java.io.IOException;
6056
import java.net.InetAddress;
6157
import java.sql.DriverManager;
@@ -145,9 +141,9 @@ public void open(int taskNumber, int numTasks) {
145141
//重试策略
146142
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
147143

148-
for (String server : address.split(",")) {
149-
cassandraPort = Integer.parseInt(server.split(":")[1]);
150-
serversList.add(InetAddress.getByName(server.split(":")[0]));
144+
for (String server : StringUtils.split(address, ",")) {
145+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
146+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
151147
}
152148

153149
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import com.google.common.collect.Maps;
25+
import org.apache.commons.lang3.StringUtils;
2526

2627
import java.util.List;
2728
import java.util.Map;
@@ -69,7 +70,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
6970
}
7071

7172
private Map parseProp(String propsStr){
72-
String[] strs = propsStr.trim().split("'\\s*,");
73+
String[] strs = StringUtils.split(propsStr.trim(), "'\\s*,");
7374
Map<String, Object> propMap = Maps.newHashMap();
7475
for(int i=0; i<strs.length; i++){
7576
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');

0 commit comments

Comments
 (0)