Skip to content

Commit 76f133f

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_31988' into 1.10_release_4.0.x
2 parents 3c28f19 + 2b2c7b1 commit 76f133f

File tree

1 file changed

+46
-41
lines changed

1 file changed

+46
-41
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
162162
init();
163163
initMetric();
164164
} catch (Exception e) {
165-
throw new RemoteException("impala output format open error!", e);
165+
throw new RuntimeException("impala output format open error!", e);
166166
}
167167
}
168168

@@ -189,19 +189,23 @@ private void init() throws SQLException {
189189
}
190190

191191
private void initScheduledTask(Long batchWaitInterval) {
192-
if (batchWaitInterval != 0) {
193-
this.scheduler = new ScheduledThreadPoolExecutor(1,
194-
new DTThreadFactory("impala-upsert-output-format"));
195-
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
196-
synchronized (ImpalaOutputFormat.this) {
197-
try {
198-
flush();
199-
} catch (Exception e) {
200-
LOG.error("Writing records to impala jdbc failed.", e);
201-
throw new RuntimeException("Writing records to impala jdbc failed.", e);
192+
try {
193+
if (batchWaitInterval != 0) {
194+
this.scheduler = new ScheduledThreadPoolExecutor(1,
195+
new DTThreadFactory("impala-upsert-output-format"));
196+
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
197+
synchronized (ImpalaOutputFormat.this) {
198+
try {
199+
flush();
200+
} catch (Exception e) {
201+
LOG.error("Writing records to impala jdbc failed.", e);
202+
throw new RuntimeException("Writing records to impala jdbc failed.", e);
203+
}
202204
}
203-
}
204-
}, batchWaitInterval, batchWaitInterval, TimeUnit.MILLISECONDS);
205+
}, batchWaitInterval, batchWaitInterval, TimeUnit.MILLISECONDS);
206+
}
207+
} catch (Exception e) {
208+
throw new RuntimeException(e);
205209
}
206210
}
207211

@@ -235,7 +239,7 @@ private void openJdbc() {
235239
}
236240
}
237241

238-
private synchronized void flush() throws SQLException {
242+
private void flush() throws Exception {
239243
if (batchCount > 0) {
240244
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
241245
executeUpdateBatch();
@@ -387,7 +391,7 @@ private String valueConditionAddQuotation(String valueCondition) {
387391
}
388392

389393
@Override
390-
public synchronized void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
394+
public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
391395
try {
392396
if (!record.f0) {
393397
return;
@@ -433,7 +437,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> record) throws IOExcep
433437
// Receive data
434438
outRecords.inc();
435439
} catch (Exception e) {
436-
throw new RuntimeException("Writing records to impala failed.", e);
440+
throw new IOException("Writing records to impala failed.", e);
437441
}
438442
}
439443

@@ -469,7 +473,7 @@ public void close() throws IOException {
469473
updateStatement.close();
470474
}
471475
} catch (SQLException e) {
472-
throw new RemoteException("impala connection close failed!");
476+
throw new RuntimeException("impala connection close failed!", e);
473477
} finally {
474478
connection = null;
475479
statement = null;
@@ -489,17 +493,16 @@ public void close() throws IOException {
489493
* @param fieldNames field name list
490494
* @param partitionFields partition fields
491495
* @param rowDataMap row data map
492-
* @throws SQLException throw sql exception
493496
*/
494-
private synchronized void executeBatchSql(Statement statement,
495-
String tempSql,
496-
String schema,
497-
String tableName,
498-
String storeType,
499-
Boolean enablePartition,
500-
List<String> fieldNames,
501-
String partitionFields,
502-
Map<String, ArrayList<String>> rowDataMap) throws SQLException {
497+
private void executeBatchSql(Statement statement,
498+
String tempSql,
499+
String schema,
500+
String tableName,
501+
String storeType,
502+
Boolean enablePartition,
503+
List<String> fieldNames,
504+
String partitionFields,
505+
Map<String, ArrayList<String>> rowDataMap) {
503506
StringBuilder valuesCondition = new StringBuilder();
504507
StringBuilder partitionCondition = new StringBuilder();
505508
String tableFieldsCondition = buildTableFieldsCondition(fieldNames, partitionFields);
@@ -510,21 +513,25 @@ private synchronized void executeBatchSql(Statement statement,
510513

511514
// kudu ${partitionCondition} is null
512515
if (storeType.equalsIgnoreCase(KUDU_TYPE) || !enablePartition) {
513-
rowData = rowDataMap.get(NO_PARTITION);
514-
rowData.forEach(row -> valuesCondition.append(row).append(", "));
515-
String executeSql = tempSql.replace(VALUES_CONDITION, valuesCondition.toString())
516-
.replace(PARTITION_CONDITION, partitionCondition.toString())
517-
.replace(PARTITION_CONSTANT, "")
518-
.replace(TABLE_FIELDS_CONDITION, tableFieldsCondition);
519-
String substring = executeSql.substring(0, executeSql.length() - 2);
520-
statement.execute(substring);
516+
try {
517+
rowData = rowDataMap.get(NO_PARTITION);
518+
rowData.forEach(row -> valuesCondition.append(row).append(", "));
519+
String executeSql = tempSql.replace(VALUES_CONDITION, valuesCondition.toString())
520+
.replace(PARTITION_CONDITION, partitionCondition.toString())
521+
.replace(PARTITION_CONSTANT, "")
522+
.replace(TABLE_FIELDS_CONDITION, tableFieldsCondition);
523+
String substring = executeSql.substring(0, executeSql.length() - 2);
524+
statement.execute(substring);
525+
} catch (Exception e) {
526+
throw new RuntimeException("execute impala SQL error!", e);
527+
}
521528
return;
522529
}
523530

524531
// partition sql
525532
Set<String> keySet = rowDataMap.keySet();
526533
String finalTempSql = tempSql;
527-
keySet.forEach(key -> {
534+
for (String key : keySet) {
528535
try {
529536
String executeSql = String.copyValueOf(finalTempSql.toCharArray());
530537
ArrayList<String> valuesConditionList = rowDataMap.get(key);
@@ -535,9 +542,9 @@ private synchronized void executeBatchSql(Statement statement,
535542
statement.execute(executeSql);
536543
partitionCondition.delete(0, partitionCondition.length());
537544
} catch (SQLException sqlException) {
538-
throw new RuntimeException("execute impala partition SQL error! ", sqlException);
545+
throw new RuntimeException("execute impala SQL error! ", sqlException);
539546
}
540-
});
547+
}
541548
}
542549

543550
/**
@@ -583,7 +590,7 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
583590
private String buildValuesCondition(List<String> fieldTypes, Row row) {
584591
String valuesCondition = fieldTypes.stream().map(
585592
f -> {
586-
for(String item : NEED_QUOTE_TYPE) {
593+
for (String item : NEED_QUOTE_TYPE) {
587594
if (f.toLowerCase().contains(item)) {
588595
return String.format("cast(? as %s)", f.toLowerCase());
589596
}
@@ -754,7 +761,5 @@ public ImpalaOutputFormat build() {
754761

755762
return format;
756763
}
757-
758764
}
759-
760765
}

0 commit comments

Comments
 (0)