Skip to content

Commit dc3a29e

Browse files
增加elasticsearch6-side功能(ALL能读取数据)
1 parent 01ff006 commit dc3a29e

File tree

4 files changed

+395
-69
lines changed

4 files changed

+395
-69
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,89 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.side.elasticsearch6;
220

321
import org.apache.flink.api.java.typeutils.RowTypeInfo;
422

523
import com.dtstack.flink.sql.side.*;
6-
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
724
import com.dtstack.flink.sql.util.ParseUtils;
825
import com.google.common.collect.Lists;
926
import org.apache.calcite.sql.SqlNode;
1027
import org.apache.commons.collections.CollectionUtils;
11-
import org.apache.commons.lang3.StringUtils;
28+
import org.elasticsearch.index.query.BoolQueryBuilder;
29+
import org.elasticsearch.index.query.QueryBuilders;
1230
import org.elasticsearch.search.builder.SearchSourceBuilder;
1331

14-
import java.util.Arrays;
1532
import java.util.List;
16-
import java.util.stream.Collectors;
1733

1834
/**
1935
* @author yinxi
2036
* @date 2020/1/13 - 1:01
2137
*/
2238
public class Elasticsearch6AllSideInfo extends SideInfo {
2339

40+
public static SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
41+
2442
public Elasticsearch6AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
2543
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
2644
}
2745

2846
@Override
2947
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
30-
Elasticsearch6SideTableInfo es6SideTableInfo = (Elasticsearch6SideTableInfo) sideTableInfo;
31-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
3248

49+
getSelectFromStatement(sideTableInfo.getPredicateInfoes());
3350

34-
sqlCondition = getSelectFromStatement(getEstype(es6SideTableInfo), Arrays.asList(sideSelectFields.split(",")), sideTableInfo.getPredicateInfoes());
35-
System.out.println("-------- all side sql query-------\n" + sqlCondition);
3651
}
3752

38-
//基于rdb开发side,但是那些between,in,not in之类的不知道怎么处理
53+
private void getSelectFromStatement(List<PredicateInfo> predicateInfoes) {
3954

40-
public String getAdditionalWhereClause() {
41-
return "";
42-
}
55+
if (predicateInfoes.size() != 0) {
56+
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
57+
for (PredicateInfo info : sideTableInfo.getPredicateInfoes()) {
58+
boolQueryBuilder = buildFilterCondition(boolQueryBuilder, info);
59+
}
60+
61+
searchSourceBuilder.query(boolQueryBuilder);
62+
}
4363

44-
private String getSelectFromStatement(String tableName, List<String> selectFields, List<PredicateInfo> predicateInfoes) {
45-
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
46-
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
47-
String whereClause = buildWhereClause(predicateClause);
48-
String sql = "SELECT " + fromClause + " FROM " + tableName + whereClause;
49-
return sql;
5064
}
5165

52-
private String buildWhereClause(String predicateClause) {
53-
String additionalWhereClause = getAdditionalWhereClause();
54-
String whereClause = (!StringUtils.isEmpty(predicateClause) || !StringUtils.isEmpty(additionalWhereClause) ? " WHERE " + predicateClause : "");
55-
whereClause += (StringUtils.isEmpty(predicateClause)) ? additionalWhereClause.replaceFirst("AND", "") : additionalWhereClause;
56-
return whereClause;
66+
public BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBuilder, PredicateInfo info){
67+
switch (info.getOperatorKind()) {
68+
case "GREATER_THAN_OR_EQUAL":
69+
return boolQueryBuilder.must(QueryBuilders.rangeQuery(info.getFieldName()).gte(info.getCondition()));
70+
case "GREATER_THAN":
71+
return boolQueryBuilder.must(QueryBuilders.rangeQuery(info.getFieldName()).gt(info.getCondition()));
72+
case "LESS_THAN_OR_EQUAL":
73+
return boolQueryBuilder.must(QueryBuilders.rangeQuery(info.getFieldName()).lte(info.getCondition()));
74+
case "LESS_THAN":
75+
return boolQueryBuilder.must(QueryBuilders.rangeQuery(info.getFieldName()).lt(info.getCondition()));
76+
case "EQUALS":
77+
return boolQueryBuilder.must(QueryBuilders.termQuery(info.getFieldName(), info.getCondition()));
78+
default:
79+
try {
80+
throw new Exception("Predicate does not match!");
81+
} catch (Exception e) {
82+
e.printStackTrace();
83+
}
84+
}
85+
86+
return boolQueryBuilder;
5787
}
5888

5989
@Override
@@ -108,28 +138,4 @@ public void parseSelectFields(JoinInfo joinInfo) {
108138
sideSelectFields = String.join(",", fields);
109139
}
110140

111-
public String buildFilterCondition(PredicateInfo info) {
112-
switch (info.getOperatorKind()) {
113-
case "IN":
114-
case "NOT_IN":
115-
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName() + " ( " + info.getCondition() + " )";
116-
case "NOT_EQUALS":
117-
return quoteIdentifier(info.getFieldName()) + " != " + info.getCondition();
118-
case "BETWEEN":
119-
return quoteIdentifier(info.getFieldName()) + " BETWEEN " + info.getCondition();
120-
case "IS_NOT_NULL":
121-
case "IS_NULL":
122-
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName();
123-
default:
124-
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName() + " " + info.getCondition();
125-
}
126-
}
127-
128-
public String getEstype(Elasticsearch6SideTableInfo es6SdideTableInfo) {
129-
return es6SdideTableInfo.getEsType();
130-
}
131-
132-
public String quoteIdentifier(String identifier) {
133-
return " " + identifier + " ";
134-
}
135141
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearh6AllReqRow.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
import com.dtstack.flink.sql.side.AllReqRow;
88
import com.dtstack.flink.sql.side.SideInfo;
99
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
10+
import com.dtstack.flink.sql.side.elasticsearch6.util.SwitchUtil;
1011
import com.google.common.collect.Lists;
1112
import com.google.common.collect.Maps;
1213
import org.apache.calcite.sql.JoinType;
1314
import org.apache.commons.collections.CollectionUtils;
14-
import org.apache.commons.lang.StringUtils;
1515
import org.apache.http.HttpHost;
1616
import org.apache.http.auth.AuthScope;
1717
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -22,10 +22,6 @@
2222
import org.elasticsearch.client.RestClient;
2323
import org.elasticsearch.client.RestClientBuilder;
2424
import org.elasticsearch.client.RestHighLevelClient;
25-
import org.elasticsearch.common.unit.TimeValue;
26-
import org.elasticsearch.index.query.QueryBuilders;
27-
import org.elasticsearch.index.query.QueryStringQueryBuilder;
28-
import org.elasticsearch.search.Scroll;
2925
import org.elasticsearch.search.SearchHit;
3026
import org.elasticsearch.search.builder.SearchSourceBuilder;
3127
import org.slf4j.Logger;
@@ -179,24 +175,11 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
179175
}
180176
}
181177

182-
// load data from table
183-
String sql = sideInfo.getSqlCondition();
184-
185-
Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
186-
187-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
188-
searchSourceBuilder.size(getBathSize());
189-
if(StringUtils.isNotEmpty(sql)){
190-
searchSourceBuilder.query(QueryBuilders.wrapperQuery(sql));
191-
}
192-
193-
// if(genericInputSplit.getTotalNumberOfSplits() > 1){
194-
// searchSourceBuilder.slice(new SliceBuilder(genericInputSplit.getSplitNumber(), genericInputSplit.getTotalNumberOfSplits()));
195-
// }
196-
178+
// load data from tableA
179+
SearchSourceBuilder searchSourceBuilder = Elasticsearch6AllSideInfo.searchSourceBuilder;
180+
searchSourceBuilder.size(getFetchSize());
197181
SearchRequest searchRequest = new SearchRequest(tableInfo.getIndex());
198182
searchRequest.types(tableInfo.getEsType());
199-
searchRequest.scroll(scroll);
200183
searchRequest.source(searchSourceBuilder);
201184

202185
SearchResponse searchResponse = rhlClient.search(searchRequest);
@@ -257,7 +240,7 @@ public RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh, Strin
257240

258241
}
259242

260-
public int getBathSize() {
243+
public int getFetchSize() {
261244
return 1000;
262245
}
263246
}

0 commit comments

Comments
 (0)