Skip to content

Commit d29b2cf

Browse files
committed
Merge branch 'feat_1.10_4.0.x_impalaKudu' into '1.10_test_4.0.x'
1.10 4.0.x impala kudu See merge request dt-insight-engine/flinkStreamSQL!144
2 parents 9c67123 + 6272205 commit d29b2cf

File tree

11 files changed

+157
-79
lines changed

11 files changed

+157
-79
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ lib/
1414
.DS_Store
1515
bin/nohup.out
1616
.DS_Store
17-
bin/sideSql.txt
17+
bin/sideSql.txt
18+
*.keytab
19+
krb5.conf

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ private Set<String> extractSelectFieldFromJoinCondition(Set<Tuple2<String, Strin
490490
}
491491

492492
if(tableRef.containsKey(field.f0)){
493-
if(fromTableNameSet.contains(tableRef.get(field.f0))){
493+
if(checkContainIterationTableName(fromTableNameSet, field.f0, tableRef)){
494494
extractFieldList.add(tableRef.get(field.f0) + "." + field.f1);
495495
}
496496
}
@@ -499,6 +499,20 @@ private Set<String> extractSelectFieldFromJoinCondition(Set<Tuple2<String, Strin
499499
return extractFieldList;
500500
}
501501

502+
private boolean checkContainIterationTableName(Set<String> fromTableNameSet, String checkTableName, Map<String, String> mappingTableName) {
503+
for (int i = 0; i < mappingTableName.size() + 1; i++) {
504+
if (fromTableNameSet.contains(checkTableName)) {
505+
return true;
506+
}
507+
508+
checkTableName = mappingTableName.get(checkTableName);
509+
if (checkTableName == null) {
510+
return false;
511+
}
512+
}
513+
return true;
514+
}
515+
502516
private Set<String> extractFieldFromGroupByList(SqlNodeList parentGroupByList,
503517
Set<String> fromTableNameSet,
504518
Map<String, String> tableRef){

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.collect.Lists;
2727
import com.google.common.collect.Maps;
2828
import org.apache.commons.lang3.StringUtils;
29+
import org.apache.flink.api.java.tuple.Tuple2;
2930

3031
import java.util.List;
3132
import java.util.Map;
@@ -49,6 +50,8 @@ public abstract class AbstractTableParser {
4950
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+?)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5051
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5152
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
53+
private static Pattern typePattern = Pattern.compile("(\\S+)\\s+(\\w+.*)");
54+
5255

5356
private Map<String, Pattern> patternMap = Maps.newHashMap();
5457

@@ -95,23 +98,16 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
9598
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
9699
}
97100

98-
String[] fieldInfoArr = fieldRow.split("\\s+");
99-
100-
String errorMsg = String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow);
101-
Preconditions.checkState(fieldInfoArr.length >= 2, errorMsg);
102-
103101
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
104102
if (isMatcherKey) {
105103
continue;
106104
}
107105

108-
//Compatible situation may arise in space in the fieldName
109-
String[] filedNameArr = new String[fieldInfoArr.length - 1];
110-
System.arraycopy(fieldInfoArr, 0, filedNameArr, 0, fieldInfoArr.length - 1);
111-
String fieldName = String.join(" ", filedNameArr);
112-
String fieldType = fieldInfoArr[fieldInfoArr.length - 1 ].trim();
106+
Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName());
107+
String fieldName = t.f0;
108+
String fieldType = t.f1;
113109

114-
Class fieldClass = null;
110+
Class fieldClass;
115111
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
116112

117113
Matcher matcher = charTypePattern.matcher(fieldType);
@@ -123,7 +119,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
123119
fieldClass = dbTypeConvertToJavaType(fieldType);
124120
}
125121

126-
tableInfo.addPhysicalMappings(fieldInfoArr[0], fieldInfoArr[0]);
122+
tableInfo.addPhysicalMappings(fieldName, fieldName);
127123
tableInfo.addField(fieldName);
128124
tableInfo.addFieldClass(fieldClass);
129125
tableInfo.addFieldType(fieldType);
@@ -133,11 +129,23 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
133129
tableInfo.finish();
134130
}
135131

132+
private Tuple2<String, String> extractType(String fieldRow, String tableName) {
133+
Matcher matcher = typePattern.matcher(fieldRow);
134+
if (matcher.matches()) {
135+
String fieldName = matcher.group(1);
136+
String fieldType = matcher.group(2);
137+
return Tuple2.of(fieldName, fieldType);
138+
} else {
139+
String errorMsg = String.format("table [%s] field [%s] format error.", tableName, fieldRow);
140+
throw new RuntimeException(errorMsg);
141+
}
142+
}
143+
136144
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
137145
String primaryFields = matcher.group(1).trim();
138-
String[] splitArry = primaryFields.split(",");
139-
List<String> primaryKes = Lists.newArrayList(splitArry);
140-
tableInfo.setPrimaryKeys(primaryKes);
146+
String[] splitArray = primaryFields.split(",");
147+
List<String> primaryKeys = Lists.newArrayList(splitArray);
148+
tableInfo.setPrimaryKeys(primaryKeys);
141149
}
142150

143151
/**

docs/plugin/hbaseSink.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ CREATE TABLE MyResult(
77
type ='hbase',
88
zookeeperQuorum ='ip:port[,ip:port]',
99
tableName ='tableName',
10-
rowKey ='colName[,colName]',
10+
rowKey ='colName[+colName]',
1111
parallelism ='1',
1212
zookeeperParent ='/hbase'
1313
)
@@ -34,7 +34,7 @@ hbase2.0
3434
|zookeeperQuorum | hbase zk地址,多个直接用逗号隔开|||
3535
|zookeeperParent | zkParent 路径|||
3636
|tableName | 关联的hbase表名称|||
37-
|rowkey | hbase的rowkey关联的列信息,多个值以逗号隔开|||
37+
|rowkey | hbase的rowkey关联的列信息'+'多个值以逗号隔开|||
3838
|updateMode|APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新|否|APPEND|
3939
|parallelism | 并行度设置||1|
4040
|kerberosAuthEnable | 是否开启kerberos认证||false|
@@ -76,7 +76,7 @@ CREATE TABLE MyResult(
7676
tableName ='myresult',
7777
partitionedJoin ='false',
7878
parallelism ='1',
79-
rowKey='name,channel'
79+
rowKey='name+channel'
8080
);
8181
8282
insert
@@ -141,7 +141,7 @@ into
141141

142142
## 6.hbase数据
143143
### 数据内容说明
144-
hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'-'连接
144+
hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'+'连接
145145
### 数据内容示例
146146
hbase(main):007:0> scan 'myresult'
147147
ROW COLUMN+CELL

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,14 +215,13 @@ private Put getPutByRow(Row record) {
215215
Put put = new Put(rowKey.getBytes());
216216
for (int i = 0; i < record.getArity(); ++i) {
217217
Object fieldVal = record.getField(i);
218-
byte[] val = null;
219218
if (fieldVal != null) {
220-
val = fieldVal.toString().getBytes();
221-
}
222-
byte[] cf = families[i].getBytes();
223-
byte[] qualifier = qualifiers[i].getBytes();
219+
byte[] val = fieldVal.toString().getBytes();
220+
byte[] cf = families[i].getBytes();
221+
byte[] qualifier = qualifiers[i].getBytes();
224222

225-
put.addColumn(cf, qualifier, val);
223+
put.addColumn(cf, qualifier, val);
224+
}
226225
}
227226
return put;
228227
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/RowKeyBuilder.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,6 @@ public static String[] splitIgnoreQuotaBrackets(String str, String delimiter){
110110
public ReplaceInfo getReplaceInfo(String field){
111111

112112
field = field.trim();
113-
if(field.length() <= 0){
114-
throw new RuntimeException(field + " \n" +
115-
"Format defined exceptions");
116-
}
117113

118114
//判断是不是常量==>''包裹的标识
119115
if(field.startsWith("'") && field.endsWith("'")){

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaDialect.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,23 @@
1818

1919
package com.dtstack.flink.sql.sink.impala;
2020

21+
import com.dtstack.flink.sql.sink.impala.table.ImpalaTableInfo;
2122
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
2223
import com.google.common.collect.Lists;
2324
import org.apache.commons.collections.CollectionUtils;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627

27-
import java.util.*;
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import java.util.Objects;
31+
import java.util.Optional;
2832
import java.util.stream.Collectors;
2933

3034
/**
3135
* Date: 2020/1/17
3236
* Company: www.dtstack.com
37+
*
3338
* @author maqi
3439
*/
3540
public class ImpalaDialect implements JDBCDialect {
@@ -41,9 +46,14 @@ public class ImpalaDialect implements JDBCDialect {
4146

4247
private List<String> primaryKeys;
4348

44-
public ImpalaDialect(TypeInformation[] fieldTypes, List<String> primaryKeys){
49+
private String storeType;
50+
51+
public ImpalaDialect(TypeInformation[] fieldTypes,
52+
List<String> primaryKeys,
53+
String storeType) {
4554
this.fieldTypes = fieldTypes;
4655
this.primaryKeys = primaryKeys;
56+
this.storeType = storeType;
4757
}
4858

4959
@Override
@@ -65,7 +75,7 @@ public String quoteIdentifier(String identifier) {
6575
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
6676
//跳过primary key字段
6777
String setClause = Arrays.stream(fieldNames)
68-
.filter(f -> CollectionUtils.isNotEmpty(primaryKeys) ? !primaryKeys.contains(f) : true)
78+
.filter(f -> !CollectionUtils.isNotEmpty(primaryKeys) || !primaryKeys.contains(f))
6979
.map(f -> quoteIdentifier(f) + "=?")
7080
.collect(Collectors.joining(", "));
7181
String conditionClause = Arrays.stream(conditionFields)
@@ -83,14 +93,18 @@ public String getInsertIntoStatement(String schema, String tableName, String[] f
8393

8494
List<String> partitionFieldsList = Objects.isNull(partitionFields) ? Lists.newArrayList() : Arrays.asList(partitionFields);
8595

96+
if (storeType.equalsIgnoreCase(ImpalaTableInfo.KUDU_TYPE)) {
97+
return buildKuduInsertSql(schemaInfo, tableName, fieldNames, fieldTypes);
98+
}
99+
86100
String columns = Arrays.stream(fieldNames)
87101
.filter(f -> !partitionFieldsList.contains(f))
88102
.map(this::quoteIdentifier)
89103
.collect(Collectors.joining(", "));
90104

91105
String placeholders = Arrays.stream(fieldTypes)
92106
.map(f -> {
93-
if(String.class.getName().equals(f.getTypeClass().getName())){
107+
if (String.class.getName().equals(f.getTypeClass().getName())) {
94108
return "cast( ? as string)";
95109
}
96110
return "?";
@@ -106,4 +120,20 @@ public String getInsertIntoStatement(String schema, String tableName, String[] f
106120
return "INSERT INTO " + schemaInfo + quoteIdentifier(tableName) +
107121
"(" + columns + ")" + partitionStatement + " VALUES (" + placeholders + ")";
108122
}
123+
124+
private String buildKuduInsertSql(String schemaInfo, String tableName, String[] fieldNames, TypeInformation[] fieldTypes) { // kudu表的Insert语句
125+
String columns = Arrays.stream(fieldNames)
126+
.map(this::quoteIdentifier)
127+
.collect(Collectors.joining(", "));
128+
String placeholders = Arrays.stream(fieldTypes)
129+
.map(f -> {
130+
if (String.class.getName().equals(f.getTypeClass().getName())) {
131+
return "cast( ? as string)";
132+
}
133+
return "?";
134+
})
135+
.collect(Collectors.joining(", "));
136+
return "INSERT INTO " + schemaInfo + quoteIdentifier(tableName) +
137+
"(" + columns + ")" + " VALUES (" + placeholders + ")";
138+
}
109139
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.apache.hadoop.security.UserGroupInformation;
77

88
import java.io.IOException;
9-
import java.security.PrivilegedAction;
109
import java.security.PrivilegedExceptionAction;
1110
import java.util.List;
1211

@@ -24,28 +23,28 @@ public class ImpalaOutputFormat extends JDBCUpsertOutputFormat {
2423
private Integer authMech;
2524

2625
public ImpalaOutputFormat(
27-
JDBCOptions options,
28-
String[] fieldNames,
29-
String[] keyFields,
30-
String[] partitionFields,
31-
int[] fieldTypes,
32-
int flushMaxSize,
33-
long flushIntervalMills,
34-
boolean allReplace,
35-
String updateMode,
36-
Integer authMech,
37-
String keytabPath,
38-
String krb5confPath,
39-
String principal) {
26+
JDBCOptions options,
27+
String[] fieldNames,
28+
String[] keyFields,
29+
String[] partitionFields,
30+
int[] fieldTypes,
31+
int flushMaxSize,
32+
long flushIntervalMills,
33+
boolean allReplace,
34+
String updateMode,
35+
Integer authMech,
36+
String keytabPath,
37+
String krb5confPath,
38+
String principal) {
4039
super(options,
41-
fieldNames,
42-
keyFields,
43-
partitionFields,
44-
fieldTypes,
45-
flushMaxSize,
46-
flushIntervalMills,
47-
allReplace,
48-
updateMode);
40+
fieldNames,
41+
keyFields,
42+
partitionFields,
43+
fieldTypes,
44+
flushMaxSize,
45+
flushIntervalMills,
46+
allReplace,
47+
updateMode);
4948
this.authMech = authMech;
5049
this.keytabPath = keytabPath;
5150
this.krb5confPath = krb5confPath;
@@ -71,11 +70,11 @@ public Void run() throws IOException {
7170
super.open(taskNumber, numTasks);
7271
}
7372
}
74-
73+
7574
public static Builder impalaBuilder() {
7675
return new Builder();
7776
}
78-
77+
7978
public static class Builder {
8079
private Integer authMech;
8180
private String keytabPath;
@@ -178,24 +177,24 @@ public Builder setPrincipal(String principal) {
178177
this.principal = principal;
179178
return this;
180179
}
181-
180+
182181
public ImpalaOutputFormat build() {
183182
checkNotNull(options, "No options supplied.");
184183
checkNotNull(fieldNames, "No fieldNames supplied.");
185184
return new ImpalaOutputFormat(
186-
options,
187-
fieldNames,
188-
keyFields,
189-
partitionFields,
190-
fieldTypes,
191-
flushMaxSize,
192-
flushIntervalMills,
193-
allReplace,
194-
updateMode,
195-
authMech,
196-
keytabPath,
197-
krb5confPath,
198-
principal);
185+
options,
186+
fieldNames,
187+
keyFields,
188+
partitionFields,
189+
fieldTypes,
190+
flushMaxSize,
191+
flushIntervalMills,
192+
allReplace,
193+
updateMode,
194+
authMech,
195+
keytabPath,
196+
krb5confPath,
197+
principal);
199198
}
200199
}
201200
}

0 commit comments

Comments
 (0)