Skip to content

Commit 03e0a70

Browse files
committed
impala type to java type
1 parent eef733c commit 03e0a70

File tree

5 files changed

+84
-21
lines changed

5 files changed

+84
-21
lines changed

core/pom.xml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,6 @@
116116
<version>${flink.version}</version>
117117
</dependency>
118118

119-
<!-- test for predicate -->
120-
<dependency>
121-
<groupId>com.dtstack.flink</groupId>
122-
<artifactId>sql.side.rdb</artifactId>
123-
<version>1.0-SNAPSHOT</version>
124-
<scope>test</scope>
125-
</dependency>
126119

127120
</dependencies>
128121

core/src/test/java/com/dtstack/flink/sql/side/SidePredicatesParserTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package com.dtstack.flink.sql.side;
2020

21-
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2221
import org.apache.calcite.sql.parser.SqlParseException;
2322
import org.junit.Assert;
2423
import org.junit.Test;
@@ -55,7 +54,13 @@ public void testfillPredicatesForSideTable() throws SqlParseException {
5554
" MyTable.a='1' and s.d='1' and s.d <> '3' and s.c LIKE '%xx%' and s.c in ('1','2') and s.c between '10' and '23' and s.d is not null\n";
5655

5756

58-
SideTableInfo sideTableInfo = new RdbSideTableInfo();
57+
SideTableInfo sideTableInfo = new SideTableInfo(){
58+
@Override
59+
public boolean check() {
60+
return false;
61+
}
62+
};
63+
5964
sideTableInfo.setName("sideTable");
6065

6166
Map<String, SideTableInfo> sideTableMap = new HashMap<>();

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.SideInfo;
2423
import com.dtstack.flink.sql.side.SideTableInfo;
2524
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2625
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
27-
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2826
import io.vertx.core.Vertx;
2927
import io.vertx.core.VertxOptions;
3028
import io.vertx.core.json.JsonObject;

impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@
2323
import com.dtstack.flink.sql.util.MathUtil;
2424
import com.fasterxml.jackson.databind.ObjectMapper;
2525

26-
import java.io.IOException;
27-
import java.util.*;
26+
import java.math.BigDecimal;
27+
import java.sql.Date;
28+
import java.sql.Timestamp;
29+
import java.util.Arrays;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
2834

2935
/**
3036
* Reason:
@@ -110,8 +116,36 @@ public Map setPartitionFieldValues(String partitionfieldValuesStr){
110116
e.printStackTrace();
111117
throw new RuntimeException(e);
112118
}
119+
}
113120

121+
@Override
122+
public Class dbTypeConvertToJavaType(String fieldType) {
123+
switch (fieldType.toLowerCase()) {
124+
case "boolean":
125+
return Boolean.class;
126+
case "char":
127+
return Character.class;
128+
case "double":
129+
return Double.class;
130+
case "float":
131+
return Float.class;
132+
case "tinyint":
133+
return Byte.class;
134+
case "smallint":
135+
return Short.class;
136+
case "int":
137+
return Integer.class;
138+
case "bigint":
139+
return Long.class;
140+
case "decimal":
141+
return BigDecimal.class;
142+
case "string":
143+
case "varchar":
144+
return String.class;
145+
case "timestamp":
146+
return Timestamp.class;
147+
}
114148

115-
149+
throw new RuntimeException("不支持 " + fieldType + " 类型");
116150
}
117151
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/table/ImpalaSinkParser.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
2424

25+
import java.math.BigDecimal;
26+
import java.sql.Timestamp;
2527
import java.util.ArrayList;
2628
import java.util.Arrays;
2729
import java.util.List;
@@ -55,37 +57,68 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
5557
impalaTableInfo.setSchema(MathUtil.getString(props.get(ImpalaTableInfo.SCHEMA_KEY.toLowerCase())));
5658

5759
Integer authMech = MathUtil.getIntegerVal(props.get(ImpalaTableInfo.AUTHMECH_KEY.toLowerCase()));
58-
authMech = authMech == null? 0 : authMech;
60+
authMech = authMech == null ? 0 : authMech;
5961
impalaTableInfo.setAuthMech(authMech);
6062
List authMechs = Arrays.asList(new Integer[]{0, 1, 2, 3});
6163

62-
if (!authMechs.contains(authMech) ){
64+
if (!authMechs.contains(authMech)) {
6365
throw new IllegalArgumentException("The value of authMech is illegal, Please select 0, 1, 2, 3");
6466
} else if (authMech == 1) {
6567
impalaTableInfo.setPrincipal(MathUtil.getString(props.get(ImpalaTableInfo.PRINCIPAL_KEY.toLowerCase())));
6668
impalaTableInfo.setKeyTabFilePath(MathUtil.getString(props.get(ImpalaTableInfo.KEYTABFILEPATH_KEY.toLowerCase())));
6769
impalaTableInfo.setKrb5FilePath(MathUtil.getString(props.get(ImpalaTableInfo.KRB5FILEPATH_KEY.toLowerCase())));
6870
String krbRealm = MathUtil.getString(props.get(ImpalaTableInfo.KRBREALM_KEY.toLowerCase()));
69-
krbRealm = krbRealm == null? "HADOOP.COM" : krbRealm;
71+
krbRealm = krbRealm == null ? "HADOOP.COM" : krbRealm;
7072
impalaTableInfo.setKrbRealm(krbRealm);
7173
impalaTableInfo.setKrbHostFQDN(MathUtil.getString(props.get(impalaTableInfo.KRBHOSTFQDN_KEY.toLowerCase())));
7274
impalaTableInfo.setKrbServiceName(MathUtil.getString(props.get(impalaTableInfo.KRBSERVICENAME_KEY.toLowerCase())));
73-
} else if (authMech == 2 ) {
75+
} else if (authMech == 2) {
7476
impalaTableInfo.setUserName(MathUtil.getString(props.get(ImpalaTableInfo.USER_NAME_KEY.toLowerCase())));
7577
} else if (authMech == 3) {
7678
impalaTableInfo.setUserName(MathUtil.getString(props.get(ImpalaTableInfo.USER_NAME_KEY.toLowerCase())));
7779
impalaTableInfo.setPassword(MathUtil.getString(props.get(ImpalaTableInfo.PASSWORD_KEY.toLowerCase())));
7880
}
7981

80-
String enablePartitionStr = (String) props.get(ImpalaTableInfo.ENABLEPARITION_KEY.toLowerCase());
81-
boolean enablePartition = MathUtil.getBoolean(enablePartitionStr == null? "false":enablePartitionStr);
82+
String enablePartitionStr = (String) props.get(ImpalaTableInfo.ENABLEPARITION_KEY.toLowerCase());
83+
boolean enablePartition = MathUtil.getBoolean(enablePartitionStr == null ? "false" : enablePartitionStr);
8284
impalaTableInfo.setEnablePartition(enablePartition);
83-
if(enablePartition){
85+
if (enablePartition) {
8486
String partitionFields = MathUtil.getString(props.get(ImpalaTableInfo.PARTITIONFIELDS_KEY.toLowerCase()));
8587
impalaTableInfo.setPartitionFields(partitionFields);
8688
}
8789

8890
impalaTableInfo.check();
8991
return impalaTableInfo;
9092
}
93+
94+
@Override
95+
public Class dbTypeConvertToJavaType(String fieldType) {
96+
switch (fieldType.toLowerCase()) {
97+
case "boolean":
98+
return Boolean.class;
99+
case "char":
100+
return Character.class;
101+
case "double":
102+
return Double.class;
103+
case "float":
104+
return Float.class;
105+
case "tinyint":
106+
return Byte.class;
107+
case "smallint":
108+
return Short.class;
109+
case "int":
110+
return Integer.class;
111+
case "bigint":
112+
return Long.class;
113+
case "decimal":
114+
return BigDecimal.class;
115+
case "string":
116+
case "varchar":
117+
return String.class;
118+
case "timestamp":
119+
return Timestamp.class;
120+
}
121+
122+
throw new RuntimeException("不支持 " + fieldType + " 类型");
123+
}
91124
}

0 commit comments

Comments
 (0)