Skip to content

Commit 6272205

Browse files
committed
[feat] impala支持动态分区及对kudu表不做分区处理
1 parent f1626c7 commit 6272205

File tree

5 files changed

+107
-48
lines changed

5 files changed

+107
-48
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaDialect.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,23 @@
1818

1919
package com.dtstack.flink.sql.sink.impala;
2020

21+
import com.dtstack.flink.sql.sink.impala.table.ImpalaTableInfo;
2122
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
2223
import com.google.common.collect.Lists;
2324
import org.apache.commons.collections.CollectionUtils;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627

27-
import java.util.*;
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import java.util.Objects;
31+
import java.util.Optional;
2832
import java.util.stream.Collectors;
2933

3034
/**
3135
* Date: 2020/1/17
3236
* Company: www.dtstack.com
37+
*
3338
* @author maqi
3439
*/
3540
public class ImpalaDialect implements JDBCDialect {
@@ -41,9 +46,14 @@ public class ImpalaDialect implements JDBCDialect {
4146

4247
private List<String> primaryKeys;
4348

44-
public ImpalaDialect(TypeInformation[] fieldTypes, List<String> primaryKeys){
49+
private String storeType;
50+
51+
public ImpalaDialect(TypeInformation[] fieldTypes,
52+
List<String> primaryKeys,
53+
String storeType) {
4554
this.fieldTypes = fieldTypes;
4655
this.primaryKeys = primaryKeys;
56+
this.storeType = storeType;
4757
}
4858

4959
@Override
@@ -65,7 +75,7 @@ public String quoteIdentifier(String identifier) {
6575
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
6676
//跳过primary key字段
6777
String setClause = Arrays.stream(fieldNames)
68-
.filter(f -> CollectionUtils.isNotEmpty(primaryKeys) ? !primaryKeys.contains(f) : true)
78+
.filter(f -> !CollectionUtils.isNotEmpty(primaryKeys) || !primaryKeys.contains(f))
6979
.map(f -> quoteIdentifier(f) + "=?")
7080
.collect(Collectors.joining(", "));
7181
String conditionClause = Arrays.stream(conditionFields)
@@ -83,14 +93,18 @@ public String getInsertIntoStatement(String schema, String tableName, String[] f
8393

8494
List<String> partitionFieldsList = Objects.isNull(partitionFields) ? Lists.newArrayList() : Arrays.asList(partitionFields);
8595

96+
if (storeType.equalsIgnoreCase(ImpalaTableInfo.KUDU_TYPE)) {
97+
return buildKuduInsertSql(schemaInfo, tableName, fieldNames, fieldTypes);
98+
}
99+
86100
String columns = Arrays.stream(fieldNames)
87101
.filter(f -> !partitionFieldsList.contains(f))
88102
.map(this::quoteIdentifier)
89103
.collect(Collectors.joining(", "));
90104

91105
String placeholders = Arrays.stream(fieldTypes)
92106
.map(f -> {
93-
if(String.class.getName().equals(f.getTypeClass().getName())){
107+
if (String.class.getName().equals(f.getTypeClass().getName())) {
94108
return "cast( ? as string)";
95109
}
96110
return "?";
@@ -106,4 +120,20 @@ public String getInsertIntoStatement(String schema, String tableName, String[] f
106120
return "INSERT INTO " + schemaInfo + quoteIdentifier(tableName) +
107121
"(" + columns + ")" + partitionStatement + " VALUES (" + placeholders + ")";
108122
}
123+
124+
private String buildKuduInsertSql(String schemaInfo, String tableName, String[] fieldNames, TypeInformation[] fieldTypes) { // kudu表的Insert语句
125+
String columns = Arrays.stream(fieldNames)
126+
.map(this::quoteIdentifier)
127+
.collect(Collectors.joining(", "));
128+
String placeholders = Arrays.stream(fieldTypes)
129+
.map(f -> {
130+
if (String.class.getName().equals(f.getTypeClass().getName())) {
131+
return "cast( ? as string)";
132+
}
133+
return "?";
134+
})
135+
.collect(Collectors.joining(", "));
136+
return "INSERT INTO " + schemaInfo + quoteIdentifier(tableName) +
137+
"(" + columns + ")" + " VALUES (" + placeholders + ")";
138+
}
109139
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.apache.hadoop.security.UserGroupInformation;
77

88
import java.io.IOException;
9-
import java.security.PrivilegedAction;
109
import java.security.PrivilegedExceptionAction;
1110
import java.util.List;
1211

@@ -24,28 +23,28 @@ public class ImpalaOutputFormat extends JDBCUpsertOutputFormat {
2423
private Integer authMech;
2524

2625
public ImpalaOutputFormat(
27-
JDBCOptions options,
28-
String[] fieldNames,
29-
String[] keyFields,
30-
String[] partitionFields,
31-
int[] fieldTypes,
32-
int flushMaxSize,
33-
long flushIntervalMills,
34-
boolean allReplace,
35-
String updateMode,
36-
Integer authMech,
37-
String keytabPath,
38-
String krb5confPath,
39-
String principal) {
26+
JDBCOptions options,
27+
String[] fieldNames,
28+
String[] keyFields,
29+
String[] partitionFields,
30+
int[] fieldTypes,
31+
int flushMaxSize,
32+
long flushIntervalMills,
33+
boolean allReplace,
34+
String updateMode,
35+
Integer authMech,
36+
String keytabPath,
37+
String krb5confPath,
38+
String principal) {
4039
super(options,
41-
fieldNames,
42-
keyFields,
43-
partitionFields,
44-
fieldTypes,
45-
flushMaxSize,
46-
flushIntervalMills,
47-
allReplace,
48-
updateMode);
40+
fieldNames,
41+
keyFields,
42+
partitionFields,
43+
fieldTypes,
44+
flushMaxSize,
45+
flushIntervalMills,
46+
allReplace,
47+
updateMode);
4948
this.authMech = authMech;
5049
this.keytabPath = keytabPath;
5150
this.krb5confPath = krb5confPath;
@@ -71,11 +70,11 @@ public Void run() throws IOException {
7170
super.open(taskNumber, numTasks);
7271
}
7372
}
74-
73+
7574
public static Builder impalaBuilder() {
7675
return new Builder();
7776
}
78-
77+
7978
public static class Builder {
8079
private Integer authMech;
8180
private String keytabPath;
@@ -178,24 +177,24 @@ public Builder setPrincipal(String principal) {
178177
this.principal = principal;
179178
return this;
180179
}
181-
180+
182181
public ImpalaOutputFormat build() {
183182
checkNotNull(options, "No options supplied.");
184183
checkNotNull(fieldNames, "No fieldNames supplied.");
185184
return new ImpalaOutputFormat(
186-
options,
187-
fieldNames,
188-
keyFields,
189-
partitionFields,
190-
fieldTypes,
191-
flushMaxSize,
192-
flushIntervalMills,
193-
allReplace,
194-
updateMode,
195-
authMech,
196-
keytabPath,
197-
krb5confPath,
198-
principal);
185+
options,
186+
fieldNames,
187+
keyFields,
188+
partitionFields,
189+
fieldTypes,
190+
flushMaxSize,
191+
flushIntervalMills,
192+
allReplace,
193+
updateMode,
194+
authMech,
195+
keytabPath,
196+
krb5confPath,
197+
principal);
199198
}
200199
}
201200
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,15 @@ public ImpalaSink() {
4141

4242
@Override
4343
public ImpalaOutputFormat getOutputFormat() {
44+
ImpalaDialect impalaDialect = new ImpalaDialect(
45+
getFieldTypes(),
46+
primaryKeys,
47+
impalaTableInfo.getStoreType()
48+
);
49+
4450
JDBCOptions jdbcOptions = JDBCOptions.builder()
4551
.setDbUrl(getImpalaJdbcUrl())
46-
.setDialect(new ImpalaDialect(getFieldTypes(), primaryKeys))
52+
.setDialect(impalaDialect)
4753
.setUsername(userName)
4854
.setPassword(password)
4955
.setTableName(tableName)

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/table/ImpalaSinkParser.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5959
Integer authMech = MathUtil.getIntegerVal(props.get(ImpalaTableInfo.AUTHMECH_KEY.toLowerCase()));
6060
authMech = authMech == null ? 0 : authMech;
6161
impalaTableInfo.setAuthMech(authMech);
62-
List authMechs = Arrays.asList(new Integer[]{0, 1, 2, 3});
62+
List<Integer> authMechs = Arrays.asList(0, 1, 2, 3);
6363

6464
if (!authMechs.contains(authMech)) {
6565
throw new IllegalArgumentException("The value of authMech is illegal, Please select 0, 1, 2, 3");
@@ -79,20 +79,26 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7979
impalaTableInfo.setPassword(MathUtil.getString(props.get(ImpalaTableInfo.PASSWORD_KEY.toLowerCase())));
8080
}
8181

82+
String storeType = MathUtil.getString(props.get(ImpalaTableInfo.STORE_TYPE_KEY.toLowerCase()));
83+
impalaTableInfo.setStoreType(storeType);
84+
8285
String enablePartitionStr = (String) props.get(ImpalaTableInfo.ENABLEPARITION_KEY.toLowerCase());
8386
boolean enablePartition = MathUtil.getBoolean(enablePartitionStr == null ? "false" : enablePartitionStr);
8487
impalaTableInfo.setEnablePartition(enablePartition);
85-
if (enablePartition) {
88+
89+
if (!storeType.equalsIgnoreCase(ImpalaTableInfo.KUDU_TYPE) && enablePartition) {
8690
String partitionFields = MathUtil.getString(props.get(ImpalaTableInfo.PARTITIONFIELDS_KEY.toLowerCase()));
8791
impalaTableInfo.setPartitionFields(partitionFields);
8892
}
8993

94+
impalaTableInfo.setType(CURR_TYPE);
95+
9096
impalaTableInfo.check();
9197
return impalaTableInfo;
9298
}
9399

94100
@Override
95-
public Class dbTypeConvertToJavaType(String fieldType) {
101+
public Class dbTypeConvertToJavaType(String fieldType) {
96102
switch (fieldType.toLowerCase()) {
97103
case "boolean":
98104
return Boolean.class;

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/table/ImpalaTableInfo.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public class ImpalaTableInfo extends RdbTableInfo {
5050

5151
public static final String PARTITIONFIELDS_KEY = "partitionFields";
5252

53+
public static final String STORE_TYPE_KEY = "storeType";
54+
55+
public static final String KUDU_TYPE = "kudu";
56+
5357
private static final String CURR_TYPE = "impala";
5458

5559
private static final String PARTITION_FIELD_SPLIT_REGEX = ",";
@@ -72,6 +76,11 @@ public class ImpalaTableInfo extends RdbTableInfo {
7276

7377
private String[] partitionFields;
7478

79+
/**
80+
* If the type of storage is kudu, the logic of partition is ignore
81+
*/
82+
private String storeType;
83+
7584
public ImpalaTableInfo() {
7685
setType(CURR_TYPE);
7786
}
@@ -148,11 +157,20 @@ public void setPartitionFields(String partitionFields) {
148157
this.partitionFields = StringUtils.split(partitionFields, PARTITION_FIELD_SPLIT_REGEX);
149158
}
150159

160+
public void setStoreType(String storeType) {
161+
this.storeType = storeType;
162+
}
163+
164+
public String getStoreType() {
165+
return storeType;
166+
}
167+
151168
@Override
152169
public boolean check() {
153170
Preconditions.checkNotNull(this.getUrl(), "impala field of url is required");
154171
Preconditions.checkNotNull(this.getTableName(), "impala field of tableName is required");
155172
Preconditions.checkNotNull(this.getAuthMech(), "impala field of authMech is required");
173+
Preconditions.checkNotNull(this.getStoreType(), "impala field of storeType is required");
156174
Integer authMech = getAuthMech();
157175

158176
if (authMech == 1) {
@@ -163,12 +181,12 @@ public boolean check() {
163181
Preconditions.checkNotNull(this.getKrbServiceName(), "impala field of krbServiceName is required");
164182
} else if (authMech == 2) {
165183
Preconditions.checkNotNull(this.getUserName(), "impala field of userName is required");
166-
}else if (authMech == 3) {
184+
} else if (authMech == 3) {
167185
Preconditions.checkNotNull(this.getUserName(), "impala field of userName is required");
168186
Preconditions.checkNotNull(this.getPassword(), "impala field of password is required");
169187
}
170188

171-
if (isEnablePartition()) {
189+
if (!this.getStoreType().equalsIgnoreCase(KUDU_TYPE) && isEnablePartition()) {
172190
Preconditions.checkArgument(this.getPartitionFields().length > 0, "impala field of partitionFields is required");
173191
}
174192

0 commit comments

Comments
 (0)