Skip to content

Commit ab54fc5

Browse files
committed
kafka array analysis
1 parent fa0a242 commit ab54fc5

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class AbstractTableParser {
4646
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4747

4848
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
49-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
49+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5050
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5151
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5252

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.source.IStreamSourceGener;
2222
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
2323
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
24+
import com.dtstack.flink.sql.util.DataTypeUtils;
2425
import com.dtstack.flink.sql.util.DtStringUtil;
2526
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
@@ -72,10 +73,18 @@ protected String generateOperatorName(String tabName, String topicName) {
7273
}
7374

7475
protected TypeInformation<Row> getRowTypeInformation(KafkaSourceTableInfo kafkaSourceTableInfo) {
76+
String[] fieldTypes = kafkaSourceTableInfo.getFieldTypes();
7577
Class<?>[] fieldClasses = kafkaSourceTableInfo.getFieldClasses();
76-
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
77-
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
78-
.toArray(TypeInformation[]::new);
78+
TypeInformation[] types =
79+
IntStream.range(0, fieldClasses.length)
80+
.mapToObj(i -> {
81+
if (fieldClasses[i].isArray()) {
82+
return DataTypeUtils.convertToArray(fieldTypes[i]);
83+
}
84+
return TypeInformation.of(fieldClasses[i]);
85+
})
86+
.toArray(TypeInformation[]::new);
87+
7988

8089
return new RowTypeInfo(types, kafkaSourceTableInfo.getFields());
8190
}

0 commit comments

Comments
 (0)