Skip to content

Commit 8030b4a

Browse files
committed
es id null check
1 parent 22c9bb7 commit 8030b4a

File tree

2 files changed

+16
-10
lines changed
  • elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch
  • elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch

2 files changed

+16
-10
lines changed

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,14 @@ public void setOutRecords(Counter outRecords) {
9696
}
9797

9898
private IndexRequest createIndexRequest(Row element) {
99-
// index start at 1,
100-
String idFieldStr = idFieldIndexList.stream()
101-
.filter(index -> index > 0 && index <= element.getArity())
102-
.map(index -> element.getField(index - 1).toString())
103-
.collect(Collectors.joining(ID_VALUE_SPLIT));
99+
String idFieldStr = "";
100+
if (null != idFieldIndexList) {
101+
// index start at 1,
102+
idFieldStr = idFieldIndexList.stream()
103+
.filter(index -> index > 0 && index <= element.getArity())
104+
.map(index -> element.getField(index - 1).toString())
105+
.collect(Collectors.joining(ID_VALUE_SPLIT));
106+
}
104107

105108
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
106109
int length = Math.min(element.getArity(), fieldNames.size());

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,14 @@ public void setOutDirtyRecords(Counter outDirtyRecords) {
9595
}
9696

9797
private IndexRequest createIndexRequest(Row element) {
98-
// index start at 1,
99-
String idFieldStr = idFieldIndexList.stream()
100-
.filter(index -> index > 0 && index <= element.getArity())
101-
.map(index -> element.getField(index - 1).toString())
102-
.collect(Collectors.joining(ID_VALUE_SPLIT));
98+
String idFieldStr = "";
99+
if (null != idFieldIndexList) {
100+
// index start at 1,
101+
idFieldStr = idFieldIndexList.stream()
102+
.filter(index -> index > 0 && index <= element.getArity())
103+
.map(index -> element.getField(index - 1).toString())
104+
.collect(Collectors.joining(ID_VALUE_SPLIT));
105+
}
103106

104107
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element, fieldNames, fieldTypes);
105108
int length = Math.min(element.getArity(), fieldNames.size());

0 commit comments

Comments
 (0)