Skip to content

Commit 1d6d097

Browse files
committed
Merge branch 'v1.8.0_dev' into 'feat_1.8_oceanbase'
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
2 parents adcf847 + fe59047 commit 1d6d097

File tree

110 files changed

+2912
-1789
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+2912
-1789
lines changed

.gitlab-ci.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
build:
2+
stage: test
3+
script:
4+
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
5+
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
6+
- sh ci/sonar_notify.sh
7+
only:
8+
- v1.8.0_dev
9+
tags:
10+
- dt-insight-engine

README.md

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,11 @@
77
> > * 支持原生FLinkSQL所有的语法
88
> > * 扩展了输入和输出的性能指标到promethus
99
10-
## 新特性:
11-
* 1.kafka源表支持not null语法,支持字符串类型的时间转换。
12-
* 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
13-
* 3.异步维表支持非等值连接,比如:<>,<,>。
14-
* 4.增加kafka数组解析
15-
* 5.增加kafka1.0以上版本的支持
16-
* 6.增加postgresql、kudu、clickhouse维表、结果表的支持
17-
* 7.支持插件的依赖方式,参考pluginLoadMode参数
18-
* 8.支持cep处理
19-
* 9.支持udaf
20-
* 10.支持谓词下移
21-
* 11.支持状态的ttl
22-
23-
## BUG修复:
24-
* 1.修复不能解析sql中orderby,union语法。
25-
* 2.修复yarnPer模式提交失败的异常。
26-
* 3.一些bug的修复
27-
2810
# 已支持
2911
* 源表:kafka 0.9、0.10、0.11、1.x版本
3012
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
3113
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver
3214

33-
# 后续开发计划
34-
* 维表快照
35-
* kafka avro格式
36-
* topN
37-
3815
## 1 快速起步
3916
### 1.1 运行模式
4017

@@ -205,6 +182,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
205182
* [impala 结果表插件](docs/impalaSink.md)
206183
* [db2 结果表插件](docs/db2Sink.md)
207184
* [sqlserver 结果表插件](docs/sqlserverSink.md)
185+
* [kafka 结果表插件](docs/kafkaSink.md)
208186

209187
### 2.3 维表插件
210188
* [hbase 维表插件](docs/hbaseSide.md)

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
267267
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
268268
+ ",pwd:" + tableInfo.getPassword();
269269
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
270-
Thread.sleep(5 * 1000);
270+
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
271271
} catch (InterruptedException e1) {
272272
LOG.error("", e1);
273273
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
216216
connCassandraDB(cassandraSideTableInfo);
217217

218218
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
219-
System.out.println("sqlCondition:" + sqlCondition);
219+
LOG.info("sqlCondition:{}" + sqlCondition);
220220

221221
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
222222
new AsyncFunction<Session, ResultSet>() {
@@ -265,7 +265,6 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
265265
public void onFailure(Throwable t) {
266266
LOG.error("Failed to retrieve the data: %s%n",
267267
t.getMessage());
268-
System.out.println("Failed to retrieve the data: " + t.getMessage());
269268
cluster.closeAsync();
270269
resultFuture.completeExceptionally(t);
271270
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.calcite.sql.SqlNode;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import com.google.common.collect.Lists;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import java.util.List;
3537

@@ -42,6 +44,8 @@
4244
public class CassandraAsyncSideInfo extends BaseSideInfo {
4345

4446
private static final long serialVersionUID = -4403313049809013362L;
47+
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());
48+
4549

4650
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4751
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
@@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
6367
}
6468

6569
sqlCondition = "select ${selectField} from ${tableName}";
66-
6770
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
68-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
71+
72+
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
6973
}
7074

7175

ci/sonar_notify.sh

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/bin/bash
2+
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
3+
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
4+
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
5+
-H "Content-Type: application/json" \
6+
-d "{
7+
\"msgtype\": \"markdown\",
8+
\"markdown\": {
9+
\"title\":\"sonar代码质量\",
10+
\"text\": \"## sonar代码质量报告: \n
11+
> [sonar地址](http://172.16.100.198:9000/dashboard?id=dt-insight-engine/flinkStreamSQL) \n
12+
> ${sonarreport} \n\"
13+
}
14+
}"

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ public void open(Configuration parameters) throws Exception {
5151
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
54-
.put("provider_class", DT_PROVIDER_CLASS);
54+
.put("provider_class", DT_PROVIDER_CLASS)
55+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
56+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
57+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
58+
5559
System.setProperty("vertx.disableFileCPResolving", "true");
5660
VertxOptions vo = new VertxOptions();
5761
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
5656

5757
@Override
5858
public void writeRecord(Tuple2 tuple2) throws IOException {
59-
System.out.println("received oriainal data:" + tuple2);
59+
LOG.info("received oriainal data:{}" + tuple2);
6060
Tuple2<Boolean, Row> tupleTrans = tuple2;
6161
Boolean retract = tupleTrans.getField(0);
6262
if (!retract) {

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public static <T> TablePrintUtil build(List<T> data) {
9595
try {
9696
value = obj.getClass().getMethod(colList.get(j).getMethodName).invoke(data.get(i)).toString();
9797
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
98-
e.printStackTrace();
98+
LOG.error("", e);
9999
}
100100
item[j] = value == null ? "null" : value;
101101
}

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,13 @@ public JobExecutionResult execute(String jobName) throws Exception {
113113
LOG.info("Running job on local embedded Flink mini cluster");
114114
}
115115

116-
MiniCluster exec = new MiniCluster(configBuilder.build());
117-
try {
116+
try (MiniCluster exec = new MiniCluster(configBuilder.build());) {
118117
exec.start();
119-
return exec.executeJobBlocking(jobGraph);
120-
}
121-
finally {
118+
JobExecutionResult jobExecutionResult = exec.executeJobBlocking(jobGraph);
122119
transformations.clear();
123-
exec.closeAsync();
120+
return jobExecutionResult;
121+
} catch (Exception e) {
122+
throw new RuntimeException(e);
124123
}
125124
}
126125
}

0 commit comments

Comments
 (0)