Skip to content

Commit 9742e74

Browse files
增加一些注释
1 parent 97ce51b commit 9742e74

File tree

3 files changed

+14
-3
lines changed

3 files changed

+14
-3
lines changed

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ protected void initCache() throws SQLException {
159159
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
160160
cacheRef.set(newCache);
161161
try {
162+
// create search request and build where cause
162163
searchRequest = Es6Util.setSearchRequest(sideInfo);
163164
boolQueryBuilder = Es6Util.setPredicateclause(sideInfo);
164165
loadData(newCache);
@@ -217,6 +218,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
217218
}
218219
}
219220

221+
// initialize searchSourceBuilder
220222
private SearchSourceBuilder initConfiguration(BoolQueryBuilder boolQueryBuilder){
221223
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
222224
if (boolQueryBuilder != null) {
@@ -225,6 +227,8 @@ private SearchSourceBuilder initConfiguration(BoolQueryBuilder boolQueryBuilder)
225227

226228
searchSourceBuilder.size(getFetchSize());
227229
searchSourceBuilder.sort("_id", SortOrder.DESC);
230+
231+
// fields included in the source data
228232
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");
229233
searchSourceBuilder.fetchSource(sideFieldNames, null);
230234
return searchSourceBuilder;
@@ -240,6 +244,7 @@ private void searchData(SearchSourceBuilder searchSourceBuilder, Map<String, Lis
240244
while (true) {
241245
try {
242246
if (searchAfterParameter != null) {
247+
// set search mark
243248
searchSourceBuilder.searchAfter(searchAfterParameter);
244249
}
245250
searchRequest.source(searchSourceBuilder);
@@ -258,6 +263,7 @@ private void searchData(SearchSourceBuilder searchSourceBuilder, Map<String, Lis
258263
}
259264
}
260265

266+
// data load to cache
261267
private void loadToCache(SearchHit[] searchHits, Map<String, List<Map<String, Object>>> tmpCache) {
262268
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");
263269
String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes();

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,12 @@ public void onResponse(SearchResponse searchResponse) {
136136
try {
137137
while (true) {
138138
loadDataToCache(searchHits, rowList, cacheContent, copyCrow);
139+
// determine if all results haven been ferched
139140
if (searchHits.length < getFetchSize()) {
140141
break;
141142
}
142143
if (tableInfo == null && tmpRhlClient == null) {
144+
// create new connection to fetch data
143145
tableInfo = (Elasticsearch6SideTableInfo) sideInfo.getSideTableInfo();
144146
tmpRhlClient = Es6Util.getClient(tableInfo.getAddress(), tableInfo.isAuthMesh(), tableInfo.getUserName(), tableInfo.getPassword());
145147
}
@@ -187,7 +189,6 @@ private void loadDataToCache(SearchHit[] searchHits, List<CRow> rowList, List<Ob
187189
rowList.addAll(getRows(copyCrow, cacheContent, results));
188190
}
189191

190-
191192
protected List<CRow> getRows(CRow inputRow, List<Object> cacheContent, List<Object> results) {
192193
List<CRow> rowList = Lists.newArrayList();
193194
for (Object line : results) {
@@ -257,7 +258,6 @@ private SearchSourceBuilder initConfiguration() {
257258
return searchSourceBuilder;
258259
}
259260

260-
261261
private BoolQueryBuilder setInputParams(List<Object> inputParams, BoolQueryBuilder boolQueryBuilder) {
262262
if (boolQueryBuilder == null) {
263263
boolQueryBuilder = new BoolQueryBuilder();
@@ -273,7 +273,6 @@ private BoolQueryBuilder setInputParams(List<Object> inputParams, BoolQueryBuild
273273
return boolQueryBuilder;
274274
}
275275

276-
277276
public int getFetchSize() {
278277
return 1000;
279278
}

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/Es6Util.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class Es6Util {
5151
private static final String KEY_WORD_TYPE = ".keyword";
5252
private static final String APOSTROPHE = "\'";
5353

54+
// connect to the elasticsearch
5455
public static RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh, String userName, String password) {
5556
List<HttpHost> httpHostList = new ArrayList<>();
5657
String[] address = StringUtils.split(esAddress, ",");
@@ -96,6 +97,7 @@ public static RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh
9697
return rhlClient;
9798
}
9899

100+
// add index and type to search request
99101
public static SearchRequest setSearchRequest(SideInfo sideInfo) {
100102
SearchRequest searchRequest = new SearchRequest();
101103
Elasticsearch6SideTableInfo tableInfo = (Elasticsearch6SideTableInfo) sideInfo.getSideTableInfo();
@@ -126,6 +128,7 @@ public static SearchRequest setSearchRequest(SideInfo sideInfo) {
126128
return searchRequest;
127129
}
128130

131+
// build where cause
129132
public static BoolQueryBuilder setPredicateclause(SideInfo sideInfo) {
130133

131134
BoolQueryBuilder boolQueryBuilder = null;
@@ -140,6 +143,7 @@ public static BoolQueryBuilder setPredicateclause(SideInfo sideInfo) {
140143
return boolQueryBuilder;
141144
}
142145

146+
// build filter condition
143147
public static BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBuilder, PredicateInfo info, SideInfo sideInfo) {
144148
switch (info.getOperatorKind()) {
145149
case "IN":
@@ -184,6 +188,7 @@ public static BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBu
184188

185189
}
186190

191+
// remove extra spaces and apostrophes
187192
public static String[] removeSpaceAndApostrophe(String str) {
188193
String[] split = StringUtils.split(str, ",");
189194
for (int i = 0; i < split.length; i++) {
@@ -196,6 +201,7 @@ public static String[] removeSpaceAndApostrophe(String str) {
196201
return split;
197202
}
198203

204+
// prevent word segmentation
199205
public static String textConvertToKeyword(String fieldName, SideInfo sideInfo) {
200206
String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes();
201207
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());

0 commit comments

Comments
 (0)