Skip to content

Commit bffeb0d

Browse files
FlechazoWHiLany
authored andcommitted
[hotfix-35505][kudu]
1.add [mutationBufferMaxOps]; 2.if catch exception with "MANUAL_FLUSH is enabled but the buffer is too big", then session flush;
1 parent 9474b14 commit bffeb0d

File tree

4 files changed

+42
-10
lines changed

4 files changed

+42
-10
lines changed

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.kudu;
2020

21+
import com.dtstack.flink.sql.exception.ExceptionTrace;
2122
import com.dtstack.flink.sql.factory.DTThreadFactory;
2223
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2324
import com.dtstack.flink.sql.sink.kudu.table.KuduTableInfo;
@@ -60,6 +61,8 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean,
6061
private static final long serialVersionUID = 1L;
6162

6263
private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
64+
private static final String MANUAL_FLUSH_BUFFER_BIG_MSG = "MANUAL_FLUSH is enabled but the buffer is too big";
65+
6366
protected String[] fieldNames;
6467
TypeInformation<?>[] fieldTypes;
6568
boolean enableKrb;
@@ -76,6 +79,8 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean,
7679

7780
private Integer defaultOperationTimeoutMs;
7881

82+
private int mutationBufferMaxOps;
83+
7984
/**
8085
* kerberos
8186
*/
@@ -186,9 +191,7 @@ private KuduSession buildSessionWithFlushMode(String flushMode, KuduClient kuduC
186191
KuduSession kuduSession = kuduClient.newSession();
187192
if (flushMode.equalsIgnoreCase(KuduTableInfo.KuduFlushMode.MANUAL_FLUSH.name())) {
188193
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
189-
kuduSession.setMutationBufferSpace(
190-
Integer.parseInt(String.valueOf(Math.round(batchSize * 1.2)))
191-
);
194+
kuduSession.setMutationBufferSpace(mutationBufferMaxOps);
192195
}
193196

194197
if (flushMode.equalsIgnoreCase(KuduTableInfo.KuduFlushMode.AUTO_FLUSH_SYNC.name())) {
@@ -197,8 +200,9 @@ private KuduSession buildSessionWithFlushMode(String flushMode, KuduClient kuduC
197200
}
198201

199202
if (flushMode.equalsIgnoreCase(KuduTableInfo.KuduFlushMode.AUTO_FLUSH_BACKGROUND.name())) {
200-
LOG.warn("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode.");
203+
LOG.warn("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode. Only [batchWaitInterval] will effect.");
201204
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
205+
kuduSession.setFlushInterval(batchWaitInterval);
202206
}
203207

204208
return kuduSession;
@@ -213,21 +217,26 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
213217
Row row = record.getField(1);
214218

215219
try {
216-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
217-
LOG.info("Receive data : {}", row);
218-
}
219220
if (rowCount.getAndIncrement() >= batchSize) {
220221
flush();
221222
}
222223
// At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation, then get the response from kudu server.
223224
if (flushMode.equalsIgnoreCase(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC.name())) {
224225
dealResponse(session.apply(toOperation(writeMode, row)));
226+
} else {
227+
session.apply(toOperation(writeMode, row));
225228
}
226-
227-
session.apply(toOperation(writeMode, row));
228229
outRecords.inc();
230+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
231+
LOG.info("Receive data : {}", row);
232+
}
229233
} catch (KuduException e) {
230-
throw new RuntimeException(e);
234+
// 如果出现了buffer is too big的问题,需要将当前buffer里的数据flush掉即可。
235+
if (ExceptionTrace.traceOriginalCause(e).contains(MANUAL_FLUSH_BUFFER_BIG_MSG)) {
236+
flush();
237+
} else {
238+
throw new RuntimeException(e);
239+
}
231240
}
232241
}
233242

@@ -483,6 +492,11 @@ public KuduOutputFormatBuilder setFlushMode(String flushMode) {
483492
return this;
484493
}
485494

495+
public KuduOutputFormatBuilder setMutationBufferMaxOps(Integer mutationBufferMaxOps) {
496+
kuduOutputFormat.mutationBufferMaxOps = mutationBufferMaxOps;
497+
return this;
498+
}
499+
486500
public KuduOutputFormat finish() {
487501
if (kuduOutputFormat.kuduMasters == null) {
488502
throw new IllegalArgumentException("No kuduMasters supplied.");

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class KuduSink implements RetractStreamTableSink<Row>, Serializable, IStr
3636
private Integer batchSize;
3737
private Integer batchWaitInterval;
3838
private String flushMode;
39+
private Integer mutationBufferMaxOps;
3940

4041
@Override
4142
public KuduSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
@@ -55,6 +56,9 @@ public KuduSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
5556
this.batchSize = kuduTableInfo.getBatchSize();
5657
this.batchWaitInterval = kuduTableInfo.getBatchWaitInterval();
5758
this.flushMode = kuduTableInfo.getFlushMode();
59+
this.mutationBufferMaxOps = Objects.isNull(kuduTableInfo.getMutationBufferMaxOps()) ?
60+
Integer.parseInt(String.valueOf(Math.round(batchSize * 1.2))) :
61+
kuduTableInfo.getMutationBufferMaxOps();
5862
return this;
5963
}
6064

@@ -80,6 +84,7 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
8084
.setBatchSize(this.batchSize)
8185
.setBatchWaitInterval(this.batchWaitInterval)
8286
.setFlushMode(this.flushMode)
87+
.setMutationBufferMaxOps(this.mutationBufferMaxOps)
8388
.finish();
8489
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(kuduOutputFormat);
8590
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class KuduSinkParser extends AbstractTableParser {
3232

3333
public static final String BATCH_WAIT_INTERVAL_KEY = "batchWaitInterval";
3434

35+
public static final String BUFFER_MAX_KEY = "mutationBufferMaxOps";
36+
3537
public static final Integer DEFAULT_BATCH_WAIT_INTERVAL = 60 * 1000;
3638

3739
public static final String SESSION_FLUSH_MODE_KEY = "flushMode";
@@ -50,6 +52,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5052
kuduTableInfo.setDefaultOperationTimeoutMs(MathUtil.getIntegerVal(props.get(OPERATION_TIMEOUT_MS.toLowerCase())));
5153
kuduTableInfo.setBatchSize(MathUtil.getIntegerVal(props.getOrDefault(BATCH_SIZE_KEY.toLowerCase(), DEFAULT_BATCH_SIZE)));
5254
kuduTableInfo.setBatchWaitInterval(MathUtil.getIntegerVal(props.getOrDefault(BATCH_WAIT_INTERVAL_KEY.toLowerCase(), DEFAULT_BATCH_WAIT_INTERVAL)));
55+
kuduTableInfo.setMutationBufferMaxOps(MathUtil.getIntegerVal(props.get(BUFFER_MAX_KEY.toLowerCase())));
5356

5457
if (Objects.isNull(props.get(SESSION_FLUSH_MODE_KEY.toLowerCase()))) {
5558
if (kuduTableInfo.getBatchSize() > 1) {

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduTableInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public class KuduTableInfo extends AbstractTargetTableInfo implements KerberosTa
1919

2020
private Integer defaultOperationTimeoutMs;
2121

22+
private Integer mutationBufferMaxOps;
23+
2224
/**
2325
* kerberos
2426
*/
@@ -80,6 +82,14 @@ public void setDefaultOperationTimeoutMs(Integer defaultOperationTimeoutMs) {
8082
this.defaultOperationTimeoutMs = defaultOperationTimeoutMs;
8183
}
8284

85+
public Integer getMutationBufferMaxOps() {
86+
return mutationBufferMaxOps;
87+
}
88+
89+
public void setMutationBufferMaxOps(Integer mutationBufferMaxOps) {
90+
this.mutationBufferMaxOps = mutationBufferMaxOps;
91+
}
92+
8393
@Override
8494
public boolean check() {
8595
Preconditions.checkNotNull(kuduMasters, "kudu field of kuduMasters is required");

0 commit comments

Comments
 (0)