Skip to content

Commit 9f48daf

Browse files
committed
Merge branch '1.5_v3.8.0' into v1.5.0_dev
# Conflicts: # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java
2 parents dab993c + 7b068c1 commit 9f48daf

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919

20+
2021
package com.dtstack.flink.sql.side;
2122

2223
import com.dtstack.flink.sql.enums.ECacheType;
@@ -238,7 +239,7 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
238239
replaceFieldName(sqlSource, mappingTable, targetTableName, tableAlias);
239240
break;
240241
case AS:
241-
SqlNode asNode = ((SqlBasicCall) sqlNode).getOperands()[0];
242+
SqlNode asNode = ((SqlBasicCall)sqlNode).getOperands()[0];
242243
replaceFieldName(asNode, mappingTable, targetTableName, tableAlias);
243244
break;
244245
case SELECT:
@@ -354,7 +355,16 @@ private SqlNode replaceOrderByTableName(SqlNode orderNode, String tableAlias) {
354355
private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String, String> mappingTable, String tableAlias){
355356
if(groupNode.getKind() == IDENTIFIER){
356357
SqlIdentifier sqlIdentifier = (SqlIdentifier) groupNode;
358+
// 如果没有表别名前缀,直接返回字段名称
359+
if (sqlIdentifier.names.size() == 1) {
360+
return groupNode;
361+
}
357362
String mappingFieldName = mappingTable.get(sqlIdentifier.getComponent(0).getSimple(), sqlIdentifier.getComponent(1).getSimple());
363+
// 如果有表别名前缀,但是在宽表中找不到映射,只需要设置别名,不需要替换映射
364+
if (null == mappingFieldName){
365+
// return sqlIdentifier.setName(0, tableAlias);
366+
throw new RuntimeException("Column '" + sqlIdentifier.getComponent(1).getSimple() + "' not found in table '" + sqlIdentifier.getComponent(0).getSimple() + "'");
367+
}
358368
sqlIdentifier = sqlIdentifier.setName(0, tableAlias);
359369
return sqlIdentifier.setName(1, mappingFieldName);
360370
}else if(groupNode instanceof SqlBasicCall){
@@ -569,6 +579,7 @@ private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias,
569579
if(CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))){
570580
return true;
571581
}
582+
572583
return false;
573584
}
574585

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public Row deserialize(byte[] message) throws IOException {
148148
return row;
149149
} catch (Exception e) {
150150
//add metric of dirty data
151-
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0) {
151+
if (dirtyDataCounter.getCount() % dirtyDataFrequency == 0 || LOG.isDebugEnabled()) {
152152
LOG.info("dirtyData: " + new String(message));
153153
LOG.error(" ", e);
154154
}

v1.8.0_dev_bugfix_joinnpe

Whitespace-only changes.

0 commit comments

Comments
 (0)