Skip to content

Commit 1fc0d53

Browse files
author
xuchao
committed
Merge branch 'v1.8.0_dev' into tmp_1.8_v3.10.0_beta3
2 parents 14446f4 + 59b996b commit 1fc0d53

File tree

58 files changed

+776
-659
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+776
-659
lines changed

.gitlab-ci.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
build:
2+
stage: test
3+
script:
4+
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
5+
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.branch.name="v1.8.0_dev" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
6+
- sh ci/sonar_notify.sh
7+
only:
8+
- v1.8.0_dev
9+
tags:
10+
- dt-insight-engine

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
216216
connCassandraDB(cassandraSideTableInfo);
217217

218218
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
219-
System.out.println("sqlCondition:" + sqlCondition);
219+
LOG.info("sqlCondition:{}" + sqlCondition);
220220

221221
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
222222
new AsyncFunction<Session, ResultSet>() {
@@ -265,7 +265,6 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
265265
public void onFailure(Throwable t) {
266266
LOG.error("Failed to retrieve the data: %s%n",
267267
t.getMessage());
268-
System.out.println("Failed to retrieve the data: " + t.getMessage());
269268
cluster.closeAsync();
270269
resultFuture.completeExceptionally(t);
271270
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.calcite.sql.SqlNode;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import com.google.common.collect.Lists;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import java.util.List;
3537

@@ -42,6 +44,8 @@
4244
public class CassandraAsyncSideInfo extends BaseSideInfo {
4345

4446
private static final long serialVersionUID = -4403313049809013362L;
47+
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());
48+
4549

4650
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4751
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
@@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
6367
}
6468

6569
sqlCondition = "select ${selectField} from ${tableName}";
66-
6770
sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
68-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
71+
72+
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
6973
}
7074

7175

cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

ci/sonar_notify.sh

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/bin/bash
2+
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
3+
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
4+
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
5+
-H "Content-Type: application/json" \
6+
-d "{
7+
\"msgtype\": \"markdown\",
8+
\"markdown\": {
9+
\"title\":\"sonar代码质量\",
10+
\"text\": \"## sonar代码质量报告: \n
11+
> [sonar地址](http://172.16.100.198:9000/dashboard?id=dt-insight-engine/flinkStreamSQL) \n
12+
> ${sonarreport} \n\"
13+
}
14+
}"

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,18 @@ public void open(Configuration parameters) throws Exception {
4848
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
4949
clickhouseClientConfig.put("url", rdbSideTableInfo.getUrl())
5050
.put("driver_class", CLICKHOUSE_DRIVER)
51-
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
51+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
54-
.put("provider_class", DT_PROVIDER_CLASS);
54+
.put("provider_class", DT_PROVIDER_CLASS)
55+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
56+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
57+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
58+
5559
System.setProperty("vertx.disableFileCPResolving", "true");
5660
VertxOptions vo = new VertxOptions();
57-
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
58-
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
61+
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());
62+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
5963
vo.setFileResolverCachingEnabled(false);
6064
Vertx vertx = Vertx.vertx(vo);
6165
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
5656

5757
@Override
5858
public void writeRecord(Tuple2 tuple2) throws IOException {
59-
System.out.println("received oriainal data:" + tuple2);
59+
LOG.info("received oriainal data:{}" + tuple2);
6060
Tuple2<Boolean, Row> tupleTrans = tuple2;
6161
Boolean retract = tupleTrans.getField(0);
6262
if (!retract) {

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public static <T> TablePrintUtil build(List<T> data) {
9595
try {
9696
value = obj.getClass().getMethod(colList.get(j).getMethodName).invoke(data.get(i)).toString();
9797
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
98-
e.printStackTrace();
98+
LOG.error("", e);
9999
}
100100
item[j] = value == null ? "null" : value;
101101
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,8 @@ public class ExecuteProcessHelper {
9595

9696
public static ParamsInfo parseParams(String[] args) throws Exception {
9797
LOG.info("------------program params-------------------------");
98-
System.out.println("------------program params-------------------------");
9998
Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));
100-
Arrays.stream(args).forEach(System.out::println);
10199
LOG.info("-------------------------------------------");
102-
System.out.println("----------------------------------------");
103100

104101
OptionParser optionParser = new OptionParser(args);
105102
Options options = optionParser.getOptions();
@@ -206,7 +203,6 @@ private static void sqlTranslation(String localSqlPluginPath,
206203
if (LOG.isInfoEnabled()) {
207204
LOG.info("exe-sql:\n" + result.getExecSql());
208205
}
209-
210206
boolean isSide = false;
211207
for (String tableName : result.getTargetTableList()) {
212208
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
@@ -228,12 +224,10 @@ private static void sqlTranslation(String localSqlPluginPath,
228224
//sql-dimensional table contains the dimension table of execution
229225
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
230226
} else {
231-
System.out.println("----------exec sql without dimension join-----------");
232-
System.out.println("----------real sql exec is--------------------------");
233-
System.out.println(result.getExecSql());
227+
LOG.info("----------exec sql without dimension join-----------");
228+
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
234229
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
235230
if (LOG.isInfoEnabled()) {
236-
System.out.println();
237231
LOG.info("exec sql: " + result.getExecSql());
238232
}
239233
}
@@ -355,6 +349,7 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
355349
return env;
356350
}
357351

352+
358353
public static void setLogLevel(ParamsInfo paramsInfo){
359354
String logLevel = paramsInfo.getConfProp().getProperty(ConfigConstrant.LOG_LEVEL_KEY);
360355
if(org.apache.commons.lang3.StringUtils.isBlank(logLevel)){

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2929
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3030
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
31-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
3231
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
3334
import org.apache.flink.types.Row;
3435

3536
import java.io.IOException;

0 commit comments

Comments
 (0)