4141import java .sql .PreparedStatement ;
4242import java .sql .SQLException ;
4343import java .util .ArrayList ;
44- import java .util .Arrays ;
4544import java .util .List ;
4645import java .util .Map ;
4746import java .util .Objects ;
@@ -110,8 +109,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
110109 // partition field of static partition which matched by ${field}
111110 private final List <String > staticPartitionField = new ArrayList <>();
112111
113- // static partition sql like 'INSERT INTO tableName(field1, field2) PARTITION(pt=xx) VALUES(?, ?)'
114- private String staticPartitionSql = "" ;
112+ private String prepareStatementSql ;
115113
116114 private transient ScheduledExecutorService scheduler ;
117115 private transient ScheduledFuture <?> scheduledFuture ;
@@ -123,6 +121,7 @@ public void configure(Configuration parameters) {
123121 @ Override
124122 public void open (int taskNumber , int numTasks ) throws IOException {
125123 openConnect ();
124+ setStatementSql ();
126125 initScheduledTask (batchWaitInterval );
127126 initMetric ();
128127 }
@@ -156,55 +155,58 @@ private void openConnect() throws IOException {
156155 return null ;
157156 });
158157 } catch (InterruptedException | IOException e ) {
159- e . printStackTrace ( );
158+ throw new IllegalArgumentException ( "connect impala with kerberos error!" , e );
160159 }
161160 } else {
162161 openJdbc ();
163162 }
164163 }
165164
166165 /**
167- * get jdbc connection and create statement
166+ * Choose different sentences according to different situations
168167 */
169- private void openJdbc () {
170- JDBCUtils .forName (DRIVER_NAME , getClass ().getClassLoader ());
171- try {
172- connection = DriverManager .getConnection (dbUrl , userName , password );
173- connection .setAutoCommit (false );
168+ private void setStatementSql () {
169+ //update mode
170+ if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
171+ prepareStatementSql = buildUpdateSql (schema , tableName , fieldList , primaryKeys );
172+ return ;
173+ }
174174
175- //update mode
176- if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
177- statement = connection .prepareStatement (
178- buildUpdateSql (schema , tableName , fieldList , primaryKeys )
179- );
180- return ;
181- }
175+ // kudu
176+ if (storeType .equalsIgnoreCase (KUDU_TYPE )) {
177+ prepareStatementSql = buildKuduInsertSql (schema , tableName , fieldList , fieldTypeList );
178+ return ;
179+ }
182180
183- // kudu
184- if (storeType .equalsIgnoreCase (KUDU_TYPE )) {
185- statement = connection .prepareStatement (
186- buildKuduInsertSql (schema , tableName , fieldList , fieldTypeList )
187- );
188- return ;
189- }
181+ // match ${field} from partitionFields
182+ Matcher matcher = STATIC_PARTITION_PATTERN .matcher (partitionFields );
183+ while (matcher .find ()) {
184+ LOG .info ("find static partition field: {}" , matcher .group (1 ));
185+ staticPartitionField .add (matcher .group (1 ));
186+ }
190187
191- // match ${field} from partitionFields
192- Matcher matcher = STATIC_PARTITION_PATTERN .matcher (partitionFields );
193- while (matcher .find ()) {
194- staticPartitionField .add (matcher .group (1 ));
195- }
188+ // dynamic
189+ if (enablePartition && staticPartitionField .isEmpty ()) {
190+ prepareStatementSql = buildDynamicInsertSql (schema , tableName , fieldList , fieldTypeList , partitionFields );
191+ }
192+ // static
193+ if (enablePartition && !staticPartitionField .isEmpty ()) {
194+ prepareStatementSql = buildStaticInsertSql (schema , tableName , fieldList , fieldTypeList , partitionFields );
195+ }
196196
197- // dynamic
198- if (enablePartition && staticPartitionField .isEmpty ()) {
199- statement = connection .prepareStatement (
200- buildDynamicInsertSql (schema , tableName , fieldList , fieldTypeList , partitionFields )
201- );
202- }
203- // static
204- if (enablePartition && !staticPartitionField .isEmpty ()) {
205- staticPartitionSql = buildStaticInsertSql (schema , tableName , fieldList , fieldTypeList , partitionFields );
206- }
197+ if (Objects .isNull (prepareStatementSql )) {
198+ throw new IllegalArgumentException ("build prepareStatement sql error!" );
199+ }
200+ }
207201
202+ /**
203+ * get jdbc connection
204+ */
205+ private void openJdbc () {
206+ JDBCUtils .forName (DRIVER_NAME , getClass ().getClassLoader ());
207+ try {
208+ connection = DriverManager .getConnection (dbUrl , userName , password );
209+ connection .setAutoCommit (false );
208210 } catch (SQLException sqlException ) {
209211 throw new RuntimeException ("get impala jdbc connection failed!" );
210212 }
@@ -229,11 +231,17 @@ private void flush() throws SQLException {
229231 */
230232 @ Override
231233 public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
234+ LOG .info ("Receive data : {}" , record );
232235 try {
236+ if (!record .f0 ) {
237+ return ;
238+ }
239+
233240 Map <String , Object > valueMap = Maps .newHashMap ();
234241
235242 if (outRecords .getCount () % RECEIVE_DATA_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
236243 LOG .info ("Receive data : {}" , record );
244+ LOG .info ("Statement Sql is: {}" , prepareStatementSql );
237245 }
238246 // Receive data
239247 outRecords .inc ();
@@ -244,39 +252,30 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
244252 valueMap .put (fieldList .get (i ), copyRow .getField (i ));
245253 }
246254
247- // build static partition statement from row data
248- if (Objects .isNull (statement ) || !staticPartitionSql .isEmpty ()) {
249- statement = connection .prepareStatement (
250- staticPartitionSql .replace (PARTITION_CONDITION ,
251- buildStaticPartitionCondition (valueMap , staticPartitionField ))
252- );
253- }
255+ //replace $partitionCondition
256+ statement = connection .prepareStatement (
257+ prepareStatementSql .replace (PARTITION_CONDITION ,
258+ buildStaticPartitionCondition (valueMap , staticPartitionField ))
259+ );
254260
255261 Row rowValue = new Row (fieldTypeList .size ());
256262 for (int i = 0 ; i < fieldTypeList .size (); i ++) {
257263 rowValue .setField (i , copyRow .getField (i ));
258264 }
259265
260- if (updateMode .equalsIgnoreCase (UPDATE_MODE )) {
261- setRowToStatement (statement , fieldTypeList , rowValue , primaryKeys .stream ().mapToInt (fieldList ::indexOf ).toArray ());
262- } else {
263- setRowToStatement (statement , fieldTypeList , rowValue );
264- }
266+ setRowToStatement (statement , fieldTypeList , rowValue , Objects .isNull (primaryKeys ) ?
267+ null : primaryKeys .stream ().mapToInt (fieldList ::indexOf ).toArray ());
265268
266269 statement .addBatch ();
267270
268- if (batchCount .incrementAndGet () > batchSize ) {
271+ if (batchCount .incrementAndGet () >= batchSize ) {
269272 flush ();
270273 }
271274 } catch (Exception e ) {
272275 throw new RuntimeException ("Writing records to impala failed." , e );
273276 }
274277 }
275278
276- private void setRowToStatement (PreparedStatement statement , List <String > fieldTypeList , Row row ) throws SQLException {
277- JDBCTypeConvertUtils .setRecordToStatement (statement , JDBCTypeConvertUtils .getSqlTypeFromFieldType (fieldTypeList ), row );
278- }
279-
280279 private void setRowToStatement (PreparedStatement statement , List <String > fieldTypeList , Row row , int [] pkFields ) throws SQLException {
281280 JDBCTypeConvertUtils .setRecordToStatement (statement , JDBCTypeConvertUtils .getSqlTypeFromFieldType (fieldTypeList ), row , pkFields );
282281 }
@@ -319,7 +318,7 @@ public void close() throws IOException {
319318 * @param tableName tableName
320319 * @param fieldList fieldList
321320 * @param fieldTypes fieldTypes
322- * @return INSERT INTO kuduTable(field1, fields2) VALUES ( v1, v2 )
321+ * @return INSERT INTO kuduTable(field1, fields2) VALUES (?, ? )
323322 */
324323 private String buildKuduInsertSql (String schema ,
325324 String tableName ,
@@ -330,16 +329,8 @@ private String buildKuduInsertSql(String schema,
330329 .map (this ::quoteIdentifier )
331330 .collect (Collectors .joining (", " ));
332331
333- String placeholders = fieldTypes .stream ().map (
334- f -> {
335- if (STRING_TYPE .equals (f .toUpperCase ())) {
336- return "cast(? as string)" ;
337- }
338- return "?" ;
339- }).collect (Collectors .joining (", " ));
340-
341332 return "INSERT INTO " + (Objects .isNull (schema ) ? "" : quoteIdentifier (schema ) + "." ) + quoteIdentifier (tableName ) +
342- "(" + columns + ")" + " VALUES (" + placeholders + ")" ;
333+ "(" + columns + ")" + " VALUES (" + buildSqlPlaceholders ( fieldTypes ) + ")" ;
343334 }
344335
345336 /**
@@ -372,22 +363,14 @@ private String buildStaticInsertSql(String schema, String tableName, List<String
372363 }
373364 }
374365
375- String placeholders = fieldTypes .stream ().map (
376- f -> {
377- if (STRING_TYPE .equals (f .toUpperCase ())) {
378- return "cast(? as string)" ;
379- }
380- return "?" ;
381- }).collect (Collectors .joining (", " ));
382-
383366 String columns = copyFieldNames .stream ()
384367 .map (this ::quoteIdentifier )
385368 .collect (Collectors .joining (", " ));
386369
387370 String partitionCondition = PARTITION_CONSTANT + "(" + PARTITION_CONDITION + ")" ;
388371
389372 return "INSERT INTO " + (Objects .isNull (schema ) ? "" : quoteIdentifier (schema ) + "." ) + quoteIdentifier (tableName ) +
390- " (" + columns + ") " + partitionCondition + " VALUES (" + placeholders + ")" ;
373+ " (" + columns + ") " + partitionCondition + " VALUES (" + buildSqlPlaceholders ( fieldTypes ) + ")" ;
391374 }
392375
393376 /**
@@ -397,14 +380,6 @@ private String buildStaticInsertSql(String schema, String tableName, List<String
397380 */
398381 private String buildDynamicInsertSql (String schema , String tableName , List <String > fieldName , List <String > fieldTypes , String partitionFields ) {
399382
400- String placeholders = fieldTypes .stream ().map (
401- f -> {
402- if (STRING_TYPE .equals (f .toUpperCase ())) {
403- return "cast(? as string)" ;
404- }
405- return "?" ;
406- }).collect (Collectors .joining (", " ));
407-
408383 String columns = fieldName .stream ()
409384 .filter (f -> !partitionFields .contains (f ))
410385 .map (this ::quoteIdentifier )
@@ -413,7 +388,23 @@ private String buildDynamicInsertSql(String schema, String tableName, List<Strin
413388 String partitionCondition = PARTITION_CONSTANT + "(" + partitionFields + ")" ;
414389
415390 return "INSERT INTO " + (Objects .isNull (schema ) ? "" : quoteIdentifier (schema ) + "." ) + quoteIdentifier (tableName ) +
416- " (" + columns + ") " + partitionCondition + " VALUES (" + placeholders + ")" ;
391+ " (" + columns + ") " + partitionCondition + " VALUES (" + buildSqlPlaceholders (fieldTypes ) + ")" ;
392+ }
393+
394+ /**
395+ * according to field types, build the placeholders condition
396+ *
397+ * @param fieldTypes field types
398+ * @return condition like '?, ?, cast(? as string)'
399+ */
400+ private String buildSqlPlaceholders (List <String > fieldTypes ) {
401+ return fieldTypes .stream ().map (
402+ f -> {
403+ if (STRING_TYPE .equals (f .toUpperCase ())) {
404+ return "cast(? as string)" ;
405+ }
406+ return "?" ;
407+ }).collect (Collectors .joining (", " ));
417408 }
418409
419410 /**
@@ -436,7 +427,7 @@ private String buildUpdateSql(String schema, String tableName, List<String> fiel
436427 + quoteIdentifier (tableName ) + " SET " + setClause + " WHERE " + conditionClause ;
437428 }
438429
439- public String quoteIdentifier (String identifier ) {
430+ private String quoteIdentifier (String identifier ) {
440431 return "`" + identifier + "`" ;
441432 }
442433
0 commit comments