Skip to content

Commit 76a39d9

Browse files
author
gituser
committed
Merge branch 'feat_1.10_es5Kerberos' into 1.10_release_4.0.x
2 parents 5854b1c + da5c0a3 commit 76a39d9

26 files changed

+3562
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.util;
2020

21+
import com.esotericsoftware.minlog.Log;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.security.UserGroupInformation;
2324
import org.apache.hadoop.security.authentication.util.KerberosName;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.elasticsearch5-xh</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.sink.elasticsearch5-xh</artifactId>
13+
<name>elasticsearch-xh-sink</name>
14+
15+
<dependencies>
16+
17+
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
18+
<dependency>
19+
<groupId>org.slf4j</groupId>
20+
<artifactId>slf4j-log4j12</artifactId>
21+
<version>1.6.1</version>
22+
</dependency>
23+
24+
<dependency>
25+
<groupId>org.apache.logging.log4j</groupId>
26+
<artifactId>log4j-to-slf4j</artifactId>
27+
<version>2.7</version>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>io.transwarp.elasticsearch</groupId>
32+
<artifactId>elasticsearch-client</artifactId>
33+
<version>5.4.1</version>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>io.transwarp.sasl</groupId>
38+
<artifactId>guardian-sasl-transwarp</artifactId>
39+
<version>6.2.1</version>
40+
</dependency>
41+
42+
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
43+
<dependency>
44+
<groupId>com.google.protobuf</groupId>
45+
<artifactId>protobuf-java</artifactId>
46+
<version>3.15.0</version>
47+
</dependency>
48+
49+
50+
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-table-planner_2.11</artifactId>
54+
<version>${flink.version}</version>
55+
<scope>provided</scope>
56+
<optional>true</optional>
57+
</dependency>
58+
59+
60+
</dependencies>
61+
62+
<build>
63+
<plugins>
64+
<plugin>
65+
<groupId>org.apache.maven.plugins</groupId>
66+
<artifactId>maven-shade-plugin</artifactId>
67+
<version>1.4</version>
68+
<executions>
69+
<execution>
70+
<phase>package</phase>
71+
<goals>
72+
<goal>shade</goal>
73+
</goals>
74+
<configuration>
75+
<artifactSet>
76+
<excludes>
77+
<!--<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>-->
78+
</excludes>
79+
</artifactSet>
80+
<filters>
81+
<filter>
82+
<artifact>*:*</artifact>
83+
<excludes>
84+
<exclude>META-INF/*.SF</exclude>
85+
<exclude>META-INF/*.DSA</exclude>
86+
<exclude>META-INF/*.RSA</exclude>
87+
</excludes>
88+
</filter>
89+
</filters>
90+
</configuration>
91+
</execution>
92+
</executions>
93+
</plugin>
94+
95+
<plugin>
96+
<artifactId>maven-antrun-plugin</artifactId>
97+
<version>1.2</version>
98+
<executions>
99+
<execution>
100+
<id>copy-resources</id>
101+
<!-- here the phase you need -->
102+
<phase>package</phase>
103+
<goals>
104+
<goal>run</goal>
105+
</goals>
106+
<configuration>
107+
<tasks>
108+
<copy todir="${basedir}/../../sqlplugins/elasticsearch-xhsink">
109+
<fileset dir="target/">
110+
<include name="${project.artifactId}-${project.version}.jar" />
111+
</fileset>
112+
</copy>
113+
114+
<move file="${basedir}/../../sqlplugins/elasticsearch-xhsink/${project.artifactId}-${project.version}.jar"
115+
tofile="${basedir}/../../sqlplugins/elasticsearch-xhsink/${project.name}-${git.branch}.jar" />
116+
</tasks>
117+
</configuration>
118+
</execution>
119+
</executions>
120+
</plugin>
121+
</plugins>
122+
</build>
123+
124+
</project>
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.elasticsearch;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.functions.RuntimeContext;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.metrics.Counter;
27+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
28+
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
29+
import org.apache.flink.types.Row;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
import transwarp.org.elasticsearch.action.index.IndexRequest;
34+
import transwarp.org.elasticsearch.client.Requests;
35+
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.stream.Collectors;
39+
40+
/**
41+
* Reason:
42+
* Date: 2017/7/19
43+
* Company: www.dtstack.com
44+
* @author xuchao
45+
*/
46+
47+
public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
48+
49+
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
50+
51+
private static final String ID_VALUE_SPLIT = "_";
52+
53+
private String index;
54+
55+
private String type;
56+
57+
private List<Integer> idFieldIndexList;
58+
59+
private List<String> fieldNames;
60+
61+
private List<String> fieldTypes;
62+
63+
public transient Counter outRecords;
64+
65+
public transient Counter outDirtyRecords;
66+
67+
/** 默认分隔符为'_' */
68+
private char sp = '_';
69+
70+
public CustomerSinkFunc(String index, String type, List<String> fieldNames, List<String> fieldTypes, List<Integer> idFieldIndexes){
71+
this.index = index;
72+
this.type = type;
73+
this.fieldNames = fieldNames;
74+
this.fieldTypes = fieldTypes;
75+
this.idFieldIndexList = idFieldIndexes;
76+
}
77+
78+
@Override
79+
public void process(Tuple2 tuple2, RuntimeContext ctx, RequestIndexer indexer) {
80+
try{
81+
Tuple2<Boolean, Row> tupleTrans = tuple2;
82+
Boolean retract = tupleTrans.getField(0);
83+
Row element = tupleTrans.getField(1);
84+
if(!retract){
85+
return;
86+
}
87+
88+
indexer.add(createIndexRequest(element));
89+
outRecords.inc();
90+
}catch (Exception e){
91+
outDirtyRecords.inc();
92+
logger.error("Failed to store source data {}. ", tuple2.getField(1));
93+
logger.error("Failed to create index request exception. ", e);
94+
}
95+
}
96+
97+
public void setOutRecords(Counter outRecords) {
98+
this.outRecords = outRecords;
99+
}
100+
101+
public void setOutDirtyRecords(Counter outDirtyRecords) {
102+
this.outDirtyRecords = outDirtyRecords;
103+
}
104+
105+
private IndexRequest createIndexRequest(Row element) {
106+
String idFieldStr = "";
107+
if (null != idFieldIndexList) {
108+
// index start at 1,
109+
idFieldStr = idFieldIndexList.stream()
110+
.filter(index -> index > 0 && index <= element.getArity())
111+
.map(index -> element.getField(index - 1).toString())
112+
.collect(Collectors.joining(ID_VALUE_SPLIT));
113+
}
114+
115+
Map<String, Object> dataMap = EsUtil.rowToJsonMap(element,fieldNames,fieldTypes);
116+
int length = Math.min(element.getArity(), fieldNames.size());
117+
for(int i=0; i<length; i++){
118+
dataMap.put(fieldNames.get(i), element.getField(i));
119+
}
120+
121+
if (StringUtils.isEmpty(idFieldStr)) {
122+
return Requests.indexRequest()
123+
.index(index)
124+
.type(type)
125+
.source(dataMap);
126+
}
127+
128+
return Requests.indexRequest()
129+
.index(index)
130+
.type(type)
131+
.id(idFieldStr)
132+
.source(dataMap);
133+
}
134+
}

0 commit comments

Comments
 (0)