Skip to content

Commit f1e98f8

Browse files
committed
[hotfix-34999][elasticsearch6][sink]optimize es6's pom and adjusted generation strategy of doc's id to field name.
1 parent 84b25b3 commit f1e98f8

File tree

8 files changed

+25
-30
lines changed

8 files changed

+25
-30
lines changed

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
<properties>
1717
<sql.side.elasticsearch6.core.version>1.0-SNAPSHOT</sql.side.elasticsearch6.core.version>
18-
<elasticsearch.version>6.8.6</elasticsearch.version>
1918
</properties>
2019

2120

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
<properties>
1717
<sql.side.elasticsearch6.core.version>1.0-SNAPSHOT</sql.side.elasticsearch6.core.version>
18-
<elasticsearch.version>6.8.6</elasticsearch.version>
1918
</properties>
2019

2120

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
<version>1.0-SNAPSHOT</version>
1616
<packaging>jar</packaging>
1717

18-
<properties>
19-
<elasticsearch.version>6.8.6</elasticsearch.version>
20-
</properties>
21-
2218
<dependencies>
2319
<dependency>
2420
<groupId>org.elasticsearch.client</groupId>

elasticsearch6/elasticsearch6-sink/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@
1212
<artifactId>sql.sink.elasticsearch6</artifactId>
1313
<name>elasticsearch6-sink</name>
1414

15-
<properties>
16-
<elasticsearch.version>6.8.6</elasticsearch.version>
17-
</properties>
18-
1915
<dependencies>
2016
<dependency>
2117
<groupId>org.apache.flink</groupId>

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5353

5454
private String type;
5555

56-
private List<Integer> idFieldIndexList;
56+
private List<String> idFiledNames;
5757

5858
private List<String> fieldNames;
5959

@@ -63,12 +63,12 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
6363

6464
private transient Counter outDirtyRecords;
6565

66-
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes) {
66+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<String> idFiledNames) {
6767
this.index = index;
6868
this.type = type;
6969
this.fieldNames = fieldNames;
7070
this.fieldTypes = fieldTypes;
71-
this.idFieldIndexList = idFieldIndexes;
71+
this.idFiledNames = idFiledNames;
7272
}
7373

7474
@Override
@@ -99,14 +99,6 @@ public void setOutDirtyRecords(Counter outDirtyRecords) {
9999
}
100100

101101
private IndexRequest createIndexRequest(Row element) {
102-
String idFieldStr = "";
103-
if (null != idFieldIndexList) {
104-
// index start at 1,
105-
idFieldStr = idFieldIndexList.stream()
106-
.filter(index -> index > 0 && index <= element.getArity())
107-
.map(index -> element.getField(index - 1).toString())
108-
.collect(Collectors.joining(ID_VALUE_SPLIT));
109-
}
110102

111103
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element, fieldNames, fieldTypes);
112104
int length = Math.min(element.getArity(), fieldNames.size());
@@ -118,6 +110,12 @@ private IndexRequest createIndexRequest(Row element) {
118110
dataMap.put(fieldNames.get(i), element.getField(i));
119111
}
120112

113+
String idFieldStr = "";
114+
if (null != idFiledNames) {
115+
idFieldStr = idFiledNames.stream()
116+
.map(idFiledName -> dataMap.get(idFiledName).toString())
117+
.collect(Collectors.joining(ID_VALUE_SPLIT));
118+
}
121119

122120
if (StringUtils.isEmpty(idFieldStr)) {
123121
return Requests.indexRequest()

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

2121
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
22+
import com.google.common.base.Preconditions;
2223
import org.apache.flink.api.common.typeinfo.TypeInformation;
2324
import org.apache.flink.api.java.tuple.Tuple2;
2425
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -61,7 +62,7 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6162

6263
private String type = "";
6364

64-
private List<Integer> idIndexList;
65+
private List<String> idFiledNames;
6566

6667
protected String[] fieldNames;
6768

@@ -105,6 +106,12 @@ public TypeInformation<?>[] getFieldTypes() {
105106

106107

107108
private RichSinkFunction createEsSinkFunction() {
109+
110+
// check whether id fields is exists in columns
111+
List<String> filedNamesLists = Arrays.asList(fieldNames);
112+
Preconditions.checkState(filedNamesLists.containsAll(idFiledNames), "elasticsearch6 type of id %s is should be exists in columns %s.", idFiledNames, filedNamesLists);
113+
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, filedNamesLists, Arrays.asList(columnTypes), idFiledNames);
114+
108115
Map<String, String> userConfig = Maps.newHashMap();
109116
userConfig.put("cluster.name", clusterName);
110117
// This instructs the sink to emit after every element, otherwise they would be buffered
@@ -118,7 +125,7 @@ private RichSinkFunction createEsSinkFunction() {
118125
return new HttpHost(host.trim(), port, ES_DEFAULT_SCHEMA);
119126
}).collect(Collectors.toList());
120127

121-
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
128+
122129
return new MetricElasticsearch6Sink(userConfig, transports, customerSinkFunc, esTableInfo);
123130
}
124131

@@ -150,7 +157,7 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
150157
parallelism = Objects.isNull(esTableInfo.getParallelism()) ? parallelism : esTableInfo.getParallelism();
151158

152159
if (!StringUtils.isEmpty(id)) {
153-
idIndexList = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());
160+
idFiledNames = Arrays.stream(StringUtils.split(id, ",")).map(String::valueOf).collect(Collectors.toList());
154161
}
155162
return this;
156163
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,6 @@ public boolean check() {
130130
Preconditions.checkNotNull(esType, "elasticsearch6 type of type is required");
131131
Preconditions.checkNotNull(clusterName, "elasticsearch6 type of clusterName is required");
132132

133-
if (!StringUtils.isEmpty(id)) {
134-
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
135-
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
136-
});
137-
}
138-
139133
if (isAuthMesh()) {
140134
Preconditions.checkNotNull(userName, "elasticsearch6 type of userName is required");
141135
Preconditions.checkNotNull(password, "elasticsearch6 type of password is required");

elasticsearch6/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111

1212
<artifactId>sql.elasticsearch6</artifactId>
1313
<packaging>pom</packaging>
14+
15+
16+
<properties>
17+
<elasticsearch.version>6.8.6</elasticsearch.version>
18+
</properties>
19+
1420
<modules>
1521
<module>elasticsearch6-side</module>
1622
<module>elasticsearch6-sink</module>

0 commit comments

Comments
 (0)