Skip to content

Commit d8cf464

Browse files
committed
Merge branch 'feat_1.10_4.0.x_impalaKudu' into '1.10_test_4.0.x'
[fix] 未到batchWaitInterval结束任务,数据未写入问题 See merge request dt-insight-engine/flinkStreamSQL!149
2 parents f5cbef2 + d369012 commit d8cf464

File tree

1 file changed

+57
-46
lines changed

1 file changed

+57
-46
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 57 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
139139
// 那么实际executeSql设置字段的顺序应该为(name, id, age),同时,字段对应的type顺序也需要重组
140140
private List<String> valueFieldNames;
141141
private transient AbstractDtRichOutputFormat<?> metricOutputFormat;
142-
private List<Tuple2<String, String>> rowDataList;
143142
private List<Row> rows;
144143

145144
private transient ScheduledExecutorService scheduler;
@@ -151,36 +150,39 @@ public void configure(Configuration parameters) {
151150

152151
@Override
153152
public void open(int taskNumber, int numTasks) throws IOException {
154-
rowDataList = new ArrayList<>();
155-
rowDataMap = new HashMap<>();
156-
rows = new ArrayList<>();
157-
metricOutputFormat = this;
158-
openConnect();
159-
initScheduledTask(batchWaitInterval);
160-
init();
161-
initMetric();
153+
try {
154+
rowDataMap = new HashMap<>();
155+
rows = new ArrayList<>();
156+
metricOutputFormat = this;
157+
openConnect();
158+
initScheduledTask(batchWaitInterval);
159+
init();
160+
initMetric();
161+
} catch (Exception e) {
162+
throw new RemoteException("impala output format open error!", e);
163+
}
162164
}
163165

164-
private void init() {
165-
try {
166-
if (Objects.nonNull(partitionFields)) {
167-
// match ${field} from partitionFields
168-
Matcher matcher = STATIC_PARTITION_PATTERN.matcher(partitionFields);
169-
while (matcher.find()) {
170-
LOG.info("find static partition field: {}", matcher.group(1));
171-
staticPartitionFields.add(matcher.group(1));
172-
}
166+
private void init() throws SQLException {
167+
if (Objects.nonNull(partitionFields)) {
168+
// match ${field} from partitionFields
169+
Matcher matcher = STATIC_PARTITION_PATTERN.matcher(partitionFields);
170+
while (matcher.find()) {
171+
LOG.info("find static partition field: {}", matcher.group(1));
172+
staticPartitionFields.add(matcher.group(1));
173173
}
174+
}
174175

175-
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
176-
updateStatement = connection.prepareStatement(buildUpdateSql(schema, tableName, fieldNames, primaryKeys));
177-
return;
176+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
177+
if (!storeType.equalsIgnoreCase(KUDU_TYPE)) {
178+
throw new IllegalArgumentException("update mode not support for non-kudu table!");
178179
}
179180

180-
valueFieldNames = rebuildFieldNameListAndTypeList(fieldNames, staticPartitionFields, fieldTypes, partitionFields);
181-
} catch (Exception e) {
182-
throw new RuntimeException("init impala job error", e);
181+
updateStatement = connection.prepareStatement(buildUpdateSql(schema, tableName, fieldNames, primaryKeys));
182+
return;
183183
}
184+
185+
valueFieldNames = rebuildFieldNameListAndTypeList(fieldNames, staticPartitionFields, fieldTypes, partitionFields);
184186
}
185187

186188
private void initScheduledTask(Long batchWaitInterval) {
@@ -223,22 +225,21 @@ private void openJdbc() {
223225
JDBCUtils.forName(DRIVER_NAME, getClass().getClassLoader());
224226
try {
225227
connection = DriverManager.getConnection(dbUrl, userName, password);
226-
connection.setAutoCommit(false);
227228
statement = connection.createStatement();
229+
connection.setAutoCommit(false);
228230
} catch (SQLException sqlException) {
229231
throw new RuntimeException("get impala jdbc connection failed!", sqlException);
230232
}
231233
}
232234

233-
private void flush() throws SQLException {
235+
private synchronized void flush() throws SQLException {
234236
if (batchCount > 0) {
235237
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
236238
executeUpdateBatch();
237239
}
238-
if (!rowDataList.isEmpty()) {
240+
if (!rowDataMap.isEmpty()) {
239241
String templateSql =
240242
"INSERT INTO tableName ${tableFieldsCondition} PARTITION ${partitionCondition} VALUES ${valuesCondition}";
241-
rowDataList.forEach(rowDataTuple2 -> putRowIntoMap(rowDataMap, rowDataTuple2));
242243
executeBatchSql(
243244
statement,
244245
templateSql,
@@ -250,7 +251,6 @@ private void flush() throws SQLException {
250251
partitionFields,
251252
rowDataMap
252253
);
253-
rowDataList.clear();
254254
rowDataMap.clear();
255255
}
256256
}
@@ -354,7 +354,7 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
354354
}
355355

356356
@Override
357-
public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
357+
public synchronized void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
358358
try {
359359
if (!record.f0) {
360360
return;
@@ -388,7 +388,7 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
388388
rowValue.setField(i, valueMap.get(valueFieldNames.get(i)));
389389
}
390390
rowTuple2.f1 = buildValuesCondition(fieldTypes, rowValue);
391-
rowDataList.add(rowTuple2);
391+
putRowIntoMap(rowDataMap, rowTuple2);
392392
}
393393

394394
batchCount++;
@@ -409,11 +409,6 @@ public void close() throws IOException {
409409
if (closed) {
410410
return;
411411
}
412-
// cancel scheduled task
413-
if (this.scheduledFuture != null) {
414-
scheduledFuture.cancel(false);
415-
this.scheduler.shutdown();
416-
}
417412
// 将还未执行的SQL flush
418413
if (batchCount > 0) {
419414
try {
@@ -422,15 +417,30 @@ public void close() throws IOException {
422417
throw new RuntimeException("Writing records to impala failed.", e);
423418
}
424419
}
420+
// cancel scheduled task
421+
if (this.scheduledFuture != null) {
422+
scheduledFuture.cancel(false);
423+
this.scheduler.shutdown();
424+
}
425425
// close connection
426426
try {
427427
if (connection != null && connection.isValid(DEFAULT_CONN_TIME_OUT)) {
428428
connection.close();
429429
}
430+
431+
if (statement != null && !statement.isClosed()) {
432+
statement.close();
433+
}
434+
435+
if (updateStatement != null && !updateStatement.isClosed()) {
436+
updateStatement.close();
437+
}
430438
} catch (SQLException e) {
431439
throw new RemoteException("impala connection close failed!");
432440
} finally {
433441
connection = null;
442+
statement = null;
443+
updateStatement = null;
434444
}
435445
closed = true;
436446
}
@@ -448,15 +458,15 @@ public void close() throws IOException {
448458
* @param rowDataMap row data map
449459
* @throws SQLException throw sql exception
450460
*/
451-
private void executeBatchSql(Statement statement,
452-
String tempSql,
453-
String schema,
454-
String tableName,
455-
String storeType,
456-
Boolean enablePartition,
457-
List<String> fieldNames,
458-
String partitionFields,
459-
Map<String, ArrayList<String>> rowDataMap) throws SQLException {
461+
private synchronized void executeBatchSql(Statement statement,
462+
String tempSql,
463+
String schema,
464+
String tableName,
465+
String storeType,
466+
Boolean enablePartition,
467+
List<String> fieldNames,
468+
String partitionFields,
469+
Map<String, ArrayList<String>> rowDataMap) throws SQLException {
460470
StringBuilder valuesCondition = new StringBuilder();
461471
StringBuilder partitionCondition = new StringBuilder();
462472
String tableFieldsCondition = buildTableFieldsCondition(fieldNames, partitionFields);
@@ -492,6 +502,7 @@ private void executeBatchSql(Statement statement,
492502
.replace(VALUES_CONDITION, String.join(", ", valuesConditionList));
493503
LOG.info("current execute sql: {}", executeSql);
494504
statement.execute(executeSql);
505+
partitionCondition.delete(0, partitionCondition.length());
495506
} catch (SQLException sqlException) {
496507
throw new RuntimeException("execute impala partition SQL error! ", sqlException);
497508
}
@@ -547,7 +558,7 @@ private String buildValuesCondition(List<String> fieldTypes, Row row) {
547558
return "?";
548559
}).collect(Collectors.joining(", ")) + ")";
549560
for (int i = 0; i < row.getArity(); i++) {
550-
valuesCondition = valuesCondition.replaceFirst("\\?", row.getField(i).toString());
561+
valuesCondition = valuesCondition.replaceFirst("\\?", Objects.isNull(row.getField(i)) ? "null" : row.getField(i).toString());
551562
}
552563
Matcher matcher = STRING_TYPE_PATTERN.matcher(valuesCondition);
553564
while (matcher.find()) {

0 commit comments

Comments
 (0)