Skip to content

Commit 6795386

Browse files
committed
[fix-33847][core][kafka] add array and map type in the kafka sink.
1 parent 42f99ab commit 6795386

File tree

7 files changed

+77
-31
lines changed

7 files changed

+77
-31
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,7 @@ public static Set<URL> registerTable(SqlTree sqlTree,
334334
} else if (tableInfo instanceof AbstractTargetTableInfo) {
335335

336336
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath, pluginLoadMode);
337-
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
338-
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
339-
337+
tableEnv.registerTableSink(tableInfo.getName(), tableSink);
340338
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
341339
pluginClassPathSets.add(sinkTablePathUrl);
342340
} else if (tableInfo instanceof AbstractSideTableInfo) {

core/src/main/java/com/dtstack/flink/sql/function/FunctionManager.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,4 @@ public static void registerAggregateUDF(String classPath, String funcName, Table
124124
}
125125
}
126126

127-
128-
public static TypeInformation[] transformTypes(Class[] fieldTypes) {
129-
TypeInformation[] types = new TypeInformation[fieldTypes.length];
130-
for (int i = 0; i < fieldTypes.length; i++) {
131-
types[i] = TypeInformation.of(fieldTypes[i]);
132-
}
133-
134-
return types;
135-
}
136127
}

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.sql.Date;
2626
import java.sql.Time;
2727
import java.sql.Timestamp;
28+
import java.util.HashMap;
29+
import java.util.Map;
2830

2931
/**
3032
* Reason: TODO ADD REASON(可选)
@@ -41,6 +43,10 @@ public static Class<?> stringConvertClass(String str) {
4143
if (lowerStr.startsWith("array")) {
4244
return Array.newInstance(Integer.class, 0).getClass();
4345
}
46+
if (lowerStr.startsWith("map")) {
47+
Map m = new HashMap();
48+
return m.getClass();
49+
}
4450

4551
switch (lowerStr) {
4652
case "boolean":

core/src/main/java/com/dtstack/flink/sql/util/DataTypeUtils.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.base.Preconditions;
2222
import com.google.common.collect.Lists;
23+
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.flink.api.common.typeinfo.TypeInformation;
2425
import org.apache.flink.api.java.tuple.Tuple2;
2526
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -30,6 +31,8 @@
3031
import java.util.regex.Matcher;
3132
import java.util.regex.Pattern;
3233

34+
import static org.apache.commons.lang3.StringUtils.split;
35+
3336
/**
3437
* @program: flink.sql
3538
* @author: wuren
@@ -39,28 +42,13 @@ public class DataTypeUtils {
3942

4043
private final static Pattern COMPOSITE_TYPE_PATTERN = Pattern.compile("(.+?)<(.+)>");
4144
private final static String ARRAY = "ARRAY";
45+
private final static String MAP = "MAP";
4246
private final static String ROW = "ROW";
4347
private final static char FIELD_DELIMITER = ',';
4448
private final static char TYPE_DELIMITER = ' ';
4549

4650
private DataTypeUtils() {}
4751

48-
/**
49-
* 现在只支持ARRAY类型后续可以加入 MAP等类型
50-
* @param compositeTypeString
51-
* @return
52-
*/
53-
public static TypeInformation convertToCompositeType(String compositeTypeString) {
54-
Matcher matcher = matchCompositeType(compositeTypeString);
55-
final String errorMsg = "type " + compositeTypeString + "is not support!";
56-
Preconditions.checkState(matcher.find(), errorMsg);
57-
58-
String normalizedType = normalizeType(matcher.group(1));
59-
Preconditions.checkState(ARRAY.equals(normalizedType), errorMsg);
60-
61-
return convertToArray(compositeTypeString);
62-
}
63-
6452
/**
6553
* 目前ARRAY里只支持ROW和其他基本类型
6654
* @param arrayTypeString
@@ -86,6 +74,30 @@ public static TypeInformation convertToArray(String arrayTypeString) {
8674
return Types.OBJECT_ARRAY(elementType);
8775
}
8876

77+
/**
78+
* 目前Map里只支持基本类型
79+
* @param mapTypeString
80+
* @return
81+
*/
82+
public static TypeInformation convertToMap(String mapTypeString) {
83+
Matcher matcher = matchCompositeType(mapTypeString);
84+
final String errorMsg = mapTypeString + "convert to map type error!";
85+
Preconditions.checkState(matcher.find(), errorMsg);
86+
87+
String normalizedType = normalizeType(matcher.group(1));
88+
Preconditions.checkState(MAP.equals(normalizedType), errorMsg);
89+
90+
String kvTypeString = matcher.group(2);
91+
String[] kvTypeStringList = StringUtils.split(kvTypeString, ",");
92+
final String mapTypeErrorMsg = "There can only be key and value two types in map declaration.";
93+
Preconditions.checkState(kvTypeStringList.length == 2, mapTypeErrorMsg);
94+
String keyTypeString = normalizeType(kvTypeStringList[0]);
95+
String valueTypeString = normalizeType(kvTypeStringList[1]);
96+
TypeInformation keyType = convertToAtomicType(keyTypeString);
97+
TypeInformation valueType = convertToAtomicType(valueTypeString);
98+
return Types.MAP(keyType, valueType);
99+
}
100+
89101
/**
90102
* 目前ROW里只支持基本类型
91103
* @param rowTypeString
@@ -104,6 +116,7 @@ public static RowTypeInfo convertToRow(String rowTypeString) {
104116
return new RowTypeInfo(info.f0, info.f1);
105117
}
106118

119+
107120
private static Tuple2<TypeInformation[], String[]> genFieldInfo(Iterable<String> fieldInfoStrs) {
108121
ArrayList<TypeInformation> types = Lists.newArrayList();
109122
ArrayList<String> fieldNames = Lists.newArrayList();

docs/plugin/kafkaSink.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,3 +221,29 @@ into
221221
from
222222
MyTable a
223223
```
224+
## MAP类型示例
225+
目前Kafka Sink支持Map类型
226+
```sql
227+
CREATE TABLE ods(
228+
id INT,
229+
name STRING
230+
) WITH (
231+
...
232+
);
233+
234+
CREATE TABLE dwd (
235+
id INT,
236+
dids MAP<STRING, INT>>
237+
) WITH (
238+
type ='kafka',
239+
bootstrapServers ='localhost:9092',
240+
offsetReset ='latest',
241+
groupId='wuren_foo',
242+
topic ='luna_foo',
243+
parallelism ='1'
244+
);
245+
246+
INSERT INTO dwd
247+
SELECT ods.id, MAP['foo', 1, 'bar', 2] AS dids
248+
FROM ods;
249+
```

docs/pluginsInfo.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
### 1 插件列表
22
#### 1.1 源表插件
33
* [kafka 源表插件](plugin/kafkaSource.md)
4-
* [kafka 结果表插件](plugin/kafkaSink.md)
54

65
#### 1.2 结果表插件
6+
* [kafka 结果表插件](plugin/kafkaSink.md)
77
* [elasticsearch 结果表插件](plugin/elasticsearchSink.md)
88
* [hbase 结果表插件](plugin/hbaseSink.md)
99
* [mysql 结果表插件](plugin/mysqlSink.md)

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaSink.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.enums.EUpdateMode;
2222
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2323
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
24+
import com.dtstack.flink.sql.util.DataTypeUtils;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,6 +39,7 @@
3839
import org.apache.flink.util.Preconditions;
3940
import org.apache.kafka.clients.consumer.ConsumerConfig;
4041

42+
import java.util.HashMap;
4143
import java.util.Optional;
4244
import java.util.Properties;
4345
import java.util.stream.IntStream;
@@ -77,11 +79,21 @@ protected Properties getKafkaProperties(KafkaSinkTableInfo KafkaSinkTableInfo) {
7779
}
7880
return props;
7981
}
80-
82+
// TODO Source有相同的方法日后可以合并
8183
protected TypeInformation[] getTypeInformations(KafkaSinkTableInfo kafka11SinkTableInfo) {
84+
String[] fieldTypes = kafka11SinkTableInfo.getFieldTypes();
8285
Class<?>[] fieldClasses = kafka11SinkTableInfo.getFieldClasses();
8386
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
84-
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
87+
.mapToObj(
88+
i -> {
89+
if (fieldClasses[i].isArray()) {
90+
return DataTypeUtils.convertToArray(fieldTypes[i]);
91+
}
92+
if (fieldClasses[i] == new HashMap().getClass()) {
93+
return DataTypeUtils.convertToMap(fieldTypes[i]);
94+
}
95+
return TypeInformation.of(fieldClasses[i]);
96+
})
8597
.toArray(TypeInformation[]::new);
8698
return types;
8799
}

0 commit comments

Comments
 (0)