Skip to content

Commit f7c1b1e

Browse files
committed
解决kudu类型转换的问题和去掉share包
1 parent a8364e9 commit f7c1b1e

File tree

8 files changed

+85
-8
lines changed

8 files changed

+85
-8
lines changed

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import org.apache.calcite.sql.JoinType;
99
import org.apache.commons.collections.CollectionUtils;
1010
import org.apache.flink.api.java.typeutils.RowTypeInfo;
11-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
12-
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
11+
import com.google.common.collect.Lists;
12+
import com.google.common.collect.Maps;
1313
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
1414
import org.apache.flink.types.Row;
1515
import org.apache.flink.util.Collector;

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.apache.calcite.sql.SqlNode;
1111
import org.apache.commons.collections.CollectionUtils;
1212
import org.apache.flink.api.java.typeutils.RowTypeInfo;
13-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
13+
import com.google.common.collect.Lists;
1414

1515
import java.util.List;
1616

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
import com.stumbleupon.async.Deferred;
99
import io.vertx.core.json.JsonArray;
1010
import org.apache.flink.api.java.typeutils.RowTypeInfo;
11-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
11+
import com.google.common.collect.Lists;
1212
import org.apache.flink.configuration.Configuration;
13-
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
13+
import com.google.common.collect.Maps;
1414
import org.apache.flink.streaming.api.functions.async.ResultFuture;
1515
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
1616
import org.apache.flink.types.Row;

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.apache.calcite.sql.SqlKind;
1111
import org.apache.calcite.sql.SqlNode;
1212
import org.apache.flink.api.java.typeutils.RowTypeInfo;
13-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
13+
import com.google.common.collect.Lists;
1414

1515
import java.util.List;
1616

kudu/kudu-side/kudu-side-core/src/main/java/com/dtstack/flink/sql/side/kudu/table/KuduSideParser.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import com.dtstack.flink.sql.table.TableInfo;
55
import com.dtstack.flink.sql.util.MathUtil;
66

7+
import java.math.BigDecimal;
8+
import java.sql.Date;
9+
import java.sql.Timestamp;
710
import java.util.Map;
811

912
import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
@@ -71,4 +74,40 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
7174
return kuduSideTableInfo;
7275

7376
}
77+
78+
@Override
79+
public Class dbTypeConvertToJavaType(String fieldType) {
80+
81+
switch (fieldType.toLowerCase()) {
82+
case "boolean":
83+
case "bool":
84+
return Boolean.class;
85+
case "int8":
86+
return Byte.class;
87+
case "int16":
88+
return Short.class;
89+
case "int":
90+
case "int32":
91+
return Integer.class;
92+
case "long":
93+
case "int64":
94+
return Long.class;
95+
case "varchar":
96+
case "binary":
97+
case "string":
98+
return String.class;
99+
case "float":
100+
return Float.class;
101+
case "double":
102+
return Double.class;
103+
case "date":
104+
return Date.class;
105+
case "unixtime_micros":
106+
return Timestamp.class;
107+
case "decimal":
108+
return BigDecimal.class;
109+
}
110+
111+
throw new RuntimeException("不支持 " + fieldType + " 类型");
112+
}
74113
}

kudu/kudu-side/kudu-side-core/src/main/java/com/dtstack/flink/sql/side/kudu/table/KuduSideTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.dtstack.flink.sql.side.kudu.table;
22

33
import com.dtstack.flink.sql.side.SideTableInfo;
4-
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
4+
import com.google.common.base.Preconditions;
55

66
public class KuduSideTableInfo extends SideTableInfo {
77

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduSinkParser.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import com.dtstack.flink.sql.table.TableInfo;
66
import com.dtstack.flink.sql.util.MathUtil;
77

8+
import java.math.BigDecimal;
9+
import java.sql.Date;
10+
import java.sql.Timestamp;
811
import java.util.Map;
912

1013
import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
@@ -51,4 +54,39 @@ private KuduOutputFormat.WriteMode transWriteMode(String writeMode) {
5154
return KuduOutputFormat.WriteMode.UPSERT;
5255
}
5356
}
57+
58+
@Override
59+
public Class dbTypeConvertToJavaType(String fieldType) {
60+
switch (fieldType.toLowerCase()) {
61+
case "boolean":
62+
case "bool":
63+
return Boolean.class;
64+
case "int8":
65+
return Byte.class;
66+
case "int16":
67+
return Short.class;
68+
case "int":
69+
case "int32":
70+
return Integer.class;
71+
case "long":
72+
case "int64":
73+
return Long.class;
74+
case "varchar":
75+
case "binary":
76+
case "string":
77+
return String.class;
78+
case "float":
79+
return Float.class;
80+
case "double":
81+
return Double.class;
82+
case "date":
83+
return Date.class;
84+
case "unixtime_micros":
85+
return Timestamp.class;
86+
case "decimal":
87+
return BigDecimal.class;
88+
}
89+
90+
throw new RuntimeException("不支持 " + fieldType + " 类型");
91+
}
5492
}

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.dtstack.flink.sql.sink.kudu.KuduOutputFormat;
44
import com.dtstack.flink.sql.table.TargetTableInfo;
5-
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
5+
import com.google.common.base.Preconditions;
66

77
public class KuduTableInfo extends TargetTableInfo {
88

0 commit comments

Comments
 (0)