Skip to content

Commit e7a7184

Browse files
committed
解决kudu join条件解析问题
1 parent 2411ac6 commit e7a7184

File tree

4 files changed

+23
-10
lines changed

4 files changed

+23
-10
lines changed

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.dtstack.flink.sql.side.SideInfo;
66
import com.dtstack.flink.sql.side.SideTableInfo;
77
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
8+
import com.dtstack.flink.sql.util.ParseUtils;
89
import org.apache.calcite.sql.SqlBasicCall;
910
import org.apache.calcite.sql.SqlIdentifier;
1011
import org.apache.calcite.sql.SqlKind;
@@ -30,11 +31,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
3031
SqlNode conditionNode = joinInfo.getCondition();
3132

3233
List<SqlNode> sqlNodeList = Lists.newArrayList();
33-
if (conditionNode.getKind() == SqlKind.AND) {
34-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands()));
35-
} else {
36-
sqlNodeList.add(conditionNode);
37-
}
34+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
3835

3936
for (SqlNode sqlNode : sqlNodeList) {
4037
dealOneEqualCon(sqlNode, sideTableName);

kudu/kudu-side/kudu-side-core/src/main/java/com/dtstack/flink/sql/side/kudu/table/KuduSideParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ public Class dbTypeConvertToJavaType(String fieldType) {
9393
case "int64":
9494
return Long.class;
9595
case "varchar":
96-
case "binary":
9796
case "string":
9897
return String.class;
9998
case "float":
@@ -106,6 +105,8 @@ public Class dbTypeConvertToJavaType(String fieldType) {
106105
return Timestamp.class;
107106
case "decimal":
108107
return BigDecimal.class;
108+
case "binary":
109+
return byte[].class;
109110
}
110111

111112
throw new RuntimeException("不支持 " + fieldType + " 类型");

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.types.Row;
2626
import org.apache.kudu.client.*;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.io.IOException;
2931
import java.math.BigDecimal;
@@ -32,6 +34,10 @@
3234

3335
public class KuduOutputFormat extends MetricOutputFormat {
3436

37+
private static final long serialVersionUID = 1L;
38+
39+
private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
40+
3541
public enum WriteMode {INSERT, UPDATE, UPSERT}
3642

3743
private String kuduMasters;
@@ -104,9 +110,17 @@ public void writeRecord(Tuple2 record) throws IOException {
104110

105111
Operation operation = toOperation(writeMode, row);
106112
AsyncKuduSession session = client.newSession();
107-
session.apply(operation);
108-
session.close();
109-
outRecords.inc();
113+
114+
try {
115+
session.apply(operation);
116+
session.close();
117+
outRecords.inc();
118+
} catch (KuduException e) {
119+
outDirtyRecords.inc();
120+
LOG.error("record insert failed ..", row.toString().substring(0, 100));
121+
LOG.error("", e);
122+
}
123+
110124
}
111125

112126
@Override

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduSinkParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ public Class dbTypeConvertToJavaType(String fieldType) {
7272
case "int64":
7373
return Long.class;
7474
case "varchar":
75-
case "binary":
7675
case "string":
7776
return String.class;
7877
case "float":
@@ -85,6 +84,8 @@ public Class dbTypeConvertToJavaType(String fieldType) {
8584
return Timestamp.class;
8685
case "decimal":
8786
return BigDecimal.class;
87+
case "binary":
88+
return byte[].class;
8889
}
8990

9091
throw new RuntimeException("不支持 " + fieldType + " 类型");

0 commit comments

Comments
 (0)