Skip to content

Commit 396b5b2

Browse files
committed
Merge branch 'feat_1.10_4.0.x_impalaKudu' into '1.10_test_4.0.x'
Feat 1.10 4.0.x impala kudu See merge request dt-insight-engine/flinkStreamSQL!151
2 parents 5482ff1 + 6c88bca commit 396b5b2

File tree

1 file changed

+12
-22
lines changed

1 file changed

+12
-22
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ private void executeUpdateBatch() throws SQLException {
289289
LOG.debug("impala jdbc execute batch error ", e);
290290
connection.rollback();
291291
connection.commit();
292-
cleanBatchWhenError();
292+
updateStatement.clearBatch();
293293
executeUpdate(connection);
294294
}
295295
}
@@ -317,10 +317,6 @@ public void executeUpdate(Connection connection) {
317317
rows.clear();
318318
}
319319

320-
private void cleanBatchWhenError() throws SQLException {
321-
updateStatement.clearBatch();
322-
}
323-
324320
private void putRowIntoMap(Map<String, ArrayList<String>> rowDataMap, Tuple2<String, String> rowData) {
325321
Set<String> keySet = rowDataMap.keySet();
326322
ArrayList<String> tempRowArray;
@@ -361,6 +357,7 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
361357
* Quote a specific type of value, like string, timestamp
362358
* before: 1, cast(tiezhu as string), cast(2001-01-09 01:05:01 as timestamp), cast(123 as int)
363359
* after: 1, cast('tiezhu' as string), cast('2001-01-09 01:05:01' as timestamp), cast(123 as int)
360+
* if cast value is null, then cast(null as type)
364361
*
365362
* @param valueCondition original value condition
366363
* @return quoted condition
@@ -374,13 +371,16 @@ private String valueConditionAddQuotation(String valueCondition) {
374371
while (matcher.find()) {
375372
String value = matcher.group(1);
376373
String type = matcher.group(2);
374+
377375
if (Arrays.asList(NEED_QUOTE_TYPE).contains(type)) {
378-
valueConditionCopy[0] = valueConditionCopy[0].replace(value, "'" + value + "'");
376+
if (!"null".equals(value)) {
377+
valueConditionCopy[0] = valueConditionCopy[0].replace(value, "'" + value + "'");
378+
}
379379
}
380380
}
381381
}
382382
);
383-
return valueConditionCopy[0];
383+
return "(" + valueConditionCopy[0] + ")";
384384
}
385385

386386
@Override
@@ -514,7 +514,6 @@ private synchronized void executeBatchSql(Statement statement,
514514
.replace(PARTITION_CONSTANT, "")
515515
.replace(TABLE_FIELDS_CONDITION, tableFieldsCondition);
516516
String substring = executeSql.substring(0, executeSql.length() - 2);
517-
LOG.info("current execute sql: {}", substring);
518517
statement.execute(substring);
519518
return;
520519
}
@@ -530,7 +529,6 @@ private synchronized void executeBatchSql(Statement statement,
530529
executeSql = executeSql.replace(PARTITION_CONDITION, partitionCondition.toString())
531530
.replace(TABLE_FIELDS_CONDITION, tableFieldsCondition)
532531
.replace(VALUES_CONDITION, String.join(", ", valuesConditionList));
533-
LOG.info("current execute sql: {}", executeSql);
534532
statement.execute(executeSql);
535533
partitionCondition.delete(0, partitionCondition.length());
536534
} catch (SQLException sqlException) {
@@ -580,24 +578,16 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
580578
* @return condition like '(?, ?, cast(? as string))' and '?' will be replaced with row data
581579
*/
582580
private String buildValuesCondition(List<String> fieldTypes, Row row) {
583-
String valuesCondition = "(" + fieldTypes.stream().map(
581+
String valuesCondition = fieldTypes.stream().map(
584582
f -> {
585-
switch (f.toUpperCase()) {
586-
case STRING_TYPE:
587-
return "cast(? as string)";
588-
case TIMESTAMP_TYPE:
589-
return "cast(? as timestamp)";
590-
default:
591-
return "?";
583+
if (Arrays.asList(NEED_QUOTE_TYPE).contains(f.toLowerCase())) {
584+
return String.format("cast(? as %s)", f.toLowerCase());
592585
}
593-
}).collect(Collectors.joining(", ")) + ")";
586+
return "?";
587+
}).collect(Collectors.joining(", "));
594588
for (int i = 0; i < row.getArity(); i++) {
595589
valuesCondition = valuesCondition.replaceFirst("\\?", Objects.isNull(row.getField(i)) ? "null" : row.getField(i).toString());
596590
}
597-
Matcher matcher = TYPE_PATTERN.matcher(valuesCondition);
598-
while (matcher.find()) {
599-
valuesCondition = valuesCondition.replace(matcher.group(1), "'" + matcher.group(1) + "'");
600-
}
601591
return valuesCondition;
602592
}
603593

0 commit comments

Comments
 (0)