Skip to content

Commit 4930373

Browse files
committed
[feat-35353][elasticsearch5-xh][sink]adapt the elasticsearch(tdh) with kerberos enabled.
1 parent d49fdd4 commit 4930373

26 files changed

+3576
-0
lines changed

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.util;
2020

21+
import com.esotericsoftware.minlog.Log;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.security.UserGroupInformation;
2324
import org.apache.hadoop.security.authentication.util.KerberosName;
@@ -61,4 +62,12 @@ public static UserGroupInformation loginAndReturnUgi(String principal, String ke
6162
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
6263
}
6364

65+
public static void loginFromKeytab(String principal, String keytabPath, String krb5confPath) throws IOException{
66+
LOG.info("Kerberos login with principal: {} and keytab: {}", principal, keytabPath);
67+
System.setProperty(KRB5_CONF_KEY, krb5confPath);
68+
Configuration configuration = new Configuration();
69+
configuration.set(HADOOP_AUTH_KEY, KRB_STR);
70+
UserGroupInformation.setConfiguration(configuration);
71+
}
72+
6473
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.elasticsearch5-xh</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.sink.elasticsearch5-xh</artifactId>
13+
<name>elasticsearch-xh-sink</name>
14+
15+
<dependencies>
16+
<!-- <dependency>-->
17+
<!-- <groupId>org.elasticsearch.client</groupId>-->
18+
<!-- <artifactId>transport</artifactId>-->
19+
<!-- <version>5.3.3</version>-->
20+
<!-- </dependency>-->
21+
22+
<!-- <dependency>-->
23+
<!-- <groupId>org.elasticsearch.client</groupId>-->
24+
<!-- <artifactId>x-pack-transport</artifactId>-->
25+
<!-- <version>5.3.3</version>-->
26+
<!-- </dependency>-->
27+
28+
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
29+
<dependency>
30+
<groupId>org.slf4j</groupId>
31+
<artifactId>slf4j-log4j12</artifactId>
32+
<version>1.6.1</version>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>org.apache.logging.log4j</groupId>
37+
<artifactId>log4j-to-slf4j</artifactId>
38+
<version>2.7</version>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>transwarp.io.elasticsearch</groupId>
43+
<artifactId>elasricearch-client</artifactId>
44+
<version>5.4.1</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>guardian.sasl.transwarp</groupId>
49+
<artifactId>guardian-sasl-transwarp</artifactId>
50+
<version>6.2.1</version>
51+
</dependency>
52+
53+
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
54+
<dependency>
55+
<groupId>com.google.protobuf</groupId>
56+
<artifactId>protobuf-java</artifactId>
57+
<version>3.15.0</version>
58+
</dependency>
59+
60+
61+
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
62+
<dependency>
63+
<groupId>org.apache.flink</groupId>
64+
<artifactId>flink-table-planner_2.11</artifactId>
65+
<version>${flink.version}</version>
66+
<scope>provided</scope>
67+
<optional>true</optional>
68+
</dependency>
69+
70+
71+
<!-- <dependency>-->
72+
<!-- <groupId>org.apache.flink</groupId>-->
73+
<!-- <artifactId>flink-connector-elasticsearch5_2.11</artifactId>-->
74+
<!-- <version>${flink.version}</version>-->
75+
<!-- </dependency>-->
76+
77+
<!-- <dependency>-->
78+
<!-- <groupId>org.apache.flink</groupId>-->
79+
<!-- <artifactId>flink-connector-elasticsearch-base_2.11</artifactId>-->
80+
<!-- <version>${flink.version}</version>-->
81+
<!-- </dependency>-->
82+
</dependencies>
83+
84+
<build>
85+
<plugins>
86+
<plugin>
87+
<groupId>org.apache.maven.plugins</groupId>
88+
<artifactId>maven-shade-plugin</artifactId>
89+
<version>1.4</version>
90+
<executions>
91+
<execution>
92+
<phase>package</phase>
93+
<goals>
94+
<goal>shade</goal>
95+
</goals>
96+
<configuration>
97+
<artifactSet>
98+
<excludes>
99+
<!--<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>-->
100+
</excludes>
101+
</artifactSet>
102+
<filters>
103+
<filter>
104+
<artifact>*:*</artifact>
105+
<excludes>
106+
<exclude>META-INF/*.SF</exclude>
107+
<exclude>META-INF/*.DSA</exclude>
108+
<exclude>META-INF/*.RSA</exclude>
109+
</excludes>
110+
</filter>
111+
</filters>
112+
</configuration>
113+
</execution>
114+
</executions>
115+
</plugin>
116+
117+
<plugin>
118+
<artifactId>maven-antrun-plugin</artifactId>
119+
<version>1.2</version>
120+
<executions>
121+
<execution>
122+
<id>copy-resources</id>
123+
<!-- here the phase you need -->
124+
<phase>package</phase>
125+
<goals>
126+
<goal>run</goal>
127+
</goals>
128+
<configuration>
129+
<tasks>
130+
<copy todir="${basedir}/../../sqlplugins/elasticsearch-xhsink">
131+
<fileset dir="target/">
132+
<include name="${project.artifactId}-${project.version}.jar" />
133+
</fileset>
134+
</copy>
135+
136+
<move file="${basedir}/../../sqlplugins/elasticsearch-xhsink/${project.artifactId}-${project.version}.jar"
137+
tofile="${basedir}/../../sqlplugins/elasticsearch-xhsink/${project.name}-${git.branch}.jar" />
138+
</tasks>
139+
</configuration>
140+
</execution>
141+
</executions>
142+
</plugin>
143+
</plugins>
144+
</build>
145+
146+
</project>
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.elasticsearch;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.functions.RuntimeContext;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.metrics.Counter;
27+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
28+
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
29+
import org.apache.flink.types.Row;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
import transwarp.org.elasticsearch.action.index.IndexRequest;
34+
import transwarp.org.elasticsearch.client.Requests;
35+
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.stream.Collectors;
39+
40+
/**
41+
* Reason:
42+
* Date: 2017/7/19
43+
* Company: www.dtstack.com
44+
* @author xuchao
45+
*/
46+
47+
public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
48+
49+
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
50+
51+
private static final String ID_VALUE_SPLIT = "_";
52+
53+
private String index;
54+
55+
private String type;
56+
57+
private List<Integer> idFieldIndexList;
58+
59+
private List<String> fieldNames;
60+
61+
private List<String> fieldTypes;
62+
63+
public transient Counter outRecords;
64+
65+
/** 默认分隔符为'_' */
66+
private char sp = '_';
67+
68+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes){
69+
this.index = index;
70+
this.type = type;
71+
this.fieldNames = fieldNames;
72+
this.fieldTypes = fieldTypes;
73+
this.idFieldIndexList = idFieldIndexes;
74+
}
75+
76+
@Override
77+
public void process(Tuple2 tuple2, RuntimeContext ctx, RequestIndexer indexer) {
78+
try{
79+
Tuple2<Boolean, Row> tupleTrans = tuple2;
80+
Boolean retract = tupleTrans.getField(0);
81+
Row element = tupleTrans.getField(1);
82+
if(!retract){
83+
return;
84+
}
85+
86+
indexer.add(createIndexRequest(element));
87+
outRecords.inc();
88+
}catch (Throwable e){
89+
logger.error("", e);
90+
}
91+
}
92+
93+
public void setOutRecords(Counter outRecords) {
94+
this.outRecords = outRecords;
95+
}
96+
97+
private IndexRequest createIndexRequest(Row element) {
98+
String idFieldStr = "";
99+
if (null != idFieldIndexList) {
100+
// index start at 1,
101+
idFieldStr = idFieldIndexList.stream()
102+
.filter(index -> index > 0 && index <= element.getArity())
103+
.map(index -> element.getField(index - 1).toString())
104+
.collect(Collectors.joining(ID_VALUE_SPLIT));
105+
}
106+
107+
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
108+
int length = Math.min(element.getArity(), fieldNames.size());
109+
for(int i=0; i<length; i++){
110+
dataMap.put(fieldNames.get(i), element.getField(i));
111+
}
112+
113+
if (StringUtils.isEmpty(idFieldStr)) {
114+
return Requests.indexRequest()
115+
.index(index)
116+
.type(type)
117+
.source(dataMap);
118+
}
119+
120+
return Requests.indexRequest()
121+
.index(index)
122+
.type(type)
123+
.id(idFieldStr)
124+
.source(dataMap);
125+
}
126+
}

0 commit comments

Comments
 (0)