@@ -212,10 +212,10 @@ public void write(DataOutput out) throws IOException {
212212 * @param stmt the {@link PreparedStatement} to write the {@link StructuredRecord} to
213213 */
214214 public void write (PreparedStatement stmt ) throws SQLException {
215- for (int i = 0 ; i < columnTypes .size (); i ++) {
216- ColumnType columnType = columnTypes .get (i );
215+ for (int fieldIndex = 0 ; fieldIndex < columnTypes .size (); fieldIndex ++) {
216+ ColumnType columnType = columnTypes .get (fieldIndex );
217217 Schema .Field field = record .getSchema ().getField (columnType .getName ());
218- writeToDB (stmt , field , i );
218+ writeToDB (stmt , field , fieldIndex );
219219 }
220220 }
221221
@@ -274,26 +274,54 @@ private void writeToDataOut(DataOutput out, Schema.Field field) throws IOExcepti
274274 }
275275 }
276276
277- protected void writeToDB (PreparedStatement stmt , @ Nullable Schema .Field field , int fieldIndex ) throws SQLException {
277+ private void writeToDB (PreparedStatement stmt , @ Nullable Schema .Field field , int fieldIndex ) throws SQLException {
278+ if (shouldWriteNullField (field )) {
279+ writeNullToDB (stmt , fieldIndex );
280+ } else {
281+ Schema nonNullableSchema = getNonNullableSchema (field );
282+ writeNonNullToDB (stmt , nonNullableSchema , field .getName (), fieldIndex );
283+ }
284+ }
278285
286+ /**
287+ * This method returns true in case a field can support writeNullToDB for the current field.
288+ * By default, this method returns true when field or field value is set to null.
289+ *
290+ * @param field Field
291+ * @return true if null value of the field can be written to DB
292+ */
293+ protected boolean shouldWriteNullField (Schema .Field field ) {
294+ return (field == null || record .get (field .getName ()) == null );
295+ }
296+
297+ /**
298+ * This method handle the null field and null field values, by internally using the PreparedStatement.setNull
299+ * method. Any class requiring a custom handling to write null value for any type should override this method.
300+ *
301+ * @param stmt PreparedStatement object for writing to db
302+ * @param fieldIndex Field index in the columnTypes
303+ * @throws SQLException Exception while calling PreparedStatement.setNull
304+ */
305+ protected void writeNullToDB (PreparedStatement stmt , int fieldIndex ) throws SQLException {
279306 int sqlIndex = fieldIndex + 1 ;
280307 int sqlType = columnTypes .get (fieldIndex ).getType ();
281- if (field == null ) {
282- // Some of the fields can be absent in the record
283- stmt .setNull (sqlIndex , sqlType );
284- return ;
285- }
308+ stmt .setNull (sqlIndex , sqlType );
309+ }
286310
287- String fieldName = field .getName ();
288- Schema fieldSchema = getNonNullableSchema (field );
289- Schema .Type fieldType = fieldSchema .getType ();
290- Schema .LogicalType fieldLogicalType = fieldSchema .getLogicalType ();
291- Object fieldValue = record .get (fieldName );
311+ /**
312+ * Write Non null values to DB.
313+ *
314+ * @param stmt PreparedStatement object for writing to db
315+ * @param fieldSchema Non-Nullable schema of the field
316+ * @param fieldName Current Field from record's schema
317+ * @param fieldIndex Field index in the columnTypes
318+ * @throws SQLException Exception while calling PreparedStatement.set... calls
319+ */
320+ protected void writeNonNullToDB (PreparedStatement stmt , Schema fieldSchema ,
321+ String fieldName , int fieldIndex ) throws SQLException {
292322
293- if (fieldValue == null ) {
294- stmt .setNull (sqlIndex , columnTypes .get (fieldIndex ).getType ());
295- return ;
296- }
323+ int sqlIndex = fieldIndex + 1 ;
324+ Schema .LogicalType fieldLogicalType = fieldSchema .getLogicalType ();
297325
298326 if (fieldLogicalType != null ) {
299327 switch (fieldLogicalType ) {
@@ -318,6 +346,8 @@ protected void writeToDB(PreparedStatement stmt, @Nullable Schema.Field field, i
318346 return ;
319347 }
320348
349+ Schema .Type fieldType = fieldSchema .getType ();
350+ Object fieldValue = record .get (fieldName );
321351 switch (fieldType ) {
322352 case NULL :
323353 stmt .setNull (sqlIndex , columnTypes .get (fieldIndex ).getType ());
@@ -330,8 +360,7 @@ protected void writeToDB(PreparedStatement stmt, @Nullable Schema.Field field, i
330360 stmt .setBoolean (sqlIndex , (Boolean ) fieldValue );
331361 break ;
332362 case INT :
333- // write short or int appropriately
334- writeInt (stmt , fieldIndex , sqlIndex , fieldValue );
363+ stmt .setInt (sqlIndex , (Integer ) fieldValue );
335364 break ;
336365 case LONG :
337366 long fieldValueLong = ((Number ) fieldValue ).longValue ();
@@ -364,16 +393,6 @@ protected void writeBytes(PreparedStatement stmt, int fieldIndex, int sqlIndex,
364393 stmt .setBytes (sqlIndex , byteValue );
365394 }
366395
367- protected void writeInt (PreparedStatement stmt , int fieldIndex , int sqlIndex , Object fieldValue ) throws SQLException {
368- Integer intValue = (Integer ) fieldValue ;
369- int parameterType = columnTypes .get (fieldIndex ).getType ();
370- if (Types .TINYINT == parameterType || Types .SMALLINT == parameterType ) {
371- stmt .setShort (sqlIndex , intValue .shortValue ());
372- return ;
373- }
374- stmt .setInt (sqlIndex , intValue );
375- }
376-
377396 @ Override
378397 public void setConf (Configuration conf ) {
379398 this .conf = conf ;
0 commit comments