Skip to content

Commit be19965

Browse files
committed
Merge branch 'hotfix_1.8_3.10.2_21725' into 'v1.8.0_dev'
将框架中的String.split()修改为StringUtils.split() See merge request !230
2 parents 24bf610 + a817b62 commit be19965

File tree

35 files changed

+273
-190
lines changed

35 files changed

+273
-190
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
2127
import com.datastax.driver.core.Cluster;
2228
import com.datastax.driver.core.ConsistencyLevel;
2329
import com.datastax.driver.core.HostDistance;
@@ -33,15 +39,11 @@
3339
import com.dtstack.flink.sql.side.JoinInfo;
3440
import com.dtstack.flink.sql.side.SideTableInfo;
3541
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3942
import com.google.common.collect.Lists;
4043
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.runtime.types.CRow;
42-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
43-
import org.apache.flink.types.Row;
44-
import org.apache.flink.util.Collector;
44+
import org.apache.calcite.sql.JoinType;
45+
import org.apache.commons.collections.CollectionUtils;
46+
import org.apache.commons.lang3.StringUtils;
4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
4749

@@ -222,9 +224,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
222224
//重试策略
223225
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
224226

225-
for (String server : address.split(",")) {
226-
cassandraPort = Integer.parseInt(server.split(":")[1]);
227-
serversList.add(InetAddress.getByName(server.split(":")[0]));
227+
for (String server : StringUtils.split(address, ",")) {
228+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
229+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
228230
}
229231

230232
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -278,7 +280,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
278280
//load data from table
279281
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
280282
ResultSet resultSet = session.execute(sql);
281-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
283+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
282284
for (com.datastax.driver.core.Row row : resultSet) {
283285
Map<String, Object> oneRow = Maps.newHashMap();
284286
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25+
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
27+
import org.apache.flink.types.Row;
28+
2229
import com.datastax.driver.core.Cluster;
2330
import com.datastax.driver.core.ConsistencyLevel;
2431
import com.datastax.driver.core.HostDistance;
@@ -38,25 +45,19 @@
3845
import com.dtstack.flink.sql.side.cache.CacheObj;
3946
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4047
import com.google.common.base.Function;
48+
import com.google.common.collect.Lists;
4149
import com.google.common.util.concurrent.AsyncFunction;
4250
import com.google.common.util.concurrent.FutureCallback;
4351
import com.google.common.util.concurrent.Futures;
4452
import com.google.common.util.concurrent.ListenableFuture;
4553
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.runtime.types.CRow;
51-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
52-
import org.apache.flink.types.Row;
54+
import org.apache.commons.lang3.StringUtils;
5355
import org.slf4j.Logger;
5456
import org.slf4j.LoggerFactory;
5557

5658
import java.net.InetAddress;
5759
import java.sql.Timestamp;
5860
import java.util.ArrayList;
59-
import java.util.Collections;
6061
import java.util.List;
6162
import java.util.Map;
6263

@@ -134,9 +135,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
134135
//重试策略
135136
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
136137

137-
for (String server : address.split(",")) {
138-
cassandraPort = Integer.parseInt(server.split(":")[1]);
139-
serversList.add(InetAddress.getByName(server.split(":")[0]));
138+
for (String server : StringUtils.split(address, ",")) {
139+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
140141
}
141142

142143
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838

3939
package com.dtstack.flink.sql.sink.cassandra;
4040

41+
import org.apache.flink.api.common.typeinfo.TypeInformation;
42+
import org.apache.flink.api.java.tuple.Tuple;
43+
import org.apache.flink.api.java.tuple.Tuple2;
44+
import org.apache.flink.configuration.Configuration;
45+
import org.apache.flink.types.Row;
46+
4147
import com.datastax.driver.core.Cluster;
4248
import com.datastax.driver.core.ConsistencyLevel;
4349
import com.datastax.driver.core.HostDistance;
@@ -49,13 +55,10 @@
4955
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5056
import com.datastax.driver.core.policies.RetryPolicy;
5157
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
52-
import org.apache.flink.api.common.typeinfo.TypeInformation;
53-
import org.apache.flink.api.java.tuple.Tuple;
54-
import org.apache.flink.api.java.tuple.Tuple2;
55-
import org.apache.flink.configuration.Configuration;
56-
import org.apache.flink.types.Row;
58+
import org.apache.commons.lang3.StringUtils;
5759
import org.slf4j.Logger;
5860
import org.slf4j.LoggerFactory;
61+
5962
import java.io.IOException;
6063
import java.net.InetAddress;
6164
import java.sql.DriverManager;
@@ -145,9 +148,9 @@ public void open(int taskNumber, int numTasks) {
145148
//重试策略
146149
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
147150

148-
for (String server : address.split(",")) {
149-
cassandraPort = Integer.parseInt(server.split(":")[1]);
150-
serversList.add(InetAddress.getByName(server.split(":")[0]));
151+
for (String server : StringUtils.split(address, ",")) {
152+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
153+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
151154
}
152155

153156
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
2424
import com.google.common.collect.Maps;
25+
import org.apache.commons.lang3.StringUtils;
2526

2627
import java.util.List;
2728
import java.util.Map;
@@ -69,7 +70,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
6970
}
7071

7172
private Map parseProp(String propsStr){
72-
String[] strs = propsStr.trim().split("'\\s*,");
73+
String[] strs = StringUtils.split(propsStr.trim(), "'\\s*,");
7374
Map<String, Object> propMap = Maps.newHashMap();
7475
for(int i=0; i<strs.length; i++){
7576
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.java.tuple.Tuple2;
25+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
26+
import org.apache.flink.streaming.api.datastream.DataStream;
27+
import org.apache.flink.table.api.StreamQueryConfig;
28+
import org.apache.flink.table.api.Table;
29+
import org.apache.flink.table.api.TableSchema;
30+
import org.apache.flink.table.api.java.StreamTableEnvironment;
31+
import org.apache.flink.table.runtime.CRowKeySelector;
32+
import org.apache.flink.table.runtime.types.CRow;
33+
import org.apache.flink.table.runtime.types.CRowTypeInfo;
34+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
35+
import org.apache.flink.types.Row;
36+
2337
import com.dtstack.flink.sql.enums.ECacheType;
2438
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2539
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
@@ -49,19 +63,7 @@
4963
import org.apache.calcite.sql.parser.SqlParseException;
5064
import org.apache.calcite.sql.parser.SqlParserPos;
5165
import org.apache.commons.collections.CollectionUtils;
52-
import org.apache.flink.api.common.typeinfo.TypeInformation;
53-
import org.apache.flink.api.java.tuple.Tuple2;
54-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
55-
import org.apache.flink.streaming.api.datastream.DataStream;
56-
import org.apache.flink.table.api.StreamQueryConfig;
57-
import org.apache.flink.table.api.Table;
58-
import org.apache.flink.table.api.TableSchema;
59-
import org.apache.flink.table.api.java.StreamTableEnvironment;
60-
import org.apache.flink.table.runtime.CRowKeySelector;
61-
import org.apache.flink.table.runtime.types.CRow;
62-
import org.apache.flink.table.runtime.types.CRowTypeInfo;
63-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
64-
import org.apache.flink.types.Row;
66+
import org.apache.commons.lang3.StringUtils;
6567
import org.slf4j.Logger;
6668
import org.slf4j.LoggerFactory;
6769

@@ -891,9 +893,9 @@ private TypeInformation<Row> projectedTypeInfo(int[] fields, TableSchema schema)
891893
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
892894
List<String> fieldNames = new LinkedList<>();
893895
String fieldsInfo = result.getFieldsInfoStr();
894-
String[] fields = fieldsInfo.split(",");
896+
String[] fields = StringUtils.split(fieldsInfo, ",");
895897
for (int i = 0; i < fields.length; i++) {
896-
String[] filed = fields[i].split("\\s");
898+
String[] filed = StringUtils.split(fields[i], "\\s");
897899
if (filed.length < 2 || fields.length != table.getSchema().getColumnNames().length){
898900
return false;
899901
} else {

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
9292
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
9393
}
9494

95-
String[] filedInfoArr = fieldRow.split("\\s+");
95+
String[] filedInfoArr = StringUtils.split(fieldRow, "\\s+");
9696
if(filedInfoArr.length < 2 ){
9797
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
9898
}

core/src/main/java/com/dtstack/flink/sql/table/TableInfoParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.dtstack.flink.sql.util.MathUtil;
3030
import com.google.common.base.Strings;
3131
import com.google.common.collect.Maps;
32+
import org.apache.commons.lang3.StringUtils;
3233

3334
import java.util.Map;
3435
import java.util.regex.Matcher;
@@ -110,7 +111,7 @@ public TableInfo parseWithTableType(int tableType, CreateTableParser.SqlParserRe
110111
* @return
111112
*/
112113
private static boolean checkIsSideTable(String tableField){
113-
String[] fieldInfos = tableField.split(",");
114+
String[] fieldInfos = StringUtils.split(tableField, ",");
114115
for(String field : fieldInfos){
115116
Matcher matcher = SIDE_PATTERN.matcher(field.trim());
116117
if(matcher.find()){

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,14 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b
180180
return dbUrl;
181181
}
182182

183-
String[] splits = dbUrl.split("\\?");
183+
String[] splits = StringUtils.split(dbUrl, "\\?");
184184
String preStr = splits[0];
185185
Map<String, String> params = Maps.newHashMap();
186186
if(splits.length > 1){
187187
String existsParamStr = splits[1];
188-
String[] existsParams = existsParamStr.split("&");
188+
String[] existsParams = StringUtils.split(existsParamStr, "&");
189189
for(String oneParam : existsParams){
190-
String[] kv = oneParam.split("=");
190+
String[] kv = StringUtils.split(oneParam, "=");
191191
if(kv.length != 2){
192192
throw new RuntimeException("illegal dbUrl:" + dbUrl);
193193
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

23-
import com.dtstack.flink.sql.sink.IStreamSinkGener;
24-
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
25-
import com.dtstack.flink.sql.table.TargetTableInfo;
26-
import org.apache.commons.lang3.StringUtils;
2723
import org.apache.flink.api.common.typeinfo.TypeInformation;
2824
import org.apache.flink.api.java.tuple.Tuple2;
2925
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -34,8 +30,14 @@
3430
import org.apache.flink.table.sinks.RetractStreamTableSink;
3531
import org.apache.flink.table.sinks.TableSink;
3632
import org.apache.flink.types.Row;
33+
34+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
35+
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
36+
import com.dtstack.flink.sql.table.TargetTableInfo;
37+
import org.apache.commons.lang3.StringUtils;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
40+
3941
import java.net.InetAddress;
4042
import java.net.InetSocketAddress;
4143
import java.util.ArrayList;
@@ -116,7 +118,7 @@ private RichSinkFunction createEsSinkFunction(){
116118
List<InetSocketAddress> transports = new ArrayList<>();
117119

118120
for(String address : esAddressList){
119-
String[] infoArray = address.split(":");
121+
String[] infoArray = StringUtils.split(address, ":");
120122
int port = 9300;
121123
String host = infoArray[0];
122124
if(infoArray.length > 1){
@@ -167,7 +169,7 @@ public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
167169
esTableInfo = elasticsearchTableInfo;
168170
clusterName = elasticsearchTableInfo.getClusterName();
169171
String address = elasticsearchTableInfo.getAddress();
170-
String[] addr = address.split(",");
172+
String[] addr = StringUtils.split(address, ",");
171173
esAddressList = Arrays.asList(addr);
172174
index = elasticsearchTableInfo.getIndex();
173175
type = elasticsearchTableInfo.getEsType();

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/EsUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21-
import com.dtstack.flink.sql.util.DtStringUtil;
2221
import org.apache.flink.types.Row;
2322
import org.apache.flink.util.Preconditions;
2423

24+
import com.dtstack.flink.sql.util.DtStringUtil;
25+
import org.apache.commons.lang3.StringUtils;
26+
2527
import java.util.HashMap;
2628
import java.util.List;
2729
import java.util.Map;
@@ -40,7 +42,7 @@ public static Map<String, Object> rowToJsonMap(Row row, List<String> fields, Lis
4042
int i = 0;
4143
for(; i < fields.size(); ++i) {
4244
String field = fields.get(i);
43-
String[] parts = field.split("\\.");
45+
String[] parts = StringUtils.split(field, "\\.");
4446
Map<String, Object> currMap = jsonMap;
4547
for(int j = 0; j < parts.length - 1; ++j) {
4648
String key = parts[j];

0 commit comments

Comments
 (0)