Skip to content

Commit b9f115f

Browse files
committed
解决kudu维表多主键过滤bug
1 parent e2bc06f commit b9f115f

File tree

1 file changed

+9
-11
lines changed
  • kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu

1 file changed

+9
-11
lines changed

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -229,14 +229,12 @@ private KuduScanner getConn(KuduSideTableInfo tableInfo) {
229229
table = client.openTable(tableName);
230230
}
231231
Schema schema = table.getSchema();
232-
LOG.info("connect kudu is successed!");
233232
KuduScanner.KuduScannerBuilder tokenBuilder = client.newScannerBuilder(table);
234233
return buildScanner(tokenBuilder, schema, tableInfo);
235-
} catch (
236-
Exception e) {
234+
} catch (Exception e) {
237235
LOG.error("connect kudu is error:" + e.getMessage());
236+
throw new RuntimeException(e);
238237
}
239-
return null;
240238
}
241239

242240

@@ -280,13 +278,15 @@ private KuduScanner buildScanner(KuduScanner.KuduScannerBuilder builder, Schema
280278
String[] primaryKey = splitString(primaryKeys);
281279
String[] lowerBounds = splitString(lowerBoundPrimaryKey);
282280
String[] upperBounds = splitString(upperBoundPrimaryKey);
281+
PartialRow lowerPartialRow = schema.newPartialRow();
282+
PartialRow upperPartialRow = schema.newPartialRow();
283283
for (int i = 0; i < primaryKey.length; i++) {
284284
Integer index = columnName.get(primaryKey[i]);
285-
if (null != index) {
286-
builder.lowerBound(primaryKeyRange(columnSchemas.get(index).getType(), primaryKey[i], lowerBounds[i], schema));
287-
builder.exclusiveUpperBound(primaryKeyRange(columnSchemas.get(index).getType(), primaryKey[i], upperBounds[i], schema));
288-
}
285+
primaryKeyRange(lowerPartialRow, columnSchemas.get(index).getType(), primaryKey[i], lowerBounds[i]);
286+
primaryKeyRange(upperPartialRow, columnSchemas.get(index).getType(), primaryKey[i], upperBounds[i]);
289287
}
288+
builder.lowerBound(lowerPartialRow);
289+
builder.exclusiveUpperBound(upperPartialRow);
290290
}
291291
List<String> projectColumns = Arrays.asList(sideFieldNames);
292292
return builder.setProjectedColumnNames(projectColumns).build();
@@ -296,8 +296,7 @@ private String[] splitString(String data) {
296296
return data.split(",");
297297
}
298298

299-
private PartialRow primaryKeyRange(Type type, String primaryKey, String value, Schema schema) {
300-
PartialRow partialRow = schema.newPartialRow();
299+
private void primaryKeyRange(PartialRow partialRow, Type type, String primaryKey, String value) {
301300
switch (type) {
302301
case STRING:
303302
partialRow.addString(primaryKey, value);
@@ -332,7 +331,6 @@ private PartialRow primaryKeyRange(Type type, String primaryKey, String value, S
332331
default:
333332
throw new IllegalArgumentException("Illegal var type: " + type);
334333
}
335-
return partialRow;
336334
}
337335

338336
private void setMapValue(Type type, Map<String, Object> oneRow, String sideFieldName, RowResult result) {

0 commit comments

Comments
 (0)