Skip to content

Commit 1f93b38

Browse files
author
dapeng
committed
Merge remote-tracking branch 'origin/hotfix_1.8_3.10.x_25279' into 1.8_zy_3.10.x
2 parents c4a8144 + ee766b8 commit 1f93b38

File tree

3 files changed

+4
-2
lines changed

3 files changed

+4
-2
lines changed

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public HbaseAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldIn
8989

9090
@Override
9191
public void open(Configuration parameters) throws Exception {
92+
super.open(parameters);
9293
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
9394
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
9495
ExecutorService executorService =new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE,
@@ -147,7 +148,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
147148
return;
148149
} else if (ECacheContentType.SingleLine == val.getType()) {
149150
try {
150-
Row row = fillData(inputCopy.row(), val);
151+
Row row = fillData(inputCopy.row(), val.getContent());
151152
resultFuture.complete(Collections.singleton(new CRow(row, inputCopy.change())));
152153
} catch (Exception e) {
153154
dealFillDataError(resultFuture, e, inputCopy);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void asyncGetData(String tableName, String rowKeyStr, CRow input, ResultF
9595

9696
Row row = fillData(input.row(), sideVal);
9797
if(openCache){
98-
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
98+
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, sideVal));
9999
}
100100
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
101101
} catch (Exception e) {

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public HbaseSideParser() {
6161
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
6262
HbaseSideTableInfo hbaseTableInfo = new HbaseSideTableInfo();
6363
hbaseTableInfo.setName(tableName);
64+
parseCacheProp(hbaseTableInfo, props);
6465
parseFieldsInfo(fieldsInfo, hbaseTableInfo);
6566
hbaseTableInfo.setTableName((String) props.get(TABLE_NAME_KEY.toLowerCase()));
6667
hbaseTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase())));

0 commit comments

Comments
 (0)