Skip to content

Commit 73befd6

Browse files
解决与1.8_test_3.1.0x分支的冲突
1 parent c810a67 commit 73befd6

File tree

38 files changed

+87
-143
lines changed

38 files changed

+87
-143
lines changed

README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152-
* restore.enable:是否失败重启(默认是true)
153-
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
154-
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
155152
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
156153

157154

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import com.google.common.collect.Maps;
4444
import org.apache.calcite.sql.JoinType;
4545
import org.apache.commons.collections.CollectionUtils;
46-
import org.apache.commons.lang3.StringUtils;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
4948

@@ -224,9 +223,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
224223
//重试策略
225224
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
226225

227-
for (String server : StringUtils.split(address, ",")) {
228-
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
229-
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
226+
for (String server : address.split(",")) {
227+
cassandraPort = Integer.parseInt(server.split(":")[1]);
228+
serversList.add(InetAddress.getByName(server.split(":")[0]));
230229
}
231230

232231
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -280,7 +279,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
280279
//load data from table
281280
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
282281
ResultSet resultSet = session.execute(sql);
283-
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
282+
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
284283
for (com.datastax.driver.core.Row row : resultSet) {
285284
Map<String, Object> oneRow = Maps.newHashMap();
286285
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import com.google.common.util.concurrent.Futures;
5252
import com.google.common.util.concurrent.ListenableFuture;
5353
import io.vertx.core.json.JsonArray;
54-
import org.apache.commons.lang3.StringUtils;
5554
import org.slf4j.Logger;
5655
import org.slf4j.LoggerFactory;
5756

@@ -135,9 +134,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
135134
//重试策略
136135
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
137136

138-
for (String server : StringUtils.split(address, ",")) {
139-
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140-
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
137+
for (String server : address.split(",")) {
138+
cassandraPort = Integer.parseInt(server.split(":")[1]);
139+
serversList.add(InetAddress.getByName(server.split(":")[0]));
141140
}
142141

143142
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5656
import com.datastax.driver.core.policies.RetryPolicy;
5757
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
58-
import org.apache.commons.lang3.StringUtils;
5958
import org.slf4j.Logger;
6059
import org.slf4j.LoggerFactory;
6160

@@ -148,9 +147,9 @@ public void open(int taskNumber, int numTasks) {
148147
//重试策略
149148
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
150149

151-
for (String server : StringUtils.split(address, ",")) {
152-
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
153-
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
150+
for (String server : address.split(",")) {
151+
cassandraPort = Integer.parseInt(server.split(":")[1]);
152+
serversList.add(InetAddress.getByName(server.split(":")[0]));
154153
}
155154

156155
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -219,7 +218,7 @@ private void insertWrite(Row row) {
219218
}
220219
} catch (Exception e) {
221220
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
222-
LOG.error("record insert failed ..", row.toString().substring(0, 100));
221+
LOG.error("record insert failed, total dirty num:{}, current record:{}", outDirtyRecords.getCount(), row.toString());
223222
LOG.error("", e);
224223
}
225224

core/pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
<calcite.server.version>1.16.0</calcite.server.version>
2121
<jackson.version>2.7.9</jackson.version>
2222
<guava.version>19.0</guava.version>
23+
<logback.version>1.1.7</logback.version>
2324
</properties>
2425

2526
<dependencies>
@@ -121,10 +122,11 @@
121122
<artifactId>junit</artifactId>
122123
<version>4.12</version>
123124
</dependency>
125+
124126
<dependency>
125127
<groupId>ch.qos.logback</groupId>
126128
<artifactId>logback-classic</artifactId>
127-
<version>1.1.7</version>
129+
<version>${logback.version}</version>
128130
</dependency>
129131

130132
</dependencies>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@
2121

2222

2323

24+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25+
2426
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
2527
import com.dtstack.flink.sql.exec.ParamsInfo;
26-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

30-
import ch.qos.logback.classic.Level;
31-
import ch.qos.logback.classic.LoggerContext;
3231

3332
/**
3433
* Date: 2018/6/26
@@ -45,10 +44,4 @@ public static void main(String[] args) throws Exception {
4544
env.execute(paramsInfo.getName());
4645
LOG.info("program {} execution success", paramsInfo.getName());
4746
}
48-
private static void setLogLevel(String level){
49-
LoggerContext loggerContext= (LoggerContext) LoggerFactory.getILoggerFactory();
50-
//设置全局日志级别
51-
ch.qos.logback.classic.Logger logger = loggerContext.getLogger("root");
52-
logger.setLevel(Level.toLevel(level, Level.INFO));
53-
}
5447
}

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,12 @@ public class ConfigConstrant {
6363
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
6464
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
6565

66-
public static final String RESTOREENABLE = "restore.enable";
67-
6866

6967
// restart plocy
7068
public static final int failureRate = 3;
7169

72-
public static final String FAILUREINTERVAL = "failure.interval"; //min
70+
public static final int failureInterval = 6; //min
7371

74-
public static final String DELAYINTERVAL= "delay.interval"; //sec
72+
public static final int delayInterval = 10; //sec
7573

7674
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@
1818

1919
package com.dtstack.flink.sql.environment;
2020

21-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
22-
import com.dtstack.flink.sql.enums.EStateBackend;
23-
import com.dtstack.flink.sql.util.MathUtil;
24-
import com.dtstack.flink.sql.util.PropertiesUtils;
25-
import org.apache.commons.lang3.BooleanUtils;
26-
import org.apache.commons.lang3.StringUtils;
2721
import org.apache.flink.api.common.ExecutionConfig;
2822
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2923
import org.apache.flink.api.common.time.Time;
@@ -40,6 +34,13 @@
4034
import org.apache.flink.table.api.StreamQueryConfig;
4135
import org.apache.flink.table.api.java.StreamTableEnvironment;
4236

37+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
38+
import com.dtstack.flink.sql.enums.EStateBackend;
39+
import com.dtstack.flink.sql.util.MathUtil;
40+
import com.dtstack.flink.sql.util.PropertiesUtils;
41+
import org.apache.commons.lang3.BooleanUtils;
42+
import org.apache.commons.lang3.StringUtils;
43+
4344
import java.io.IOException;
4445
import java.lang.reflect.InvocationTargetException;
4546
import java.lang.reflect.Method;
@@ -101,15 +102,11 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
101102
}
102103
});
103104

104-
if(isRestore(confProperties).get()){
105-
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
106-
ConfigConstrant.failureRate,
107-
Time.of(getFailureInterval(confProperties).get(), TimeUnit.MINUTES),
108-
Time.of(getDelayInterval(confProperties).get(), TimeUnit.SECONDS)
109-
));
110-
} else {
111-
streamEnv.setRestartStrategy(RestartStrategies.noRestart());
112-
}
105+
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
106+
ConfigConstrant.failureRate,
107+
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
108+
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
109+
));
113110

114111
// checkpoint config
115112
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
@@ -167,20 +164,6 @@ public static Optional<Long> getAutoWatermarkInterval(Properties properties) {
167164
return StringUtils.isNotBlank(autoWatermarkInterval) ? Optional.of(Long.valueOf(autoWatermarkInterval)) : Optional.empty();
168165
}
169166

170-
public static Optional<Boolean> isRestore(Properties properties){
171-
String restoreEnable = properties.getProperty(ConfigConstrant.RESTOREENABLE, "true");
172-
return Optional.of(Boolean.valueOf(restoreEnable));
173-
}
174-
175-
public static Optional<Integer> getDelayInterval(Properties properties){
176-
String delayInterval = properties.getProperty(ConfigConstrant.DELAYINTERVAL, "10");
177-
return Optional.of(Integer.valueOf(delayInterval));
178-
}
179-
public static Optional<Integer> getFailureInterval(Properties properties){
180-
String failureInterval = properties.getProperty(ConfigConstrant.FAILUREINTERVAL, "6");
181-
return Optional.of(Integer.valueOf(failureInterval));
182-
}
183-
184167
/**
185168
* #ProcessingTime(默认), IngestionTime, EventTime
186169
* @param properties

core/src/main/java/com/dtstack/flink/sql/exec/ApiResult.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66

7+
import java.util.UUID;
8+
79
/**
810
* API调用结果返回
911
* Date: 2020/2/24

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.factory.DTThreadFactory;
24-
import org.apache.calcite.sql.JoinType;
2523
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2624
import org.apache.flink.configuration.Configuration;
2725
import org.apache.flink.table.runtime.types.CRow;
28-
import org.apache.flink.types.Row;
29-
import org.apache.flink.util.Collector;
26+
27+
import com.dtstack.flink.sql.factory.DTThreadFactory;
3028

3129
import java.sql.SQLException;
3230
import java.util.concurrent.Executors;
@@ -67,13 +65,4 @@ public void open(Configuration parameters) throws Exception {
6765
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
6866
}
6967

70-
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out){
71-
if(sideInput == null && sideInfo.getJoinType() != JoinType.LEFT){
72-
return;
73-
}
74-
75-
Row row = fillData(value.row(), sideInput);
76-
out.collect(new CRow(row, value.change()));
77-
}
78-
7968
}

0 commit comments

Comments
 (0)