Skip to content

Commit 2d80515

Browse files
committed
[feat-269][elasticsearch5][sink]compatible old version of generate doc's id
1 parent a7eb863 commit 2d80515

File tree

3 files changed

+61
-14
lines changed

3 files changed

+61
-14
lines changed

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,24 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5353

5454
private String type;
5555

56-
private List<String> idFiledNames;
56+
private List<Object> ids;
57+
58+
// true means generation doc's id by position "1[,1]"
59+
private boolean usePosition;
5760

5861
private List<String> fieldNames;
5962

6063
private List<String> fieldTypes;
6164

6265
public transient Counter outRecords;
6366

64-
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<String> idFiledNames){
67+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Object> ids, boolean usePosition) {
6568
this.index = index;
6669
this.type = type;
6770
this.fieldNames = fieldNames;
6871
this.fieldTypes = fieldTypes;
69-
this.idFiledNames = idFiledNames;
72+
this.ids = ids;
73+
this.usePosition = usePosition;
7074
}
7175

7276
@Override
@@ -100,10 +104,21 @@ private IndexRequest createIndexRequest(Row element) {
100104
}
101105

102106
String idFieldStr = "";
103-
if (null != idFiledNames) {
104-
idFieldStr = idFiledNames.stream()
105-
.map(idFiledName -> dataMap.get(idFiledName).toString())
106-
.collect(Collectors.joining(ID_VALUE_SPLIT));
107+
if (null != ids) {
108+
if (!usePosition) {
109+
idFieldStr = ids.stream()
110+
.map(filedName -> (String) filedName)
111+
.map(filedName -> dataMap.get(filedName).toString())
112+
.collect(Collectors.joining(ID_VALUE_SPLIT));
113+
} else {
114+
// compatible old version of generate doc's id
115+
// index start at 1,
116+
idFieldStr = ids.stream()
117+
.map(index -> (Integer) index)
118+
.filter(index -> index > 0 && index <= element.getArity())
119+
.map(index -> element.getField( index - 1).toString())
120+
.collect(Collectors.joining(ID_VALUE_SPLIT));
121+
}
107122
}
108123

109124
if (StringUtils.isEmpty(idFieldStr)) {

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.List;
4848
import java.util.Map;
4949
import java.util.Objects;
50+
import java.util.stream.Collectors;
5051

5152
/**
5253
* table output elastic5plugin
@@ -69,7 +70,10 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6970

7071
private String type = "";
7172

72-
private List<String> idFiledNames;
73+
private List<Object> ids;
74+
75+
// true means generation doc's id by position "1[,1]"
76+
private boolean usePosition = false;
7377

7478
protected String[] fieldNames;
7579

@@ -114,10 +118,12 @@ public TypeInformation<?>[] getFieldTypes() {
114118

115119
private RichSinkFunction createEsSinkFunction(){
116120

117-
// check whether id fields is exists in columns
118-
List<String> filedNamesLists = Arrays.asList(fieldNames);
119-
Preconditions.checkState(filedNamesLists.containsAll(idFiledNames), "elasticsearch5 type of id %s is should be exists in columns %s.", idFiledNames, filedNamesLists);
120-
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, filedNamesLists, Arrays.asList(columnTypes), idFiledNames);
121+
// check whether id fields is exists in columns if not use position to generate doc's id
122+
if (!usePosition) {
123+
List<String> filedNamesLists = Arrays.asList(fieldNames);
124+
Preconditions.checkState(filedNamesLists.containsAll(ids), "elasticsearch6 type of id %s is should be exists in columns %s.", ids, filedNamesLists);
125+
}
126+
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), ids, usePosition);
121127

122128
Map<String, String> userConfig = new HashMap<>();
123129
userConfig.put("cluster.name", clusterName);
@@ -182,15 +188,21 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
182188
type = elasticsearchTableInfo.getEsType();
183189
String id = elasticsearchTableInfo.getId();
184190
String[] idField = StringUtils.split(id, ",");
185-
idFiledNames = new ArrayList<>();
186191
registerTableName = elasticsearchTableInfo.getName();
187192
parallelism = Objects.isNull(elasticsearchTableInfo.getParallelism()) ?
188193
parallelism : elasticsearchTableInfo.getParallelism();
189194

190195
for(int i = 0; i < idField.length; ++i) {
191-
idFiledNames.add(String.valueOf(idField[i]));
196+
if (!EsUtil.checkWhetherUsePosition(id)) {
197+
ids = Arrays.stream(org.apache.commons.lang.StringUtils.split(id, ",")).map(String::valueOf).collect(Collectors.toList());
198+
} else {
199+
//compatible old version of generate doc' id
200+
usePosition = true;
201+
ids = Arrays.stream(org.apache.commons.lang.StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());
202+
}
192203
}
193204

205+
194206
columnTypes = elasticsearchTableInfo.getFieldTypes();
195207

196208
return this;

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/EsUtil.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.commons.lang3.math.NumberUtils;
2123
import org.apache.flink.types.Row;
2224
import org.apache.flink.util.Preconditions;
2325

@@ -62,5 +64,23 @@ public static Map<String, Object> rowToJsonMap(Row row, List<String> fields, Lis
6264
return jsonMap;
6365
}
6466

67+
/**
68+
* check whether use position to generation doc's id
69+
* eg : |1,2,3 -> true
70+
* |id,name,addr -> false
71+
* @param ids
72+
* @return
73+
*/
74+
public static boolean checkWhetherUsePosition(String ids) {
75+
boolean flag = true;
76+
for( String id : StringUtils.split(ids, ",")) {
77+
if (!NumberUtils.isNumber(id)) {
78+
flag= false;
79+
break;
80+
}
81+
}
82+
return flag;
83+
}
84+
6585

6686
}

0 commit comments

Comments
 (0)