Skip to content

Commit 7302b16

Browse files
committed
Merge remote-tracking branch 'origin/v1.9.0_dev' into v1.10.0_dev
# Conflicts: # core/pom.xml # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java # core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java # elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/pom.xml # elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/pom.xml # elasticsearch6/pom.xml # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java # kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java # kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java
2 parents b55d97f + bbc22fe commit 7302b16

File tree

368 files changed

+12738
-9107
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

368 files changed

+12738
-9107
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152+
* restore.enable:是否失败重启(默认是true)
153+
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
154+
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
155+
* logLevel: 日志级别动态配置(默认info)
152156
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
153157

154158

@@ -181,6 +185,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
181185
* 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
182186
* 必选:否
183187
* 默认值:false
188+
184189

185190
## 2 结构
186191
### 2.1 源表插件

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.types.Row;
24+
import org.apache.flink.util.Collector;
25+
2126
import com.datastax.driver.core.Cluster;
2227
import com.datastax.driver.core.ConsistencyLevel;
2328
import com.datastax.driver.core.HostDistance;
@@ -28,19 +33,16 @@
2833
import com.datastax.driver.core.SocketOptions;
2934
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3035
import com.datastax.driver.core.policies.RetryPolicy;
31-
import com.dtstack.flink.sql.side.AllReqRow;
36+
import com.dtstack.flink.sql.side.BaseAllReqRow;
3237
import com.dtstack.flink.sql.side.FieldInfo;
3338
import com.dtstack.flink.sql.side.JoinInfo;
34-
import com.dtstack.flink.sql.side.SideTableInfo;
39+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3540
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3941
import com.google.common.collect.Lists;
4042
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
42-
import org.apache.flink.types.Row;
43-
import org.apache.flink.util.Collector;
43+
import org.apache.calcite.sql.JoinType;
44+
import org.apache.commons.collections.CollectionUtils;
45+
import org.apache.commons.lang3.StringUtils;
4446
import org.slf4j.Logger;
4547
import org.slf4j.LoggerFactory;
4648

@@ -59,14 +61,12 @@
5961
*
6062
* @author xuqianjin
6163
*/
62-
public class CassandraAllReqRow extends AllReqRow {
64+
public class CassandraAllReqRow extends BaseAllReqRow {
6365

6466
private static final long serialVersionUID = 54015343561288219L;
6567

6668
private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);
6769

68-
private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";
69-
7070
private static final int CONN_RETRY_NUM = 3;
7171

7272
private static final int FETCH_SIZE = 1000;
@@ -76,7 +76,7 @@ public class CassandraAllReqRow extends AllReqRow {
7676

7777
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
7878

79-
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
79+
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8080
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8181
}
8282

@@ -124,14 +124,14 @@ protected void reloadCache() {
124124

125125

126126
@Override
127-
public void flatMap(Row value, Collector<Row> out) throws Exception {
127+
public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> out) throws Exception {
128128
List<Object> inputParams = Lists.newArrayList();
129129
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
130-
Object equalObj = value.getField(conValIndex);
130+
Object equalObj = input.f1.getField(conValIndex);
131131
if (equalObj == null) {
132-
if(sideInfo.getJoinType() == JoinType.LEFT){
133-
Row data = fillData(value, null);
134-
out.collect(data);
132+
if (sideInfo.getJoinType() == JoinType.LEFT) {
133+
Row data = fillData(input.f1, null);
134+
out.collect(Tuple2.of(input.f0, data));
135135
}
136136
return;
137137
}
@@ -143,8 +143,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
143143
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
144144
if (CollectionUtils.isEmpty(cacheList)) {
145145
if (sideInfo.getJoinType() == JoinType.LEFT) {
146-
Row row = fillData(value, null);
147-
out.collect(row);
146+
Row row = fillData(input.f1, null);
147+
out.collect(Tuple2.of(input.f0, row));
148148
} else {
149149
return;
150150
}
@@ -153,7 +153,7 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
153153
}
154154

155155
for (Map<String, Object> one : cacheList) {
156-
out.collect(fillData(value, one));
156+
out.collect(Tuple2.of(input.f0, fillData(input.f1, one)));
157157
}
158158

159159
}
@@ -216,9 +216,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
216216
//重试策略
217217
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
218218

219-
for (String server : address.split(",")) {
220-
cassandraPort = Integer.parseInt(server.split(":")[1]);
221-
serversList.add(InetAddress.getByName(server.split(":")[0]));
219+
for (String server : StringUtils.split(address, ",")) {
220+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
221+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
222222
}
223223

224224
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -272,7 +272,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
272272
//load data from table
273273
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
274274
ResultSet resultSet = session.execute(sql);
275-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
275+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
276276
for (com.datastax.driver.core.Row row : resultSet) {
277277
Map<String, Object> oneRow = Maps.newHashMap();
278278
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.SideInfo;
24-
import com.dtstack.flink.sql.side.SideTableInfo;
23+
import com.dtstack.flink.sql.side.BaseSideInfo;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlNode;
@@ -37,16 +37,16 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class CassandraAllSideInfo extends SideInfo {
40+
public class CassandraAllSideInfo extends BaseSideInfo {
4141

4242
private static final long serialVersionUID = -8690814317653033557L;
4343

44-
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
44+
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4545
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4646
}
4747

4848
@Override
49-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
49+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
5050
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5151

5252
sqlCondition = "select ${selectField} from ${tableName} ";

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
26+
import org.apache.flink.types.Row;
27+
2228
import com.datastax.driver.core.Cluster;
2329
import com.datastax.driver.core.ConsistencyLevel;
2430
import com.datastax.driver.core.HostDistance;
@@ -30,32 +36,27 @@
3036
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3137
import com.datastax.driver.core.policies.RetryPolicy;
3238
import com.dtstack.flink.sql.enums.ECacheContentType;
33-
import com.dtstack.flink.sql.side.AsyncReqRow;
39+
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
3440
import com.dtstack.flink.sql.side.CacheMissVal;
3541
import com.dtstack.flink.sql.side.FieldInfo;
3642
import com.dtstack.flink.sql.side.JoinInfo;
37-
import com.dtstack.flink.sql.side.SideTableInfo;
43+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3844
import com.dtstack.flink.sql.side.cache.CacheObj;
3945
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4046
import com.google.common.base.Function;
47+
import com.google.common.collect.Lists;
4148
import com.google.common.util.concurrent.AsyncFunction;
4249
import com.google.common.util.concurrent.FutureCallback;
4350
import com.google.common.util.concurrent.Futures;
4451
import com.google.common.util.concurrent.ListenableFuture;
4552
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
51-
import org.apache.flink.types.Row;
53+
import org.apache.commons.lang3.StringUtils;
5254
import org.slf4j.Logger;
5355
import org.slf4j.LoggerFactory;
5456

5557
import java.net.InetAddress;
5658
import java.sql.Timestamp;
5759
import java.util.ArrayList;
58-
import java.util.Collections;
5960
import java.util.List;
6061
import java.util.Map;
6162

@@ -65,7 +66,7 @@
6566
*
6667
* @author xuqianjin
6768
*/
68-
public class CassandraAsyncReqRow extends AsyncReqRow {
69+
public class CassandraAsyncReqRow extends BaseAsyncReqRow {
6970

7071
private static final long serialVersionUID = 6631584128079864735L;
7172

@@ -81,7 +82,7 @@ public class CassandraAsyncReqRow extends AsyncReqRow {
8182
private transient ListenableFuture session;
8283
private transient CassandraSideTableInfo cassandraSideTableInfo;
8384

84-
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
85+
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8586
super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8687
}
8788

@@ -133,9 +134,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
133134
//重试策略
134135
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
135136

136-
for (String server : address.split(",")) {
137-
cassandraPort = Integer.parseInt(server.split(":")[1]);
138-
serversList.add(InetAddress.getByName(server.split(":")[0]));
137+
for (String server : StringUtils.split(address, ",")) {
138+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
139+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
139140
}
140141

141142
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -160,17 +161,17 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
160161
}
161162

162163
@Override
163-
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
Row inputRow = Row.copy(input);
164+
public void asyncInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
165+
Tuple2<Boolean, Row> inputCopy = Tuple2.of(input.f0, input.f1);
165166
JsonArray inputParams = new JsonArray();
166167
StringBuffer stringBuffer = new StringBuffer();
167168
String sqlWhere = " where ";
168169

169170
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170171
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = inputRow.getField(conValIndex);
172+
Object equalObj = inputCopy.f1.getField(conValIndex);
172173
if (equalObj == null) {
173-
dealMissKey(inputRow, resultFuture);
174+
dealMissKey(inputCopy, resultFuture);
174175
return;
175176
}
176177
inputParams.add(equalObj);
@@ -194,13 +195,13 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194195
if (val != null) {
195196

196197
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(inputRow, resultFuture);
198+
dealMissKey(inputCopy, resultFuture);
198199
return;
199200
} else if (ECacheContentType.MultiLine == val.getType()) {
200-
List<Row> rowList = Lists.newArrayList();
201+
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
201202
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(inputRow, jsonArray);
203-
rowList.add(row);
203+
Row row = fillData(inputCopy.f1, jsonArray);
204+
rowList.add(Tuple2.of(inputCopy.f0, row));
204205
}
205206
resultFuture.complete(rowList);
206207
} else {
@@ -214,7 +215,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
214215
connCassandraDB(cassandraSideTableInfo);
215216

216217
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
217-
System.out.println("sqlCondition:" + sqlCondition);
218+
LOG.info("sqlCondition:{}" + sqlCondition);
218219

219220
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
220221
new AsyncFunction<Session, ResultSet>() {
@@ -238,20 +239,20 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
238239
cluster.closeAsync();
239240
if (rows.size() > 0) {
240241
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241-
List<Row> rowList = Lists.newArrayList();
242+
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
242243
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(inputRow, line);
244+
Row row = fillData(inputCopy.f1, line);
244245
if (openCache()) {
245246
cacheContent.add(line);
246247
}
247-
rowList.add(row);
248+
rowList.add(Tuple2.of(inputCopy.f0,row));
248249
}
249250
resultFuture.complete(rowList);
250251
if (openCache()) {
251252
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252253
}
253254
} else {
254-
dealMissKey(inputRow, resultFuture);
255+
dealMissKey(inputCopy, resultFuture);
255256
if (openCache()) {
256257
putCache(key, CacheMissVal.getMissKeyObj());
257258
}
@@ -263,7 +264,6 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
263264
public void onFailure(Throwable t) {
264265
LOG.error("Failed to retrieve the data: %s%n",
265266
t.getMessage());
266-
System.out.println("Failed to retrieve the data: " + t.getMessage());
267267
cluster.closeAsync();
268268
resultFuture.completeExceptionally(t);
269269
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.SideInfo;
24-
import com.dtstack.flink.sql.side.SideTableInfo;
23+
import com.dtstack.flink.sql.side.BaseSideInfo;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlBasicCall;
@@ -30,6 +30,8 @@
3030
import org.apache.calcite.sql.SqlNode;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import com.google.common.collect.Lists;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import java.util.List;
3537

@@ -39,16 +41,18 @@
3941
*
4042
* @author xuqianjin
4143
*/
42-
public class CassandraAsyncSideInfo extends SideInfo {
44+
public class CassandraAsyncSideInfo extends BaseSideInfo {
4345

4446
private static final long serialVersionUID = -4403313049809013362L;
47+
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());
4548

46-
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
49+
50+
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4751
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4852
}
4953

5054
@Override
51-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
55+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
5256
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5357

5458
String sideTableName = joinInfo.getSideTableName();
@@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6367
}
6468

6569
sqlCondition = "select ${selectField} from ${tableName}";
66-
6770
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
68-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
71+
72+
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
6973
}
7074

7175

0 commit comments

Comments
 (0)