Skip to content

Commit 586c74a

Browse files
committed
es id number check
1 parent 110df8d commit 586c74a

File tree

6 files changed

+56
-62
lines changed

6 files changed

+56
-62
lines changed
File renamed without changes.

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323

2424
import com.dtstack.flink.sql.table.TargetTableInfo;
2525
import com.google.common.base.Preconditions;
26+
import org.apache.commons.lang.StringUtils;
27+
import org.apache.commons.lang3.math.NumberUtils;
28+
29+
import java.util.Arrays;
2630

2731
/**
2832
* @date 2018/09/12
@@ -131,6 +135,10 @@ public boolean check() {
131135
Preconditions.checkNotNull(id, "elasticsearch type of id is required");
132136
Preconditions.checkNotNull(clusterName, "elasticsearch type of clusterName is required");
133137

138+
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
139+
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
140+
});
141+
134142
if (isAuthMesh()) {
135143
Preconditions.checkNotNull(userName, "elasticsearch type of userName is required");
136144
Preconditions.checkNotNull(password, "elasticsearch type of password is required");

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34-
import java.util.ArrayList;
3534
import java.util.List;
3635
import java.util.Map;
36+
import java.util.stream.Collectors;
3737

3838
/**
3939
* @author yinxi
@@ -42,6 +42,8 @@
4242
public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
4343

4444
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
45+
/** 用作ID的属性值连接符号 */
46+
private static final String ID_VALUE_SPLIT = "_";
4547

4648
private String index;
4749

@@ -57,10 +59,7 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5759

5860
private transient Counter outDirtyRecords;
5961

60-
/** 默认分隔符为'_' */
61-
private char sp = '_';
62-
63-
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes){
62+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes) {
6463
this.index = index;
6564
this.type = type;
6665
this.fieldNames = fieldNames;
@@ -96,31 +95,30 @@ public void setOutDirtyRecords(Counter outDirtyRecords) {
9695
}
9796

9897
private IndexRequest createIndexRequest(Row element) {
98+
// index start at 1,
99+
String idFieldStr = idFieldIndexList.stream()
100+
.filter(index -> index > 0 && index <= element.getArity())
101+
.map(index -> element.getField(index - 1).toString())
102+
.collect(Collectors.joining(ID_VALUE_SPLIT));
99103

100-
List<String> idFieldList = new ArrayList<>();
101-
for(int index : idFieldIndexList){
102-
if(index >= element.getArity()){
103-
continue;
104-
}
105-
106-
idFieldList.add(element.getField(index).toString());
107-
}
108-
109-
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element,fieldNames,fieldTypes);
104+
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element, fieldNames, fieldTypes);
110105
int length = Math.min(element.getArity(), fieldNames.size());
111-
for(int i=0; i<length; i++){
106+
for (int i = 0; i < length; i++) {
112107
dataMap.put(fieldNames.get(i), element.getField(i));
113108
}
114109

115-
if (idFieldList.size() == 0) {
116-
return Requests.indexRequest().index(index).type(type).source(dataMap);
110+
111+
if (StringUtils.isEmpty(idFieldStr)) {
112+
return Requests.indexRequest()
113+
.index(index)
114+
.type(type)
115+
.source(dataMap);
117116
}
118117

119-
String id = StringUtils.join(idFieldList, sp);
120118
return Requests.indexRequest()
121119
.index(index)
122120
.type(type)
123-
.id(id)
121+
.id(idFieldStr)
124122
.source(dataMap);
125123
}
126124
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,19 @@
3535
import com.google.common.collect.Maps;
3636
import org.apache.commons.lang.StringUtils;
3737
import org.apache.http.HttpHost;
38-
import org.slf4j.Logger;
39-
import org.slf4j.LoggerFactory;
40-
41-
import java.util.ArrayList;
4238
import java.util.Arrays;
4339
import java.util.List;
4440
import java.util.Map;
41+
import java.util.stream.Collectors;
4542

4643
/**
4744
* @author yinxi
4845
* @date 2020/1/9 - 15:08
4946
*/
5047
public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSinkGener<ElasticsearchSink> {
5148

52-
private final Logger logger = LoggerFactory.getLogger(ElasticsearchSink.class);
49+
private final int ES_DEFAULT_PORT = 9200;
50+
private final String ES_DEFAULT_SCHEMA = "http";
5351

5452
private String clusterName;
5553

@@ -103,32 +101,20 @@ public TypeInformation<?>[] getFieldTypes() {
103101

104102

105103
private RichSinkFunction createEsSinkFunction() {
106-
107-
108104
Map<String, String> userConfig = Maps.newHashMap();
109105
userConfig.put("cluster.name", clusterName);
110106
// This instructs the sink to emit after every element, otherwise they would be buffered
111107
userConfig.put(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "" + bulkFlushMaxActions);
112-
List<HttpHost> transports = new ArrayList<>();
113-
114-
for (String address : esAddressList) {
115-
String[] infoArray = address.split(":");
116-
int port = 9200;
117-
String host = infoArray[0];
118-
if (infoArray.length > 1) {
119-
port = Integer.valueOf(infoArray[1].trim());
120-
}
121-
122-
try {
123-
transports.add(new HttpHost(host.trim(), port, "http"));
124-
} catch (Exception e) {
125-
logger.error("", e);
126-
throw new RuntimeException(e);
127-
}
128-
}
129108

130-
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
109+
List<HttpHost> transports = esAddressList.stream()
110+
.map(address -> address.split(":"))
111+
.map(addressArray -> {
112+
String host = addressArray[0].trim();
113+
int port = addressArray.length > 1 ? Integer.valueOf(addressArray[1].trim()) : ES_DEFAULT_PORT;
114+
return new HttpHost(host.trim(), port, ES_DEFAULT_SCHEMA);
115+
}).collect(Collectors.toList());
131116

117+
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
132118
return new MetricElasticsearch6Sink(userConfig, transports, customerSinkFunc, esTableInfo);
133119
}
134120

@@ -151,23 +137,17 @@ public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
151137

152138
@Override
153139
public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
154-
ElasticsearchTableInfo elasticsearchTableInfo = (ElasticsearchTableInfo) targetTableInfo;
155-
esTableInfo = elasticsearchTableInfo;
156-
clusterName = elasticsearchTableInfo.getClusterName();
157-
String address = elasticsearchTableInfo.getAddress();
158-
String[] addr = address.split(",");
159-
esAddressList = Arrays.asList(addr);
160-
index = elasticsearchTableInfo.getIndex();
161-
type = elasticsearchTableInfo.getEsType();
162-
String id = elasticsearchTableInfo.getId();
163-
String[] idField = StringUtils.split(id, ",");
164-
idIndexList = new ArrayList<>();
165-
166-
for (int i = 0; i < idField.length; ++i) {
167-
idIndexList.add(Integer.valueOf(idField[i]));
140+
esTableInfo = (ElasticsearchTableInfo) targetTableInfo;
141+
clusterName = esTableInfo.getClusterName();
142+
index = esTableInfo.getIndex();
143+
type = esTableInfo.getEsType();
144+
columnTypes = esTableInfo.getFieldTypes();
145+
esAddressList = Arrays.asList(esTableInfo.getAddress().split(","));
146+
147+
String id = esTableInfo.getId();
148+
if (!StringUtils.isEmpty(id)) {
149+
idIndexList = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());
168150
}
169-
170-
columnTypes = elasticsearchTableInfo.getFieldTypes();
171151
return this;
172152
}
173153
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121
import com.dtstack.flink.sql.table.TargetTableInfo;
2222
import com.google.common.base.Preconditions;
23+
import org.apache.commons.lang.StringUtils;
24+
import org.apache.commons.lang3.math.NumberUtils;
25+
26+
import java.util.Arrays;
2327

2428
/**
2529
* @author yinxi
@@ -127,6 +131,10 @@ public boolean check() {
127131
Preconditions.checkNotNull(id, "elasticsearch6 type of id is required");
128132
Preconditions.checkNotNull(clusterName, "elasticsearch6 type of clusterName is required");
129133

134+
Arrays.stream(StringUtils.split(id, ",")).forEach(number ->{
135+
Preconditions.checkArgument(NumberUtils.isNumber(number),"id must be a numeric type");
136+
});
137+
130138
if (isAuthMesh()) {
131139
Preconditions.checkNotNull(userName, "elasticsearch6 type of userName is required");
132140
Preconditions.checkNotNull(password, "elasticsearch6 type of password is required");

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
<module>mysql</module>
1919
<module>hbase</module>
2020
<module>elasticsearch5</module>
21+
<module>elasticsearch6</module>
2122
<module>mongo</module>
2223
<module>redis5</module>
2324
<module>launcher</module>
@@ -33,7 +34,6 @@
3334
<module>impala</module>
3435
<module>db2</module>
3536
<module>polardb</module>
36-
<module>elasticsearch6</module>
3737

3838
</modules>
3939

0 commit comments

Comments
 (0)