Skip to content

Commit 905a779

Browse files
committed
Merge branch 'v1.8.0_dev_feature_es' into 'v1.8.0_dev'
es结果表支持用户名和密码配置 es结果表支持用户名和密码配置 See merge request !137
2 parents 286a21b + 53d2c4e commit 905a779

File tree

7 files changed

+260
-10
lines changed

7 files changed

+260
-10
lines changed

docs/elasticsearchSink.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ CREATE TABLE tableName(
3333
|index | 选择的ES上的index名称|||
3434
|estype | 选择ES上的type名称|||
3535
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id)|||
36+
|authMesh | 是否进行用户名密码认证 || false|
37+
|userName | 用户名 | 否,authMesh='true'时为必填 ||
38+
|password | 密码 | 否,authMesh='true'时为必填 ||
3639
|parallelism | 并行度设置||1|
3740

3841
## 5.样例:

elasticsearch5/elasticsearch5-sink/pom.xml

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
<dependencies>
1616
<dependency>
17-
<groupId>org.apache.flink</groupId>
18-
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
19-
<version>${flink.version}</version>
17+
<groupId>org.elasticsearch.client</groupId>
18+
<artifactId>transport</artifactId>
19+
<version>5.3.3</version>
2020
</dependency>
2121

2222
<dependency>
@@ -30,6 +30,30 @@
3030
<artifactId>logback-classic</artifactId>
3131
<version>1.1.7</version>
3232
</dependency>
33+
34+
<dependency>
35+
<groupId>org.elasticsearch.client</groupId>
36+
<artifactId>x-pack-transport</artifactId>
37+
<version>5.3.3</version>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.logging.log4j</groupId>
42+
<artifactId>log4j-to-slf4j</artifactId>
43+
<version>2.7</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.flink</groupId>
48+
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
49+
<version>${flink.version}</version>
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>org.apache.flink</groupId>
54+
<artifactId>flink-connector-elasticsearch-base_2.11</artifactId>
55+
<version>${flink.version}</version>
56+
</dependency>
3357
</dependencies>
3458

3559
<build>
@@ -47,7 +71,7 @@
4771
<configuration>
4872
<artifactSet>
4973
<excludes>
50-
<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>
74+
<!--<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>-->
5175
</excludes>
5276
</artifactSet>
5377
<filters>

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@
2727
import org.apache.flink.api.java.tuple.Tuple2;
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2929
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
30-
import org.apache.flink.metrics.Counter;
3130
import org.apache.flink.streaming.api.datastream.DataStream;
3231
import org.apache.flink.streaming.api.datastream.DataStreamSink;
3332
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
34-
import org.apache.flink.table.sinks.AppendStreamTableSink;
3533
import org.apache.flink.table.sinks.RetractStreamTableSink;
3634
import org.apache.flink.table.sinks.TableSink;
3735
import org.apache.flink.types.Row;
@@ -76,6 +74,8 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
7674

7775
private int parallelism = -1;
7876

77+
private ElasticsearchTableInfo esTableInfo;
78+
7979

8080
@Override
8181
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
@@ -130,9 +130,17 @@ private RichSinkFunction createEsSinkFunction(){
130130
}
131131
}
132132

133+
boolean authMesh = esTableInfo.isAuthMesh();
134+
if (authMesh) {
135+
String username = esTableInfo.getUserName();
136+
String password = esTableInfo.getPassword();
137+
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
138+
userConfig.put("xpack.security.user", authPassword);
139+
}
140+
133141
CustomerSinkFunc customerSinkFunc = new CustomerSinkFunc(index, type, Arrays.asList(fieldNames), Arrays.asList(columnTypes), idIndexList);
134142

135-
return new MetricElasticsearchSink(userConfig, transports, customerSinkFunc);
143+
return new MetricElasticsearchSink(userConfig, transports, customerSinkFunc, esTableInfo);
136144
}
137145

138146
@Override
@@ -155,6 +163,7 @@ public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
155163
@Override
156164
public ElasticsearchSink genStreamSink(TargetTableInfo targetTableInfo) {
157165
ElasticsearchTableInfo elasticsearchTableInfo = (ElasticsearchTableInfo) targetTableInfo;
166+
esTableInfo = elasticsearchTableInfo;
158167
clusterName = elasticsearchTableInfo.getClusterName();
159168
String address = elasticsearchTableInfo.getAddress();
160169
String[] addr = address.split(",");
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.dtstack.flink.sql.sink.elasticsearch;
19+
20+
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
21+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
22+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
23+
import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
24+
import org.apache.flink.util.IOUtils;
25+
import org.apache.flink.util.Preconditions;
26+
import org.elasticsearch.action.bulk.BackoffPolicy;
27+
import org.elasticsearch.action.bulk.BulkItemResponse;
28+
import org.elasticsearch.action.bulk.BulkProcessor;
29+
import org.elasticsearch.client.transport.TransportClient;
30+
import org.elasticsearch.common.network.NetworkModule;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.transport.TransportAddress;
33+
import org.elasticsearch.common.unit.TimeValue;
34+
import org.elasticsearch.transport.Netty3Plugin;
35+
import org.elasticsearch.transport.client.PreBuiltTransportClient;
36+
import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import javax.annotation.Nullable;
41+
import java.net.InetSocketAddress;
42+
import java.util.List;
43+
import java.util.Map;
44+
45+
/**
46+
* @date 2019/11/16
47+
* @author xiuzhu
48+
* @Company: www.dtstack.com
49+
*/
50+
51+
public class ExtendES5ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> {
52+
private static final long serialVersionUID = -5222683870097809633L;
53+
54+
private static final Logger LOG = LoggerFactory.getLogger(ExtendES5ApiCallBridge.class);
55+
56+
private final List<InetSocketAddress> transportAddresses;
57+
58+
protected ElasticsearchTableInfo esTableInfo;
59+
60+
public ExtendES5ApiCallBridge(List<InetSocketAddress> transportAddresses, ElasticsearchTableInfo esTableInfo) {
61+
Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
62+
this.transportAddresses = transportAddresses;
63+
this.esTableInfo = esTableInfo;
64+
}
65+
66+
@Override
67+
public TransportClient createClient(Map<String, String> clientConfig) {
68+
Settings settings = Settings.builder().put(clientConfig)
69+
//.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
70+
//.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
71+
.build();
72+
73+
TransportClient transportClient;
74+
if (esTableInfo.isAuthMesh()) {
75+
transportClient = new PreBuiltXPackTransportClient(settings);
76+
}else {
77+
transportClient = new PreBuiltTransportClient(settings);
78+
}
79+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
80+
transportClient.addTransportAddress(transport);
81+
}
82+
83+
// verify that we actually are connected to a cluster
84+
if (transportClient.connectedNodes().isEmpty()) {
85+
86+
// close the transportClient here
87+
IOUtils.closeQuietly(transportClient);
88+
89+
throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
90+
}
91+
92+
if (LOG.isInfoEnabled()) {
93+
LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
94+
}
95+
96+
return transportClient;
97+
}
98+
99+
@Override
100+
public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) {
101+
return BulkProcessor.builder(client, listener);
102+
}
103+
104+
@Override
105+
public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
106+
if (!bulkItemResponse.isFailed()) {
107+
return null;
108+
} else {
109+
return bulkItemResponse.getFailure().getCause();
110+
}
111+
}
112+
113+
@Override
114+
public void configureBulkProcessorBackoff(
115+
BulkProcessor.Builder builder,
116+
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
117+
118+
BackoffPolicy backoffPolicy;
119+
if (flushBackoffPolicy != null) {
120+
switch (flushBackoffPolicy.getBackoffType()) {
121+
case CONSTANT:
122+
backoffPolicy = BackoffPolicy.constantBackoff(
123+
new TimeValue(flushBackoffPolicy.getDelayMillis()),
124+
flushBackoffPolicy.getMaxRetryCount());
125+
break;
126+
case EXPONENTIAL:
127+
default:
128+
backoffPolicy = BackoffPolicy.exponentialBackoff(
129+
new TimeValue(flushBackoffPolicy.getDelayMillis()),
130+
flushBackoffPolicy.getMaxRetryCount());
131+
}
132+
} else {
133+
backoffPolicy = BackoffPolicy.noBackoff();
134+
}
135+
136+
builder.setBackoffPolicy(backoffPolicy);
137+
}
138+
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/MetricElasticsearchSink.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,39 @@
1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

2121
import com.dtstack.flink.sql.metric.MetricConstant;
22+
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.metrics.Counter;
2425
import org.apache.flink.metrics.Meter;
2526
import org.apache.flink.metrics.MeterView;
2627
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
28+
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
2729

30+
import java.net.InetSocketAddress;
2831
import java.util.List;
2932
import java.util.Map;
3033

3134
/**
3235
* @Auther: jiangjunjie
3336
* @Date: 2018/11/29 14:15
3437
* @Description:
38+
*
3539
*/
36-
public class MetricElasticsearchSink extends org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink {
40+
public class MetricElasticsearchSink extends org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase {
3741

3842
protected CustomerSinkFunc customerSinkFunc;
3943

4044
protected transient Meter outRecordsRate;
4145

42-
public MetricElasticsearchSink(Map userConfig, List transportAddresses, ElasticsearchSinkFunction elasticsearchSinkFunction) {
43-
super(userConfig, transportAddresses, elasticsearchSinkFunction);
46+
protected Map userConfig;
47+
48+
49+
public MetricElasticsearchSink(Map userConfig, List transportAddresses,
50+
ElasticsearchSinkFunction elasticsearchSinkFunction,
51+
ElasticsearchTableInfo esTableInfo) {
52+
super(new ExtendES5ApiCallBridge(transportAddresses, esTableInfo), userConfig, elasticsearchSinkFunction, new NoOpFailureHandler());
4453
this.customerSinkFunc = (CustomerSinkFunc) elasticsearchSinkFunction;
54+
this.userConfig = userConfig;
4555
}
4656

4757
@Override
@@ -50,6 +60,20 @@ public void open(Configuration parameters) throws Exception {
5060
initMetric();
5161
}
5262

63+
/*public void setXPackTransportClient() throws Exception {
64+
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
65+
Settings settings = Settings.builder().put(userConfig).put("xpack.security.user", authPassword).build();
66+
Class clz = Class.forName("org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase");
67+
Field clientField = clz.getDeclaredField("client");
68+
clientField.setAccessible(true);
69+
PreBuiltXPackTransportClient transportClient = new PreBuiltXPackTransportClient(settings);
70+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
71+
transportClient.addTransportAddress(transport);
72+
}
73+
74+
clientField.set(this, transportClient);
75+
}*/
76+
5377
public void initMetric() {
5478
Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
5579
customerSinkFunc.setOutRecords(counter);

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
import com.dtstack.flink.sql.table.AbsTableParser;
2525
import com.dtstack.flink.sql.table.TableInfo;
26+
import com.dtstack.flink.sql.util.MathUtil;
27+
2628
import java.util.Map;
2729

2830
/**
@@ -42,6 +44,12 @@ public class ElasticsearchSinkParser extends AbsTableParser {
4244

4345
private static final String KEY_ES_ID_FIELD_INDEX_LIST = "id";
4446

47+
private static final String KEY_ES_AUTHMESH = "authMesh";
48+
49+
private static final String KEY_ES_USERNAME = "userName";
50+
51+
private static final String KEY_ES_PASSWORD = "password";
52+
4553
@Override
4654
protected boolean fieldNameNeedsUpperCase() {
4755
return false;
@@ -57,6 +65,14 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
5765
elasticsearchTableInfo.setId((String) props.get(KEY_ES_ID_FIELD_INDEX_LIST.toLowerCase()));
5866
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES_INDEX.toLowerCase()));
5967
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES_TYPE.toLowerCase()));
68+
69+
String authMeshStr = (String)props.get(KEY_ES_AUTHMESH.toLowerCase());
70+
if (authMeshStr != null & "true".equals(authMeshStr)) {
71+
elasticsearchTableInfo.setAuthMesh(MathUtil.getBoolean(authMeshStr));
72+
elasticsearchTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES_USERNAME.toLowerCase())));
73+
elasticsearchTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES_PASSWORD.toLowerCase())));
74+
}
75+
elasticsearchTableInfo.check();
6076
return elasticsearchTableInfo;
6177
}
6278
}

0 commit comments

Comments
 (0)