Skip to content

Commit 2a70c5f

Browse files
修改
1 parent 87643e5 commit 2a70c5f

File tree

1 file changed

+4
-35
lines changed

1 file changed

+4
-35
lines changed

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public Elasticsearch6AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, Lis
7171
}
7272

7373
@Override
74-
public void open(Configuration parameters) throws Exception{
74+
public void open(Configuration parameters) throws Exception {
7575
super.open(parameters);
7676
Elasticsearch6SideTableInfo tableInfo = (Elasticsearch6SideTableInfo) sideInfo.getSideTableInfo();
7777
rhlClient = Es6Util.getClient(tableInfo.getAddress(), tableInfo.isAuthMesh(), tableInfo.getUserName(), tableInfo.getPassword());
@@ -117,40 +117,9 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
117117
BoolQueryBuilder boolQueryBuilder = Es6Util.setPredicateclause(sideInfo);
118118
boolQueryBuilder = setInputParams(inputParams, boolQueryBuilder);
119119
SearchSourceBuilder searchSourceBuilder = initConfiguration();
120-
121120
searchSourceBuilder.query(boolQueryBuilder);
122-
123121
searchRequest.source(searchSourceBuilder);
124122

125-
126-
// List<Object> cacheContent = Lists.newArrayList();
127-
// List<CRow> rowList = Lists.newArrayList();
128-
// ActionListener<SearchResponse> searchActionListener = new ActionListener<SearchResponse>() {
129-
// @Override
130-
// public void onResponse(SearchResponse searchResponse) {
131-
//
132-
// SearchHit[] searchHits = searchResponse.getHits().getHits();
133-
// while (searchHits != null && searchHits.length > 0){
134-
// loadDataToCache(searchHits, rowList, cacheContent, copyCrow);
135-
// if (searchHits.length < getFetchSize()) {
136-
// break;
137-
// }
138-
//
139-
// Object[] searchAfterParameter = searchHits[searchHits.length - 1].getSortValues();
140-
// searchSourceBuilder.searchAfter(searchAfterParameter);
141-
// searchRequest.source(searchSourceBuilder);
142-
// SearchResponse newSearchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
143-
// searchHits = newSearchResponse.getHits().getHits();
144-
//
145-
// }
146-
// }
147-
//
148-
// @Override
149-
// public void onFailure(Exception e) {
150-
//
151-
// }
152-
// };
153-
154123
// 异步查询数据
155124
rhlClient.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
156125

@@ -209,7 +178,7 @@ public void onFailure(Exception e) {
209178

210179
}
211180

212-
private void loadDataToCache(SearchHit[] searchHits, List<CRow> rowList, List<Object> cacheContent, CRow copyCrow){
181+
private void loadDataToCache(SearchHit[] searchHits, List<CRow> rowList, List<Object> cacheContent, CRow copyCrow) {
213182
List<Object> results = Lists.newArrayList();
214183
for (SearchHit searchHit : searchHits) {
215184
Map<String, Object> object = searchHit.getSourceAsMap();
@@ -289,8 +258,8 @@ private SearchSourceBuilder initConfiguration() {
289258
}
290259

291260

292-
private BoolQueryBuilder setInputParams(List<Object> inputParams, BoolQueryBuilder boolQueryBuilder){
293-
if(boolQueryBuilder == null){
261+
private BoolQueryBuilder setInputParams(List<Object> inputParams, BoolQueryBuilder boolQueryBuilder) {
262+
if (boolQueryBuilder == null) {
294263
boolQueryBuilder = new BoolQueryBuilder();
295264
}
296265

0 commit comments

Comments
 (0)