Skip to content

Commit c99b326

Browse files
author
yinxi
committed
Merge branch '1.8_test_3.1.0x_mergeHotfix21725' into '1.8_test_3.10.x'
将String.split()替换成StringUtils.split() See merge request !245
2 parents 3856dc4 + fdc614b commit c99b326

File tree

31 files changed

+89
-61
lines changed

31 files changed

+89
-61
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.google.common.collect.Maps;
4444
import org.apache.calcite.sql.JoinType;
4545
import org.apache.commons.collections.CollectionUtils;
46+
import org.apache.commons.lang3.StringUtils;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

@@ -223,9 +224,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
223224
//重试策略
224225
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
225226

226-
for (String server : address.split(",")) {
227-
cassandraPort = Integer.parseInt(server.split(":")[1]);
228-
serversList.add(InetAddress.getByName(server.split(":")[0]));
227+
for (String server : StringUtils.split(address, ",")) {
228+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
229+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
229230
}
230231

231232
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -279,7 +280,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
279280
//load data from table
280281
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
281282
ResultSet resultSet = session.execute(sql);
282-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
283+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
283284
for (com.datastax.driver.core.Row row : resultSet) {
284285
Map<String, Object> oneRow = Maps.newHashMap();
285286
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.google.common.util.concurrent.Futures;
5252
import com.google.common.util.concurrent.ListenableFuture;
5353
import io.vertx.core.json.JsonArray;
54+
import org.apache.commons.lang3.StringUtils;
5455
import org.slf4j.Logger;
5556
import org.slf4j.LoggerFactory;
5657

@@ -134,9 +135,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
134135
//重试策略
135136
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
136137

137-
for (String server : address.split(",")) {
138-
cassandraPort = Integer.parseInt(server.split(":")[1]);
139-
serversList.add(InetAddress.getByName(server.split(":")[0]));
138+
for (String server : StringUtils.split(address, ",")) {
139+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
140141
}
141142

142143
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5656
import com.datastax.driver.core.policies.RetryPolicy;
5757
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
58+
import org.apache.commons.lang3.StringUtils;
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
6061

@@ -147,9 +148,9 @@ public void open(int taskNumber, int numTasks) {
147148
//重试策略
148149
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
149150

150-
for (String server : address.split(",")) {
151-
cassandraPort = Integer.parseInt(server.split(":")[1]);
152-
serversList.add(InetAddress.getByName(server.split(":")[0]));
151+
for (String server : StringUtils.split(address, ",")) {
152+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
153+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
153154
}
154155

155156
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.calcite.sql.parser.SqlParseException;
6464
import org.apache.calcite.sql.parser.SqlParserPos;
6565
import org.apache.commons.collections.CollectionUtils;
66+
import org.apache.commons.lang3.StringUtils;
6667
import org.slf4j.Logger;
6768
import org.slf4j.LoggerFactory;
6869

@@ -892,7 +893,7 @@ private TypeInformation<Row> projectedTypeInfo(int[] fields, TableSchema schema)
892893
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
893894
List<String> fieldNames = new LinkedList<>();
894895
String fieldsInfo = result.getFieldsInfoStr();
895-
String[] fields = fieldsInfo.split(",");
896+
String[] fields = StringUtils.split(fieldsInfo, ",");
896897
for (int i = 0; i < fields.length; i++) {
897898
String[] filed = fields[i].split("\\s");
898899
if (filed.length < 2 || fields.length != table.getSchema().getColumnNames().length){

core/src/main/java/com/dtstack/flink/sql/table/TableInfoParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.dtstack.flink.sql.util.MathUtil;
3030
import com.google.common.base.Strings;
3131
import com.google.common.collect.Maps;
32+
import org.apache.commons.lang3.StringUtils;
3233

3334
import java.util.Map;
3435
import java.util.regex.Matcher;
@@ -110,7 +111,7 @@ public TableInfo parseWithTableType(int tableType, CreateTableParser.SqlParserRe
110111
* @return
111112
*/
112113
private static boolean checkIsSideTable(String tableField){
113-
String[] fieldInfos = tableField.split(",");
114+
String[] fieldInfos = StringUtils.split(tableField, ",");
114115
for(String field : fieldInfos){
115116
Matcher matcher = SIDE_PATTERN.matcher(field.trim());
116117
if(matcher.find()){

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,9 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b
185185
Map<String, String> params = Maps.newHashMap();
186186
if(splits.length > 1){
187187
String existsParamStr = splits[1];
188-
String[] existsParams = existsParamStr.split("&");
188+
String[] existsParams = StringUtils.split(existsParamStr, "&");
189189
for(String oneParam : existsParams){
190-
String[] kv = oneParam.split("=");
190+
String[] kv = StringUtils.split(oneParam, "=");
191191
if(kv.length != 2){
192192
throw new RuntimeException("illegal dbUrl:" + dbUrl);
193193
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private RichSinkFunction createEsSinkFunction(){
118118
List<InetSocketAddress> transports = new ArrayList<>();
119119

120120
for(String address : esAddressList){
121-
String[] infoArray = address.split(":");
121+
String[] infoArray = StringUtils.split(address, ":");
122122
int port = 9300;
123123
String host = infoArray[0];
124124
if(infoArray.length > 1){
@@ -169,7 +169,7 @@ public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
169169
esTableInfo = elasticsearchTableInfo;
170170
clusterName = elasticsearchTableInfo.getClusterName();
171171
String address = elasticsearchTableInfo.getAddress();
172-
String[] addr = address.split(",");
172+
String[] addr = StringUtils.split(address, ",");
173173
esAddressList = Arrays.asList(addr);
174174
index = elasticsearchTableInfo.getIndex();
175175
type = elasticsearchTableInfo.getEsType();

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public HbaseOutputFormat finish() {
281281
String[] columns = keySet.toArray(new String[keySet.size()]);
282282
for (int i = 0; i < columns.length; ++i) {
283283
String col = columns[i];
284-
String[] part = col.split(":");
284+
String[] part = StringUtils.split(col, ":");
285285
families[i] = part[0];
286286
qualifiers[i] = part[1];
287287
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.table.TableInfo;
2727
import com.dtstack.flink.sql.util.DtStringUtil;
2828
import com.dtstack.flink.sql.util.MathUtil;
29+
import org.apache.commons.lang3.StringUtils;
2930

3031
import java.util.LinkedHashMap;
3132
import java.util.List;
@@ -68,7 +69,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
6869
hbaseTableInfo.setHost((String) props.get(HBASE_ZOOKEEPER_QUORUM.toLowerCase()));
6970
hbaseTableInfo.setParent((String)props.get(ZOOKEEPER_PARENT.toLowerCase()));
7071
String rk = (String) props.get(HBASE_ROWKEY.toLowerCase());
71-
hbaseTableInfo.setRowkey(rk.split(","));
72+
hbaseTableInfo.setRowkey(StringUtils.split(rk, ","));
7273
String updateMode = (String) props.getOrDefault(UPDATE_KEY, EUpdateMode.APPEND.name());
7374
hbaseTableInfo.setUpdateMode(updateMode);
7475
return hbaseTableInfo;
@@ -96,7 +97,7 @@ public void parseFieldsInfo(String fieldsInfo, HbaseTableInfo tableInfo){
9697
String fieldName = String.join(" ", filedNameArr);
9798
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
9899
Class fieldClass = dbTypeConvertToJavaType(fieldType);
99-
String[] columnFamily = fieldName.trim().split(":");
100+
String[] columnFamily = StringUtils.split(fieldName.trim(), ":");
100101
columnFamilies.put(fieldName.trim(),columnFamily[1]);
101102
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
102103
tableInfo.addField(columnFamily[1]);

impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
2424
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import org.apache.commons.lang3.StringUtils;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

@@ -93,9 +94,9 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9394
impalaSideTableInfo.setEnablePartition(enablePartition);
9495
if (enablePartition) {
9596
String partitionfieldsStr = MathUtil.getString(props.get(ImpalaSideTableInfo.PARTITIONFIELDS_KEY.toLowerCase()));
96-
impalaSideTableInfo.setPartitionfields(partitionfieldsStr.split(","));
97+
impalaSideTableInfo.setPartitionfields(StringUtils.split(partitionfieldsStr, ","));
9798
String partitionfieldTypesStr = MathUtil.getString(props.get(ImpalaSideTableInfo.PARTITIONFIELDTYPES_KEY.toLowerCase()));
98-
impalaSideTableInfo.setPartitionFieldTypes(partitionfieldTypesStr.split(","));
99+
impalaSideTableInfo.setPartitionFieldTypes(StringUtils.split(partitionfieldTypesStr, ","));
99100
String partitionfieldValuesStr = MathUtil.getString(props.get(ImpalaSideTableInfo.PARTITIONVALUES_KEY.toLowerCase()));
100101
impalaSideTableInfo.setPartitionValues(setPartitionFieldValues(partitionfieldValuesStr));
101102
}

0 commit comments

Comments
 (0)