Skip to content

Commit 27ed707

Browse files
committed
Merge branch 'feat_esAllowNull' into '1.8_test_3.10.x'
allow es is is null See merge request !261
2 parents a6ba9ef + d636b17 commit 27ed707

File tree

7 files changed

+76
-83
lines changed

7 files changed

+76
-83
lines changed
File renamed without changes.

docs/elasticsearch6Sink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ CREATE TABLE tableName(
3535
|cluster | ES 集群名称 |||
3636
|index | 选择的ES上的index名称|||
3737
|esType | 选择ES上的type名称|||
38-
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
38+
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
3939
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
4040
|authMesh | 是否进行用户名密码认证 || false|
4141
|userName | 用户名 | 否,authMesh='true'时为必填 ||

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.ArrayList;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.stream.Collectors;
3839

3940
/**
4041
* Reason:
@@ -47,6 +48,8 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
4748

4849
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
4950

51+
private static final String ID_VALUE_SPLIT = "_";
52+
5053
private String index;
5154

5255
private String type;
@@ -93,27 +96,29 @@ public void setOutRecords(Counter outRecords) {
9396
}
9497

9598
private IndexRequest createIndexRequest(Row element) {
96-
97-
List<String> idFieldList = new ArrayList<>();
98-
for(int index : idFieldIndexList){
99-
if(index >= element.getArity()){
100-
continue;
101-
}
102-
103-
idFieldList.add(element.getField(index).toString());
104-
}
99+
// index start at 1,
100+
String idFieldStr = idFieldIndexList.stream()
101+
.filter(index -> index > 0 && index <= element.getArity())
102+
.map(index -> element.getField(index - 1).toString())
103+
.collect(Collectors.joining(ID_VALUE_SPLIT));
105104

106105
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
107106
int length = Math.min(element.getArity(), fieldNames.size());
108107
for(int i=0; i<length; i++){
109108
dataMap.put(fieldNames.get(i), element.getField(i));
110109
}
111110

112-
String id = StringUtils.join(idFieldList, sp);
111+
if (StringUtils.isEmpty(idFieldStr)) {
112+
return Requests.indexRequest()
113+
.index(index)
114+
.type(type)
115+
.source(dataMap);
116+
}
117+
113118
return Requests.indexRequest()
114119
.index(index)
115120
.type(type)
116-
.id(id)
121+
.id(idFieldStr)
117122
.source(dataMap);
118123
}
119124
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323

2424
import com.dtstack.flink.sql.table.TargetTableInfo;
2525
import com.google.common.base.Preconditions;
26+
import org.apache.commons.lang.StringUtils;
27+
import org.apache.commons.lang3.math.NumberUtils;
28+
29+
import java.util.Arrays;
2630

2731
/**
2832
* @date 2018/09/12
@@ -128,9 +132,14 @@ public boolean check() {
128132
Preconditions.checkNotNull(address, "elasticsearch type of address is required");
129133
Preconditions.checkNotNull(index, "elasticsearch type of index is required");
130134
Preconditions.checkNotNull(esType, "elasticsearch type of type is required");
131-
Preconditions.checkNotNull(id, "elasticsearch type of id is required");
132135
Preconditions.checkNotNull(clusterName, "elasticsearch type of clusterName is required");
133136

137+
if (!StringUtils.isEmpty(id)) {
138+
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
139+
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
140+
});
141+
}
142+
134143
if (isAuthMesh()) {
135144
Preconditions.checkNotNull(userName, "elasticsearch type of userName is required");
136145
Preconditions.checkNotNull(password, "elasticsearch type of password is required");

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34-
import java.util.ArrayList;
3534
import java.util.List;
3635
import java.util.Map;
36+
import java.util.stream.Collectors;
3737

3838
/**
3939
* @author yinxi
@@ -42,6 +42,8 @@
4242
public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
4343

4444
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
45+
/** 用作ID的属性值连接符号 */
46+
private static final String ID_VALUE_SPLIT = "_";
4547

4648
private String index;
4749

@@ -57,10 +59,7 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5759

5860
private transient Counter outDirtyRecords;
5961

60-
/** 默认分隔符为'_' */
61-
private char sp = '_';
62-
63-
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes){
62+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes) {
6463
this.index = index;
6564
this.type = type;
6665
this.fieldNames = fieldNames;
@@ -96,31 +95,30 @@ public void setOutDirtyRecords(Counter outDirtyRecords) {
9695
}
9796

9897
private IndexRequest createIndexRequest(Row element) {
98+
// index start at 1,
99+
String idFieldStr = idFieldIndexList.stream()
100+
.filter(index -> index > 0 && index <= element.getArity())
101+
.map(index -> element.getField(index - 1).toString())
102+
.collect(Collectors.joining(ID_VALUE_SPLIT));
99103

100-
List<String> idFieldList = new ArrayList<>();
101-
for(int index : idFieldIndexList){
102-
if(index >= element.getArity()){
103-
continue;
104-
}
105-
106-
idFieldList.add(element.getField(index).toString());
107-
}
108-
109-
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element,fieldNames,fieldTypes);
104+
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element, fieldNames, fieldTypes);
110105
int length = Math.min(element.getArity(), fieldNames.size());
111-
for(int i=0; i<length; i++){
106+
for (int i = 0; i < length; i++) {
112107
dataMap.put(fieldNames.get(i), element.getField(i));
113108
}
114109

115-
if (idFieldList.size() == 0) {
116-
return Requests.indexRequest().index(index).type(type).source(dataMap);
110+
111+
if (StringUtils.isEmpty(idFieldStr)) {
112+
return Requests.indexRequest()
113+
.index(index)
114+
.type(type)
115+
.source(dataMap);
117116
}
118117

119-
String id = StringUtils.join(idFieldList, sp);
120118
return Requests.indexRequest()
121119
.index(index)
122120
.type(type)
123-
.id(id)
121+
.id(idFieldStr)
124122
.source(dataMap);
125123
}
126124
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,19 @@
3535
import com.google.common.collect.Maps;
3636
import org.apache.commons.lang.StringUtils;
3737
import org.apache.http.HttpHost;
38-
import org.slf4j.Logger;
39-
import org.slf4j.LoggerFactory;
40-
41-
import java.util.ArrayList;
4238
import java.util.Arrays;
4339
import java.util.List;
4440
import java.util.Map;
41+
import java.util.stream.Collectors;
4542

4643
/**
4744
* @author yinxi
4845
* @date 2020/1/9 - 15:08
4946
*/
5047
public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSinkGener<ElasticsearchSink> {
5148

52-
private final Logger logger = LoggerFactory.getLogger(ElasticsearchSink.class);
49+
private final int ES_DEFAULT_PORT = 9200;
50+
private final String ES_DEFAULT_SCHEMA = "http";
5351

5452
private String clusterName;
5553

@@ -103,32 +101,20 @@ public TypeInformation<?>[] getFieldTypes() {
103101

104102

105103
private RichSinkFunction createEsSinkFunction() {
106-
107-
108104
Map<String, String> userConfig = Maps.newHashMap();
109105
userConfig.put("cluster.name", clusterName);
110106
// This instructs the sink to emit after every element, otherwise they would be buffered
111107
userConfig.put(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "" + bulkFlushMaxActions);
112-
List<HttpHost> transports = new ArrayList<>();
113-
114-
for (String address : esAddressList) {
115-
String[] infoArray = address.split(":");
116-
int port = 9200;
117-
String host = infoArray[0];
118-
if (infoArray.length > 1) {
119-
port = Integer.valueOf(infoArray[1].trim());
120-
}
121-
122-
try {
123-
transports.add(new HttpHost(host.trim(), port, "http"));
124-
} catch (Exception e) {
125-
logger.error("", e);
126-
throw new RuntimeException(e);
127-
}
128-
}
129108

130-
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
109+
List<HttpHost> transports = esAddressList.stream()
110+
.map(address -> address.split(":"))
111+
.map(addressArray -> {
112+
String host = addressArray[0].trim();
113+
int port = addressArray.length > 1 ? Integer.valueOf(addressArray[1].trim()) : ES_DEFAULT_PORT;
114+
return new HttpHost(host.trim(), port, ES_DEFAULT_SCHEMA);
115+
}).collect(Collectors.toList());
131116

117+
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
132118
return new MetricElasticsearch6Sink(userConfig, transports, customerSinkFunc, esTableInfo);
133119
}
134120

@@ -141,33 +127,19 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
141127
}
142128
}
143129

144-
public void setParallelism(int parallelism) {
145-
this.parallelism = parallelism;
146-
}
147-
148-
public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
149-
this.bulkFlushMaxActions = bulkFlushMaxActions;
150-
}
151-
152130
@Override
153131
public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
154-
ElasticsearchTableInfo elasticsearchTableInfo = (ElasticsearchTableInfo) targetTableInfo;
155-
esTableInfo = elasticsearchTableInfo;
156-
clusterName = elasticsearchTableInfo.getClusterName();
157-
String address = elasticsearchTableInfo.getAddress();
158-
String[] addr = address.split(",");
159-
esAddressList = Arrays.asList(addr);
160-
index = elasticsearchTableInfo.getIndex();
161-
type = elasticsearchTableInfo.getEsType();
162-
String id = elasticsearchTableInfo.getId();
163-
String[] idField = StringUtils.split(id, ",");
164-
idIndexList = new ArrayList<>();
165-
166-
for (int i = 0; i < idField.length; ++i) {
167-
idIndexList.add(Integer.valueOf(idField[i]));
132+
esTableInfo = (ElasticsearchTableInfo) targetTableInfo;
133+
clusterName = esTableInfo.getClusterName();
134+
index = esTableInfo.getIndex();
135+
type = esTableInfo.getEsType();
136+
columnTypes = esTableInfo.getFieldTypes();
137+
esAddressList = Arrays.asList(esTableInfo.getAddress().split(","));
138+
String id = esTableInfo.getId();
139+
140+
if (!StringUtils.isEmpty(id)) {
141+
idIndexList = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());
168142
}
169-
170-
columnTypes = elasticsearchTableInfo.getFieldTypes();
171143
return this;
172144
}
173145
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121
import com.dtstack.flink.sql.table.TargetTableInfo;
2222
import com.google.common.base.Preconditions;
23+
import org.apache.commons.lang.StringUtils;
24+
import org.apache.commons.lang3.math.NumberUtils;
25+
26+
import java.util.Arrays;
2327

2428
/**
2529
* @author yinxi
@@ -124,9 +128,14 @@ public boolean check() {
124128
Preconditions.checkNotNull(address, "elasticsearch6 type of address is required");
125129
Preconditions.checkNotNull(index, "elasticsearch6 type of index is required");
126130
Preconditions.checkNotNull(esType, "elasticsearch6 type of type is required");
127-
Preconditions.checkNotNull(id, "elasticsearch6 type of id is required");
128131
Preconditions.checkNotNull(clusterName, "elasticsearch6 type of clusterName is required");
129132

133+
if (!StringUtils.isEmpty(id)) {
134+
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
135+
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
136+
});
137+
}
138+
130139
if (isAuthMesh()) {
131140
Preconditions.checkNotNull(userName, "elasticsearch6 type of userName is required");
132141
Preconditions.checkNotNull(password, "elasticsearch6 type of password is required");

0 commit comments

Comments
 (0)