Skip to content

Commit 625002e

Browse files
authored
Merge pull request #185 from data-integrations/ensure-batches-every-10k-records
Added logic to ensure batches are submitted to the engine every 1k records.
2 parents 90d5b13 + 4081484 commit 625002e

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.cdap.plugin.db.batch.config.DatabaseSinkConfig;
4848
import io.cdap.plugin.util.DBUtils;
4949
import io.cdap.plugin.util.DriverCleanup;
50+
import org.apache.hadoop.conf.Configuration;
5051
import org.apache.hadoop.io.NullWritable;
5152
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
5253
import org.slf4j.Logger;
@@ -158,8 +159,18 @@ public void prepareRun(BatchSinkContext context) {
158159
configAccessor.setTransactionIsolationLevel(dbSinkConfig.getTransactionIsolationLevel());
159160
}
160161

161-
context.addOutput(Output.of(dbSinkConfig.getReferenceName(), new SinkOutputFormatProvider(ETLDBOutputFormat.class,
162-
configAccessor.getConfiguration())));
162+
// Get Hadoop configuration object
163+
Configuration configuration = configAccessor.getConfiguration();
164+
165+
// Configure batch size if specified in pipeline arguments.
166+
if (context.getArguments().has(ETLDBOutputFormat.COMMIT_BATCH_SIZE)) {
167+
configuration.set(ETLDBOutputFormat.COMMIT_BATCH_SIZE,
168+
context.getArguments().get(ETLDBOutputFormat.COMMIT_BATCH_SIZE));
169+
}
170+
171+
context.addOutput(Output.of(dbSinkConfig.getReferenceName(),
172+
new SinkOutputFormatProvider(ETLDBOutputFormat.class,
173+
configuration)));
163174
}
164175

165176
/**

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/ETLDBOutputFormat.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050
* @param <V> - Value passed to this class to be written. The value is ignored.
5151
*/
5252
public class ETLDBOutputFormat<K extends DBWritable, V> extends DBOutputFormat<K, V> {
53+
// Batch size before submitting a batch to the SQL engine. If set to 0, no batches will be submitted until commit.
54+
public static final String COMMIT_BATCH_SIZE = "io.cdap.plugin.db.output.commit.batch.size";
55+
public static final int DEFAULT_COMMIT_BATCH_SIZE = 1000;
5356

5457
private static final Logger LOG = LoggerFactory.getLogger(ETLDBOutputFormat.class);
5558

@@ -63,6 +66,7 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOE
6366
DBConfiguration dbConf = new DBConfiguration(conf);
6467
String tableName = dbConf.getOutputTableName();
6568
String[] fieldNames = dbConf.getOutputFieldNames();
69+
final int batchSize = conf.getInt(COMMIT_BATCH_SIZE, DEFAULT_COMMIT_BATCH_SIZE);
6670

6771
if (fieldNames == null) {
6872
fieldNames = new String[dbConf.getOutputFieldCount()];
@@ -74,6 +78,7 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOE
7478
return new DBRecordWriter(connection, statement) {
7579

7680
private boolean emptyData = true;
81+
private long numWrittenRecords = 0;
7782

7883
//Implementation of the close method below is the exact implementation in DBOutputFormat except that
7984
//we check if there is any data to be written and if not, we skip executeBatch call.
@@ -116,6 +121,13 @@ public void write(K key, V value) {
116121
try {
117122
key.write(getStatement());
118123
getStatement().addBatch();
124+
numWrittenRecords++;
125+
126+
// Submit a batch to the SQL engine every 10k records
127+
// This is done to reduce memory usage in the worker, as processed records can now be GC'd.
128+
if (batchSize > 0 && numWrittenRecords % batchSize == 0) {
129+
getStatement().executeBatch();
130+
}
119131
} catch (SQLException e) {
120132
LOG.warn("Failed to write value to database", e);
121133
}

0 commit comments

Comments
 (0)