Skip to content

Commit e0f2726

Browse files
committed
[fix-31383][hbase]hbase dim table just filter stream data, need not select field
1 parent 22499cc commit e0f2726

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,33 @@ public HbaseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldIn
4444
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4545
}
4646

47+
@Override
48+
public void parseSelectFields(JoinInfo joinInfo) {
49+
String sideTableName = joinInfo.getSideTableName();
50+
String nonSideTableName = joinInfo.getNonSideTable();
51+
List<String> fields = Lists.newArrayList();
52+
int sideTableFieldIndex = 0;
53+
54+
for( int i=0; i<outFieldInfoList.size(); i++){
55+
FieldInfo fieldInfo = outFieldInfoList.get(i);
56+
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
57+
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
58+
fields.add(sideFieldName);
59+
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
60+
sideFieldIndex.put(i, sideTableFieldIndex);
61+
sideFieldNameIndex.put(i, sideFieldName);
62+
sideTableFieldIndex++;
63+
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
64+
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
65+
inFieldIndex.put(i, nonSideIndex);
66+
}else{
67+
throw new RuntimeException("unknown table " + fieldInfo.getTable());
68+
}
69+
}
70+
71+
sideSelectFields = String.join(",", fields);
72+
}
73+
4774
@Override
4875
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
4976
rowKeyBuilder = new RowKeyBuilder();

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncSideInfo.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,33 @@ public HbaseAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
3333
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3434
}
3535

36+
@Override
37+
public void parseSelectFields(JoinInfo joinInfo) {
38+
String sideTableName = joinInfo.getSideTableName();
39+
String nonSideTableName = joinInfo.getNonSideTable();
40+
List<String> fields = Lists.newArrayList();
41+
int sideTableFieldIndex = 0;
42+
43+
for( int i=0; i<outFieldInfoList.size(); i++){
44+
FieldInfo fieldInfo = outFieldInfoList.get(i);
45+
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
46+
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
47+
fields.add(sideFieldName);
48+
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
49+
sideFieldIndex.put(i, sideTableFieldIndex);
50+
sideFieldNameIndex.put(i, sideFieldName);
51+
sideTableFieldIndex++;
52+
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
53+
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
54+
inFieldIndex.put(i, nonSideIndex);
55+
}else{
56+
throw new RuntimeException("unknown table " + fieldInfo.getTable());
57+
}
58+
}
59+
60+
sideSelectFields = String.join(",", fields);
61+
}
62+
3663
@Override
3764
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
3865
rowKeyBuilder = new RowKeyBuilder();

0 commit comments

Comments
 (0)