Skip to content

Commit c4ab92c

Browse files
committed
[feat][hbase-sink] hbase add batchSize and batchInterval.
1 parent 9f1efa6 commit c4ab92c

File tree

4 files changed

+143
-45
lines changed

4 files changed

+143
-45
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 98 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.sink.hbase;
2221

23-
import com.dtstack.flink.sql.enums.EUpdateMode;
2422
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2523
import com.google.common.collect.Maps;
2624
import org.apache.commons.lang3.StringUtils;
2725
import org.apache.flink.api.java.tuple.Tuple2;
2826
import org.apache.flink.configuration.Configuration;
27+
import org.apache.flink.runtime.util.ExecutorThreadFactory;
2928
import org.apache.flink.types.Row;
3029
import org.apache.flink.util.Preconditions;
3130
import org.apache.hadoop.hbase.AuthUtil;
@@ -35,21 +34,23 @@
3534
import org.apache.hadoop.hbase.TableName;
3635
import org.apache.hadoop.hbase.client.Connection;
3736
import org.apache.hadoop.hbase.client.ConnectionFactory;
38-
import org.apache.hadoop.hbase.client.Delete;
3937
import org.apache.hadoop.hbase.client.Put;
4038
import org.apache.hadoop.hbase.client.Table;
41-
import org.apache.hadoop.hbase.util.Bytes;
4239
import org.apache.hadoop.security.UserGroupInformation;
4340
import org.slf4j.Logger;
4441
import org.slf4j.LoggerFactory;
4542

4643
import java.io.File;
4744
import java.io.IOException;
4845
import java.security.PrivilegedAction;
49-
import java.util.LinkedHashMap;
46+
import java.util.ArrayList;
5047
import java.util.LinkedList;
5148
import java.util.List;
5249
import java.util.Map;
50+
import java.util.concurrent.Executors;
51+
import java.util.concurrent.ScheduledExecutorService;
52+
import java.util.concurrent.ScheduledFuture;
53+
import java.util.concurrent.TimeUnit;
5354

5455
/**
5556
* @author: jingzhen@dtstack.com
@@ -85,6 +86,15 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
8586

8687
private transient ChoreService choreService;
8788

89+
private Integer batchSize;
90+
private Long batchWaitInterval;
91+
92+
private transient ScheduledExecutorService executor;
93+
private transient ScheduledFuture scheduledFuture;
94+
95+
private final List<Row> records = new ArrayList<>();
96+
97+
8898
@Override
8999
public void configure(Configuration parameters) {
90100
LOG.warn("---configure---");
@@ -98,10 +108,22 @@ public void open(int taskNumber, int numTasks) throws IOException {
98108
table = conn.getTable(TableName.valueOf(tableName));
99109
LOG.warn("---open end(get table from hbase) ---");
100110
initMetric();
111+
112+
// 设置定时任务
113+
if (batchWaitInterval > 0) {
114+
this.executor = Executors.newScheduledThreadPool(
115+
1, new ExecutorThreadFactory("hbase-sink-flusher"));
116+
this.scheduledFuture = this.executor.scheduleAtFixedRate(() -> {
117+
if (!records.isEmpty()) {
118+
dealBatchOperation(records);
119+
records.clear();
120+
}
121+
}, batchWaitInterval, batchWaitInterval, TimeUnit.MILLISECONDS);
122+
}
101123
}
102124

103-
private void openConn(){
104-
try{
125+
private void openConn() {
126+
try {
105127
if (kerberosAuthEnable) {
106128
LOG.info("open kerberos conn");
107129
openKerberosConn();
@@ -111,7 +133,7 @@ private void openConn(){
111133
conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent);
112134
conn = ConnectionFactory.createConnection(conf);
113135
}
114-
}catch (Exception e){
136+
} catch (Exception e) {
115137
throw new RuntimeException(e);
116138
}
117139

@@ -151,16 +173,59 @@ private void openKerberosConn() throws Exception {
151173
}
152174

153175

154-
155176
@Override
156177
public void writeRecord(Tuple2 tuple2) {
157178
Tuple2<Boolean, Row> tupleTrans = tuple2;
158179
Boolean retract = tupleTrans.f0;
159180
Row row = tupleTrans.f1;
160181
if (retract) {
161-
dealInsert(row);
162-
} else if (!retract && StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
163-
dealDelete(row);
182+
if (this.batchSize != 0) {
183+
writeBatchRecord(row);
184+
} else {
185+
dealInsert(row);
186+
}
187+
}
188+
}
189+
190+
public void writeBatchRecord(Row row) {
191+
records.add(row);
192+
// 数据累计到batchSize之后开始处理
193+
if (records.size() == this.batchSize) {
194+
dealBatchOperation(records);
195+
// 添加完数据之后数据清空records
196+
records.clear();
197+
}
198+
}
199+
200+
protected void dealBatchOperation(List<Row> records) {
201+
// A null in the result array means that the call for that action failed, even after retries.
202+
Object[] results = new Object[records.size()];
203+
try {
204+
List<Put> puts = new ArrayList<>();
205+
for (Row record : records) {
206+
puts.add(getPutByRow(record));
207+
}
208+
table.batch(puts, results);
209+
210+
// 判断数据是否插入成功
211+
for (int i = 0; i < results.length; i++) {
212+
if (results[i] == null) {
213+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
214+
LOG.error("record insert failed ..{}", records.get(i).toString());
215+
}
216+
// 脏数据记录
217+
outDirtyRecords.inc();
218+
} else {
219+
// 输出结果条数记录
220+
outRecords.inc();
221+
}
222+
}
223+
// 打印结果
224+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
225+
LOG.info(records.toString());
226+
}
227+
} catch (IOException | InterruptedException e) {
228+
LOG.error("", e);
164229
}
165230
}
166231

@@ -187,26 +252,6 @@ protected void dealInsert(Row record) {
187252
outRecords.inc();
188253
}
189254

190-
protected void dealDelete(Row record) {
191-
String rowKey = buildRowKey(record);
192-
if (!StringUtils.isEmpty(rowKey)) {
193-
Delete delete = new Delete(Bytes.toBytes(rowKey));
194-
try {
195-
table.delete(delete);
196-
} catch (IOException e) {
197-
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
198-
LOG.error("record insert failed ..{}", record.toString());
199-
LOG.error("", e);
200-
}
201-
outDirtyRecords.inc();
202-
}
203-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
204-
LOG.info(record.toString());
205-
}
206-
outRecords.inc();
207-
}
208-
}
209-
210255
private Put getPutByRow(Row record) {
211256
String rowKey = buildRowKey(record);
212257
if (StringUtils.isEmpty(rowKey)) {
@@ -244,9 +289,9 @@ private String getRowKeyValues(Row record) {
244289
return rowKeyBuilder.getRowKey(row);
245290
}
246291

247-
private Map<String, Object> rowConvertMap(Row record){
292+
private Map<String, Object> rowConvertMap(Row record) {
248293
Map<String, Object> rowValue = Maps.newHashMap();
249-
for(int i = 0; i < columnNames.length; i++){
294+
for (int i = 0; i < columnNames.length; i++) {
250295
rowValue.put(columnNames[i], record.getField(i));
251296
}
252297
return rowValue;
@@ -258,7 +303,15 @@ public void close() throws IOException {
258303
conn.close();
259304
conn = null;
260305
}
306+
307+
if (scheduledFuture != null) {
308+
scheduledFuture.cancel(false);
309+
if (executor != null) {
310+
executor.shutdownNow();
311+
}
312+
}
261313
}
314+
262315
private HbaseOutputFormat() {
263316
}
264317

@@ -345,6 +398,15 @@ public HbaseOutputFormatBuilder setClientKeytabFile(String clientKeytabFile) {
345398
return this;
346399
}
347400

401+
public HbaseOutputFormatBuilder setBatchSize(Integer batchSize) {
402+
format.batchSize = batchSize;
403+
return this;
404+
}
405+
406+
public HbaseOutputFormatBuilder setBatchWaitInterval(Long batchWaitInterval) {
407+
format.batchWaitInterval = batchWaitInterval;
408+
return this;
409+
}
348410

349411
public HbaseOutputFormat finish() {
350412
Preconditions.checkNotNull(format.host, "zookeeperQuorum should be specified");
@@ -405,6 +467,8 @@ public String toString() {
405467
", zookeeperSaslClient='" + zookeeperSaslClient + '\'' +
406468
", clientPrincipal='" + clientPrincipal + '\'' +
407469
", clientKeytabFile='" + clientKeytabFile + '\'' +
470+
", batchSize='" + batchSize + '\'' +
471+
", batchWaitInterval='" + batchWaitInterval + '\'' +
408472
'}';
409473
}
410474

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
6565
private String clientKeytabFile;
6666
private int parallelism = 1;
6767

68+
protected String batchSize;
69+
protected String batchWaitInterval;
6870

6971
public HbaseSink() {
7072
// TO DO NOTHING
@@ -94,6 +96,9 @@ public HbaseSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
9496
if (tmpSinkParallelism != null) {
9597
this.parallelism = tmpSinkParallelism;
9698
}
99+
100+
this.batchSize = hbaseTableInfo.getBatchSize();
101+
this.batchWaitInterval = hbaseTableInfo.getBatchWaitInterval();
97102
return this;
98103
}
99104

@@ -119,6 +124,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
119124
builder.setClientPrincipal(clientPrincipal);
120125
builder.setClientKeytabFile(clientKeytabFile);
121126

127+
builder.setBatchSize(Integer.parseInt(batchSize));
128+
builder.setBatchWaitInterval(Long.parseLong(batchWaitInterval));
129+
122130
HbaseOutputFormat outputFormat = builder.finish();
123131
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
124132
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction).name(registerTabName);

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public class HbaseSinkParser extends AbstractTableParser {
6464
public static final String CLIENT_PRINCIPAL_KEY = "clientPrincipal";
6565
public static final String CLIENT_KEYTABFILE_KEY = "clientKeytabFile";
6666

67+
public static final String BATCH_SIZE = "batchSize";
68+
public static final String BATCH_WAIT_INTERVAL = "batchWaitInterval";
69+
6770

6871
@Override
6972
protected boolean fieldNameNeedsUpperCase() {
@@ -75,23 +78,26 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7578
HbaseTableInfo hbaseTableInfo = new HbaseTableInfo();
7679
hbaseTableInfo.setName(tableName);
7780
parseFieldsInfo(fieldsInfo, hbaseTableInfo);
78-
hbaseTableInfo.setTableName((String) props.get(TABLE_NAME_KEY.toLowerCase()));
81+
hbaseTableInfo.setTableName(MathUtil.getString(props.get(TABLE_NAME_KEY.toLowerCase())));
7982
hbaseTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase())));
80-
hbaseTableInfo.setHost((String) props.get(HBASE_ZOOKEEPER_QUORUM.toLowerCase()));
81-
hbaseTableInfo.setParent((String)props.get(ZOOKEEPER_PARENT.toLowerCase()));
82-
String rk = (String) props.get(HBASE_ROWKEY.toLowerCase());
83+
hbaseTableInfo.setHost(MathUtil.getString(props.get(HBASE_ZOOKEEPER_QUORUM.toLowerCase())));
84+
hbaseTableInfo.setParent(MathUtil.getString(props.get(ZOOKEEPER_PARENT.toLowerCase())));
85+
String rk = MathUtil.getString(props.get(HBASE_ROWKEY.toLowerCase()));
8386
hbaseTableInfo.setRowkey(rk);
84-
String updateMode = (String) props.getOrDefault(UPDATE_KEY, EUpdateMode.APPEND.name());
87+
String updateMode = MathUtil.getString(props.getOrDefault(UPDATE_KEY, EUpdateMode.APPEND.name()));
8588
hbaseTableInfo.setUpdateMode(updateMode);
8689

8790
hbaseTableInfo.setKerberosAuthEnable(MathUtil.getBoolean(props.get(KERBEROS_AUTH_ENABLE_KEY.toLowerCase()), false));
88-
hbaseTableInfo.setRegionserverKeytabFile((String) props.get(REGIONSERVER_KEYTAB_FILE_KEY.toLowerCase()));
89-
hbaseTableInfo.setRegionserverPrincipal((String) props.get(REGIONSERVER_PRINCIPAL_KEY.toLowerCase()));
90-
hbaseTableInfo.setSecurityKrb5Conf((String) props.get(SECURITY_KRB5_CONF_KEY.toLowerCase()));
91-
hbaseTableInfo.setZookeeperSaslClient((String) props.get(ZOOKEEPER_SASL_CLINT_KEY.toLowerCase()));
91+
hbaseTableInfo.setRegionserverKeytabFile(MathUtil.getString(props.get(REGIONSERVER_KEYTAB_FILE_KEY.toLowerCase())));
92+
hbaseTableInfo.setRegionserverPrincipal((MathUtil.getString(props.get(REGIONSERVER_PRINCIPAL_KEY.toLowerCase()))));
93+
hbaseTableInfo.setSecurityKrb5Conf(MathUtil.getString(props.get(SECURITY_KRB5_CONF_KEY.toLowerCase())));
94+
hbaseTableInfo.setZookeeperSaslClient(MathUtil.getString(props.get(ZOOKEEPER_SASL_CLINT_KEY.toLowerCase())));
95+
96+
hbaseTableInfo.setClientPrincipal(MathUtil.getString(props.get(CLIENT_PRINCIPAL_KEY.toLowerCase())));
97+
hbaseTableInfo.setClientKeytabFile(MathUtil.getString(props.get(CLIENT_KEYTABFILE_KEY.toLowerCase())));
9298

93-
hbaseTableInfo.setClientPrincipal((String) props.get(CLIENT_PRINCIPAL_KEY.toLowerCase()));
94-
hbaseTableInfo.setClientKeytabFile((String) props.get(CLIENT_KEYTABFILE_KEY.toLowerCase()));
99+
hbaseTableInfo.setBatchSize(MathUtil.getString(props.getOrDefault(BATCH_SIZE.toLowerCase(), "100")));
100+
hbaseTableInfo.setBatchWaitInterval(MathUtil.getString(props.getOrDefault(BATCH_WAIT_INTERVAL.toLowerCase(), "1000")));
95101
return hbaseTableInfo;
96102
}
97103

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseTableInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public class HbaseTableInfo extends AbstractTargetTableInfo {
7171

7272
private String clientKeytabFile;
7373

74+
private String batchSize;
75+
76+
private String batchWaitInterval;
77+
7478
private Map<String,Object> hbaseConfig = Maps.newHashMap();
7579

7680
public HbaseTableInfo(){
@@ -233,4 +237,20 @@ public void setClientKeytabFile(String clientKeytabFile) {
233237
this.clientKeytabFile = clientKeytabFile;
234238
}
235239

240+
public String getBatchSize() {
241+
return batchSize;
242+
}
243+
244+
public void setBatchSize(String batchSize) {
245+
this.batchSize = batchSize;
246+
}
247+
248+
public String getBatchWaitInterval() {
249+
return batchWaitInterval;
250+
}
251+
252+
public void setBatchWaitInterval(String batchWaitInterval) {
253+
this.batchWaitInterval = batchWaitInterval;
254+
}
255+
236256
}

0 commit comments

Comments
 (0)