Skip to content

Commit 9474b14

Browse files
committed
[feat-35353][elasticsearch-xh][sink] add features that dirty data record and kerberos renew.
1 parent 4930373 commit 9474b14

File tree

10 files changed

+73
-88
lines changed

10 files changed

+73
-88
lines changed

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,4 @@ public static UserGroupInformation loginAndReturnUgi(String principal, String ke
6262
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
6363
}
6464

65-
public static void loginFromKeytab(String principal, String keytabPath, String krb5confPath) throws IOException{
66-
LOG.info("Kerberos login with principal: {} and keytab: {}", principal, keytabPath);
67-
System.setProperty(KRB5_CONF_KEY, krb5confPath);
68-
Configuration configuration = new Configuration();
69-
configuration.set(HADOOP_AUTH_KEY, KRB_STR);
70-
UserGroupInformation.setConfiguration(configuration);
71-
}
72-
7365
}

elasticsearch5-xh/elasticsearch5-xh-sink/pom.xml

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,6 @@
1313
<name>elasticsearch-xh-sink</name>
1414

1515
<dependencies>
16-
<!-- <dependency>-->
17-
<!-- <groupId>org.elasticsearch.client</groupId>-->
18-
<!-- <artifactId>transport</artifactId>-->
19-
<!-- <version>5.3.3</version>-->
20-
<!-- </dependency>-->
21-
22-
<!-- <dependency>-->
23-
<!-- <groupId>org.elasticsearch.client</groupId>-->
24-
<!-- <artifactId>x-pack-transport</artifactId>-->
25-
<!-- <version>5.3.3</version>-->
26-
<!-- </dependency>-->
2716

2817
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
2918
<dependency>
@@ -39,13 +28,13 @@
3928
</dependency>
4029

4130
<dependency>
42-
<groupId>transwarp.io.elasticsearch</groupId>
43-
<artifactId>elasricearch-client</artifactId>
31+
<groupId>io.transwarp.elasticsearch</groupId>
32+
<artifactId>elasticsearch-client</artifactId>
4433
<version>5.4.1</version>
4534
</dependency>
4635

4736
<dependency>
48-
<groupId>guardian.sasl.transwarp</groupId>
37+
<groupId>io.transwarp.sasl</groupId>
4938
<artifactId>guardian-sasl-transwarp</artifactId>
5039
<version>6.2.1</version>
5140
</dependency>
@@ -68,17 +57,6 @@
6857
</dependency>
6958

7059

71-
<!-- <dependency>-->
72-
<!-- <groupId>org.apache.flink</groupId>-->
73-
<!-- <artifactId>flink-connector-elasticsearch5_2.11</artifactId>-->
74-
<!-- <version>${flink.version}</version>-->
75-
<!-- </dependency>-->
76-
77-
<!-- <dependency>-->
78-
<!-- <groupId>org.apache.flink</groupId>-->
79-
<!-- <artifactId>flink-connector-elasticsearch-base_2.11</artifactId>-->
80-
<!-- <version>${flink.version}</version>-->
81-
<!-- </dependency>-->
8260
</dependencies>
8361

8462
<build>

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
6262

6363
public transient Counter outRecords;
6464

65+
public transient Counter outDirtyRecords;
66+
6567
/** 默认分隔符为'_' */
6668
private char sp = '_';
6769

@@ -85,15 +87,21 @@ public void process(Tuple2 tuple2, RuntimeContext ctx, RequestIndexer indexer) {
8587

8688
indexer.add(createIndexRequest(element));
8789
outRecords.inc();
88-
}catch (Throwable e){
89-
logger.error("", e);
90+
}catch (Exception e){
91+
outDirtyRecords.inc();
92+
logger.error("Failed to store source data {}. ", tuple2.getField(1));
93+
logger.error("Failed to create index request exception. ", e);
9094
}
9195
}
9296

9397
public void setOutRecords(Counter outRecords) {
9498
this.outRecords = outRecords;
9599
}
96100

101+
public void setOutDirtyRecords(Counter outDirtyRecords) {
102+
this.outDirtyRecords = outDirtyRecords;
103+
}
104+
97105
private IndexRequest createIndexRequest(Row element) {
98106
String idFieldStr = "";
99107
if (null != idFieldIndexList) {

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,6 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
9090

9191
// es cluster name
9292
private static final String CLUSTER_NAME = "cluster.name";
93-
// es cluster whether enable security
94-
private static final String SECURITY_ENABLE = "security.enable";
95-
9693

9794
@Override
9895
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ExtendES5ApiCallBridge.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,13 @@ public TransportClient createClient(Map<String, String> clientConfig) throws IOE
7878

7979
//2. set transwarp attributes
8080
Settings settings = Settings.builder().put(clientConfig)
81+
.put("client.transport.sniff", true)
8182
.put("security.enable", true)
8283
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
8384
.build();
8485

8586
//3. build transport client with transwarp plugins
86-
TransportClient transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
87+
TransportClient transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
8788
TransportClient tmpClient = new PreBuiltTransportClient(settings,
8889
Collections.singletonList(DoorKeeperClientPlugin.class));
8990
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
@@ -92,17 +93,6 @@ public TransportClient createClient(Map<String, String> clientConfig) throws IOE
9293
return tmpClient;
9394
});
9495

95-
// verify that we actually are connected to a cluster
96-
if (transportClient.connectedNodes().isEmpty()) {
97-
// close the transportClient here
98-
IOUtils.closeQuietly(transportClient);
99-
throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
100-
}
101-
102-
if (LOG.isInfoEnabled()) {
103-
LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
104-
}
105-
10696
return transportClient;
10797
}
10898

@@ -148,7 +138,27 @@ public void configureBulkProcessorBackoff(
148138
}
149139

150140
@Override
151-
public void verifyClientConnection(TransportClient client) throws IOException {
141+
public boolean verifyClientConnection(TransportClient client) throws IOException {
142+
143+
//1. login kdc with keytab and krb5 conf
144+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
145+
esTableInfo.getPrincipal(),
146+
esTableInfo.getKeytab(),
147+
esTableInfo.getKrb5conf());
148+
149+
//2. refresh availableNodes.
150+
boolean verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
151+
LOG.info("Refresh client available nodes.");
152+
client.refreshAvailableNodes();
153+
return client.connectedNodes().isEmpty();
154+
});
155+
156+
if (!verifyResult) {
157+
return true;
158+
}
152159

160+
// close the transportClient here
161+
IOUtils.closeQuietly(client);
162+
throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
153163
}
154164
}

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/MetricElasticsearchSink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ public void open(Configuration parameters) throws Exception {
6262

6363
public void initMetric() {
6464
Counter counter = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
65+
Counter outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
66+
6567
customerSinkFunc.setOutRecords(counter);
68+
customerSinkFunc.setOutDirtyRecords(outDirtyRecords);
6669
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(counter, 20));
6770
}
6871
}

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,6 @@ public class ElasticsearchSinkParser extends AbstractTableParser {
4545

4646
private static final String KEY_ES_ID_FIELD_INDEX_LIST = "id";
4747

48-
private static final String KEY_ES_AUTHMESH = "authMesh";
49-
50-
private static final String KEY_ES_USERNAME = "userName";
51-
52-
private static final String KEY_ES_PASSWORD = "password";
53-
5448
private static final String KEY_ES_PARALLELISM = "parallelism";
5549

5650
@Override

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.krb.KerberosTable;
2525
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
2626
import com.google.common.base.Preconditions;
27+
import com.google.common.base.Strings;
2728
import org.apache.commons.lang.StringUtils;
2829
import org.apache.commons.lang3.math.NumberUtils;
2930

@@ -48,11 +49,6 @@ public class ElasticsearchTableInfo extends AbstractTargetTableInfo implements K
4849

4950
private String esType;
5051

51-
private boolean authMesh = false;
52-
53-
private String userName;
54-
55-
private String password;
5652

5753
/**
5854
* kerberos
@@ -148,30 +144,6 @@ public void setClusterName(String clusterName) {
148144
this.clusterName = clusterName;
149145
}
150146

151-
public boolean isAuthMesh() {
152-
return authMesh;
153-
}
154-
155-
public void setAuthMesh(boolean authMesh) {
156-
this.authMesh = authMesh;
157-
}
158-
159-
public String getUserName() {
160-
return userName;
161-
}
162-
163-
public void setUserName(String userName) {
164-
this.userName = userName;
165-
}
166-
167-
public String getPassword() {
168-
return password;
169-
}
170-
171-
public void setPassword(String password) {
172-
this.password = password;
173-
}
174-
175147
public ElasticsearchTableInfo() {
176148
setType(CURR_TYPE);
177149
}
@@ -189,6 +161,13 @@ public boolean check() {
189161
});
190162
}
191163

164+
boolean allNotSet =
165+
Strings.isNullOrEmpty(principal) &&
166+
Strings.isNullOrEmpty(keytab) &&
167+
Strings.isNullOrEmpty(krb5conf);
168+
169+
Preconditions.checkState(allNotSet, "xh's elasticsearch type of kerberos file is required");
170+
192171
return true;
193172
}
194173

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void configureBulkProcessorBackoff(
8686
*
8787
* @param client the Elasticsearch client.
8888
*/
89-
void verifyClientConnection(C client) throws IOException;
89+
boolean verifyClientConnection(C client) throws IOException;
9090

9191
/**
9292
* Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary compatible.

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import transwarp.org.elasticsearch.common.unit.TimeValue;
4444
import transwarp.org.elasticsearch.rest.RestStatus;
4545

46+
import java.io.IOException;
4647
import java.io.Serializable;
4748
import java.util.HashMap;
4849
import java.util.Map;
@@ -391,8 +392,31 @@ private void checkAsyncErrorsAndRequests() {
391392
}
392393

393394
private class BulkProcessorListener implements BulkProcessor.Listener {
395+
396+
397+
private long lastVerifyTime = 0L;
398+
399+
/**
400+
* The interval time which check client whether available.
401+
*/
402+
private static final long verifyInterval = 22 * 60 * 60 * 1000L;
403+
394404
@Override
395-
public void beforeBulk(long executionId, BulkRequest request) { }
405+
public void beforeBulk(long executionId, BulkRequest request) {
406+
407+
// verify client connection.
408+
if ((System.currentTimeMillis() - lastVerifyTime) >= verifyInterval) {
409+
try {
410+
long currentTimeMillis = System.currentTimeMillis();
411+
callBridge.verifyClientConnection(client);
412+
lastVerifyTime = currentTimeMillis;
413+
} catch (IOException e) {
414+
LOG.error("Verify and Refresh ClientConnection failed. Reason: {}", e.getMessage());
415+
failureThrowable.compareAndSet(null, e);
416+
}
417+
}
418+
419+
}
396420

397421
@Override
398422
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

0 commit comments

Comments
 (0)