Skip to content

Commit c52ffa9

Browse files
author
dapeng
committed
hbase 解析列族的问题
1 parent abfb083 commit c52ffa9

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.table.AbstractTableInfo;
2727
import com.dtstack.flink.sql.util.DtStringUtil;
2828
import com.dtstack.flink.sql.util.MathUtil;
29+
import com.google.common.collect.Maps;
2930
import org.apache.commons.lang3.StringUtils;
3031

3132
import java.util.LinkedHashMap;
@@ -98,14 +99,23 @@ public void parseFieldsInfo(String fieldsInfo, HbaseTableInfo tableInfo){
9899
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
99100
Class fieldClass = dbTypeConvertToJavaType(fieldType);
100101
String[] columnFamily = StringUtils.split(fieldName.trim(), ":");
101-
columnFamilies.put(fieldName.trim(),columnFamily[1]);
102102
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
103103
tableInfo.addField(columnFamily[1]);
104104
tableInfo.addFieldClass(fieldClass);
105105
tableInfo.addFieldType(fieldType);
106106
tableInfo.addFieldExtraInfo(null);
107107
}
108-
tableInfo.setColumnNameFamily(columnFamilies);
108+
tableInfo.setColumnNameFamily(parseColumnFamily(tableInfo.getPhysicalFields()));
109109
tableInfo.finish();
110110
}
111+
112+
private Map<String, String> parseColumnFamily(Map<String, String> physicalFieldMap){
113+
Map<String, String> columnFamiles = Maps.newHashMap();
114+
physicalFieldMap.values().forEach(x -> {
115+
String[] columnFamily = StringUtils.split(x.trim(), ":");
116+
columnFamiles.put(x, columnFamily[1]);
117+
});
118+
119+
return columnFamiles;
120+
}
111121
}

0 commit comments

Comments
 (0)