Skip to content

Commit a5353ba

Browse files
增加elasticsearch6-side功能(异步读取数据)
1 parent dc3a29e commit a5353ba

File tree

13 files changed

+1077
-46
lines changed

13 files changed

+1077
-46
lines changed

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,20 @@
3131
<artifactId>sql.side.elasticsearch6.core</artifactId>
3232
<version>${sql.side.elasticsearch6.core.version}</version>
3333
</dependency>
34+
35+
<dependency>
36+
<groupId>junit</groupId>
37+
<artifactId>junit</artifactId>
38+
<version>3.8.1</version>
39+
<scope>test</scope>
40+
</dependency>
41+
<dependency>
42+
<groupId>junit</groupId>
43+
<artifactId>junit</artifactId>
44+
<version>4.12</version>
45+
<scope>test</scope>
46+
</dependency>
47+
3448
</dependencies>
3549

3650
<build>

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearh6AllReqRow.java renamed to elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.google.common.collect.Maps;
1313
import org.apache.calcite.sql.JoinType;
1414
import org.apache.commons.collections.CollectionUtils;
15+
import org.apache.commons.lang3.StringUtils;
1516
import org.apache.http.HttpHost;
1617
import org.apache.http.auth.AuthScope;
1718
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -40,13 +41,13 @@
4041
* @author yinxi
4142
* @date 2020/1/13 - 1:00
4243
*/
43-
public class Elasticsearh6AllReqRow extends AllReqRow {
44+
public class Elasticsearch6AllReqRow extends AllReqRow {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AllReqRow.class);
4447

45-
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearh6AllReqRow.class);
46-
private static final int CONN_RETRY_NUM = 3;
4748
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
4849

49-
public Elasticsearh6AllReqRow(SideInfo sideInfo) {
50+
public Elasticsearch6AllReqRow(SideInfo sideInfo) {
5051
super(sideInfo);
5152
}
5253

@@ -155,44 +156,55 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
155156
Elasticsearch6SideTableInfo tableInfo = (Elasticsearch6SideTableInfo) sideInfo.getSideTableInfo();
156157
RestHighLevelClient rhlClient = null;
157158

158-
try {
159-
for (int i = 0; i < CONN_RETRY_NUM; i++) {
160-
try {
161-
rhlClient = getClient(tableInfo.getAddress(), tableInfo.isAuthMesh(), tableInfo.getUserName(), tableInfo.getPassword());
162-
break;
163-
} catch (Exception e) {
164-
if (i == CONN_RETRY_NUM - 1) {
165-
throw new RuntimeException("", e);
166-
}
167-
168-
try {
169-
String connInfo = "address: " + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword();
170-
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
171-
Thread.sleep(5 * 1000);
172-
} catch (InterruptedException e1) {
173-
LOG.error("", e1);
174-
}
175-
}
176-
}
159+
try{
160+
rhlClient = getClient(tableInfo.getAddress(), tableInfo.isAuthMesh(), tableInfo.getUserName(), tableInfo.getPassword(), tableInfo.getTimeout());
177161

178162
// load data from tableA
179-
SearchSourceBuilder searchSourceBuilder = Elasticsearch6AllSideInfo.searchSourceBuilder;
163+
SearchSourceBuilder searchSourceBuilder = tableInfo.getSearchSourceBuilder();
180164
searchSourceBuilder.size(getFetchSize());
181-
SearchRequest searchRequest = new SearchRequest(tableInfo.getIndex());
182-
searchRequest.types(tableInfo.getEsType());
165+
SearchRequest searchRequest = new SearchRequest();
166+
167+
// determine existence of index
168+
String index = tableInfo.getIndex().trim();
169+
if(!StringUtils.isEmpty(index)){
170+
// strip leading and trailing spaces from a string
171+
String[] indexes = StringUtils.split(index, ",");
172+
for(int i=0; i < indexes.length; i++ ){
173+
indexes[i] = indexes[i].trim();
174+
}
175+
176+
searchRequest.indices(indexes);
177+
178+
}
179+
180+
// determine existence of type
181+
String type = tableInfo.getEsType().trim();
182+
if(!StringUtils.isEmpty(type)){
183+
// strip leading and trailing spaces from a string
184+
String[] types = StringUtils.split(type, ",");
185+
for(int i=0; i < types.length; i++ ){
186+
types[i] = types[i].trim();
187+
}
188+
189+
searchRequest.types(types);
190+
}
191+
192+
// add query condition
183193
searchRequest.source(searchSourceBuilder);
184194

195+
// get query reults
185196
SearchResponse searchResponse = rhlClient.search(searchRequest);
186197
SearchHit[] searchHits = searchResponse.getHits().getHits();
187-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
188-
String[] fields = sideInfo.getSideTableInfo().getFieldTypes();
198+
199+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");
200+
String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes();
189201

190202
Map<String, Object> oneRow = Maps.newHashMap();
191203
for (SearchHit searchHit : searchHits) {
192204
for(String fieldName : sideFieldNames){
193205
Object object = searchHit.getSourceAsMap().get(fieldName.trim());
194206
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
195-
object = SwitchUtil.getTarget(object, fields[fieldIndex]);
207+
object = SwitchUtil.getTarget(object, sideFieldTypes[fieldIndex]);
196208
oneRow.put(fieldName.trim(), object);
197209
}
198210

@@ -212,11 +224,11 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
212224

213225
}
214226

215-
public RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh, String userName, String password) {
227+
public RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh, String userName, String password, Integer timeout) {
216228
List<HttpHost> httpHostList = new ArrayList<>();
217-
String[] address = esAddress.split(",");
229+
String[] address = StringUtils.split(esAddress, ",");
218230
for (String addr : address) {
219-
String[] infoArray = addr.split(":");
231+
String[] infoArray = StringUtils.split(addr, ":");
220232
int port = 9200;
221233
String host = infoArray[0].trim();
222234
if (infoArray.length > 1) {
@@ -226,6 +238,11 @@ public RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh, Strin
226238
}
227239

228240
RestClientBuilder restClientBuilder = RestClient.builder(httpHostList.toArray(new HttpHost[httpHostList.size()]));
241+
242+
if (timeout != null) {
243+
restClientBuilder.setMaxRetryTimeoutMillis(timeout * 1000);
244+
}
245+
229246
if (isAuthMesh) {
230247
// 进行用户和密码认证
231248
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
@@ -236,6 +253,23 @@ public RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh, Strin
236253

237254
RestHighLevelClient rhlClient = new RestHighLevelClient(restClientBuilder);
238255

256+
if (LOG.isInfoEnabled()) {
257+
LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHostList);
258+
}
259+
260+
try{
261+
if (!rhlClient.ping()) {
262+
throw new RuntimeException("There are no reachable Elasticsearch nodes!");
263+
}
264+
} catch (IOException e){
265+
LOG.warn("", e);
266+
}
267+
268+
269+
if (LOG.isInfoEnabled()) {
270+
LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHostList.toString());
271+
}
272+
239273
return rhlClient;
240274

241275
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllSideInfo.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2222

2323
import com.dtstack.flink.sql.side.*;
24+
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
2425
import com.dtstack.flink.sql.util.ParseUtils;
2526
import com.google.common.collect.Lists;
2627
import org.apache.calcite.sql.SqlNode;
2728
import org.apache.commons.collections.CollectionUtils;
29+
import org.apache.commons.lang.StringUtils;
2830
import org.elasticsearch.index.query.BoolQueryBuilder;
2931
import org.elasticsearch.index.query.QueryBuilders;
3032
import org.elasticsearch.search.builder.SearchSourceBuilder;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3135

3236
import java.util.List;
3337

@@ -37,7 +41,7 @@
3741
*/
3842
public class Elasticsearch6AllSideInfo extends SideInfo {
3943

40-
public static SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
44+
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AllSideInfo.class);
4145

4246
public Elasticsearch6AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
4347
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
@@ -46,12 +50,14 @@ public Elasticsearch6AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, Lis
4650
@Override
4751
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
4852

49-
getSelectFromStatement(sideTableInfo.getPredicateInfoes());
53+
Elasticsearch6SideTableInfo elasticsearch6SideTableInfo = (Elasticsearch6SideTableInfo) sideTableInfo;
5054

51-
}
55+
elasticsearch6SideTableInfo.setSearchSourceBuilder(getSelectFromStatement(sideTableInfo.getPredicateInfoes()));
5256

53-
private void getSelectFromStatement(List<PredicateInfo> predicateInfoes) {
57+
}
5458

59+
private SearchSourceBuilder getSelectFromStatement(List<PredicateInfo> predicateInfoes) {
60+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
5561
if (predicateInfoes.size() != 0) {
5662
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
5763
for (PredicateInfo info : sideTableInfo.getPredicateInfoes()) {
@@ -61,10 +67,15 @@ private void getSelectFromStatement(List<PredicateInfo> predicateInfoes) {
6167
searchSourceBuilder.query(boolQueryBuilder);
6268
}
6369

70+
return searchSourceBuilder;
6471
}
6572

66-
public BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBuilder, PredicateInfo info){
73+
public BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBuilder, PredicateInfo info) {
6774
switch (info.getOperatorKind()) {
75+
case "IN":
76+
return boolQueryBuilder.must(QueryBuilders.termsQuery(info.getFieldName(), StringUtils.split(info.getCondition().trim(), ",")));
77+
case "NOT_IN":
78+
return boolQueryBuilder.mustNot(QueryBuilders.termsQuery(info.getFieldName(), StringUtils.split(info.getCondition().trim(), ",")));
6879
case "GREATER_THAN_OR_EQUAL":
6980
return boolQueryBuilder.must(QueryBuilders.rangeQuery(info.getFieldName()).gte(info.getCondition()));
7081
case "GREATER_THAN":
@@ -75,15 +86,19 @@ public BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBuilder,
7586
return boolQueryBuilder.must(QueryBuilders.rangeQuery(info.getFieldName()).lt(info.getCondition()));
7687
case "EQUALS":
7788
return boolQueryBuilder.must(QueryBuilders.termQuery(info.getFieldName(), info.getCondition()));
89+
case "NOT_EQUALS":
90+
return boolQueryBuilder.mustNot(QueryBuilders.termQuery(info.getFieldName(), info.getCondition()));
7891
default:
7992
try {
80-
throw new Exception("Predicate does not match!");
93+
throw new Exception("elasticsearch6 does not support this operation: " + info.getOperatorName());
8194
} catch (Exception e) {
95+
8296
e.printStackTrace();
97+
LOG.error(e.getMessage());
8398
}
99+
return boolQueryBuilder;
84100
}
85101

86-
return boolQueryBuilder;
87102
}
88103

89104
@Override
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.elasticsearch6</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.async.elasticsearch6</artifactId>
13+
<name>elasticsearch6-async-side</name>
14+
<packaging>jar</packaging>
15+
16+
<properties>
17+
<sql.side.elasticsearch6.core.version>1.0-SNAPSHOT</sql.side.elasticsearch6.core.version>
18+
<elasticsearch.version>6.8.6</elasticsearch.version>
19+
</properties>
20+
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.elasticsearch.client</groupId>
25+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
26+
<version>${elasticsearch.version}</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>com.dtstack.flink</groupId>
31+
<artifactId>sql.side.elasticsearch6.core</artifactId>
32+
<version>${sql.side.elasticsearch6.core.version}</version>
33+
</dependency>
34+
35+
</dependencies>
36+
37+
<build>
38+
<plugins>
39+
<plugin>
40+
<groupId>org.apache.maven.plugins</groupId>
41+
<artifactId>maven-shade-plugin</artifactId>
42+
<version>1.4</version>
43+
<executions>
44+
<execution>
45+
<phase>package</phase>
46+
<goals>
47+
<goal>shade</goal>
48+
</goals>
49+
<configuration>
50+
<artifactSet>
51+
<excludes>
52+
53+
</excludes>
54+
</artifactSet>
55+
<filters>
56+
<filter>
57+
<artifact>*:*</artifact>
58+
<excludes>
59+
<exclude>META-INF/*.SF</exclude>
60+
<exclude>META-INF/*.DSA</exclude>
61+
<exclude>META-INF/*.RSA</exclude>
62+
</excludes>
63+
</filter>
64+
</filters>
65+
</configuration>
66+
</execution>
67+
</executions>
68+
</plugin>
69+
70+
<plugin>
71+
<artifactId>maven-antrun-plugin</artifactId>
72+
<version>1.2</version>
73+
<executions>
74+
<execution>
75+
<id>copy-resources</id>
76+
<!-- here the phase you need -->
77+
<phase>package</phase>
78+
<goals>
79+
<goal>run</goal>
80+
</goals>
81+
<configuration>
82+
<tasks>
83+
<copy todir="${basedir}/../../../plugins/elasticsearch6asyncside">
84+
<fileset dir="target/">
85+
<include name="${project.artifactId}-${project.version}.jar"/>
86+
</fileset>
87+
</copy>
88+
89+
<move file="${basedir}/../../../plugins/elasticsearch6asyncside/${project.artifactId}-${project.version}.jar"
90+
tofile="${basedir}/../../../plugins/elasticsearch6asyncside/${project.name}-${git.branch}.jar"/>
91+
</tasks>
92+
</configuration>
93+
</execution>
94+
</executions>
95+
</plugin>
96+
</plugins>
97+
</build>
98+
99+
100+
101+
</project>

0 commit comments

Comments
 (0)