Skip to content

Commit f64ebfe

Browse files
committed
[fix] update mode error
1 parent 7660530 commit f64ebfe

File tree

1 file changed

+41
-42
lines changed

1 file changed

+41
-42
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 41 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
9898
private int batchCount = 0;
9999

100100
// |------------------------------------------------|
101-
// | partitionCondition | Array of row data |
101+
// | partitionCondition |Array of valueCondition|
102102
// |------------------------------------------------|
103103
// | ptOne, ptTwo, ptThree | [(v1, v2, v3, v4, v5)]| DP
104104
// |------------------------------------------------|
@@ -110,8 +110,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
110110
// |------------------------------------------------|
111111
private transient Map<String, ArrayList<String>> rowDataMap;
112112

113-
private final String templateSql = "INSERT INTO tableName ${tableFieldsCondition} PARTITION ${partitionCondition} VALUES ${valuesCondition}";
114-
115113
protected String keytabPath;
116114
protected String krb5confPath;
117115
protected String principal;
@@ -140,7 +138,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
140138
// INSERT INTO tableName(name, id) PARTITION(age) VALUES(?, ?, ?)
141139
// 那么实际executeSql设置字段的顺序应该为(name, id, age),同时,字段对应的type顺序也需要重组
142140
private List<String> valueFieldNames;
143-
private transient final AbstractDtRichOutputFormat<?> metricOutputFormat = this;
141+
private transient AbstractDtRichOutputFormat<?> metricOutputFormat;
144142
private List<Tuple2<String, String>> rowDataList;
145143
private List<Row> rows;
146144

@@ -156,6 +154,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
156154
rowDataList = new ArrayList<>();
157155
rowDataMap = new HashMap<>();
158156
rows = new ArrayList<>();
157+
metricOutputFormat = this;
159158
openConnect();
160159
initScheduledTask(batchWaitInterval);
161160
init();
@@ -175,6 +174,7 @@ private void init() {
175174

176175
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
177176
updateStatement = connection.prepareStatement(buildUpdateSql(schema, tableName, fieldNames, primaryKeys));
177+
return;
178178
}
179179

180180
valueFieldNames = rebuildFieldNameListAndTypeList(fieldNames, staticPartitionFields, fieldTypes, partitionFields);
@@ -231,49 +231,48 @@ private void openJdbc() {
231231
}
232232

233233
private void flush() throws SQLException {
234-
if (!rowDataList.isEmpty() && batchCount > 0) {
235-
rowDataList.forEach(rowDataTuple2 -> putRowIntoMap(rowDataMap, rowDataTuple2));
236-
executeBatchSql(
237-
statement,
238-
templateSql,
239-
schema,
240-
tableName,
241-
storeType,
242-
enablePartition,
243-
valueFieldNames,
244-
partitionFields,
245-
rowDataMap
246-
);
247-
rowDataList.clear();
248-
rowDataMap.clear();
249-
batchCount = 0;
234+
if (batchCount > 0) {
235+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
236+
executeUpdateBatch();
237+
}
238+
if (!rowDataList.isEmpty()) {
239+
String templateSql =
240+
"INSERT INTO tableName ${tableFieldsCondition} PARTITION ${partitionCondition} VALUES ${valuesCondition}";
241+
rowDataList.forEach(rowDataTuple2 -> putRowIntoMap(rowDataMap, rowDataTuple2));
242+
executeBatchSql(
243+
statement,
244+
templateSql,
245+
schema,
246+
tableName,
247+
storeType,
248+
enablePartition,
249+
valueFieldNames,
250+
partitionFields,
251+
rowDataMap
252+
);
253+
rowDataList.clear();
254+
rowDataMap.clear();
255+
}
250256
}
257+
batchCount = 0;
258+
251259
}
252260

253-
private void executeBatch() throws SQLException {
261+
/**
262+
* execute batch update statement
263+
*
264+
* @throws SQLException throw sql exception
265+
*/
266+
private void executeUpdateBatch() throws SQLException {
254267
try {
255268
rows.forEach(row -> {
256269
try {
257-
Map<String, Object> valueMap = new HashMap<>();
258-
259-
for (int i = 0; i < row.getArity(); i++) {
260-
valueMap.put(fieldNames.get(i), row.getField(i));
261-
}
262-
// 根据字段名对 row data 重组, 比如,原始 row data : (1, xxx, 20) -> (id, name, age)
263-
// 但是由于 partition,写入的field 顺序变成了 (name, id, age),则需要对 row data 重组变成 (xxx, 1, 20)
264-
Row rowValue = new Row(fieldTypes.size());
265-
for (int i = 0; i < fieldTypes.size(); i++) {
266-
rowValue.setField(i, valueMap.get(valueFieldNames.get(i)));
267-
}
268-
269-
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
270-
JDBCTypeConvertUtils.setRecordToStatement(
271-
updateStatement,
272-
JDBCTypeConvertUtils.getSqlTypeFromFieldType(fieldTypes),
273-
rowValue,
274-
primaryKeys.stream().mapToInt(fieldNames::indexOf).toArray()
275-
);
276-
}
270+
JDBCTypeConvertUtils.setRecordToStatement(
271+
updateStatement,
272+
JDBCTypeConvertUtils.getSqlTypeFromFieldType(fieldTypes),
273+
row,
274+
primaryKeys.stream().mapToInt(fieldNames::indexOf).toArray()
275+
);
277276
updateStatement.addBatch();
278277
} catch (Exception e) {
279278
throw new RuntimeException("impala jdbc execute batch error!", e);
@@ -447,7 +446,7 @@ public void close() throws IOException {
447446
* @param fieldNames field name list
448447
* @param partitionFields partition fields
449448
* @param rowDataMap row data map
450-
* @throws SQLException
449+
* @throws SQLException throw sql exception
451450
*/
452451
private void executeBatchSql(Statement statement,
453452
String tempSql,

0 commit comments

Comments
 (0)