Skip to content

Commit ead6172

Browse files
committed
Merge branch 'v1.8.0_dev_absParser' into 'v1.8.0_dev'
V1.8.0 dev abs parser See merge request !169
2 parents ecc7b00 + ddb99c7 commit ead6172

File tree

5 files changed

+33
-42
lines changed

5 files changed

+33
-42
lines changed

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ public class CassandraSideParser extends AbsSideTableParser {
6868

6969
public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";
7070

71-
static {
72-
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
73-
keyHandlerMap.put(SIDE_SIGN_KEY, CassandraSideParser::dealSideSign);
71+
public CassandraSideParser() {
72+
addParserHandler(SIDE_SIGN_KEY, SIDE_TABLE_SIGN, this::dealSideSign);
7473
}
7574

7675
@Override
@@ -97,7 +96,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9796
return cassandraSideTableInfo;
9897
}
9998

100-
private static void dealSideSign(Matcher matcher, TableInfo tableInfo) {
99+
private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
101100
}
102101

103102
public Class dbTypeConvertToJavaType(String fieldType) {

core/src/main/java/com/dtstack/flink/sql/table/AbsSideTableParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@ public abstract class AbsSideTableParser extends AbsTableParser {
4040

4141
private final static Pattern SIDE_TABLE_SIGN = Pattern.compile("(?i)^PERIOD\\s+FOR\\s+SYSTEM_TIME$");
4242

43-
static {
44-
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
45-
keyHandlerMap.put(SIDE_SIGN_KEY, AbsSideTableParser::dealSideSign);
43+
public AbsSideTableParser() {
44+
addParserHandler(SIDE_SIGN_KEY, SIDE_TABLE_SIGN, this::dealSideSign);
4645
}
4746

48-
private static void dealSideSign(Matcher matcher, TableInfo tableInfo){
47+
private void dealSideSign(Matcher matcher, TableInfo tableInfo){
4948
//FIXME SIDE_TABLE_SIGN current just used as a sign for side table; and do nothing
5049
}
5150

core/src/main/java/com/dtstack/flink/sql/table/AbsSourceParser.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package com.dtstack.flink.sql.table;
2222

23-
import com.dtstack.flink.sql.util.ClassUtil;
2423
import com.dtstack.flink.sql.util.MathUtil;
2524

2625
import java.util.regex.Matcher;
@@ -46,26 +45,21 @@ public abstract class AbsSourceParser extends AbsTableParser {
4645
private static Pattern waterMarkKeyPattern = Pattern.compile("(?i)^\\s*WATERMARK\\s+FOR\\s+(\\S+)\\s+AS\\s+withOffset\\(\\s*(\\S+)\\s*,\\s*(\\d+)\\s*\\)$");
4746
private static Pattern notNullKeyPattern = Pattern.compile("(?i)^(\\w+)\\s+(\\w+)\\s+NOT\\s+NULL?$");
4847

49-
static {
50-
keyPatternMap.put(VIRTUAL_KEY, virtualFieldKeyPattern);
51-
keyPatternMap.put(WATERMARK_KEY, waterMarkKeyPattern);
52-
keyPatternMap.put(NOTNULL_KEY, notNullKeyPattern);
53-
keyPatternMap.put(NEST_JSON_FIELD_KEY, nestJsonFieldKeyPattern);
54-
55-
keyHandlerMap.put(VIRTUAL_KEY, AbsSourceParser::dealVirtualField);
56-
keyHandlerMap.put(WATERMARK_KEY, AbsSourceParser::dealWaterMark);
57-
keyHandlerMap.put(NOTNULL_KEY, AbsSourceParser::dealNotNull);
58-
keyHandlerMap.put(NEST_JSON_FIELD_KEY, AbsSourceParser::dealNestField);
48+
public AbsSourceParser() {
49+
addParserHandler(VIRTUAL_KEY, virtualFieldKeyPattern, this::dealVirtualField);
50+
addParserHandler(WATERMARK_KEY, waterMarkKeyPattern, this::dealWaterMark);
51+
addParserHandler(NOTNULL_KEY, notNullKeyPattern, this::dealNotNull);
52+
addParserHandler(NEST_JSON_FIELD_KEY, nestJsonFieldKeyPattern, this::dealNestField);
5953
}
6054

61-
static void dealVirtualField(Matcher matcher, TableInfo tableInfo){
55+
protected void dealVirtualField(Matcher matcher, TableInfo tableInfo){
6256
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
6357
String fieldName = matcher.group(2);
6458
String expression = matcher.group(1);
6559
sourceTableInfo.addVirtualField(fieldName, expression);
6660
}
6761

68-
static void dealWaterMark(Matcher matcher, TableInfo tableInfo){
62+
protected void dealWaterMark(Matcher matcher, TableInfo tableInfo){
6963
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
7064
String eventTimeField = matcher.group(1);
7165
//FIXME Temporarily resolve the second parameter row_time_field
@@ -74,10 +68,10 @@ static void dealWaterMark(Matcher matcher, TableInfo tableInfo){
7468
sourceTableInfo.setMaxOutOrderness(offset);
7569
}
7670

77-
static void dealNotNull(Matcher matcher, TableInfo tableInfo) {
71+
protected void dealNotNull(Matcher matcher, TableInfo tableInfo) {
7872
String fieldName = matcher.group(1);
7973
String fieldType = matcher.group(2);
80-
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
74+
Class fieldClass= dbTypeConvertToJavaType(fieldType);
8175
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
8276
fieldExtraInfo.setNotNull(true);
8377

@@ -93,11 +87,11 @@ static void dealNotNull(Matcher matcher, TableInfo tableInfo) {
9387
* @param matcher
9488
* @param tableInfo
9589
*/
96-
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
90+
protected void dealNestField(Matcher matcher, TableInfo tableInfo) {
9791
String physicalField = matcher.group(1);
9892
String fieldType = matcher.group(3);
9993
String mappingField = matcher.group(4);
100-
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
94+
Class fieldClass= dbTypeConvertToJavaType(fieldType);
10195
boolean notNull = matcher.group(5) != null;
10296
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
10397
fieldExtraInfo.setNotNull(notNull);

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.table;
2222

@@ -43,13 +43,12 @@ public abstract class AbsTableParser {
4343

4444
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
4545

46-
public static Map<String, Pattern> keyPatternMap = Maps.newHashMap();
46+
private Map<String, Pattern> patternMap = Maps.newHashMap();
4747

48-
public static Map<String, ITableFieldDealHandler> keyHandlerMap = Maps.newHashMap();
48+
private Map<String, ITableFieldDealHandler> handlerMap = Maps.newHashMap();
4949

50-
static {
51-
keyPatternMap.put(PRIMARY_KEY, primaryKeyPattern);
52-
keyHandlerMap.put(PRIMARY_KEY, AbsTableParser::dealPrimaryKey);
50+
public AbsTableParser() {
51+
addParserHandler(PRIMARY_KEY, primaryKeyPattern, this::dealPrimaryKey);
5352
}
5453

5554
protected boolean fieldNameNeedsUpperCase() {
@@ -59,12 +58,12 @@ protected boolean fieldNameNeedsUpperCase() {
5958
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception;
6059

6160
public boolean dealKeyPattern(String fieldRow, TableInfo tableInfo){
62-
for(Map.Entry<String, Pattern> keyPattern : keyPatternMap.entrySet()){
61+
for(Map.Entry<String, Pattern> keyPattern : patternMap.entrySet()){
6362
Pattern pattern = keyPattern.getValue();
6463
String key = keyPattern.getKey();
6564
Matcher matcher = pattern.matcher(fieldRow);
6665
if(matcher.find()){
67-
ITableFieldDealHandler handler = keyHandlerMap.get(key);
66+
ITableFieldDealHandler handler = handlerMap.get(key);
6867
if(handler == null){
6968
throw new RuntimeException("parse field [" + fieldRow + "] error.");
7069
}
@@ -110,7 +109,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
110109
tableInfo.finish();
111110
}
112111

113-
public static void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
112+
public void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
114113
String primaryFields = matcher.group(1).trim();
115114
String[] splitArry = primaryFields.split(",");
116115
List<String> primaryKes = Lists.newArrayList(splitArry);
@@ -121,4 +120,8 @@ public Class dbTypeConvertToJavaType(String fieldType) {
121120
return ClassUtil.stringConvertClass(fieldType);
122121
}
123122

123+
protected void addParserHandler(String parserName, Pattern pattern, ITableFieldDealHandler handler) {
124+
patternMap.put(parserName, pattern);
125+
handlerMap.put(parserName, handler);
126+
}
124127
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.dtstack.flink.sql.table.AbsSideTableParser;
2424
import com.dtstack.flink.sql.table.TableInfo;
25-
import com.dtstack.flink.sql.util.ClassUtil;
2625
import com.dtstack.flink.sql.util.MathUtil;
2726

2827
import java.util.Map;
@@ -54,13 +53,10 @@ public class HbaseSideParser extends AbsSideTableParser {
5453

5554
public static final String CACHE = "cache";
5655

57-
58-
static {
59-
keyPatternMap.put(FIELD_KEY, FIELD_PATTERN);
60-
keyHandlerMap.put(FIELD_KEY, HbaseSideParser::dealField);
56+
public HbaseSideParser() {
57+
addParserHandler(FIELD_KEY, FIELD_PATTERN, this::dealField);
6158
}
6259

63-
6460
@Override
6561
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
6662
HbaseSideTableInfo hbaseTableInfo = new HbaseSideTableInfo();
@@ -81,7 +77,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
8177
* @param matcher
8278
* @param tableInfo
8379
*/
84-
private static void dealField(Matcher matcher, TableInfo tableInfo){
80+
private void dealField(Matcher matcher, TableInfo tableInfo){
8581

8682
HbaseSideTableInfo sideTableInfo = (HbaseSideTableInfo) tableInfo;
8783
String filedDefineStr = matcher.group(1);
@@ -97,7 +93,7 @@ private static void dealField(Matcher matcher, TableInfo tableInfo){
9793
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
9894
String fieldName = String.join(" ", filedNameArr);
9995
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
100-
Class fieldClass = ClassUtil.stringConvertClass(filedInfoArr[1].trim());
96+
Class fieldClass = dbTypeConvertToJavaType(filedInfoArr[1].trim());
10197

10298
sideTableInfo.addColumnRealName(fieldName);
10399
sideTableInfo.addField(aliasStr);

0 commit comments

Comments
 (0)