Skip to content

Commit d02cb08

Browse files
committed
[merge]resolve conflit from merge.
2 parents 4121a31 + 7616174 commit d02cb08

File tree

22 files changed

+167
-95
lines changed

22 files changed

+167
-95
lines changed

docs/plugin/elasticsearch6Sink.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CREATE TABLE tableName(
99
cluster='clusterName',
1010
esType ='esType',
1111
index ='index',
12-
id ='num[,num]',
12+
id ='num[,num]'(id = 'field[,field]'),
1313
authMesh = 'true',
1414
userName = 'userName',
1515
password = 'password',
@@ -35,7 +35,7 @@ CREATE TABLE tableName(
3535
|cluster | ES 集群名称 |||
3636
|index | 选择的ES上的index名称|||
3737
|esType | 选择ES上的type名称|||
38-
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
38+
|id | 生成id的规则(当前是根据指定的字段名称(或者字段position)获取字段信息,拼接生成id)|||
3939
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
4040
|authMesh | 是否进行用户名密码认证 || false|
4141
|userName | 用户名 | 否,authMesh='true'时为必填 ||
@@ -73,7 +73,7 @@ CREATE TABLE MyResult(
7373
estype ='external',
7474
cluster ='docker-cluster',
7575
index ='myresult',
76-
id ='1',
76+
id ='pv',
7777
updateMode ='append',
7878
parallelism ='1'
7979
);

docs/plugin/elasticsearchSink.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CREATE TABLE tableName(
99
cluster='clusterName',
1010
estype ='esType',
1111
index ='index',
12-
id ='num[,num]',
12+
id ='num[,num]'(id = 'field[,field]'),
1313
parallelism ='1'
1414
)
1515
```
@@ -32,7 +32,7 @@ CREATE TABLE tableName(
3232
|cluster | ES 集群名称 |||
3333
|index | 选择的ES上的index名称|||
3434
|estype | 选择ES上的type名称|||
35-
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id)|||
35+
|id | 生成id的规则(当前是根据指定的字段名称(或者字段position)获取字段信息,拼接生成id)|||
3636
|authMesh | 是否进行用户名密码认证 || false|
3737
|userName | 用户名 | 否,authMesh='true'时为必填 ||
3838
|password | 密码 | 否,authMesh='true'时为必填 ||
@@ -68,7 +68,7 @@ CREATE TABLE MyResult(
6868
estype ='external',
6969
cluster ='docker-cluster',
7070
index ='myresult',
71-
id ='1',
71+
id ='pv',
7272
updateMode ='append',
7373
parallelism ='1'
7474
);

elasticsearch5/elasticsearch5-sink/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
<dependency>
1717
<groupId>org.elasticsearch.client</groupId>
1818
<artifactId>transport</artifactId>
19-
<version>5.3.3</version>
19+
<version>${elasticsearch.version}</version>
2020
</dependency>
2121

2222
<dependency>
2323
<groupId>org.elasticsearch.client</groupId>
2424
<artifactId>x-pack-transport</artifactId>
25-
<version>5.3.3</version>
25+
<version>${elasticsearch.version}</version>
2626
</dependency>
2727

2828
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,24 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
5353

5454
private String type;
5555

56-
private List<Integer> idFieldIndexList;
56+
private List<Object> ids;
57+
58+
// true means generation doc's id by position "1[,1]"
59+
private boolean usePosition;
5760

5861
private List<String> fieldNames;
5962

6063
private List<String> fieldTypes;
6164

6265
public transient Counter outRecords;
6366

64-
/** 默认分隔符为'_' */
65-
private char sp = '_';
66-
67-
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes){
67+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Object> ids, boolean usePosition) {
6868
this.index = index;
6969
this.type = type;
7070
this.fieldNames = fieldNames;
7171
this.fieldTypes = fieldTypes;
72-
this.idFieldIndexList = idFieldIndexes;
72+
this.ids = ids;
73+
this.usePosition = usePosition;
7374
}
7475

7576
@Override
@@ -95,21 +96,31 @@ public void setOutRecords(Counter outRecords) {
9596
}
9697

9798
private IndexRequest createIndexRequest(Row element) {
98-
String idFieldStr = "";
99-
if (null != idFieldIndexList) {
100-
// index start at 1,
101-
idFieldStr = idFieldIndexList.stream()
102-
.filter(index -> index > 0 && index <= element.getArity())
103-
.map(index -> element.getField(index - 1).toString())
104-
.collect(Collectors.joining(ID_VALUE_SPLIT));
105-
}
10699

107100
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
108101
int length = Math.min(element.getArity(), fieldNames.size());
109102
for(int i=0; i<length; i++){
110103
dataMap.put(fieldNames.get(i), element.getField(i));
111104
}
112105

106+
String idFieldStr = "";
107+
if (null != ids) {
108+
if (!usePosition) {
109+
idFieldStr = ids.stream()
110+
.map(filedName -> (String) filedName)
111+
.map(filedName -> dataMap.get(filedName).toString())
112+
.collect(Collectors.joining(ID_VALUE_SPLIT));
113+
} else {
114+
// compatible old version of generate doc's id
115+
// index start at 1,
116+
idFieldStr = ids.stream()
117+
.map(index -> (Integer) index)
118+
.filter(index -> index > 0 && index <= element.getArity())
119+
.map(index -> element.getField( index - 1).toString())
120+
.collect(Collectors.joining(ID_VALUE_SPLIT));
121+
}
122+
}
123+
113124
if (StringUtils.isEmpty(idFieldStr)) {
114125
return Requests.indexRequest()
115126
.index(index)

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

2323
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
24+
import com.google.common.base.Preconditions;
2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
2526
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -46,6 +47,7 @@
4647
import java.util.List;
4748
import java.util.Map;
4849
import java.util.Objects;
50+
import java.util.stream.Collectors;
4951

5052
/**
5153
* table output elastic5plugin
@@ -68,7 +70,10 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6870

6971
private String type = "";
7072

71-
private List<Integer> idIndexList;
73+
private List<Object> ids;
74+
75+
// true means generation doc's id by position "1[,1]"
76+
private boolean usePosition = false;
7277

7378
protected String[] fieldNames;
7479

@@ -113,6 +118,14 @@ public TypeInformation<?>[] getFieldTypes() {
113118

114119
private RichSinkFunction createEsSinkFunction(){
115120

121+
// check whether id fields is exists in columns if not use position to generate doc's id
122+
if (!usePosition
123+
&& ids != null
124+
&& ids.size() != 0) {
125+
List<String> filedNamesLists = Arrays.asList(fieldNames);
126+
Preconditions.checkState(filedNamesLists.containsAll(ids), "elasticsearch6 type of id %s is should be exists in columns %s.", ids, filedNamesLists);
127+
}
128+
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), ids, usePosition);
116129

117130
Map<String, String> userConfig = new HashMap<>();
118131
userConfig.put("cluster.name", clusterName);
@@ -142,8 +155,6 @@ private RichSinkFunction createEsSinkFunction(){
142155
userConfig.put("xpack.security.user", authPassword);
143156
}
144157

145-
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
146-
147158
return new MetricElasticsearchSink(userConfig, transports, customerSinkFunc, esTableInfo);
148159
}
149160

@@ -179,15 +190,21 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
179190
type = elasticsearchTableInfo.getEsType();
180191
String id = elasticsearchTableInfo.getId();
181192
String[] idField = StringUtils.split(id, ",");
182-
idIndexList = new ArrayList<>();
183193
registerTableName = elasticsearchTableInfo.getName();
184194
parallelism = Objects.isNull(elasticsearchTableInfo.getParallelism()) ?
185195
parallelism : elasticsearchTableInfo.getParallelism();
186196

187197
for(int i = 0; i < idField.length; ++i) {
188-
idIndexList.add(Integer.valueOf(idField[i]));
198+
if (!EsUtil.checkWhetherUsePosition(id)) {
199+
ids = Arrays.stream(org.apache.commons.lang.StringUtils.split(id, ",")).map(String::valueOf).collect(Collectors.toList());
200+
} else {
201+
//compatible old version of generate doc' id
202+
usePosition = true;
203+
ids = Arrays.stream(org.apache.commons.lang.StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());
204+
}
189205
}
190206

207+
191208
columnTypes = elasticsearchTableInfo.getFieldTypes();
192209

193210
return this;

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/EsUtil.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.commons.lang3.math.NumberUtils;
2123
import org.apache.flink.types.Row;
2224
import org.apache.flink.util.Preconditions;
2325

@@ -62,5 +64,23 @@ public static Map<String, Object> rowToJsonMap(Row row, List<String> fields, Lis
6264
return jsonMap;
6365
}
6466

67+
/**
68+
* check whether use position to generation doc's id
69+
* eg : |1,2,3 -> true
70+
* |id,name,addr -> false
71+
* @param ids
72+
* @return
73+
*/
74+
public static boolean checkWhetherUsePosition(String ids) {
75+
boolean flag = true;
76+
for( String id : StringUtils.split(ids, ",")) {
77+
if (!NumberUtils.isNumber(id)) {
78+
flag= false;
79+
break;
80+
}
81+
}
82+
return flag;
83+
}
84+
6585

6686
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/MetricElasticsearchSink.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,6 @@ public void open(Configuration parameters) throws Exception {
6060
initMetric();
6161
}
6262

63-
/*public void setXPackTransportClient() throws Exception {
64-
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
65-
Settings settings = Settings.builder().put(userConfig).put("xpack.security.user", authPassword).build();
66-
Class clz = Class.forName("org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase");
67-
Field clientField = clz.getDeclaredField("client");
68-
clientField.setAccessible(true);
69-
PreBuiltXPackTransportClient transportClient = new PreBuiltXPackTransportClient(settings);
70-
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
71-
transportClient.addTransportAddress(transport);
72-
}
73-
74-
clientField.set(this, transportClient);
75-
}*/
76-
7763
public void initMetric() {
7864
Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
7965
customerSinkFunc.setOutRecords(counter);

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,6 @@ public boolean check() {
134134
Preconditions.checkNotNull(esType, "elasticsearch type of type is required");
135135
Preconditions.checkNotNull(clusterName, "elasticsearch type of clusterName is required");
136136

137-
if (!StringUtils.isEmpty(id)) {
138-
Arrays.stream(StringUtils.split(id, ",")).forEach(number -> {
139-
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
140-
});
141-
}
142-
143137
if (isAuthMesh()) {
144138
Preconditions.checkNotNull(userName, "elasticsearch type of userName is required");
145139
Preconditions.checkNotNull(password, "elasticsearch type of password is required");

elasticsearch5/elasticsearch5-sink/src/test/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFuncTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ public class CustomerSinkFuncTest {
3232
String type;
3333
List<String> fieldNames = Collections.singletonList("name.pv");
3434
List<String> fieldTypes = Arrays.asList("varchar", "varchar");
35-
List<Integer> idFieldIndexes = Collections.singletonList(1);
35+
List<Object> idFieldIndexes = Collections.singletonList(1);
3636
Tuple2<Boolean, Row> tuple2 = new Tuple2<>();
3737

3838
@Before
3939
public void setUp() {
4040
MockitoAnnotations.initMocks(this);
41-
customerSinkFunc = new CustomerSinkFunc(index, type, fieldNames, fieldTypes, idFieldIndexes);
41+
customerSinkFunc = new CustomerSinkFunc(index, type, fieldNames, fieldTypes, idFieldIndexes, true);
4242
customerSinkFunc.setOutRecords(outRecords);
4343
tuple2.setField(true, 0);
4444
Row row = new Row(1);

elasticsearch5/elasticsearch5-sink/src/test/java/com/dtstack/flink/sql/sink/elasticsearch/MetricElasticsearchSinkTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.junit.Test;
1212
import org.mockito.MockitoAnnotations;
1313

14-
import java.util.Arrays;
1514
import java.util.Collections;
1615
import java.util.HashMap;
1716
import java.util.List;

0 commit comments

Comments
 (0)