Skip to content

Commit e0745d8

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_27373' into 1.8_test_3.10.x
2 parents cd4573c + 199c16e commit e0745d8

File tree

4 files changed

+51
-9
lines changed

4 files changed

+51
-9
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaDialect.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020

2121
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
2222
import com.google.common.collect.Lists;
23+
import org.apache.commons.collections.CollectionUtils;
2324
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
2526

26-
import java.util.Arrays;
27-
import java.util.List;
28-
import java.util.Objects;
29-
import java.util.Optional;
27+
import java.util.*;
3028
import java.util.stream.Collectors;
3129

3230
/**
@@ -41,8 +39,11 @@ public class ImpalaDialect implements JDBCDialect {
4139

4240
private TypeInformation[] fieldTypes;
4341

44-
public ImpalaDialect(TypeInformation[] fieldTypes){
42+
private List<String> primaryKeys;
43+
44+
public ImpalaDialect(TypeInformation[] fieldTypes, List<String> primaryKeys){
4545
this.fieldTypes = fieldTypes;
46+
this.primaryKeys = primaryKeys;
4647
}
4748

4849
@Override
@@ -62,7 +63,17 @@ public String quoteIdentifier(String identifier) {
6263

6364
@Override
6465
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
65-
throw new RuntimeException("impala does not support update sql, please remove primary key or use append mode");
66+
//跳过primary key字段
67+
String setClause = Arrays.stream(fieldNames)
68+
.filter(f -> CollectionUtils.isNotEmpty(primaryKeys) ? !primaryKeys.contains(f) : true)
69+
.map(f -> quoteIdentifier(f) + "=?")
70+
.collect(Collectors.joining(", "));
71+
String conditionClause = Arrays.stream(conditionFields)
72+
.map(f -> quoteIdentifier(f) + "=?")
73+
.collect(Collectors.joining(" AND "));
74+
return "UPDATE " + quoteIdentifier(tableName) +
75+
" SET " + setClause +
76+
" WHERE " + conditionClause;
6677
}
6778

6879
@Override

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public ImpalaSink() {
4848
public JDBCUpsertOutputFormat getOutputFormat() {
4949
JDBCOptions jdbcOptions = JDBCOptions.builder()
5050
.setDbUrl(getImpalaJdbcUrl())
51-
.setDialect(new ImpalaDialect(getFieldTypes()))
51+
.setDialect(new ImpalaDialect(getFieldTypes(), primaryKeys))
5252
.setUsername(userName)
5353
.setPassword(password)
5454
.setTableName(tableName)

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb;
2020

21+
import com.google.common.collect.Lists;
2122
import org.apache.flink.types.Row;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
@@ -51,6 +52,10 @@ public class JDBCTypeConvertUtils {
5152
* @see PreparedStatement
5253
*/
5354
public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, Row row) throws SQLException {
55+
setRecordToStatement(upload, typesArray, row, null);
56+
}
57+
58+
public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, Row row, int[] pkFieldIndices) throws SQLException {
5459
if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
5560
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
5661
}
@@ -62,10 +67,34 @@ public static void setRecordToStatement(PreparedStatement upload, int[] typesArr
6267
}
6368
} else {
6469
// types provided
70+
int placeIndex = 0;
6571
for (int i = 0; i < row.getArity(); i++) {
66-
setField(upload, typesArray[i], row.getField(i), i);
72+
if(isPrimaryKeyField(pkFieldIndices, i)){
73+
continue;
74+
}
75+
setField(upload, typesArray[i], row.getField(i), placeIndex ++);
76+
}
77+
78+
//填充whereClause中的主键占位符
79+
if (pkFieldIndices != null && pkFieldIndices.length > 0) {
80+
for (int j = 0; j < pkFieldIndices.length; j++) {
81+
int pkIndex = pkFieldIndices[j];
82+
setField(upload, typesArray[pkIndex], row.getField(pkIndex), placeIndex + j);
83+
}
84+
}
85+
}
86+
}
87+
88+
private static boolean isPrimaryKeyField(int[] pkFieldIndices, int fieldIndex) {
89+
if (pkFieldIndices == null || pkFieldIndices.length <= 0) {
90+
return false;
91+
}
92+
for (int index : pkFieldIndices) {
93+
if (index == fieldIndex) {
94+
return true;
6795
}
6896
}
97+
return false;
6998
}
7099

71100
public static void setField(PreparedStatement upload, int type, Object field, int index) throws SQLException {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AbstractUpsertWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ private static final class UpsertWriterUsingInsertUpdateStatement extends Abstra
268268
private final String existSql;
269269
private final String insertSql;
270270
private final String updateSql;
271+
private final int[] pkFields;
271272

272273
private transient PreparedStatement existStatement;
273274
private transient PreparedStatement insertStatement;
@@ -287,6 +288,7 @@ private UpsertWriterUsingInsertUpdateStatement(
287288
this.existSql = existSql;
288289
this.insertSql = insertSql;
289290
this.updateSql = updateSql;
291+
this.pkFields = pkFields;
290292
}
291293

292294
@Override
@@ -310,7 +312,7 @@ void processOneRowInBatch(Row pk, Row row) throws SQLException {
310312
resultSet.close();
311313
if (exist) {
312314
// do update
313-
setRecordToStatement(updateStatement, fieldTypes, row);
315+
setRecordToStatement(updateStatement, fieldTypes, row, pkFields);
314316
updateStatement.addBatch();
315317
} else {
316318
// do insert

0 commit comments

Comments
 (0)