Skip to content

Commit ac0979d

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_31672' into 1.10_test_4.0.x
2 parents 313b5dc + a58ddd5 commit ac0979d

File tree

12 files changed

+132
-582
lines changed

12 files changed

+132
-582
lines changed

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
2626
import com.dtstack.flink.sql.side.elasticsearch6.util.Es6Util;
27-
import com.dtstack.flink.sql.side.elasticsearch6.util.SwitchUtil;
2827
import com.google.common.collect.Lists;
2928
import com.google.common.collect.Maps;
3029
import org.apache.commons.collections.CollectionUtils;
@@ -235,7 +234,7 @@ private void loadToCache(SearchHit[] searchHits, Map<String, List<Map<String, Ob
235234
for (String fieldName : sideFieldNames) {
236235
Object object = searchHit.getSourceAsMap().get(fieldName.trim());
237236
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
238-
object = SwitchUtil.getTarget(object, sideFieldTypes[fieldIndex]);
237+
object = Es6Util.getTarget(object, sideFieldTypes[fieldIndex]);
239238
oneRow.put(fieldName.trim(), object);
240239
}
241240

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.side.elasticsearch6;
2020

21+
import com.dtstack.flink.sql.side.elasticsearch6.util.Es6Util;
2122
import com.dtstack.flink.sql.util.RowDataComplete;
2223
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2324
import org.apache.flink.configuration.Configuration;
@@ -30,8 +31,6 @@
3031
import com.dtstack.flink.sql.side.*;
3132
import com.dtstack.flink.sql.side.cache.CacheObj;
3233
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
33-
import com.dtstack.flink.sql.side.elasticsearch6.util.Es6Util;
34-
import com.dtstack.flink.sql.side.elasticsearch6.util.SwitchUtil;
3534
import com.dtstack.flink.sql.util.ParseUtils;
3635
import com.google.common.collect.Lists;
3736
import org.apache.calcite.sql.SqlNode;
@@ -110,7 +109,7 @@ public void onResponse(SearchResponse searchResponse) {
110109
if (searchHits.length < getFetchSize()) {
111110
break;
112111
}
113-
if (tableInfo == null && tmpRhlClient == null) {
112+
if (tableInfo == null) {
114113
// create new connection to fetch data
115114
tableInfo = (Elasticsearch6SideTableInfo) sideInfo.getSideTableInfo();
116115
tmpRhlClient = Es6Util.getClient(tableInfo.getAddress(), tableInfo.isAuthMesh(), tableInfo.getUserName(), tableInfo.getPassword());
@@ -201,7 +200,7 @@ public Row fillData(Row input, Object line) {
201200
if (cacheInfo == null) {
202201
row.setField(entry.getKey(), null);
203202
} else {
204-
Object object = SwitchUtil.getTarget(cacheInfo.get(sideInfo.getSideFieldNameIndex().get(entry.getKey())), fields[entry.getValue()]);
203+
Object object = Es6Util.getTarget(cacheInfo.get(sideInfo.getSideFieldNameIndex().get(entry.getKey())), fields[entry.getValue()]);
205204
row.setField(entry.getKey(), object);
206205
}
207206
}

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/table/Elasticsearch6SideParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
package com.dtstack.flink.sql.side.elasticsearch6.table;
2020

21-
import com.dtstack.flink.sql.side.elasticsearch6.util.ClassUtil;
2221
import com.dtstack.flink.sql.table.AbstractSideTableParser;
2322
import com.dtstack.flink.sql.table.AbstractTableInfo;
23+
import com.dtstack.flink.sql.util.ClassUtil;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525
import org.apache.commons.lang3.StringUtils;
2626

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/ClassUtil.java

Lines changed: 0 additions & 202 deletions
This file was deleted.

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/Es6Util.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.side.BaseSideInfo;
2222
import com.dtstack.flink.sql.side.PredicateInfo;
2323
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
24+
import com.dtstack.flink.sql.util.MathUtil;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.http.HttpHost;
2627
import org.apache.http.auth.AuthScope;
@@ -40,6 +41,7 @@
4041
import java.io.IOException;
4142
import java.util.ArrayList;
4243
import java.util.List;
44+
import java.util.Locale;
4345

4446
/**
4547
* @author yinxi
@@ -49,7 +51,7 @@ public class Es6Util {
4951

5052
private static final Logger LOG = LoggerFactory.getLogger(Es6Util.class);
5153
private static final String KEY_WORD_TYPE = ".keyword";
52-
private static final String APOSTROPHE = "\'";
54+
private static final String APOSTROPHE = "'";
5355

5456
// connect to the elasticsearch
5557
public static RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh, String userName, String password) {
@@ -60,12 +62,12 @@ public static RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh
6062
int port = 9200;
6163
String host = infoArray[0].trim();
6264
if (infoArray.length > 1) {
63-
port = Integer.valueOf(infoArray[1].trim());
65+
port = Integer.parseInt(infoArray[1].trim());
6466
}
6567
httpHostList.add(new HttpHost(host, port, "http"));
6668
}
6769

68-
RestClientBuilder restClientBuilder = RestClient.builder(httpHostList.toArray(new HttpHost[httpHostList.size()]));
70+
RestClientBuilder restClientBuilder = RestClient.builder(httpHostList.toArray(new HttpHost[0]));
6971

7072
if (isAuthMesh) {
7173
// 进行用户和密码认证
@@ -171,6 +173,9 @@ public static BoolQueryBuilder buildFilterCondition(BoolQueryBuilder boolQueryBu
171173
return boolQueryBuilder.must(QueryBuilders.existsQuery(info.getFieldName()));
172174
case "=":
173175
case "EQUALS":
176+
if (StringUtils.isBlank(info.getCondition())) {
177+
return boolQueryBuilder;
178+
}
174179
return boolQueryBuilder.must(QueryBuilders.termQuery(textConvertToKeyword(info.getFieldName(), sideInfo), removeSpaceAndApostrophe(info.getCondition())[0]));
175180
case "<>":
176181
case "NOT_EQUALS":
@@ -199,7 +204,7 @@ public static String textConvertToKeyword(String fieldName, BaseSideInfo sideInf
199204
String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes();
200205
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
201206
String fieldType = sideFieldTypes[fieldIndex];
202-
switch (fieldType.toLowerCase()) {
207+
switch (fieldType.toLowerCase(Locale.ENGLISH)) {
203208
case "varchar":
204209
case "char":
205210
case "text":
@@ -208,4 +213,59 @@ public static String textConvertToKeyword(String fieldName, BaseSideInfo sideInf
208213
return fieldName;
209214
}
210215
}
216+
217+
public static Object getTarget(Object obj, String targetType) {
218+
switch (targetType.toLowerCase(Locale.ENGLISH)) {
219+
220+
case "smallint":
221+
case "smallintunsigned":
222+
case "tinyint":
223+
case "tinyintunsigned":
224+
case "mediumint":
225+
case "mediumintunsigned":
226+
case "integer":
227+
case "int":
228+
return MathUtil.getIntegerVal(obj);
229+
230+
case "bigint":
231+
case "bigintunsigned":
232+
case "intunsigned":
233+
case "integerunsigned":
234+
return MathUtil.getLongVal(obj);
235+
236+
case "boolean":
237+
return MathUtil.getBoolean(obj);
238+
239+
case "blob":
240+
return MathUtil.getByte(obj);
241+
242+
case "varchar":
243+
case "char":
244+
case "text":
245+
return MathUtil.getString(obj);
246+
247+
case "real":
248+
case "float":
249+
case "realunsigned":
250+
case "floatunsigned":
251+
return MathUtil.getFloatVal(obj);
252+
253+
case "double":
254+
case "doubleunsigned":
255+
return MathUtil.getDoubleVal(obj);
256+
257+
case "decimal":
258+
case "decimalunsigned":
259+
return MathUtil.getBigDecimal(obj);
260+
261+
case "date":
262+
return MathUtil.getDate(obj);
263+
264+
case "timestamp":
265+
case "datetime":
266+
return MathUtil.getTimestamp(obj);
267+
default:
268+
}
269+
return obj;
270+
}
211271
}

0 commit comments

Comments
 (0)