Skip to content

Commit 1687594

Browse files
committed
Merge branch 'feat_1.8_elasticsearch6-sink' into 'v1.8.0_dev'
添加对es6 结果表的支持(es添加脏数据指标的输出) es添加脏数据指标的输出 See merge request !233
2 parents be19965 + dbd490a commit 1687594

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/CustomerSinkFunc.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5353

5454
private List<String> fieldTypes;
5555

56-
public transient Counter outRecords;
56+
private transient Counter outRecords;
57+
58+
private transient Counter outDirtyRecords;
5759

5860
/** 默认分隔符为'_' */
5961
private char sp = '_';
@@ -79,6 +81,7 @@ public void process(Tuple2 tuple2, RuntimeContext ctx, RequestIndexer indexer) {
7981
indexer.add(createIndexRequest(element));
8082
outRecords.inc();
8183
} catch (Throwable e) {
84+
outDirtyRecords.inc();
8285
logger.error("Failed to store source data {}. ", tuple2.getField(1));
8386
logger.error("Failed to create index request exception. ", e);
8487
}
@@ -88,6 +91,10 @@ public void setOutRecords(Counter outRecords) {
8891
this.outRecords = outRecords;
8992
}
9093

94+
public void setOutDirtyRecords(Counter outDirtyRecords) {
95+
this.outDirtyRecords = outDirtyRecords;
96+
}
97+
9198
private IndexRequest createIndexRequest(Row element) {
9299

93100
List<String> idFieldList = new ArrayList<>();

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/MetricElasticsearch6Sink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ public void open(Configuration parameters) throws Exception {
6363

6464
public void initMetric() {
6565
Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
66+
Counter outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
67+
6668
customerSinkFunc.setOutRecords(counter);
69+
customerSinkFunc.setOutDirtyRecords(outDirtyRecords);
6770
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(counter, 20));
6871
}
6972
}

0 commit comments

Comments
 (0)