Skip to content

Commit bfc1f0e

Browse files
解决elasticsearch-side分支与test_1.8_3.1.0x的冲突
1 parent c50bd17 commit bfc1f0e

File tree

35 files changed

+78
-123
lines changed

35 files changed

+78
-123
lines changed

README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152-
* restore.enable:是否失败重启(默认是true)
153-
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
154-
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
155152
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
156153

157154

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import com.google.common.collect.Maps;
4444
import org.apache.calcite.sql.JoinType;
4545
import org.apache.commons.collections.CollectionUtils;
46-
import org.apache.commons.lang3.StringUtils;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
4948

@@ -224,9 +223,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
224223
//重试策略
225224
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
226225

227-
for (String server : StringUtils.split(address, ",")) {
228-
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
229-
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
226+
for (String server : address.split(",")) {
227+
cassandraPort = Integer.parseInt(server.split(":")[1]);
228+
serversList.add(InetAddress.getByName(server.split(":")[0]));
230229
}
231230

232231
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -280,7 +279,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
280279
//load data from table
281280
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
282281
ResultSet resultSet = session.execute(sql);
283-
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
282+
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
284283
for (com.datastax.driver.core.Row row : resultSet) {
285284
Map<String, Object> oneRow = Maps.newHashMap();
286285
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import com.google.common.util.concurrent.Futures;
5252
import com.google.common.util.concurrent.ListenableFuture;
5353
import io.vertx.core.json.JsonArray;
54-
import org.apache.commons.lang3.StringUtils;
5554
import org.slf4j.Logger;
5655
import org.slf4j.LoggerFactory;
5756

@@ -135,9 +134,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
135134
//重试策略
136135
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
137136

138-
for (String server : StringUtils.split(address, ",")) {
139-
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140-
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
137+
for (String server : address.split(",")) {
138+
cassandraPort = Integer.parseInt(server.split(":")[1]);
139+
serversList.add(InetAddress.getByName(server.split(":")[0]));
141140
}
142141

143142
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5656
import com.datastax.driver.core.policies.RetryPolicy;
5757
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
58-
import org.apache.commons.lang3.StringUtils;
5958
import org.slf4j.Logger;
6059
import org.slf4j.LoggerFactory;
6160

@@ -148,9 +147,9 @@ public void open(int taskNumber, int numTasks) {
148147
//重试策略
149148
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
150149

151-
for (String server : StringUtils.split(address, ",")) {
152-
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
153-
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
150+
for (String server : address.split(",")) {
151+
cassandraPort = Integer.parseInt(server.split(":")[1]);
152+
serversList.add(InetAddress.getByName(server.split(":")[0]));
154153
}
155154

156155
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,12 @@ public class ConfigConstrant {
6363
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
6464
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
6565

66-
public static final String RESTOREENABLE = "restore.enable";
67-
6866

6967
// restart plocy
7068
public static final int failureRate = 3;
7169

72-
public static final String FAILUREINTERVAL = "failure.interval"; //min
70+
public static final int failureInterval = 6; //min
7371

74-
public static final String DELAYINTERVAL= "delay.interval"; //sec
72+
public static final int delayInterval = 10; //sec
7573

7674
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@
1818

1919
package com.dtstack.flink.sql.environment;
2020

21-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
22-
import com.dtstack.flink.sql.enums.EStateBackend;
23-
import com.dtstack.flink.sql.util.MathUtil;
24-
import com.dtstack.flink.sql.util.PropertiesUtils;
25-
import org.apache.commons.lang3.BooleanUtils;
26-
import org.apache.commons.lang3.StringUtils;
2721
import org.apache.flink.api.common.ExecutionConfig;
2822
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2923
import org.apache.flink.api.common.time.Time;
@@ -40,6 +34,13 @@
4034
import org.apache.flink.table.api.StreamQueryConfig;
4135
import org.apache.flink.table.api.java.StreamTableEnvironment;
4236

37+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
38+
import com.dtstack.flink.sql.enums.EStateBackend;
39+
import com.dtstack.flink.sql.util.MathUtil;
40+
import com.dtstack.flink.sql.util.PropertiesUtils;
41+
import org.apache.commons.lang3.BooleanUtils;
42+
import org.apache.commons.lang3.StringUtils;
43+
4344
import java.io.IOException;
4445
import java.lang.reflect.InvocationTargetException;
4546
import java.lang.reflect.Method;
@@ -101,15 +102,11 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
101102
}
102103
});
103104

104-
if(isRestore(confProperties).get()){
105-
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
106-
ConfigConstrant.failureRate,
107-
Time.of(getFailureInterval(confProperties).get(), TimeUnit.MINUTES),
108-
Time.of(getDelayInterval(confProperties).get(), TimeUnit.SECONDS)
109-
));
110-
} else {
111-
streamEnv.setRestartStrategy(RestartStrategies.noRestart());
112-
}
105+
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
106+
ConfigConstrant.failureRate,
107+
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
108+
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
109+
));
113110

114111
// checkpoint config
115112
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
@@ -167,20 +164,6 @@ public static Optional<Long> getAutoWatermarkInterval(Properties properties) {
167164
return StringUtils.isNotBlank(autoWatermarkInterval) ? Optional.of(Long.valueOf(autoWatermarkInterval)) : Optional.empty();
168165
}
169166

170-
public static Optional<Boolean> isRestore(Properties properties){
171-
String restoreEnable = properties.getProperty(ConfigConstrant.RESTOREENABLE, "true");
172-
return Optional.of(Boolean.valueOf(restoreEnable));
173-
}
174-
175-
public static Optional<Integer> getDelayInterval(Properties properties){
176-
String delayInterval = properties.getProperty(ConfigConstrant.DELAYINTERVAL, "10");
177-
return Optional.of(Integer.valueOf(delayInterval));
178-
}
179-
public static Optional<Integer> getFailureInterval(Properties properties){
180-
String failureInterval = properties.getProperty(ConfigConstrant.FAILUREINTERVAL, "6");
181-
return Optional.of(Integer.valueOf(failureInterval));
182-
}
183-
184167
/**
185168
* #ProcessingTime(默认), IngestionTime, EventTime
186169
* @param properties

core/src/main/java/com/dtstack/flink/sql/table/TableInfoParser.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.dtstack.flink.sql.util.MathUtil;
3030
import com.google.common.base.Strings;
3131
import com.google.common.collect.Maps;
32-
import org.apache.commons.lang3.StringUtils;
3332

3433
import java.util.Map;
3534
import java.util.regex.Matcher;
@@ -111,7 +110,7 @@ public TableInfo parseWithTableType(int tableType, CreateTableParser.SqlParserRe
111110
* @return
112111
*/
113112
private static boolean checkIsSideTable(String tableField){
114-
String[] fieldInfos = StringUtils.split(tableField, ",");
113+
String[] fieldInfos = tableField.split(",");
115114
for(String field : fieldInfos){
116115
Matcher matcher = SIDE_PATTERN.matcher(field.trim());
117116
if(matcher.find()){

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,14 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b
180180
return dbUrl;
181181
}
182182

183-
String[] splits = StringUtils.split(dbUrl, "\\?");
183+
String[] splits = dbUrl.split("\\?");
184184
String preStr = splits[0];
185185
Map<String, String> params = Maps.newHashMap();
186186
if(splits.length > 1){
187187
String existsParamStr = splits[1];
188-
String[] existsParams = StringUtils.split(existsParamStr, "&");
188+
String[] existsParams = existsParamStr.split("&");
189189
for(String oneParam : existsParams){
190-
String[] kv = StringUtils.split(oneParam, "=");
190+
String[] kv = oneParam.split("=");
191191
if(kv.length != 2){
192192
throw new RuntimeException("illegal dbUrl:" + dbUrl);
193193
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private RichSinkFunction createEsSinkFunction(){
118118
List<InetSocketAddress> transports = new ArrayList<>();
119119

120120
for(String address : esAddressList){
121-
String[] infoArray = StringUtils.split(address, ":");
121+
String[] infoArray = address.split(":");
122122
int port = 9300;
123123
String host = infoArray[0];
124124
if(infoArray.length > 1){
@@ -169,7 +169,7 @@ public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
169169
esTableInfo = elasticsearchTableInfo;
170170
clusterName = elasticsearchTableInfo.getClusterName();
171171
String address = elasticsearchTableInfo.getAddress();
172-
String[] addr = StringUtils.split(address, ",");
172+
String[] addr = address.split(",");
173173
esAddressList = Arrays.asList(addr);
174174
index = elasticsearchTableInfo.getIndex();
175175
type = elasticsearchTableInfo.getEsType();

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/EsUtil.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.util.Preconditions;
2323

2424
import com.dtstack.flink.sql.util.DtStringUtil;
25-
import org.apache.commons.lang3.StringUtils;
2625

2726
import java.util.HashMap;
2827
import java.util.List;
@@ -42,7 +41,7 @@ public static Map<String, Object> rowToJsonMap(Row row, List<String> fields, Lis
4241
int i = 0;
4342
for(; i < fields.size(); ++i) {
4443
String field = fields.get(i);
45-
String[] parts = StringUtils.split(field, "\\.");
44+
String[] parts = field.split("\\.");
4645
Map<String, Object> currMap = jsonMap;
4746
for(int j = 0; j < parts.length - 1; ++j) {
4847
String key = parts[j];

0 commit comments

Comments
 (0)