Skip to content

Commit ba97be2

Browse files
修改代码规范
1 parent 14fa4a6 commit ba97be2

File tree

16 files changed

+62
-926
lines changed

16 files changed

+62
-926
lines changed

docs/elasticsearch6Side

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
|参数名称|含义|是否必填|默认值|
4242
|----|---|---|----|
43-
type|表明 输出表类型[mysq|hbase|elasticsearch6]|是||
43+
type|表明 输出表类型[elasticsearch6]|是||
4444
|address | 连接ES Transport地址(tcp地址)|是||
4545
|cluster | ES 集群名称 |是||
4646
|index | 选择的ES上的index名称|否||

docs/elasticsearch6Sink.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ CREATE TABLE tableName(
2727
## 4.参数:
2828
|参数名称|含义|是否必填|默认值|
2929
|----|---|---|----|
30-
|type|表明 输出表类型[mysq|hbase|elasticsearch]|||
30+
|type|表明 输出表类型[elasticsearch6]|||
3131
|address | 连接ES Transport地址(tcp地址)|||
3232
|cluster | ES 集群名称 |||
3333
|index | 选择的ES上的index名称|||
34-
|estype | 选择ES上的type名称|||
34+
|esType | 选择ES上的type名称|||
3535
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
3636
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
3737
|authMesh | 是否进行用户名密码认证 || false|

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.slf4j.Logger;
4848
import org.slf4j.LoggerFactory;
4949

50+
import java.io.IOException;
5051
import java.io.Serializable;
5152
import java.sql.Timestamp;
5253
import java.util.List;
@@ -121,6 +122,35 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
121122

122123
searchRequest.source(searchSourceBuilder);
123124

125+
126+
// List<Object> cacheContent = Lists.newArrayList();
127+
// List<CRow> rowList = Lists.newArrayList();
128+
// ActionListener<SearchResponse> searchActionListener = new ActionListener<SearchResponse>() {
129+
// @Override
130+
// public void onResponse(SearchResponse searchResponse) {
131+
//
132+
// SearchHit[] searchHits = searchResponse.getHits().getHits();
133+
// while (searchHits != null && searchHits.length > 0){
134+
// loadDataToCache(searchHits, rowList, cacheContent, copyCrow);
135+
// if (searchHits.length < getFetchSize()) {
136+
// break;
137+
// }
138+
//
139+
// Object[] searchAfterParameter = searchHits[searchHits.length - 1].getSortValues();
140+
// searchSourceBuilder.searchAfter(searchAfterParameter);
141+
// searchRequest.source(searchSourceBuilder);
142+
// SearchResponse newSearchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
143+
// searchHits = newSearchResponse.getHits().getHits();
144+
//
145+
// }
146+
// }
147+
//
148+
// @Override
149+
// public void onFailure(Exception e) {
150+
//
151+
// }
152+
// };
153+
124154
// 异步查询数据
125155
rhlClient.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
126156

@@ -132,36 +162,43 @@ public void onResponse(SearchResponse searchResponse) {
132162
List<CRow> rowList = Lists.newArrayList();
133163
SearchHit[] searchHits = searchResponse.getHits().getHits();
134164
if (searchHits.length > 0) {
135-
try{
136-
loadDataToCache(searchHits, rowList, cacheContent, copyCrow);
165+
Elasticsearch6SideTableInfo tableInfo = null;
166+
RestHighLevelClient tmpRhlClient = null;
167+
try {
137168
while (true) {
169+
loadDataToCache(searchHits, rowList, cacheContent, copyCrow);
138170
if (searchHits.length < getFetchSize()) {
139171
break;
140172
}
141-
BoolQueryBuilder newBoolQueryBuilder = Es6Util.setPredicateclause(sideInfo);
142-
newBoolQueryBuilder = setInputParams(inputParams, newBoolQueryBuilder);
173+
if (tableInfo == null && tmpRhlClient == null) {
174+
tableInfo = (Elasticsearch6SideTableInfo) sideInfo.getSideTableInfo();
175+
tmpRhlClient = Es6Util.getClient(tableInfo.getAddress(), tableInfo.isAuthMesh(), tableInfo.getUserName(), tableInfo.getPassword());
176+
}
143177
Object[] searchAfterParameter = searchHits[searchHits.length - 1].getSortValues();
144-
SearchSourceBuilder newSearchSourceBuilder = initConfiguration();
145-
newSearchSourceBuilder.searchAfter(searchAfterParameter);
146-
newSearchSourceBuilder.query(newBoolQueryBuilder);
147-
searchRequest.source(newSearchSourceBuilder);
148-
searchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
178+
searchSourceBuilder.searchAfter(searchAfterParameter);
179+
searchRequest.source(searchSourceBuilder);
180+
searchResponse = tmpRhlClient.search(searchRequest, RequestOptions.DEFAULT);
149181
searchHits = searchResponse.getHits().getHits();
150-
loadDataToCache(searchHits, rowList, cacheContent, copyCrow);
151182
}
152183
dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
153184
resultFuture.complete(rowList);
154185
} catch (Exception e) {
155186
dealFillDataError(resultFuture, e, copyCrow);
187+
} finally {
188+
if (tmpRhlClient != null) {
189+
try {
190+
tmpRhlClient.close();
191+
} catch (IOException e) {
192+
LOG.warn("Failed to shut down tmpRhlClient.", e);
193+
}
194+
}
156195
}
157196
} else {
158197
dealMissKey(copyCrow, resultFuture);
159198
dealCacheData(key, CacheMissVal.getMissKeyObj());
160199
}
161200
}
162201

163-
164-
165202
// 响应失败处理
166203
@Override
167204
public void onFailure(Exception e) {

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/table/Elasticsearch6SideParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class Elasticsearch6SideParser extends AbsSideTableParser {
4646

4747
private static final String KEY_ES6_PASSWORD = "password";
4848

49+
private static final String KEY_TRUE = "true";
50+
4951

5052
@Override
5153
protected boolean fieldNameNeedsUpperCase() {
@@ -64,7 +66,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
6466
elasticsearch6SideTableInfo.setEsType((String) props.get(KEY_ES6_TYPE.toLowerCase()));
6567

6668
String authMeshStr = (String) props.get(KEY_ES6_AUTHMESH.toLowerCase());
67-
if (authMeshStr != null && StringUtils.equalsIgnoreCase("true", authMeshStr)) {
69+
if (authMeshStr != null && StringUtils.equalsIgnoreCase(KEY_TRUE, authMeshStr)) {
6870
elasticsearch6SideTableInfo.setAuthMesh(MathUtil.getBoolean(authMeshStr));
6971
elasticsearch6SideTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES6_USERNAME.toLowerCase())));
7072
elasticsearch6SideTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES6_PASSWORD.toLowerCase())));

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/ClassUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.side.elasticsearch6.util;
2221

@@ -31,6 +30,7 @@
3130
* Reason: TODO ADD REASON(可选)
3231
* Date: 2017年03月10日 下午1:16:37
3332
* Company: www.dtstack.com
33+
*
3434
* @author sishu.yss
3535
*/
3636
public class ClassUtil {
@@ -89,10 +89,10 @@ public static Class<?> stringConvertClass(String str) {
8989
case "decimal":
9090
case "decimalunsigned":
9191
return BigDecimal.class;
92-
92+
default:
93+
throw new RuntimeException("不支持 " + str + " 类型");
9394
}
9495

95-
throw new RuntimeException("不支持 " + str + " 类型");
9696
}
9797

9898
public static Object convertType(Object field, String fromType, String toType) {

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/Es6Util.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.http.client.CredentialsProvider;
2929
import org.apache.http.impl.client.BasicCredentialsProvider;
3030
import org.elasticsearch.action.search.SearchRequest;
31+
import org.elasticsearch.client.RequestOptions;
3132
import org.elasticsearch.client.RestClient;
3233
import org.elasticsearch.client.RestClientBuilder;
3334
import org.elasticsearch.client.RestHighLevelClient;
@@ -80,7 +81,7 @@ public static RestHighLevelClient getClient(String esAddress, Boolean isAuthMesh
8081
}
8182

8283
try {
83-
if (!rhlClient.ping()) {
84+
if (!rhlClient.ping(RequestOptions.DEFAULT)) {
8485
throw new RuntimeException("There are no reachable Elasticsearch nodes!");
8586
}
8687
} catch (IOException e) {

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/util/SwitchUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public static Object getTarget(Object obj, String targetType) {
7777
case "timestamp":
7878
case "datetime":
7979
return MathUtil.getTimestamp(obj);
80+
default:
81+
return obj;
8082
}
81-
return obj;
8283
}
8384
}

elasticsearch6/elasticsearch6-sink/pom.xml

Lines changed: 0 additions & 126 deletions
This file was deleted.

0 commit comments

Comments
 (0)