Skip to content

Commit 25675b0

Browse files
committed
Merge remote-tracking branch 'origin/1.5_v3.7.1' into 1.5_v3.7.2
2 parents 6fd5ee8 + c822d28 commit 25675b0

File tree

7 files changed

+208
-65
lines changed

7 files changed

+208
-65
lines changed

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class MetricConstant {
4545

4646
public static final String DT_NUM_RECORDS_OUT = "dtNumRecordsOut";
4747

48+
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
49+
4850
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
4951

5052
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.calcite.sql.SqlJoin;
2727
import org.apache.calcite.sql.SqlKind;
2828
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlOrderBy;
2930
import org.apache.calcite.sql.SqlSelect;
3031
import org.apache.calcite.sql.parser.SqlParseException;
3132
import org.apache.calcite.sql.parser.SqlParser;
@@ -132,6 +133,10 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
132133
parseNode(unionRight, sqlParseResult);
133134
}
134135
break;
136+
case ORDER_BY:
137+
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
138+
parseNode(sqlOrderBy.query, sqlParseResult);
139+
break;
135140
default:
136141
//do nothing
137142
break;

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.calcite.sql.SqlKind;
3232
import org.apache.calcite.sql.SqlNode;
3333
import org.apache.calcite.sql.SqlOperator;
34+
import org.apache.calcite.sql.SqlOrderBy;
3435
import org.apache.calcite.sql.SqlSelect;
3536
import org.apache.calcite.sql.parser.SqlParseException;
3637
import org.apache.calcite.sql.parser.SqlParser;
@@ -110,6 +111,20 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
110111
aliasInfo.setAlias(alias.toString());
111112

112113
return aliasInfo;
114+
115+
case UNION:
116+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
117+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
118+
119+
parseSql(unionLeft, sideTableSet, queueInfo);
120+
121+
parseSql(unionRight, sideTableSet, queueInfo);
122+
123+
break;
124+
125+
case ORDER_BY:
126+
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
127+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo);
113128
}
114129

115130
return "";

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.calcite.sql.SqlLiteral;
3636
import org.apache.calcite.sql.SqlNode;
3737
import org.apache.calcite.sql.SqlNodeList;
38+
import org.apache.calcite.sql.SqlOrderBy;
3839
import org.apache.calcite.sql.SqlSelect;
3940
import org.apache.calcite.sql.fun.SqlCase;
4041
import org.apache.calcite.sql.parser.SqlParseException;
@@ -240,11 +241,50 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
240241
}
241242

242243
break;
244+
245+
case UNION:
246+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
247+
248+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
249+
250+
replaceFieldName(unionLeft, mappingTable, targetTableName, tableAlias);
251+
252+
replaceFieldName(unionRight, mappingTable, targetTableName, tableAlias);
253+
254+
break;
255+
256+
case ORDER_BY:
257+
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
258+
replaceFieldName(sqlOrderBy.query, mappingTable, targetTableName, tableAlias);
259+
SqlNodeList orderFiledList = sqlOrderBy.orderList;
260+
for (int i=0 ;i<orderFiledList.size();i++) {
261+
SqlNode replaceNode = replaceOrderByTableName(orderFiledList.get(i), tableAlias);
262+
orderFiledList.set(i, replaceNode);
263+
}
243264
default:
244265
break;
245266
}
246267
}
247268

269+
private SqlNode replaceOrderByTableName(SqlNode orderNode, String tableAlias) {
270+
if(orderNode.getKind() == IDENTIFIER){
271+
SqlIdentifier sqlIdentifier = (SqlIdentifier) orderNode;
272+
if (sqlIdentifier.names.size() == 1) {
273+
return orderNode;
274+
}
275+
return sqlIdentifier.setName(0, tableAlias);
276+
} else if (orderNode instanceof SqlBasicCall) {
277+
SqlBasicCall sqlBasicCall = (SqlBasicCall) orderNode;
278+
for(int i=0; i<sqlBasicCall.getOperandList().size(); i++){
279+
SqlNode sqlNode = sqlBasicCall.getOperandList().get(i);
280+
sqlBasicCall.getOperands()[i] = replaceOrderByTableName(sqlNode , tableAlias);
281+
}
282+
return sqlBasicCall;
283+
} else {
284+
return orderNode;
285+
}
286+
}
287+
248288
private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String, String> mappingTable, String tableAlias){
249289
if(groupNode.getKind() == IDENTIFIER){
250290
SqlIdentifier sqlIdentifier = (SqlIdentifier) groupNode;
@@ -359,7 +399,7 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
359399
SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode;
360400

361401
if(sqlIdentifier.names.size() == 1){
362-
return null;
402+
return selectNode;
363403
}
364404

365405
String mappingFieldName = mappingTable.get(sqlIdentifier.getComponent(0).getSimple(), sqlIdentifier.getComponent(1).getSimple());

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
3434

3535
protected transient Meter outRecordsRate;
3636

37+
protected transient Counter outDirtyRecords;
38+
3739
public void initMetric() {
3840
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
41+
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
3942
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
4043
}
4144

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.dtstack.flink.sql.util;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.sql.DriverManager;
7+
8+
public class JDBCUtils {
9+
10+
private static final Logger LOG = LoggerFactory.getLogger(ClassUtil.class);
11+
12+
public final static String lock_str = "jdbc_lock_str";
13+
14+
public static void forName(String clazz, ClassLoader classLoader) {
15+
synchronized (lock_str){
16+
try {
17+
Class.forName(clazz, true, classLoader);
18+
DriverManager.setLoginTimeout(10);
19+
} catch (Exception e) {
20+
throw new RuntimeException(e);
21+
}
22+
}
23+
}
24+
25+
26+
public synchronized static void forName(String clazz) {
27+
try {
28+
Class<?> driverClass = Class.forName(clazz);
29+
driverClass.newInstance();
30+
} catch (Exception e) {
31+
throw new RuntimeException(e);
32+
}
33+
}
34+
}

0 commit comments

Comments
 (0)