Skip to content

Commit 284d52d

Browse files
committed
id allow as null
1 parent 586c74a commit 284d52d

File tree

4 files changed

+27
-28
lines changed

4 files changed

+27
-28
lines changed

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.ArrayList;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.stream.Collectors;
3839

3940
/**
4041
* Reason:
@@ -47,6 +48,8 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
4748

4849
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
4950

51+
private static final String ID_VALUE_SPLIT = "_";
52+
5053
private String index;
5154

5255
private String type;
@@ -93,27 +96,29 @@ public void setOutRecords(Counter outRecords) {
9396
}
9497

9598
private IndexRequest createIndexRequest(Row element) {
96-
97-
List<String> idFieldList = new ArrayList<>();
98-
for(int index : idFieldIndexList){
99-
if(index >= element.getArity()){
100-
continue;
101-
}
102-
103-
idFieldList.add(element.getField(index).toString());
104-
}
99+
// index start at 1,
100+
String idFieldStr = idFieldIndexList.stream()
101+
.filter(index -> index > 0 && index <= element.getArity())
102+
.map(index -> element.getField(index - 1).toString())
103+
.collect(Collectors.joining(ID_VALUE_SPLIT));
105104

106105
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
107106
int length = Math.min(element.getArity(), fieldNames.size());
108107
for(int i=0; i<length; i++){
109108
dataMap.put(fieldNames.get(i), element.getField(i));
110109
}
111110

112-
String id = StringUtils.join(idFieldList, sp);
111+
if (StringUtils.isEmpty(idFieldStr)) {
112+
return Requests.indexRequest()
113+
.index(index)
114+
.type(type)
115+
.source(dataMap);
116+
}
117+
113118
return Requests.indexRequest()
114119
.index(index)
115120
.type(type)
116-
.id(id)
121+
.id(idFieldStr)
117122
.source(dataMap);
118123
}
119124
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,13 @@ public boolean check() {
132132
Preconditions.checkNotNull(address, "elasticsearch type of address is required");
133133
Preconditions.checkNotNull(index, "elasticsearch type of index is required");
134134
Preconditions.checkNotNull(esType, "elasticsearch type of type is required");
135-
Preconditions.checkNotNull(id, "elasticsearch type of id is required");
136135
Preconditions.checkNotNull(clusterName, "elasticsearch type of clusterName is required");
137136

138-
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
139-
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
140-
});
137+
if (!StringUtils.isEmpty(id)) {
138+
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
139+
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
140+
});
141+
}
141142

142143
if (isAuthMesh()) {
143144
Preconditions.checkNotNull(userName, "elasticsearch type of userName is required");

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,6 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
127127
}
128128
}
129129

130-
public void setParallelism(int parallelism) {
131-
this.parallelism = parallelism;
132-
}
133-
134-
public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
135-
this.bulkFlushMaxActions = bulkFlushMaxActions;
136-
}
137-
138130
@Override
139131
public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
140132
esTableInfo = (ElasticsearchTableInfo) targetTableInfo;
@@ -143,8 +135,8 @@ public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
143135
type = esTableInfo.getEsType();
144136
columnTypes = esTableInfo.getFieldTypes();
145137
esAddressList = Arrays.asList(esTableInfo.getAddress().split(","));
146-
147138
String id = esTableInfo.getId();
139+
148140
if (!StringUtils.isEmpty(id)) {
149141
idIndexList = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());
150142
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,13 @@ public boolean check() {
128128
Preconditions.checkNotNull(address, "elasticsearch6 type of address is required");
129129
Preconditions.checkNotNull(index, "elasticsearch6 type of index is required");
130130
Preconditions.checkNotNull(esType, "elasticsearch6 type of type is required");
131-
Preconditions.checkNotNull(id, "elasticsearch6 type of id is required");
132131
Preconditions.checkNotNull(clusterName, "elasticsearch6 type of clusterName is required");
133132

134-
Arrays.stream(StringUtils.split(id, ",")).forEach(number ->{
135-
Preconditions.checkArgument(NumberUtils.isNumber(number),"id must be a numeric type");
136-
});
133+
if (!StringUtils.isEmpty(id)) {
134+
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
135+
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
136+
});
137+
}
137138

138139
if (isAuthMesh()) {
139140
Preconditions.checkNotNull(userName, "elasticsearch6 type of userName is required");

0 commit comments

Comments
 (0)