Skip to content

Commit 4bdbe90

Browse files
committed
Merge branch 'v1.8.0_dev' into feat_1.8_codereview
# Conflicts: # cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java # core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java # core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java # core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java # hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/enums/EReplaceType.java # hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java # kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java # mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java # mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java # mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AbstractUpsertWriter.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java # redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java
2 parents 02f2c37 + 1bdc3eb commit 4bdbe90

File tree

108 files changed

+4918
-959
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+4918
-959
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152+
* restore.enable:是否失败重启(默认是true)
153+
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
154+
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
155+
* logLevel: 日志级别动态配置(默认info)
152156
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
153157

154158

@@ -181,6 +185,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
181185
* 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
182186
* 必选:否
183187
* 默认值:false
188+
184189

185190
## 2 结构
186191
### 2.1 源表插件

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
2127
import com.datastax.driver.core.Cluster;
2228
import com.datastax.driver.core.ConsistencyLevel;
2329
import com.datastax.driver.core.HostDistance;
@@ -33,15 +39,11 @@
3339
import com.dtstack.flink.sql.side.JoinInfo;
3440
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3541
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3942
import com.google.common.collect.Lists;
4043
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.runtime.types.CRow;
42-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
43-
import org.apache.flink.types.Row;
44-
import org.apache.flink.util.Collector;
44+
import org.apache.calcite.sql.JoinType;
45+
import org.apache.commons.collections.CollectionUtils;
46+
import org.apache.commons.lang3.StringUtils;
4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
4749

@@ -220,9 +222,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
220222
//重试策略
221223
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
222224

223-
for (String server : address.split(",")) {
224-
cassandraPort = Integer.parseInt(server.split(":")[1]);
225-
serversList.add(InetAddress.getByName(server.split(":")[0]));
225+
for (String server : StringUtils.split(address, ",")) {
226+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
227+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
226228
}
227229

228230
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -276,7 +278,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
276278
//load data from table
277279
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
278280
ResultSet resultSet = session.execute(sql);
279-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
281+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
280282
for (com.datastax.driver.core.Row row : resultSet) {
281283
Map<String, Object> oneRow = Maps.newHashMap();
282284
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25+
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
27+
import org.apache.flink.types.Row;
28+
2229
import com.datastax.driver.core.Cluster;
2330
import com.datastax.driver.core.ConsistencyLevel;
2431
import com.datastax.driver.core.HostDistance;
@@ -38,18 +45,13 @@
3845
import com.dtstack.flink.sql.side.cache.CacheObj;
3946
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4047
import com.google.common.base.Function;
48+
import com.google.common.collect.Lists;
4149
import com.google.common.util.concurrent.AsyncFunction;
4250
import com.google.common.util.concurrent.FutureCallback;
4351
import com.google.common.util.concurrent.Futures;
4452
import com.google.common.util.concurrent.ListenableFuture;
4553
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.runtime.types.CRow;
51-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
52-
import org.apache.flink.types.Row;
54+
import org.apache.commons.lang3.StringUtils;
5355
import org.slf4j.Logger;
5456
import org.slf4j.LoggerFactory;
5557

@@ -133,9 +135,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
133135
//重试策略
134136
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
135137

136-
for (String server : address.split(",")) {
137-
cassandraPort = Integer.parseInt(server.split(":")[1]);
138-
serversList.add(InetAddress.getByName(server.split(":")[0]));
138+
for (String server : StringUtils.split(address, ",")) {
139+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
139141
}
140142

141143
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838

3939
package com.dtstack.flink.sql.sink.cassandra;
4040

41+
import org.apache.flink.api.common.typeinfo.TypeInformation;
42+
import org.apache.flink.api.java.tuple.Tuple;
43+
import org.apache.flink.api.java.tuple.Tuple2;
44+
import org.apache.flink.configuration.Configuration;
45+
import org.apache.flink.types.Row;
46+
4147
import com.datastax.driver.core.Cluster;
4248
import com.datastax.driver.core.ConsistencyLevel;
4349
import com.datastax.driver.core.HostDistance;
@@ -48,14 +54,11 @@
4854
import com.datastax.driver.core.SocketOptions;
4955
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5056
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
52-
import org.apache.flink.api.common.typeinfo.TypeInformation;
53-
import org.apache.flink.api.java.tuple.Tuple;
54-
import org.apache.flink.api.java.tuple.Tuple2;
55-
import org.apache.flink.configuration.Configuration;
56-
import org.apache.flink.types.Row;
57+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
58+
import org.apache.commons.lang3.StringUtils;
5759
import org.slf4j.Logger;
5860
import org.slf4j.LoggerFactory;
61+
5962
import java.io.IOException;
6063
import java.net.InetAddress;
6164
import java.sql.DriverManager;
@@ -145,9 +148,9 @@ public void open(int taskNumber, int numTasks) {
145148
//重试策略
146149
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
147150

148-
for (String server : address.split(",")) {
149-
cassandraPort = Integer.parseInt(server.split(":")[1]);
150-
serversList.add(InetAddress.getByName(server.split(":")[0]));
151+
for (String server : StringUtils.split(address, ",")) {
152+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
153+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
151154
}
152155

153156
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -216,7 +219,7 @@ private void insertWrite(Row row) {
216219
}
217220
} catch (Exception e) {
218221
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
219-
LOG.error("record insert failed ..", row.toString().substring(0, 100));
222+
LOG.error("record insert failed, total dirty num:{}, current record:{}", outDirtyRecords.getCount(), row.toString());
220223
LOG.error("", e);
221224
}
222225

clickhouse/clickhouse-side/clickhouse-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<configuration>
4242
<artifactSet>
4343
<excludes>
44-
44+
<exclude>org.slf4j</exclude>
4545
</excludes>
4646
</artifactSet>
4747
<filters>

clickhouse/clickhouse-side/clickhouse-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<configuration>
4242
<artifactSet>
4343
<excludes>
44-
44+
<exclude>org.slf4j</exclude>
4545
</excludes>
4646
</artifactSet>
4747
<filters>

clickhouse/clickhouse-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
<configuration>
4141
<artifactSet>
4242
<excludes>
43-
43+
<exclude>org.slf4j</exclude>
4444
</excludes>
4545
</artifactSet>
4646
<filters>

core/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
<calcite.server.version>1.16.0</calcite.server.version>
2121
<jackson.version>2.7.9</jackson.version>
2222
<guava.version>19.0</guava.version>
23+
<logger.tool.version>1.0.0-SNAPSHOT</logger.tool.version>
2324
</properties>
2425

2526
<dependencies>
@@ -122,11 +123,10 @@
122123
<version>4.12</version>
123124
</dependency>
124125
<dependency>
125-
<groupId>ch.qos.logback</groupId>
126-
<artifactId>logback-classic</artifactId>
127-
<version>1.1.7</version>
126+
<groupId>com.aiweiergou</groupId>
127+
<artifactId>tools-logger</artifactId>
128+
<version>${logger.tool.version}</version>
128129
</dependency>
129-
130130
</dependencies>
131131

132132
<build>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
3130
/**
3231
* Date: 2018/6/26
3332
* Company: www.dtstack.com
@@ -39,6 +38,7 @@ public class Main {
3938

4039
public static void main(String[] args) throws Exception {
4140
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
41+
ExecuteProcessHelper.setLogLevel(paramsInfo);
4242
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4343
env.execute(paramsInfo.getName());
4444
LOG.info("program {} execution success", paramsInfo.getName());

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public class ConfigConstrant {
6363
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
6464
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
6565

66+
public static final String RESTOREENABLE = "restore.enable";
67+
68+
public static final String LOG_LEVEL_KEY = "logLevel";
69+
6670

6771
// restart plocy
6872
public static final int FAILUEE_RATE = 3;
@@ -71,4 +75,9 @@ public class ConfigConstrant {
7175

7276
public static final int DELAY_INTERVAL = 10; //sec
7377

78+
public static final String FAILUREINTERVAL = "failure.interval"; //min
79+
80+
public static final String DELAYINTERVAL= "delay.interval"; //sec
81+
82+
7483
}

0 commit comments

Comments
 (0)