Skip to content

Commit 1af2223

Browse files
committed
[fix] partition sql error
1 parent c132206 commit 1af2223

File tree

1 file changed

+26
-9
lines changed

1 file changed

+26
-9
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
110110
private final List<String> staticPartitionField = new ArrayList<>();
111111

112112
private String prepareStatementSql;
113+
private List<String> newFieldNames;
113114

114115
private transient ScheduledExecutorService scheduler;
115116
private transient ScheduledFuture<?> scheduledFuture;
@@ -231,7 +232,6 @@ private void flush() throws SQLException {
231232
*/
232233
@Override
233234
public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
234-
LOG.info("Receive data : {}", record);
235235
try {
236236
if (!record.f0) {
237237
return;
@@ -258,13 +258,18 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
258258
buildStaticPartitionCondition(valueMap, staticPartitionField))
259259
);
260260

261+
// 根据字段名对 row data 重组, 比如,原始 row data : (1, xxx, 20) -> (id, name, age)
262+
// 但是由于 partition,写入的field 顺序变成了 (name, id, age),则需要对 row data 重组变成 (xxx, 1, 20)
261263
Row rowValue = new Row(fieldTypeList.size());
262264
for (int i = 0; i < fieldTypeList.size(); i++) {
263-
rowValue.setField(i, copyRow.getField(i));
265+
rowValue.setField(i, valueMap.get(newFieldNames.get(i)));
264266
}
265267

266-
setRowToStatement(statement, fieldTypeList, rowValue, Objects.isNull(primaryKeys) ?
267-
null : primaryKeys.stream().mapToInt(fieldList::indexOf).toArray());
268+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
269+
setRowToStatement(statement, fieldTypeList, rowValue, primaryKeys.stream().mapToInt(fieldList::indexOf).toArray());
270+
} else {
271+
setRowToStatement(statement, fieldTypeList, rowValue, null);
272+
}
268273

269274
statement.addBatch();
270275

@@ -355,18 +360,19 @@ private String buildStaticPartitionCondition(Map<String, Object> rowData, List<S
355360
* @return INSERT INTO tableName(field1, field2) PARTITION($partitionCondition) VALUES (?, ?)
356361
*/
357362
private String buildStaticInsertSql(String schema, String tableName, List<String> fieldNames, List<String> fieldTypes, String partitionFields) {
358-
List<String> copyFieldNames = new ArrayList<>(fieldNames);
363+
newFieldNames = new ArrayList<>(fieldNames);
359364
for (int i = fieldNames.size() - 1; i >= 0; i--) {
360365
if (partitionFields.contains(fieldNames.get(i))) {
361-
copyFieldNames.remove(i);
366+
newFieldNames.remove(i);
362367
fieldTypes.remove(i);
363368
}
364369
}
365370

366-
String columns = copyFieldNames.stream()
371+
String columns = newFieldNames.stream()
367372
.map(this::quoteIdentifier)
368373
.collect(Collectors.joining(", "));
369374

375+
// PARTITION($partition)
370376
String partitionCondition = PARTITION_CONSTANT + "(" + PARTITION_CONDITION + ")";
371377

372378
return "INSERT INTO " + (Objects.isNull(schema) ? "" : quoteIdentifier(schema) + ".") + quoteIdentifier(tableName) +
@@ -378,9 +384,20 @@ private String buildStaticInsertSql(String schema, String tableName, List<String
378384
*
379385
* @return INSERT INTO tableName(field1, field2) PARTITION(pt) VALUES (?, ?)
380386
*/
381-
private String buildDynamicInsertSql(String schema, String tableName, List<String> fieldName, List<String> fieldTypes, String partitionFields) {
387+
private String buildDynamicInsertSql(String schema, String tableName, List<String> fieldNames, List<String> fieldTypes, String partitionFields) {
388+
// newFieldNames -> 重组之后的fieldNames,为了重组row data字段值对应
389+
// 需要对partition字段做特殊处理,比如原来的字段顺序为(age, name, id),但是因为partition,写入的SQL为
390+
// INSERT INTO tableName(name, id) PARTITION(age) VALUES(?, ?, ?)
391+
// 那么实际prepareStatement设置字段的顺序应该为(name, id, age),同时,字段对应的type顺序也需要重组
392+
newFieldNames = new ArrayList<>(fieldNames);
393+
for (int i = fieldNames.size() - 1; i >= 0; i--) {
394+
if (partitionFields.contains(fieldNames.get(i))) {
395+
newFieldNames.add(newFieldNames.remove(i));
396+
fieldTypes.add(fieldTypes.remove(i));
397+
}
398+
}
382399

383-
String columns = fieldName.stream()
400+
String columns = fieldNames.stream()
384401
.filter(f -> !partitionFields.contains(f))
385402
.map(this::quoteIdentifier)
386403
.collect(Collectors.joining(", "));

0 commit comments

Comments
 (0)