Skip to content

Commit cfa0226

Browse files
异常时将数据写入日志
1 parent 2df63ae commit cfa0226

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

docs/elasticsearch6Sink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ CREATE TABLE MyResult(
4848
aa INT,
4949
bb INT
5050
)WITH(
51-
type ='elasticsearch',
51+
type ='elasticsearch6',
5252
address ='172.16.10.47:9500',
5353
cluster='es_47_menghan',
5454
esType ='type1',

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/CustomerSinkFunc.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,19 @@ public CustomerSinkFunc(String index, String type, List<String> fieldNames, List
6868

6969
@Override
7070
public void process(Tuple2 tuple2, RuntimeContext ctx, RequestIndexer indexer) {
71-
try{
71+
try {
7272
Tuple2<Boolean, Row> tupleTrans = tuple2;
7373
Boolean retract = tupleTrans.getField(0);
7474
Row element = tupleTrans.getField(1);
75-
if(!retract){
75+
if (!retract) {
7676
return;
7777
}
7878

79-
8079
indexer.add(createIndexRequest(element));
8180
outRecords.inc();
82-
}catch (Throwable e){
83-
logger.error("", e);
81+
} catch (Throwable e) {
82+
logger.error("Failed to store source data {}. ", tuple2.getField(1));
83+
logger.error("Failed to create index request exception. ", e);
8484
}
8585
}
8686

0 commit comments

Comments
 (0)