Skip to content

Commit 7660530

Browse files
committed
[feat] support update mode
1 parent e95e32a commit 7660530

File tree

1 file changed

+112
-26
lines changed

1 file changed

+112
-26
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 112 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.dtstack.flink.sql.factory.DTThreadFactory;
2222
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
23+
import com.dtstack.flink.sql.sink.rdb.JDBCTypeConvertUtils;
2324
import com.dtstack.flink.sql.table.AbstractTableInfo;
2425
import com.dtstack.flink.sql.util.JDBCUtils;
2526
import com.dtstack.flink.sql.util.KrbUtils;
@@ -37,6 +38,7 @@
3738
import java.security.PrivilegedExceptionAction;
3839
import java.sql.Connection;
3940
import java.sql.DriverManager;
41+
import java.sql.PreparedStatement;
4042
import java.sql.SQLException;
4143
import java.sql.Statement;
4244
import java.util.ArrayList;
@@ -53,6 +55,7 @@
5355
import java.util.regex.Pattern;
5456
import java.util.stream.Collectors;
5557

58+
import static com.dtstack.flink.sql.sink.rdb.JDBCTypeConvertUtils.setRecordToStatement;
5659
import static org.apache.flink.util.Preconditions.checkNotNull;
5760

5861
/**
@@ -89,6 +92,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
8992

9093
protected transient Connection connection;
9194
protected transient Statement statement;
95+
protected transient PreparedStatement updateStatement;
9296

9397
private transient volatile boolean closed = false;
9498
private int batchCount = 0;
@@ -138,6 +142,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
138142
private List<String> valueFieldNames;
139143
private transient final AbstractDtRichOutputFormat<?> metricOutputFormat = this;
140144
private List<Tuple2<String, String>> rowDataList;
145+
private List<Row> rows;
141146

142147
private transient ScheduledExecutorService scheduler;
143148
private transient ScheduledFuture<?> scheduledFuture;
@@ -150,23 +155,32 @@ public void configure(Configuration parameters) {
150155
public void open(int taskNumber, int numTasks) throws IOException {
151156
rowDataList = new ArrayList<>();
152157
rowDataMap = new HashMap<>();
158+
rows = new ArrayList<>();
153159
openConnect();
154160
initScheduledTask(batchWaitInterval);
155161
init();
156162
initMetric();
157163
}
158164

159165
private void init() {
160-
if (Objects.nonNull(partitionFields)) {
161-
// match ${field} from partitionFields
162-
Matcher matcher = STATIC_PARTITION_PATTERN.matcher(partitionFields);
163-
while (matcher.find()) {
164-
LOG.info("find static partition field: {}", matcher.group(1));
165-
staticPartitionFields.add(matcher.group(1));
166+
try {
167+
if (Objects.nonNull(partitionFields)) {
168+
// match ${field} from partitionFields
169+
Matcher matcher = STATIC_PARTITION_PATTERN.matcher(partitionFields);
170+
while (matcher.find()) {
171+
LOG.info("find static partition field: {}", matcher.group(1));
172+
staticPartitionFields.add(matcher.group(1));
173+
}
174+
}
175+
176+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
177+
updateStatement = connection.prepareStatement(buildUpdateSql(schema, tableName, fieldNames, primaryKeys));
166178
}
167-
}
168179

169-
valueFieldNames = rebuildFieldNameListAndTypeList(fieldNames, staticPartitionFields, fieldTypes, partitionFields);
180+
valueFieldNames = rebuildFieldNameListAndTypeList(fieldNames, staticPartitionFields, fieldTypes, partitionFields);
181+
} catch (Exception e) {
182+
throw new RuntimeException("init impala job error", e);
183+
}
170184
}
171185

172186
private void initScheduledTask(Long batchWaitInterval) {
@@ -236,6 +250,74 @@ private void flush() throws SQLException {
236250
}
237251
}
238252

253+
private void executeBatch() throws SQLException {
254+
try {
255+
rows.forEach(row -> {
256+
try {
257+
Map<String, Object> valueMap = new HashMap<>();
258+
259+
for (int i = 0; i < row.getArity(); i++) {
260+
valueMap.put(fieldNames.get(i), row.getField(i));
261+
}
262+
// 根据字段名对 row data 重组, 比如,原始 row data : (1, xxx, 20) -> (id, name, age)
263+
// 但是由于 partition,写入的field 顺序变成了 (name, id, age),则需要对 row data 重组变成 (xxx, 1, 20)
264+
Row rowValue = new Row(fieldTypes.size());
265+
for (int i = 0; i < fieldTypes.size(); i++) {
266+
rowValue.setField(i, valueMap.get(valueFieldNames.get(i)));
267+
}
268+
269+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
270+
JDBCTypeConvertUtils.setRecordToStatement(
271+
updateStatement,
272+
JDBCTypeConvertUtils.getSqlTypeFromFieldType(fieldTypes),
273+
rowValue,
274+
primaryKeys.stream().mapToInt(fieldNames::indexOf).toArray()
275+
);
276+
}
277+
updateStatement.addBatch();
278+
} catch (Exception e) {
279+
throw new RuntimeException("impala jdbc execute batch error!", e);
280+
}
281+
});
282+
updateStatement.executeBatch();
283+
connection.commit();
284+
rows.clear();
285+
} catch (Exception e) {
286+
LOG.debug("impala jdbc execute batch error ", e);
287+
connection.rollback();
288+
connection.commit();
289+
cleanBatchWhenError();
290+
executeUpdate(connection);
291+
}
292+
}
293+
294+
public void executeUpdate(Connection connection) {
295+
rows.forEach(row -> {
296+
try {
297+
setRecordToStatement(updateStatement, JDBCTypeConvertUtils.getSqlTypeFromFieldType(fieldTypes), row);
298+
updateStatement.executeUpdate();
299+
connection.commit();
300+
} catch (Exception e) {
301+
try {
302+
connection.rollback();
303+
connection.commit();
304+
} catch (SQLException e1) {
305+
throw new RuntimeException(e1);
306+
}
307+
if (metricOutputFormat.outDirtyRecords.getCount() % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
308+
LOG.error("record insert failed ,this row is {}", row.toString());
309+
LOG.error("", e);
310+
}
311+
metricOutputFormat.outDirtyRecords.inc();
312+
}
313+
});
314+
rows.clear();
315+
}
316+
317+
private void cleanBatchWhenError() throws SQLException {
318+
updateStatement.clearBatch();
319+
}
320+
239321
private void putRowIntoMap(Map<String, ArrayList<String>> rowDataMap, Tuple2<String, String> rowData) {
240322
Set<String> keySet = rowDataMap.keySet();
241323
ArrayList<String> tempRowArray;
@@ -283,28 +365,32 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
283365
LOG.info("Receive data : {}", record);
284366
}
285367

286-
Map<String, Object> valueMap = Maps.newHashMap();
287-
Row row = Row.copy(record.f1);
368+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
369+
rows.add(Row.copy(record.f1));
370+
} else {
371+
Map<String, Object> valueMap = Maps.newHashMap();
372+
Row row = Row.copy(record.f1);
288373

289-
for (int i = 0; i < row.getArity(); i++) {
290-
valueMap.put(fieldNames.get(i), row.getField(i));
291-
}
374+
for (int i = 0; i < row.getArity(); i++) {
375+
valueMap.put(fieldNames.get(i), row.getField(i));
376+
}
292377

293-
Tuple2<String, String> rowTuple2 = new Tuple2<>();
294-
if (storeType.equalsIgnoreCase(KUDU_TYPE) || !enablePartition) {
295-
rowTuple2.f0 = NO_PARTITION;
296-
} else {
297-
rowTuple2.f0 = buildPartitionCondition(valueMap, partitionFields, staticPartitionFields);
298-
}
378+
Tuple2<String, String> rowTuple2 = new Tuple2<>();
379+
if (storeType.equalsIgnoreCase(KUDU_TYPE) || !enablePartition) {
380+
rowTuple2.f0 = NO_PARTITION;
381+
} else {
382+
rowTuple2.f0 = buildPartitionCondition(valueMap, partitionFields, staticPartitionFields);
383+
}
299384

300-
// 根据字段名对 row data 重组, 比如,原始 row data : (1, xxx, 20) -> (id, name, age)
301-
// 但是由于 partition,写入的field 顺序变成了 (name, id, age),则需要对 row data 重组变成 (xxx, 1, 20)
302-
Row rowValue = new Row(fieldTypes.size());
303-
for (int i = 0; i < fieldTypes.size(); i++) {
304-
rowValue.setField(i, valueMap.get(valueFieldNames.get(i)));
385+
// 根据字段名对 row data 重组, 比如,原始 row data : (1, xxx, 20) -> (id, name, age)
386+
// 但是由于 partition,写入的field 顺序变成了 (name, id, age),则需要对 row data 重组变成 (xxx, 1, 20)
387+
Row rowValue = new Row(fieldTypes.size());
388+
for (int i = 0; i < fieldTypes.size(); i++) {
389+
rowValue.setField(i, valueMap.get(valueFieldNames.get(i)));
390+
}
391+
rowTuple2.f1 = buildValuesCondition(fieldTypes, rowValue);
392+
rowDataList.add(rowTuple2);
305393
}
306-
rowTuple2.f1 = buildValuesCondition(fieldTypes, rowValue);
307-
rowDataList.add(rowTuple2);
308394

309395
batchCount++;
310396

0 commit comments

Comments
 (0)