Skip to content

Commit 24bf610

Browse files
committed
Merge branch 'feat_1.8_elasticsearch6-sink' into 'v1.8.0_dev'
elasticsearch6的结果集sink部分 See merge request !223
2 parents fe6f2e6 + 1b4c632 commit 24bf610

File tree

11 files changed

+1010
-0
lines changed

11 files changed

+1010
-0
lines changed

docs/elasticsearch6Sink.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
bb INT
6+
)WITH(
7+
type ='elasticsearch6',
8+
address ='ip:port[,ip:port]',
9+
cluster='clusterName',
10+
esType ='esType',
11+
index ='index',
12+
id ='num[,num]',
13+
authMesh = 'true',
14+
userName = 'userName',
15+
password = 'password',
16+
parallelism ='1'
17+
)
18+
```
19+
## 2.支持的版本
20+
elasticsearch 6.8.6
21+
22+
## 3.表结构定义
23+
24+
|参数名称|含义|
25+
|----|---|
26+
|tableName|在 sql 中使用的名称;即注册到flink-table-env上的名称|
27+
|colName|列名称|
28+
|colType|列类型 [colType支持的类型](colType.md)|
29+
30+
## 4.参数:
31+
|参数名称|含义|是否必填|默认值|
32+
|----|---|---|----|
33+
|type|表明 输出表类型[elasticsearch6]|||
34+
|address | 连接ES Transport地址(tcp地址)|||
35+
|cluster | ES 集群名称 |||
36+
|index | 选择的ES上的index名称|||
37+
|esType | 选择ES上的type名称|||
38+
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
39+
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
40+
|authMesh | 是否进行用户名密码认证 || false|
41+
|userName | 用户名 | 否,authMesh='true'时为必填 ||
42+
|password | 密码 | 否,authMesh='true'时为必填 ||
43+
|parallelism | 并行度设置||1|
44+
45+
## 5.样例:
46+
```
47+
CREATE TABLE MyResult(
48+
aa INT,
49+
bb INT
50+
)WITH(
51+
type ='elasticsearch6',
52+
address ='172.16.10.47:9500',
53+
cluster='es_47_menghan',
54+
esType ='type1',
55+
index ='xc_es_test',
56+
authMesh = 'true',
57+
userName = 'elastic',
58+
password = 'abc123',
59+
id ='0,1',
60+
parallelism ='1'
61+
)
62+
```
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.elasticsearch6</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.sink.elasticsearch6</artifactId>
13+
<name>elasticsearch6-sink</name>
14+
15+
<properties>
16+
<elasticsearch.version>6.8.6</elasticsearch.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.flink</groupId>
22+
<artifactId>flink-streaming-java_2.11</artifactId>
23+
<version>${flink.version}</version>
24+
<scope>provided</scope>
25+
</dependency>
26+
27+
<dependency>
28+
<groupId>org.elasticsearch.client</groupId>
29+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
30+
<version>${elasticsearch.version}</version>
31+
</dependency>
32+
33+
<dependency>
34+
<groupId>ch.qos.logback</groupId>
35+
<artifactId>logback-core</artifactId>
36+
<version>1.1.7</version>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>ch.qos.logback</groupId>
41+
<artifactId>logback-classic</artifactId>
42+
<version>1.1.7</version>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.apache.logging.log4j</groupId>
47+
<artifactId>log4j-to-slf4j</artifactId>
48+
<version>2.7</version>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
54+
<version>${flink.version}</version>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-connector-elasticsearch-base_2.11</artifactId>
60+
<version>${flink.version}</version>
61+
</dependency>
62+
</dependencies>
63+
64+
<build>
65+
<plugins>
66+
<plugin>
67+
<groupId>org.apache.maven.plugins</groupId>
68+
<artifactId>maven-shade-plugin</artifactId>
69+
<version>1.4</version>
70+
<executions>
71+
<execution>
72+
<phase>package</phase>
73+
<goals>
74+
<goal>shade</goal>
75+
</goals>
76+
<configuration>
77+
<artifactSet>
78+
<excludes>
79+
<!--<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>-->
80+
</excludes>
81+
</artifactSet>
82+
<filters>
83+
<filter>
84+
<artifact>*:*</artifact>
85+
<excludes>
86+
<exclude>META-INF/*.SF</exclude>
87+
<exclude>META-INF/*.DSA</exclude>
88+
<exclude>META-INF/*.RSA</exclude>
89+
</excludes>
90+
</filter>
91+
</filters>
92+
</configuration>
93+
</execution>
94+
</executions>
95+
</plugin>
96+
97+
<plugin>
98+
<artifactId>maven-antrun-plugin</artifactId>
99+
<version>1.2</version>
100+
<executions>
101+
<execution>
102+
<id>copy-resources</id>
103+
<!-- here the phase you need -->
104+
<phase>package</phase>
105+
<goals>
106+
<goal>run</goal>
107+
</goals>
108+
<configuration>
109+
<tasks>
110+
<copy todir="${basedir}/../../plugins/elasticsearch6sink">
111+
<fileset dir="target/">
112+
<include name="${project.artifactId}-${project.version}.jar"/>
113+
</fileset>
114+
</copy>
115+
116+
<move file="${basedir}/../../plugins/elasticsearch6sink/${project.artifactId}-${project.version}.jar"
117+
tofile="${basedir}/../../plugins/elasticsearch6sink/${project.name}-${git.branch}.jar"/>
118+
</tasks>
119+
</configuration>
120+
</execution>
121+
</executions>
122+
</plugin>
123+
</plugins>
124+
</build>
125+
126+
</project>
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.elasticsearch6;
20+
21+
import org.apache.flink.api.common.functions.RuntimeContext;
22+
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.metrics.Counter;
24+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
25+
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
26+
import org.apache.flink.types.Row;
27+
28+
import org.apache.commons.lang3.StringUtils;
29+
import org.elasticsearch.action.index.IndexRequest;
30+
import org.elasticsearch.client.Requests;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.Map;
37+
38+
/**
39+
* @author yinxi
40+
* @date 2020/1/9 - 15:10
41+
*/
42+
public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
43+
44+
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
45+
46+
private String index;
47+
48+
private String type;
49+
50+
private List<Integer> idFieldIndexList;
51+
52+
private List<String> fieldNames;
53+
54+
private List<String> fieldTypes;
55+
56+
public transient Counter outRecords;
57+
58+
/** 默认分隔符为'_' */
59+
private char sp = '_';
60+
61+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes){
62+
this.index = index;
63+
this.type = type;
64+
this.fieldNames = fieldNames;
65+
this.fieldTypes = fieldTypes;
66+
this.idFieldIndexList = idFieldIndexes;
67+
}
68+
69+
@Override
70+
public void process(Tuple2 tuple2, RuntimeContext ctx, RequestIndexer indexer) {
71+
try {
72+
Tuple2<Boolean, Row> tupleTrans = tuple2;
73+
Boolean retract = tupleTrans.getField(0);
74+
Row element = tupleTrans.getField(1);
75+
if (!retract) {
76+
return;
77+
}
78+
79+
indexer.add(createIndexRequest(element));
80+
outRecords.inc();
81+
} catch (Throwable e) {
82+
logger.error("Failed to store source data {}. ", tuple2.getField(1));
83+
logger.error("Failed to create index request exception. ", e);
84+
}
85+
}
86+
87+
public void setOutRecords(Counter outRecords) {
88+
this.outRecords = outRecords;
89+
}
90+
91+
private IndexRequest createIndexRequest(Row element) {
92+
93+
List<String> idFieldList = new ArrayList<>();
94+
for(int index : idFieldIndexList){
95+
if(index >= element.getArity()){
96+
continue;
97+
}
98+
99+
idFieldList.add(element.getField(index).toString());
100+
}
101+
102+
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element,fieldNames,fieldTypes);
103+
int length = Math.min(element.getArity(), fieldNames.size());
104+
for(int i=0; i<length; i++){
105+
dataMap.put(fieldNames.get(i), element.getField(i));
106+
}
107+
108+
if (idFieldList.size() == 0) {
109+
return Requests.indexRequest().index(index).type(type).source(dataMap);
110+
}
111+
112+
String id = StringUtils.join(idFieldList, sp);
113+
return Requests.indexRequest()
114+
.index(index)
115+
.type(type)
116+
.id(id)
117+
.source(dataMap);
118+
}
119+
}

0 commit comments

Comments
 (0)