Skip to content

Commit 84b25b3

Browse files
committed
[hotfix-34999][elasticsearch5][sink]optimize es5's pom and adjusted generation strategy of doc's id to field name.
1 parent ea50a0a commit 84b25b3

File tree

6 files changed

+39
-55
lines changed

6 files changed

+39
-55
lines changed

elasticsearch5/elasticsearch5-sink/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<dependency>
1717
<groupId>org.elasticsearch.client</groupId>
1818
<artifactId>transport</artifactId>
19-
<version>5.3.3</version>
19+
<version>${elasticsearch.version}</version>
2020
</dependency>
2121

2222
<dependency>
@@ -34,7 +34,7 @@
3434
<dependency>
3535
<groupId>org.elasticsearch.client</groupId>
3636
<artifactId>x-pack-transport</artifactId>
37-
<version>5.3.3</version>
37+
<version>${elasticsearch.version}</version>
3838
</dependency>
3939

4040
<dependency>

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,20 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5353

5454
private String type;
5555

56-
private List<Integer> idFieldIndexList;
56+
private List<String> idFiledNames;
5757

5858
private List<String> fieldNames;
5959

6060
private List<String> fieldTypes;
6161

6262
public transient Counter outRecords;
6363

64-
/** 默认分隔符为'_' */
65-
private char sp = '_';
66-
67-
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes){
64+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<String> idFiledNames){
6865
this.index = index;
6966
this.type = type;
7067
this.fieldNames = fieldNames;
7168
this.fieldTypes = fieldTypes;
72-
this.idFieldIndexList = idFieldIndexes;
69+
this.idFiledNames = idFiledNames;
7370
}
7471

7572
@Override
@@ -95,21 +92,20 @@ public void setOutRecords(Counter outRecords) {
9592
}
9693

9794
private IndexRequest createIndexRequest(Row element) {
98-
String idFieldStr = "";
99-
if (null != idFieldIndexList) {
100-
// index start at 1,
101-
idFieldStr = idFieldIndexList.stream()
102-
.filter(index -> index > 0 && index <= element.getArity())
103-
.map(index -> element.getField(index - 1).toString())
104-
.collect(Collectors.joining(ID_VALUE_SPLIT));
105-
}
10695

10796
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
10897
int length = Math.min(element.getArity(), fieldNames.size());
10998
for(int i=0; i<length; i++){
11099
dataMap.put(fieldNames.get(i), element.getField(i));
111100
}
112101

102+
String idFieldStr = "";
103+
if (null != idFiledNames) {
104+
idFieldStr = idFiledNames.stream()
105+
.map(idFiledName -> dataMap.get(idFiledName).toString())
106+
.collect(Collectors.joining(ID_VALUE_SPLIT));
107+
}
108+
113109
if (StringUtils.isEmpty(idFieldStr)) {
114110
return Requests.indexRequest()
115111
.index(index)

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

2323
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
24+
import com.google.common.base.Preconditions;
2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
2526
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -68,7 +69,7 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6869

6970
private String type = "";
7071

71-
private List<Integer> idIndexList;
72+
private List<String> idFiledNames;
7273

7374
protected String[] fieldNames;
7475

@@ -113,6 +114,10 @@ public TypeInformation<?>[] getFieldTypes() {
113114

114115
private RichSinkFunction createEsSinkFunction(){
115116

117+
// check whether id fields is exists in columns
118+
List<String> filedNamesLists = Arrays.asList(fieldNames);
119+
Preconditions.checkState(filedNamesLists.containsAll(idFiledNames), "elasticsearch5 type of id %s is should be exists in columns %s.", idFiledNames, filedNamesLists);
120+
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, filedNamesLists, Arrays.asList(columnTypes), idFiledNames);
116121

117122
Map<String, String> userConfig = new HashMap<>();
118123
userConfig.put("cluster.name", clusterName);
@@ -142,8 +147,6 @@ private RichSinkFunction createEsSinkFunction(){
142147
userConfig.put("xpack.security.user", authPassword);
143148
}
144149

145-
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
146-
147150
return new MetricElasticsearchSink(userConfig, transports, customerSinkFunc, esTableInfo);
148151
}
149152

@@ -179,13 +182,13 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
179182
type = elasticsearchTableInfo.getEsType();
180183
String id = elasticsearchTableInfo.getId();
181184
String[] idField = StringUtils.split(id, ",");
182-
idIndexList = new ArrayList<>();
185+
idFiledNames = new ArrayList<>();
183186
registerTableName = elasticsearchTableInfo.getName();
184187
parallelism = Objects.isNull(elasticsearchTableInfo.getParallelism()) ?
185188
parallelism : elasticsearchTableInfo.getParallelism();
186189

187190
for(int i = 0; i < idField.length; ++i) {
188-
idIndexList.add(Integer.valueOf(idField[i]));
191+
idFiledNames.add(String.valueOf(idField[i]));
189192
}
190193

191194
columnTypes = elasticsearchTableInfo.getFieldTypes();

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/MetricElasticsearchSink.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,6 @@ public void open(Configuration parameters) throws Exception {
6060
initMetric();
6161
}
6262

63-
/*public void setXPackTransportClient() throws Exception {
64-
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
65-
Settings settings = Settings.builder().put(userConfig).put("xpack.security.user", authPassword).build();
66-
Class clz = Class.forName("org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase");
67-
Field clientField = clz.getDeclaredField("client");
68-
clientField.setAccessible(true);
69-
PreBuiltXPackTransportClient transportClient = new PreBuiltXPackTransportClient(settings);
70-
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
71-
transportClient.addTransportAddress(transport);
72-
}
73-
74-
clientField.set(this, transportClient);
75-
}*/
76-
7763
public void initMetric() {
7864
Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
7965
customerSinkFunc.setOutRecords(counter);

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,6 @@ public boolean check() {
134134
Preconditions.checkNotNull(esType, "elasticsearch type of type is required");
135135
Preconditions.checkNotNull(clusterName, "elasticsearch type of clusterName is required");
136136

137-
if (!StringUtils.isEmpty(id)) {
138-
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
139-
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
140-
});
141-
}
142-
143137
if (isAuthMesh()) {
144138
Preconditions.checkNotNull(userName, "elasticsearch type of userName is required");
145139
Preconditions.checkNotNull(password, "elasticsearch type of password is required");

elasticsearch5/pom.xml

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,29 @@
1111

1212
<artifactId>sql.elasticsearch5</artifactId>
1313
<packaging>pom</packaging>
14+
15+
<properties>
16+
<elasticsearch.version>5.3.3</elasticsearch.version>
17+
</properties>
18+
1419
<modules>
1520
<module>elasticsearch5-sink</module>
1621
</modules>
17-
<dependencies>
18-
<dependency>
19-
<groupId>junit</groupId>
20-
<artifactId>junit</artifactId>
21-
<version>3.8.1</version>
22-
<scope>test</scope>
23-
</dependency>
22+
<dependencies>
23+
<dependency>
24+
<groupId>junit</groupId>
25+
<artifactId>junit</artifactId>
26+
<version>3.8.1</version>
27+
<scope>test</scope>
28+
</dependency>
2429

25-
<dependency>
26-
<groupId>com.dtstack.flink</groupId>
27-
<artifactId>sql.core</artifactId>
28-
<version>1.0-SNAPSHOT</version>
29-
<scope>provided</scope>
30-
</dependency>
30+
<dependency>
31+
<groupId>com.dtstack.flink</groupId>
32+
<artifactId>sql.core</artifactId>
33+
<version>1.0-SNAPSHOT</version>
34+
<scope>provided</scope>
35+
</dependency>
3136

32-
</dependencies>
37+
</dependencies>
3338

3439
</project>

0 commit comments

Comments
 (0)