Skip to content

Commit 4e43e74

Browse files
committed
[feat-269][elasticsearch6][sink]compatible old version of generate doc's id
1 parent 2d80515 commit 4e43e74

File tree

3 files changed

+61
-13
lines changed

3 files changed

+61
-13
lines changed

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5353

5454
private String type;
5555

56-
private List<String> idFiledNames;
56+
private List<Object> ids;
57+
58+
// true means generation doc's id by position "1[,1]"
59+
private boolean usePosition;
5760

5861
private List<String> fieldNames;
5962

@@ -63,12 +66,13 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
6366

6467
private transient Counter outDirtyRecords;
6568

66-
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<String> idFiledNames) {
69+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Object> ids, boolean usePosition) {
6770
this.index = index;
6871
this.type = type;
6972
this.fieldNames = fieldNames;
7073
this.fieldTypes = fieldTypes;
71-
this.idFiledNames = idFiledNames;
74+
this.ids = ids;
75+
this.usePosition = usePosition;
7276
}
7377

7478
@Override
@@ -111,10 +115,21 @@ private IndexRequest createIndexRequest(Row element) {
111115
}
112116

113117
String idFieldStr = "";
114-
if (null != idFiledNames) {
115-
idFieldStr = idFiledNames.stream()
116-
.map(idFiledName -> dataMap.get(idFiledName).toString())
117-
.collect(Collectors.joining(ID_VALUE_SPLIT));
118+
if (null != ids) {
119+
if (!usePosition) {
120+
idFieldStr = ids.stream()
121+
.map(filedName -> (String) filedName)
122+
.map(filedName -> dataMap.get(filedName).toString())
123+
.collect(Collectors.joining(ID_VALUE_SPLIT));
124+
} else {
125+
// compatible old version of generate doc's id
126+
// index start at 1,
127+
idFieldStr = ids.stream()
128+
.map(index -> (Integer) index)
129+
.filter(index -> index > 0 && index <= element.getArity())
130+
.map(index -> element.getField( index - 1).toString())
131+
.collect(Collectors.joining(ID_VALUE_SPLIT));
132+
}
118133
}
119134

120135
if (StringUtils.isEmpty(idFieldStr)) {

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6262

6363
private String type = "";
6464

65-
private List<String> idFiledNames;
65+
private List<Object> ids;
66+
67+
// true means generation doc's id by position "1[,1]"
68+
private boolean usePosition = false;
6669

6770
protected String[] fieldNames;
6871

@@ -107,10 +110,12 @@ public TypeInformation<?>[] getFieldTypes() {
107110

108111
private RichSinkFunction createEsSinkFunction() {
109112

110-
// check whether id fields is exists in columns
111-
List<String> filedNamesLists = Arrays.asList(fieldNames);
112-
Preconditions.checkState(filedNamesLists.containsAll(idFiledNames), "elasticsearch6 type of id %s is should be exists in columns %s.", idFiledNames, filedNamesLists);
113-
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, filedNamesLists, Arrays.asList(columnTypes), idFiledNames);
113+
// check whether id fields is exists in columns if not use position to generate doc's id
114+
if (!usePosition) {
115+
List<String> filedNamesLists = Arrays.asList(fieldNames);
116+
Preconditions.checkState(filedNamesLists.containsAll(ids), "elasticsearch6 type of id %s is should be exists in columns %s.", ids, filedNamesLists);
117+
}
118+
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), ids, usePosition);
114119

115120
Map<String, String> userConfig = Maps.newHashMap();
116121
userConfig.put("cluster.name", clusterName);
@@ -156,8 +161,15 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
156161
registerTableName = esTableInfo.getName();
157162
parallelism = Objects.isNull(esTableInfo.getParallelism()) ? parallelism : esTableInfo.getParallelism();
158163

164+
// check mode of generate doc's id
159165
if (!StringUtils.isEmpty(id)) {
160-
idFiledNames = Arrays.stream(StringUtils.split(id, ",")).map(String::valueOf).collect(Collectors.toList());
166+
if (!Es6Util.checkWhetherUsePosition(id)) {
167+
ids = Arrays.stream(StringUtils.split(id, ",")).map(String::valueOf).collect(Collectors.toList());
168+
} else {
169+
//compatible old version of generate doc' id
170+
usePosition = true;
171+
ids = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());
172+
}
161173
}
162174
return this;
163175
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/Es6Util.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.commons.lang3.math.NumberUtils;
2123
import org.apache.flink.types.Row;
2224
import org.apache.flink.util.Preconditions;
2325

2426
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2527

2628
import com.dtstack.flink.sql.util.DtStringUtil;
2729

30+
import java.util.Arrays;
2831
import java.util.HashMap;
2932
import java.util.List;
3033
import java.util.Map;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3135

3236
/**
3337
* @author yinxi
@@ -63,5 +67,22 @@ public static Map<String, Object> rowToJsonMap(Row row, List<String> fields, Lis
6367
return jsonMap;
6468
}
6569

70+
/**
71+
* check whether use position to generation doc's id
72+
* eg : |1,2,3 -> true
73+
* |id,name,addr -> false
74+
* @param ids
75+
* @return
76+
*/
77+
public static boolean checkWhetherUsePosition(String ids) {
78+
boolean flag = true;
79+
for( String id : StringUtils.split(ids, ",")) {
80+
if (!NumberUtils.isNumber(id)) {
81+
flag= false;
82+
break;
83+
}
84+
}
85+
return flag;
86+
}
6687

6788
}

0 commit comments

Comments
 (0)