2424import org .apache .flink .types .Row ;
2525import org .apache .flink .util .Collector ;
2626
27- import com .dtstack .flink .sql .side .*;
27+ import com .dtstack .flink .sql .side .AllReqRow ;
28+ import com .dtstack .flink .sql .side .FieldInfo ;
29+ import com .dtstack .flink .sql .side .JoinInfo ;
30+ import com .dtstack .flink .sql .side .SideTableInfo ;
2831import com .dtstack .flink .sql .side .elasticsearch6 .table .Elasticsearch6SideTableInfo ;
32+ import com .dtstack .flink .sql .side .elasticsearch6 .util .Es6Util ;
2933import com .dtstack .flink .sql .side .elasticsearch6 .util .SwitchUtil ;
3034import com .google .common .collect .Lists ;
3135import com .google .common .collect .Maps ;
3236import org .apache .calcite .sql .JoinType ;
3337import org .apache .commons .collections .CollectionUtils ;
3438import org .apache .commons .lang3 .StringUtils ;
35- import org .apache .http .HttpHost ;
36- import org .apache .http .auth .AuthScope ;
37- import org .apache .http .auth .UsernamePasswordCredentials ;
38- import org .apache .http .client .CredentialsProvider ;
39- import org .apache .http .impl .client .BasicCredentialsProvider ;
40- import org .elasticsearch .action .search .*;
39+ import org .elasticsearch .action .search .SearchRequest ;
40+ import org .elasticsearch .action .search .SearchResponse ;
4141import org .elasticsearch .client .RequestOptions ;
42- import org .elasticsearch .client .RestClient ;
43- import org .elasticsearch .client .RestClientBuilder ;
4442import org .elasticsearch .client .RestHighLevelClient ;
45- import org .elasticsearch .common .unit .TimeValue ;
4643import org .elasticsearch .index .query .BoolQueryBuilder ;
47- import org .elasticsearch .index .query .QueryBuilders ;
48- import org .elasticsearch .search .Scroll ;
4944import org .elasticsearch .search .SearchHit ;
5045import org .elasticsearch .search .builder .SearchSourceBuilder ;
46+ import org .elasticsearch .search .sort .SortOrder ;
5147import org .slf4j .Logger ;
5248import org .slf4j .LoggerFactory ;
5349
5450import java .io .IOException ;
5551import java .io .Serializable ;
5652import java .sql .SQLException ;
5753import java .sql .Timestamp ;
58- import java .util .*;
54+ import java .util .Calendar ;
55+ import java .util .List ;
56+ import java .util .Map ;
5957import java .util .concurrent .atomic .AtomicReference ;
60- import java .util .stream .Collectors ;
6158
6259/**
6360 * @author yinxi
@@ -67,19 +64,16 @@ public class Elasticsearch6AllReqRow extends AllReqRow implements Serializable {
6764
6865 private static final Logger LOG = LoggerFactory .getLogger (Elasticsearch6AllReqRow .class );
6966
70- private static final Integer SCROLL_TIME = 1 ;
7167 private static final int CONN_RETRY_NUM = 3 ;
72- private static final String KEY_WORD_TYPE = ".keyword" ;
7368 private AtomicReference <Map <String , List <Map <String , Object >>>> cacheRef = new AtomicReference <>();
74- private String scrollId ;
75- private Scroll scroll ;
7669 private transient RestHighLevelClient rhlClient ;
70+ private SearchRequest searchRequest ;
71+ private BoolQueryBuilder boolQueryBuilder ;
7772
7873 public Elasticsearch6AllReqRow (RowTypeInfo rowTypeInfo , JoinInfo joinInfo , List <FieldInfo > outFieldInfoList , SideTableInfo sideTableInfo ) {
7974 super (new Elasticsearch6AllSideInfo (rowTypeInfo , joinInfo , outFieldInfoList , sideTableInfo ));
8075 }
8176
82-
8377 @ Override
8478 public void flatMap (CRow value , Collector <CRow > out ) throws Exception {
8579 List <Object > inputParams = Lists .newArrayList ();
@@ -165,6 +159,8 @@ protected void initCache() throws SQLException {
165159 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
166160 cacheRef .set (newCache );
167161 try {
162+ searchRequest = Es6Util .setSearchRequest (sideInfo );
163+ boolQueryBuilder = Es6Util .setPredicateclause (sideInfo );
168164 loadData (newCache );
169165 } catch (Exception e ) {
170166 LOG .error ("" , e );
@@ -191,7 +187,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
191187 try {
192188 for (int i = 0 ; i < CONN_RETRY_NUM ; i ++) {
193189 try {
194- rhlClient = getClient (tableInfo .getAddress (), tableInfo .isAuthMesh (), tableInfo .getUserName (), tableInfo .getPassword ());
190+ rhlClient = Es6Util . getClient (tableInfo .getAddress (), tableInfo .isAuthMesh (), tableInfo .getUserName (), tableInfo .getPassword ());
195191 break ;
196192 } catch (Exception e ) {
197193 if (i == CONN_RETRY_NUM - 1 ) {
@@ -208,81 +204,56 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
208204 }
209205
210206 }
211-
212-
213- // load data from tableA
214- SearchSourceBuilder searchSourceBuilder = getSelectFromStatement (sideInfo .getSideTableInfo ().getPredicateInfoes ());
215- searchSourceBuilder .size (getFetchSize ());
216- SearchRequest searchRequest = new SearchRequest ();
217- scroll = new Scroll (TimeValue .timeValueMinutes (SCROLL_TIME ));
218- searchRequest .scroll (scroll );
219-
220- // determine existence of index
221- String index = tableInfo .getIndex ().trim ();
222- if (!StringUtils .isEmpty (index )) {
223- // strip leading and trailing spaces from a string
224- String [] indexes = StringUtils .split (index , "," );
225- for (int i = 0 ; i < indexes .length ; i ++) {
226- indexes [i ] = indexes [i ].trim ();
227- }
228-
229- searchRequest .indices (indexes );
230-
231- }
232-
233- // determine existence of type
234- String type = tableInfo .getEsType ().trim ();
235- if (!StringUtils .isEmpty (type )) {
236- // strip leading and trailing spaces from a string
237- String [] types = StringUtils .split (type , "," );
238- for (int i = 0 ; i < types .length ; i ++) {
239- types [i ] = types [i ].trim ();
240- }
241-
242- searchRequest .types (types );
243- }
244-
245- // add query condition
246- searchRequest .source (searchSourceBuilder );
247-
248- // get query reults
249- searchScroll (searchRequest , tmpCache );
250-
207+ SearchSourceBuilder searchSourceBuilder = initConfiguration (boolQueryBuilder );
208+ searchData (searchSourceBuilder , tmpCache );
251209
252210 } catch (Exception e ) {
253211 LOG .error ("" , e );
254212 } finally {
255- if (!StringUtils .isEmpty (scrollId )) {
256- clearScroll ();
257- }
258213
259214 if (rhlClient != null ) {
260215 rhlClient .close ();
261216 }
262217 }
218+ }
219+
220+ private SearchSourceBuilder initConfiguration (BoolQueryBuilder boolQueryBuilder ){
221+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder ();
222+ if (boolQueryBuilder != null ) {
223+ searchSourceBuilder .query (boolQueryBuilder );
224+ }
263225
226+ searchSourceBuilder .size (getFetchSize ());
227+ searchSourceBuilder .sort ("_id" , SortOrder .DESC );
228+ String [] sideFieldNames = StringUtils .split (sideInfo .getSideSelectFields ().trim (), "," );
229+ searchSourceBuilder .fetchSource (sideFieldNames , null );
230+ return searchSourceBuilder ;
264231 }
265232
266- public void searchScroll (SearchRequest searchRequest , Map <String , List <Map <String , Object >>> tmpCache ) throws IOException {
267- SearchResponse searchResponse = rhlClient .search (searchRequest , RequestOptions .DEFAULT );
268- scrollId = searchResponse .getScrollId ();
269- SearchHit [] searchHits = searchResponse .getHits ().getHits ();
270- loadToCache (searchHits , tmpCache );
271-
272- if (!StringUtils .isEmpty (scrollId )) {
273- SearchScrollRequest scrollRequest = new SearchScrollRequest (scrollId );
274- scrollRequest .scroll (scroll );
275- while (true ) {
276- SearchResponse scrollResponse = rhlClient .scroll (scrollRequest , RequestOptions .DEFAULT );
277- if (scrollResponse .getHits ().getHits () == null || scrollResponse .getHits ().getHits ().length < 1 ) {
278- break ;
233+
234+ private void searchData (SearchSourceBuilder searchSourceBuilder , Map <String , List <Map <String , Object >>> tmpCache ) {
235+
236+ Object [] searchAfterParameter = null ;
237+ SearchResponse searchResponse = null ;
238+ SearchHit [] searchHits = null ;
239+
240+ while (true ) {
241+ try {
242+ if (searchAfterParameter != null ) {
243+ searchSourceBuilder .searchAfter (searchAfterParameter );
279244 }
245+ searchRequest .source (searchSourceBuilder );
246+ searchResponse = rhlClient .search (searchRequest , RequestOptions .DEFAULT );
280247 searchHits = searchResponse .getHits ().getHits ();
281248 loadToCache (searchHits , tmpCache );
249+
250+ if (searchHits .length < getFetchSize ()) {
251+ break ;
252+ }
282253
283- scrollId = scrollResponse . getScrollId ();
284- scrollRequest . scrollId ( scrollId );
285-
254+ searchAfterParameter = searchHits [ searchHits . length - 1 ]. getSortValues ();
255+ } catch ( IOException e ) {
256+ LOG . error ( "Query failed!" , e );
286257 }
287258 }
288259 }
@@ -305,138 +276,7 @@ private void loadToCache(SearchHit[] searchHits, Map<String, List<Map<String, Ob
305276 }
306277 }
307278
308- private void clearScroll () throws IOException {
309- ClearScrollRequest clearScrollRequest = new ClearScrollRequest ();
310- clearScrollRequest .addScrollId (scrollId );
311- ClearScrollResponse clearScrollResponse = rhlClient .clearScroll (clearScrollRequest , RequestOptions .DEFAULT );
312- boolean succeeded = clearScrollResponse .isSucceeded ();
313- LOG .info ("Clear scroll response:{}" , succeeded );
314- }
315-
316- public RestHighLevelClient getClient (String esAddress , Boolean isAuthMesh , String userName , String password ) {
317- List <HttpHost > httpHostList = new ArrayList <>();
318- String [] address = StringUtils .split (esAddress , "," );
319- for (String addr : address ) {
320- String [] infoArray = StringUtils .split (addr , ":" );
321- int port = 9200 ;
322- String host = infoArray [0 ].trim ();
323- if (infoArray .length > 1 ) {
324- port = Integer .valueOf (infoArray [1 ].trim ());
325- }
326- httpHostList .add (new HttpHost (host , port , "http" ));
327- }
328-
329- RestClientBuilder restClientBuilder = RestClient .builder (httpHostList .toArray (new HttpHost [httpHostList .size ()]));
330-
331- if (isAuthMesh ) {
332- // 进行用户和密码认证
333- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider ();
334- credentialsProvider .setCredentials (AuthScope .ANY , new UsernamePasswordCredentials (userName .trim (), password .trim ()));
335- restClientBuilder .setHttpClientConfigCallback (httpAsyncClientBuilder ->
336- httpAsyncClientBuilder .setDefaultCredentialsProvider (credentialsProvider ));
337- }
338-
339- RestHighLevelClient rhlClient = new RestHighLevelClient (restClientBuilder );
340-
341- if (LOG .isInfoEnabled ()) {
342- LOG .info ("Pinging Elasticsearch cluster via hosts {} ..." , httpHostList );
343- }
344-
345- try {
346- if (!rhlClient .ping ()) {
347- throw new RuntimeException ("There are no reachable Elasticsearch nodes!" );
348- }
349- } catch (IOException e ) {
350- LOG .warn ("" , e );
351- }
352-
353-
354- if (LOG .isInfoEnabled ()) {
355- LOG .info ("Created Elasticsearch RestHighLevelClient connected to {}" , httpHostList .toString ());
356- }
357-
358- return rhlClient ;
359-
360- }
361-
362279 public int getFetchSize () {
363280 return 1000 ;
364281 }
365-
366- private SearchSourceBuilder getSelectFromStatement (List <PredicateInfo > predicateInfoes ) {
367- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder ();
368-
369- if (predicateInfoes .size () > 0 ) {
370- BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder ();
371- for (PredicateInfo info : predicateInfoes ) {
372- boolQueryBuilder = buildFilterCondition (boolQueryBuilder , info );
373- }
374-
375- searchSourceBuilder .query (boolQueryBuilder );
376- }
377-
378- return searchSourceBuilder ;
379- }
380-
381- public BoolQueryBuilder buildFilterCondition (BoolQueryBuilder boolQueryBuilder , PredicateInfo info ) {
382- switch (info .getOperatorKind ()) {
383- case "IN" :
384- return boolQueryBuilder .must (QueryBuilders .termsQuery (textConvertToKeyword (info .getFieldName ()), removeSpaces (info .getCondition ())));
385- case "NOT_IN" :
386- return boolQueryBuilder .mustNot (QueryBuilders .termsQuery (textConvertToKeyword (info .getFieldName ()), removeSpaces (info .getCondition ())));
387- case "GREATER_THAN_OR_EQUAL" :
388- return boolQueryBuilder .must (QueryBuilders .rangeQuery (info .getFieldName ()).gte (info .getCondition ()));
389- case "GREATER_THAN" :
390- return boolQueryBuilder .must (QueryBuilders .rangeQuery (info .getFieldName ()).gt (info .getCondition ()));
391- case "LESS_THAN_OR_EQUAL" :
392- return boolQueryBuilder .must (QueryBuilders .rangeQuery (info .getFieldName ()).lte (info .getCondition ()));
393- case "LESS_THAN" :
394- return boolQueryBuilder .must (QueryBuilders .rangeQuery (info .getFieldName ()).lt (info .getCondition ()));
395- case "BETWEEN" :
396- return boolQueryBuilder .must (QueryBuilders .rangeQuery (info .getFieldName ()).gte (StringUtils .split (info .getCondition ().toUpperCase (), "AND" )[0 ].trim ())
397- .lte (StringUtils .split (info .getCondition ().toUpperCase (), "AND" )[1 ].trim ()));
398- case "IS_NULL" :
399- return boolQueryBuilder .mustNot (QueryBuilders .existsQuery (info .getFieldName ()));
400- case "IS_NOT_NULL" :
401- return boolQueryBuilder .must (QueryBuilders .existsQuery (info .getFieldName ()));
402- case "EQUALS" :
403- return boolQueryBuilder .must (QueryBuilders .termQuery (textConvertToKeyword (info .getFieldName ()), info .getCondition ()));
404- case "NOT_EQUALS" :
405- return boolQueryBuilder .mustNot (QueryBuilders .termQuery (textConvertToKeyword (info .getFieldName ()), info .getCondition ()));
406- default :
407- try {
408- throw new Exception ("elasticsearch6 does not support this operation: " + info .getOperatorName ());
409- } catch (Exception e ) {
410-
411- e .printStackTrace ();
412- LOG .error (e .getMessage ());
413- }
414- return boolQueryBuilder ;
415- }
416-
417- }
418-
419- public String [] removeSpaces (String str ) {
420- String [] split = StringUtils .split (str , "," );
421- String [] result = new String [split .length ];
422- Arrays .asList (split ).stream ().map (f -> f .trim ()).collect (Collectors .toList ()).toArray (result );
423- return result ;
424- }
425-
426- public String textConvertToKeyword (String fieldName ) {
427- String [] sideFieldNames = StringUtils .split (sideInfo .getSideSelectFields ().trim (), "," );
428- String [] sideFieldTypes = sideInfo .getSideTableInfo ().getFieldTypes ();
429- int fieldIndex = sideInfo .getSideTableInfo ().getFieldList ().indexOf (fieldName .trim ());
430- String fieldType = sideFieldTypes [fieldIndex ];
431- switch (fieldType .toLowerCase ()) {
432- case "varchar" :
433- case "char" :
434- case "text" :
435- return fieldName + KEY_WORD_TYPE ;
436- default :
437- return fieldName ;
438- }
439- }
440-
441-
442282}
0 commit comments