4141import java .sql .PreparedStatement ;
4242import java .sql .SQLException ;
4343import java .util .ArrayList ;
44+ import java .util .HashMap ;
4445import java .util .List ;
4546import java .util .Map ;
4647import java .util .Objects ;
5253import java .util .regex .Pattern ;
5354import java .util .stream .Collectors ;
5455
56+ import static com .dtstack .flink .sql .sink .rdb .JDBCTypeConvertUtils .setRecordToStatement ;
5557import static org .apache .flink .util .Preconditions .checkNotNull ;
5658
5759/**
@@ -71,6 +73,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7173
7274 private static final Integer DEFAULT_CONN_TIME_OUT = 60 ;
7375 private static final int RECEIVE_DATA_PRINT_FREQUENCY = 1000 ;
76+ private static final int DIRTY_DATA_PRINT_FREQUENCY = 1000 ;
7477
7578 private static final String KUDU_TYPE = "kudu" ;
7679 private static final String UPDATE_MODE = "update" ;
@@ -84,6 +87,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
8487
8588 private transient volatile boolean closed = false ;
8689 private int batchCount = 0 ;
90+ private transient List <Row > rows ;
8791
8892 protected String keytabPath ;
8993 protected String krb5confPath ;
@@ -110,6 +114,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
110114
111115 private String prepareStatementSql ;
112116 private List <String > newFieldNames ;
117+ private transient AbstractDtRichOutputFormat <?> metricOutputFormat ;
113118
114119 private transient ScheduledExecutorService scheduler ;
115120 private transient ScheduledFuture <?> scheduledFuture ;
@@ -120,6 +125,7 @@ public void configure(Configuration parameters) {
120125
121126 @ Override
122127 public void open (int taskNumber , int numTasks ) throws IOException {
128+ this .rows = new ArrayList <>();
123129 openConnect ();
124130 setStatementSql ();
125131 initScheduledTask (batchWaitInterval );
@@ -214,7 +220,7 @@ private void openJdbc() {
214220
215221 private void flush () throws SQLException {
216222 if (Objects .nonNull (statement )) {
217- statement . executeBatch ();
223+ executeBatch ();
218224 batchCount = 0 ;
219225 statement .clearBatch ();
220226 }
@@ -237,51 +243,104 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
237243 return ;
238244 }
239245
240- Map <String , Object > valueMap = Maps .newHashMap ();
241-
242246 if (outRecords .getCount () % RECEIVE_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
243247 LOG .info ("Receive data : {}" , record );
244- LOG .info ("Statement Sql is: {}" , prepareStatementSql );
245248 }
246- // Receive data
247- outRecords .inc ();
248249
249- Row copyRow = Row .copy (record .f1 );
250+ if (Objects .isNull (statement )) {
251+ Map <String , Object > valueMap = Maps .newHashMap ();
252+ Row row = Row .copy (record .f1 );
250253
251- for (int i = 0 ; i < copyRow .getArity (); i ++) {
252- valueMap .put (fieldList .get (i ), copyRow .getField (i ));
253- }
254-
255- //replace $partitionCondition
256- statement = connection .prepareStatement (
257- prepareStatementSql .replace (PARTITION_CONDITION ,
258- buildStaticPartitionCondition (valueMap , staticPartitionField ))
259- );
260-
261- // 根据字段名对 row data 重组, 比如,原始 row data : (1, xxx, 20) -> (id, name, age)
262- // 但是由于 partition,写入的field 顺序变成了 (name, id, age),则需要对 row data 重组变成 (xxx, 1, 20)
263- Row rowValue = new Row (fieldTypeList .size ());
264- for (int i = 0 ; i < fieldTypeList .size (); i ++) {
265- rowValue .setField (i , valueMap .get (newFieldNames .get (i )));
266- }
254+ for (int i = 0 ; i < row .getArity (); i ++) {
255+ valueMap .put (fieldList .get (i ), row .getField (i ));
256+ }
267257
268- if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
269- setRowToStatement (statement , fieldTypeList , rowValue , primaryKeys .stream ().mapToInt (fieldList ::indexOf ).toArray ());
270- } else {
271- setRowToStatement (statement , fieldTypeList , rowValue , null );
258+ //replace $partitionCondition
259+ statement = connection .prepareStatement (
260+ prepareStatementSql .replace (PARTITION_CONDITION ,
261+ buildStaticPartitionCondition (valueMap , staticPartitionField ))
262+ );
272263 }
273264
265+ rows .add (record .f1 );
274266 batchCount ++;
275- statement .addBatch ();
276267
277- if (batchCount > batchSize ) {
268+ if (batchCount >= batchSize ) {
278269 flush ();
279270 }
271+
272+ // Receive data
273+ outRecords .inc ();
280274 } catch (Exception e ) {
281275 throw new RuntimeException ("Writing records to impala failed." , e );
282276 }
283277 }
284278
279+ private void executeBatch () throws SQLException {
280+ try {
281+ rows .forEach (row -> {
282+ try {
283+ Map <String , Object > valueMap = new HashMap <>();
284+
285+ for (int i = 0 ; i < row .getArity (); i ++) {
286+ valueMap .put (fieldList .get (i ), row .getField (i ));
287+ }
288+ // 根据字段名对 row data 重组, 比如,原始 row data : (1, xxx, 20) -> (id, name, age)
289+ // 但是由于 partition,写入的field 顺序变成了 (name, id, age),则需要对 row data 重组变成 (xxx, 1, 20)
290+ Row rowValue = new Row (fieldTypeList .size ());
291+ for (int i = 0 ; i < fieldTypeList .size (); i ++) {
292+ rowValue .setField (i , valueMap .get (newFieldNames .get (i )));
293+ }
294+
295+ if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
296+ setRowToStatement (statement , fieldTypeList , rowValue , primaryKeys .stream ().mapToInt (fieldList ::indexOf ).toArray ());
297+ } else {
298+ setRowToStatement (statement , fieldTypeList , rowValue , null );
299+ }
300+ statement .addBatch ();
301+ } catch (Exception e ) {
302+ throw new RuntimeException ("impala jdbc execute batch error!" , e );
303+ }
304+ });
305+ statement .executeBatch ();
306+ connection .commit ();
307+ rows .clear ();
308+ } catch (Exception e ) {
309+ LOG .debug ("impala jdbc execute batch error " , e );
310+ connection .rollback ();
311+ connection .commit ();
312+ cleanBatchWhenError ();
313+ executeUpdate (connection );
314+ }
315+ }
316+
317+ public void executeUpdate (Connection connection ) {
318+ rows .forEach (row -> {
319+ try {
320+ setRecordToStatement (statement , JDBCTypeConvertUtils .getSqlTypeFromFieldType (fieldTypeList ), row );
321+ statement .executeUpdate ();
322+ connection .commit ();
323+ } catch (Exception e ) {
324+ try {
325+ connection .rollback ();
326+ connection .commit ();
327+ } catch (SQLException e1 ) {
328+ throw new RuntimeException (e1 );
329+ }
330+ if (metricOutputFormat .outDirtyRecords .getCount () % DIRTY_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
331+ LOG .error ("record insert failed ,this row is {}" , row .toString ());
332+ LOG .error ("" , e );
333+ }
334+ metricOutputFormat .outDirtyRecords .inc ();
335+ }
336+ });
337+ rows .clear ();
338+ }
339+
340+ private void cleanBatchWhenError () throws SQLException {
341+ statement .clearBatch ();
342+ }
343+
285344 private void setRowToStatement (PreparedStatement statement , List <String > fieldTypeList , Row row , int [] pkFields ) throws SQLException {
286345 JDBCTypeConvertUtils .setRecordToStatement (statement , JDBCTypeConvertUtils .getSqlTypeFromFieldType (fieldTypeList ), row , pkFields );
287346 }
0 commit comments