Skip to content

Commit 801fd6f

Browse files
committed
Merge remote-tracking branch 'origin/1.10_release_4.0.x' into 1.10_release_4.0.x
2 parents 7ab9a5d + ef35b0c commit 801fd6f

File tree

8 files changed

+140
-56
lines changed

8 files changed

+140
-56
lines changed

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public class ConfigConstrant {
5656
// default 200ms
5757
public static final String AUTO_WATERMARK_INTERVAL_KEY = "autoWatermarkInterval";
5858

59+
// window early trigger
60+
public static final String EARLY_TRIGGER = "early.trigger";
61+
5962
public static final String SQL_TTL_MINTIME = "sql.ttl.min";
6063
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
6164

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
121121
}
122122
}
123123

124+
/**
125+
* 设置TableEnvironment window提前触发
126+
* @param tableEnv
127+
* @param confProperties
128+
*/
129+
public static void streamTableEnvironmentEarlyTriggerConfig(TableEnvironment tableEnv, Properties confProperties) {
130+
confProperties = PropertiesUtils.propertiesTrim(confProperties);
131+
String triggerTime = confProperties.getProperty(ConfigConstrant.EARLY_TRIGGER);
132+
if (StringUtils.isNumeric(triggerTime)) {
133+
TableConfig qConfig = tableEnv.getConfig();
134+
qConfig.getConfiguration().setString("table.exec.emit.early-fire.enabled", "true");
135+
qConfig.getConfiguration().setString("table.exec.emit.early-fire.delay", triggerTime+"s");
136+
}
137+
}
138+
124139
/**
125140
* 设置TableEnvironment状态超时时间
126141
* @param tableEnv

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ public static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironmen
379379

380380
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, tableConfig);
381381
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
382+
StreamEnvConfigManager.streamTableEnvironmentEarlyTriggerConfig(tableEnv, confProperties);
382383
return tableEnv;
383384
}
384385

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
294294
SqlBasicCall buildAs = TableUtils.buildAsNodeByJoinInfo(joinInfo, null, null);
295295

296296
if(rightIsSide){
297-
addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentGroupByList, parentWhere, tableRef);
297+
addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentGroupByList, parentWhere, tableRef, fieldRef);
298298
}
299299

300300
SqlNode newLeftNode = joinNode.getLeft();
@@ -307,7 +307,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
307307

308308
//替换leftNode 为新的查询
309309
joinNode.setLeft(buildAs);
310-
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentGroupByList, parentWhere);
310+
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, fieldRef, parentSelectList, parentGroupByList, parentWhere);
311311
}
312312

313313
return joinInfo;
@@ -330,7 +330,8 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
330330
SqlNodeList parentSelectList,
331331
SqlNodeList parentGroupByList,
332332
SqlNode parentWhere,
333-
Map<String, String> tableRef){
333+
Map<String, String> tableRef,
334+
Map<String, String> fieldRef){
334335
//只处理维表
335336
if(!joinInfo.isRightIsSideTable()){
336337
return;
@@ -342,7 +343,7 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
342343
//替换左表为新的表名称
343344
joinNode.setLeft(buildAs);
344345

345-
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentGroupByList, parentWhere);
346+
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, fieldRef, parentSelectList, parentGroupByList, parentWhere);
346347
}
347348

348349
/**
@@ -357,6 +358,7 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
357358
public void replaceSelectAndWhereField(SqlBasicCall buildAs,
358359
SqlNode leftJoinNode,
359360
Map<String, String> tableRef,
361+
Map<String, String> fieldRef,
360362
SqlNodeList parentSelectList,
361363
SqlNodeList parentGroupByList,
362364
SqlNode parentWhere){
@@ -370,23 +372,22 @@ public void replaceSelectAndWhereField(SqlBasicCall buildAs,
370372
}
371373

372374
//替换select field 中的对应字段
373-
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
374375
for(SqlNode sqlNode : parentSelectList.getList()){
375376
for(String tbTmp : fromTableNameSet) {
376-
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);
377+
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldRef);
377378
}
378379
}
379380

380381
//TODO 应该根据上面的查询字段的关联关系来替换
381382
//替换where 中的条件相关
382383
for(String tbTmp : fromTableNameSet){
383-
TableUtils.replaceWhereCondition(parentWhere, tbTmp, newLeftTableName, fieldReplaceRef);
384+
TableUtils.replaceWhereCondition(parentWhere, tbTmp, newLeftTableName, fieldRef);
384385
}
385386

386387
if(parentGroupByList != null){
387388
for(SqlNode sqlNode : parentGroupByList.getList()){
388389
for(String tbTmp : fromTableNameSet) {
389-
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);
390+
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldRef);
390391
}
391392
}
392393
}
@@ -444,16 +445,15 @@ private void extractTemporaryQuery(SqlNode node, String tableAlias,
444445
queueInfo.offer(sqlBasicCall);
445446

446447
//替换select中的表结构
447-
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
448448
for(SqlNode tmpSelect : parentSelectList.getList()){
449449
for(String tbTmp : fromTableNameSet) {
450-
TableUtils.replaceSelectFieldTable(tmpSelect, tbTmp, tableAlias, fieldReplaceRef);
450+
TableUtils.replaceSelectFieldTable(tmpSelect, tbTmp, tableAlias, fieldRef);
451451
}
452452
}
453453

454454
//替换where 中的条件相关
455455
for(String tbTmp : fromTableNameSet){
456-
TableUtils.replaceWhereCondition(parentWhere, tbTmp, tableAlias, fieldReplaceRef);
456+
TableUtils.replaceWhereCondition(parentWhere, tbTmp, tableAlias, fieldRef);
457457
}
458458

459459
for(String tbTmp : fromTableNameSet){

core/src/main/java/com/dtstack/flink/sql/util/JDBCUtils.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,23 @@
1919

2020
package com.dtstack.flink.sql.util;
2121

22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.sql.Connection;
2226
import java.sql.DriverManager;
27+
import java.sql.ResultSet;
28+
import java.sql.SQLException;
29+
import java.sql.Statement;
30+
import java.util.Objects;
2331

2432
public class JDBCUtils {
33+
private static final Logger LOG = LoggerFactory.getLogger(JDBCUtils.class);
34+
2535
private static final Object LOCK = new Object();
2636

27-
public static void forName(String clazz, ClassLoader classLoader) {
28-
synchronized (LOCK){
37+
public static void forName(String clazz, ClassLoader classLoader) {
38+
synchronized (LOCK) {
2939
try {
3040
Class.forName(clazz, true, classLoader);
3141
DriverManager.setLoginTimeout(10);
@@ -44,4 +54,75 @@ public synchronized static void forName(String clazz) {
4454
throw new RuntimeException(e);
4555
}
4656
}
57+
58+
/**
59+
* 关闭连接资源
60+
*
61+
* @param rs ResultSet
62+
* @param stmt Statement
63+
* @param conn Connection
64+
* @param commit
65+
*/
66+
public static void closeConnectionResource(ResultSet rs, Statement stmt, Connection conn, boolean commit) {
67+
if (Objects.nonNull(rs)) {
68+
try {
69+
rs.close();
70+
} catch (SQLException e) {
71+
LOG.warn("Close resultSet error: {}", e.getMessage());
72+
}
73+
}
74+
75+
if (Objects.nonNull(stmt)) {
76+
try {
77+
stmt.close();
78+
} catch (SQLException e) {
79+
LOG.warn("Close statement error:{}", e.getMessage());
80+
}
81+
}
82+
83+
if (Objects.nonNull(conn)) {
84+
try {
85+
if (commit) {
86+
commit(conn);
87+
} else {
88+
rollBack(conn);
89+
}
90+
91+
conn.close();
92+
} catch (SQLException e) {
93+
LOG.warn("Close connection error:{}", e.getMessage());
94+
}
95+
}
96+
}
97+
98+
/**
99+
* 手动提交事物
100+
*
101+
* @param conn Connection
102+
*/
103+
public static void commit(Connection conn) {
104+
try {
105+
if (!conn.isClosed() && !conn.getAutoCommit()) {
106+
conn.commit();
107+
}
108+
} catch (SQLException e) {
109+
LOG.warn("commit error:{}", e.getMessage());
110+
}
111+
}
112+
113+
/**
114+
* 手动回滚事物
115+
*
116+
* @param conn Connection
117+
*/
118+
public static void rollBack(Connection conn) {
119+
try {
120+
if (!conn.isClosed() && !conn.getAutoCommit()) {
121+
conn.rollback();
122+
}
123+
} catch (SQLException e) {
124+
LOG.warn("rollBack error:{}", e.getMessage());
125+
}
126+
}
127+
47128
}

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ public static void getFromTableInfo(SqlNode fromTable, Set<String> tableNameSet)
312312
public static void replaceSelectFieldTable(SqlNode selectNode,
313313
String oldTbName,
314314
String newTbName,
315-
HashBiMap<String, String> fieldReplaceRef) {
315+
Map<String, String> fieldReplaceRef) {
316316
if (selectNode.getKind() == AS) {
317317
SqlNode leftNode = ((SqlBasicCall) selectNode).getOperands()[0];
318318
replaceSelectFieldTable(leftNode, oldTbName, newTbName, fieldReplaceRef);
@@ -406,22 +406,13 @@ public static void replaceSelectFieldTable(SqlNode selectNode,
406406
private static void replaceOneSelectField(SqlIdentifier sqlIdentifier,
407407
String newTbName,
408408
String oldTbName,
409-
HashBiMap<String, String> fieldReplaceRef){
409+
Map<String, String> fieldReplaceRef){
410410
SqlIdentifier newField = sqlIdentifier.setName(0, newTbName);
411411
String fieldName = sqlIdentifier.names.get(1);
412-
String fieldKey = oldTbName + "_" + fieldName;
413-
414-
if(!fieldReplaceRef.containsKey(fieldKey)){
415-
if(fieldReplaceRef.inverse().get(fieldName) != null){
416-
//换一个名字
417-
String mappingFieldName = ParseUtils.dealDuplicateFieldName(fieldReplaceRef, fieldName);
418-
newField = newField.setName(1, mappingFieldName);
419-
fieldReplaceRef.put(fieldKey, mappingFieldName);
420-
} else {
421-
fieldReplaceRef.put(fieldKey, fieldName);
422-
}
423-
}else {
424-
newField = newField.setName(1, fieldReplaceRef.get(fieldKey));
412+
String fieldKey = oldTbName + "." + fieldName;
413+
if(fieldReplaceRef.get(fieldKey) != null){
414+
String newFieldName = fieldReplaceRef.get(fieldKey).split("\\.")[1];
415+
newField = newField.setName(1, newFieldName);
425416
}
426417

427418
sqlIdentifier.assignNamesFrom(newField);
@@ -522,7 +513,7 @@ public static String getTargetRefField(Map<String, String> refFieldMap, String c
522513
return preFieldName;
523514
}
524515

525-
public static void replaceWhereCondition(SqlNode parentWhere, String oldTbName, String newTbName, HashBiMap<String, String> fieldReplaceRef){
516+
public static void replaceWhereCondition(SqlNode parentWhere, String oldTbName, String newTbName, Map<String, String> fieldReplaceRef){
526517

527518
if(parentWhere == null){
528519
return;
@@ -538,7 +529,7 @@ public static void replaceWhereCondition(SqlNode parentWhere, String oldTbName,
538529
}
539530
}
540531

541-
private static void replaceConditionNode(SqlNode selectNode, String oldTbName, String newTbName, HashBiMap<String, String> fieldReplaceRef) {
532+
private static void replaceConditionNode(SqlNode selectNode, String oldTbName, String newTbName, Map<String, String> fieldReplaceRef) {
542533
if(selectNode.getKind() == IDENTIFIER){
543534
SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode;
544535

localTest/src/main/java/com/dtstack/flink/sql/localTest/LocalTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ public static void main(String[] args) throws Exception {
5050
setLogLevel("INFO");
5151

5252
List<String> propertiesList = new ArrayList<>();
53-
String sqlPath = "/Users/wtz/dtstack/job/flinkStreamSQL/sql/TestDemo/JoinDemoTwo.sql";
53+
String sqlPath = "/Users/chuixue/Desktop/tmp/sqlFile.sql";
5454
Map<String, Object> conf = new HashMap<>();
5555
JSONObject properties = new JSONObject();
5656

5757
//其他参数配置
5858
properties.put("time.characteristic", "eventTime");
5959
properties.put("timezone", TimeZone.getDefault());
60+
properties.put("early.trigger", "1");
6061

6162
// 任务配置参数
6263
conf.put("-sql", URLEncoder.encode(readSQL(sqlPath), StandardCharsets.UTF_8.name()));

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.side.BaseSideInfo;
2323
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
25+
import com.dtstack.flink.sql.util.JDBCUtils;
2526
import com.dtstack.flink.sql.util.RowDataComplete;
2627
import com.dtstack.flink.sql.util.RowDataConvert;
2728
import com.google.common.collect.Lists;
@@ -125,36 +126,26 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
125126
}
126127

127128
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
128-
RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
129-
Connection connection = null;
129+
queryAndFillData(tmpCache, getConnectionWithRetry((RdbSideTableInfo) sideInfo.getSideTableInfo()));
130+
}
130131

131-
try {
132-
for (int i = 0; i < CONN_RETRY_NUM; i++) {
132+
private Connection getConnectionWithRetry(RdbSideTableInfo tableInfo) throws SQLException {
133+
String connInfo = "url:" + tableInfo.getUrl() + "; userName:" + tableInfo.getUserName();
134+
String errorMsg = null;
135+
for (int i = 0; i < CONN_RETRY_NUM; i++) {
136+
try {
137+
return getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
138+
} catch (Exception e) {
133139
try {
134-
connection = getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
135-
break;
136-
} catch (Exception e) {
137-
if (i == CONN_RETRY_NUM - 1) {
138-
throw new RuntimeException("", e);
139-
}
140-
try {
141-
String connInfo = "url:" + tableInfo.getUrl() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword();
142-
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
143-
Thread.sleep(5 * 1000);
144-
} catch (InterruptedException e1) {
145-
LOG.error("", e1);
146-
}
140+
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
141+
errorMsg = e.getCause().toString();
142+
Thread.sleep(5 * 1000);
143+
} catch (InterruptedException e1) {
144+
LOG.error("", e1);
147145
}
148146
}
149-
queryAndFillData(tmpCache, connection);
150-
} catch (Exception e) {
151-
LOG.error("", e);
152-
throw new SQLException(e);
153-
} finally {
154-
if (connection != null) {
155-
connection.close();
156-
}
157147
}
148+
throw new SQLException("get conn fail. connInfo: " + connInfo + "\ncause by: " + errorMsg);
158149
}
159150

160151
private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, Connection connection) throws SQLException {
@@ -188,6 +179,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
188179
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())
189180
.add(oneRow);
190181
}
182+
JDBCUtils.closeConnectionResource(resultSet, statement, connection, false);
191183
}
192184

193185
public int getFetchSize() {

0 commit comments

Comments
 (0)