Skip to content

Commit a831177

Browse files
author
dapeng
committed
维表all的类型
1 parent e463694 commit a831177

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.side.FieldInfo;
2626
import com.dtstack.flink.sql.side.JoinInfo;
2727
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
28+
import com.dtstack.flink.sql.side.hbase.utils.HbaseUtils;
2829
import org.apache.calcite.sql.JoinType;
2930
import org.apache.commons.collections.map.HashedMap;
3031
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -165,6 +166,7 @@ public void flatMap(CRow input, Collector<CRow> out) throws Exception {
165166

166167
private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLException {
167168
AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
169+
Map<String, String> colRefType = ((HbaseAllSideInfo)sideInfo).getColRefType();
168170
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
169171
Configuration conf = new Configuration();
170172
conf.set("hbase.zookeeper.quorum", hbaseSideTableInfo.getHost());
@@ -177,14 +179,12 @@ private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLExcep
177179
resultScanner = table.getScanner(new Scan());
178180
for (Result r : resultScanner) {
179181
Map<String, Object> kv = new HashedMap();
180-
for (Cell cell : r.listCells())
181-
{
182+
for (Cell cell : r.listCells()) {
182183
String family = Bytes.toString(CellUtil.cloneFamily(cell));
183184
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
184-
String value = Bytes.toString(CellUtil.cloneValue(cell));
185185
StringBuilder key = new StringBuilder();
186186
key.append(family).append(":").append(qualifier);
187-
187+
Object value = HbaseUtils.convertByte(CellUtil.cloneValue(cell), colRefType.get(key.toString()));
188188
kv.put(aliasNameInversion.get(key.toString()), value);
189189
}
190190
tmpCache.put(new String(r.getRow()), kv);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,22 @@
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.BaseSideInfo;
2626
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
27+
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
2728
import com.dtstack.flink.sql.util.ParseUtils;
29+
import com.google.common.collect.Maps;
2830
import org.apache.calcite.sql.SqlNode;
2931
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3032
import com.google.common.collect.Lists;
3133

3234
import java.util.List;
35+
import java.util.Map;
3336

3437
public class HbaseAllSideInfo extends BaseSideInfo {
3538

3639
private RowKeyBuilder rowKeyBuilder;
3740

41+
private Map<String, String> colRefType;
42+
3843
public HbaseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3944
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4045
}
@@ -48,6 +53,14 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
4853

4954
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0));
5055

56+
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
57+
colRefType = Maps.newHashMap();
58+
for(int i=0; i<hbaseSideTableInfo.getColumnRealNames().length; i++){
59+
String realColName = hbaseSideTableInfo.getColumnRealNames()[i];
60+
String colType = hbaseSideTableInfo.getFieldTypes()[i];
61+
colRefType.put(realColName, colType);
62+
}
63+
5164
String sideTableName = joinInfo.getSideTableName();
5265
SqlNode conditionNode = joinInfo.getCondition();
5366

@@ -67,4 +80,8 @@ public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
6780
this.rowKeyBuilder = rowKeyBuilder;
6881
}
6982

83+
public Map<String, String> getColRefType() {
84+
return colRefType;
85+
}
86+
7087
}

0 commit comments

Comments
 (0)