Skip to content

Commit 18f0c62

Browse files
修改代码规范
1 parent 9e5e3c7 commit 18f0c62

File tree

5 files changed

+19
-12
lines changed

5 files changed

+19
-12
lines changed

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/Elasticsearch6Sink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@
3232
import com.dtstack.flink.sql.sink.IStreamSinkGener;
3333
import com.dtstack.flink.sql.sink.elasticsearch6.table.ElasticsearchTableInfo;
3434
import com.dtstack.flink.sql.table.TargetTableInfo;
35+
import com.google.common.collect.Maps;
3536
import org.apache.commons.lang.StringUtils;
3637
import org.apache.http.HttpHost;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

4041
import java.util.ArrayList;
4142
import java.util.Arrays;
42-
import java.util.HashMap;
4343
import java.util.List;
4444
import java.util.Map;
4545

@@ -105,7 +105,7 @@ public TypeInformation<?>[] getFieldTypes() {
105105
private RichSinkFunction createEsSinkFunction() {
106106

107107

108-
Map<String, String> userConfig = new HashMap<>();
108+
Map<String, String> userConfig = Maps.newHashMap();
109109
userConfig.put("cluster.name", clusterName);
110110
// This instructs the sink to emit after every element, otherwise they would be buffered
111111
userConfig.put(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "" + bulkFlushMaxActions);

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/Es6Util.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.flink.types.Row;
2222
import org.apache.flink.util.Preconditions;
2323

24+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
25+
2426
import com.dtstack.flink.sql.util.DtStringUtil;
2527

2628
import java.util.HashMap;
@@ -35,7 +37,7 @@ public class Es6Util {
3537

3638
public static Map<String, Object> rowToJsonMap(Row row, List<String> fields, List<String> types) {
3739
Preconditions.checkArgument(row.getArity() == fields.size());
38-
Map<String,Object> jsonMap = new HashMap<>();
40+
Map<String,Object> jsonMap = Maps.newHashMap();
3941
int i = 0;
4042
for(; i < fields.size(); ++i) {
4143
String field = fields.get(i);
@@ -44,7 +46,8 @@ public static Map<String, Object> rowToJsonMap(Row row, List<String> fields, Lis
4446
for(int j = 0; j < parts.length - 1; ++j) {
4547
String key = parts[j];
4648
if(currMap.get(key) == null) {
47-
currMap.put(key, new HashMap<String,Object>());
49+
HashMap<String, Object> hashMap = Maps.newHashMap();
50+
currMap.put(key, hashMap);
4851
}
4952
currMap = (Map<String, Object>) currMap.get(key);
5053
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/ExtendES6ApiCallBridge.java renamed to elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/ExtendEs6ApiCallBridge.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.bulk.BackoffPolicy;
3232
import org.elasticsearch.action.bulk.BulkItemResponse;
3333
import org.elasticsearch.action.bulk.BulkProcessor;
34+
import org.elasticsearch.client.RequestOptions;
3435
import org.elasticsearch.client.RestClient;
3536
import org.elasticsearch.client.RestClientBuilder;
3637
import org.elasticsearch.client.RestHighLevelClient;
@@ -48,17 +49,17 @@
4849
* @author yinxi
4950
* @date 2020/1/9 - 15:09
5051
*/
51-
public class ExtendES6ApiCallBridge implements ElasticsearchApiCallBridge<RestHighLevelClient> {
52+
public class ExtendEs6ApiCallBridge implements ElasticsearchApiCallBridge<RestHighLevelClient> {
5253

53-
private static final Logger LOG = LoggerFactory.getLogger(ExtendES6ApiCallBridge.class);
54+
private static final Logger LOG = LoggerFactory.getLogger(ExtendEs6ApiCallBridge.class);
5455

5556
private final List<HttpHost> HttpAddresses;
5657

5758
protected ElasticsearchTableInfo es6TableInfo;
5859

59-
public ExtendES6ApiCallBridge(List<HttpHost> HttpAddresses, ElasticsearchTableInfo es6TableInfo) {
60-
Preconditions.checkArgument(HttpAddresses != null && !HttpAddresses.isEmpty());
61-
this.HttpAddresses = HttpAddresses;
60+
public ExtendEs6ApiCallBridge(List<HttpHost> httpAddresses, ElasticsearchTableInfo es6TableInfo) {
61+
Preconditions.checkArgument(httpAddresses != null && !httpAddresses.isEmpty());
62+
this.HttpAddresses = httpAddresses;
6263
this.es6TableInfo = es6TableInfo;
6364
}
6465

@@ -81,7 +82,7 @@ public RestHighLevelClient createClient(Map<String, String> clientConfig) {
8182
}
8283

8384
try{
84-
if (!rhlClient.ping()) {
85+
if (!rhlClient.ping(RequestOptions.DEFAULT)) {
8586
throw new RuntimeException("There are no reachable Elasticsearch nodes!");
8687
}
8788
} catch (IOException e){

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/MetricElasticsearch6Sink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class MetricElasticsearch6Sink<T> extends ElasticsearchSinkBase<T, RestHi
4949
public MetricElasticsearch6Sink(Map userConfig, List transportAddresses,
5050
ElasticsearchSinkFunction elasticsearchSinkFunction,
5151
ElasticsearchTableInfo es6TableInfo) {
52-
super(new ExtendES6ApiCallBridge(transportAddresses, es6TableInfo), userConfig, elasticsearchSinkFunction, new NoOpFailureHandler());
52+
super(new ExtendEs6ApiCallBridge(transportAddresses, es6TableInfo), userConfig, elasticsearchSinkFunction, new NoOpFailureHandler());
5353
this.customerSinkFunc = (CustomerSinkFunc) elasticsearchSinkFunction;
5454
this.userConfig = userConfig;
5555
}

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch6/table/ElasticsearchSinkParser.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.table.AbsTableParser;
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
24+
import org.apache.commons.lang3.StringUtils;
2425

2526
import java.util.Map;
2627

@@ -46,6 +47,8 @@ public class ElasticsearchSinkParser extends AbsTableParser {
4647

4748
private static final String KEY_ES6_PASSWORD = "password";
4849

50+
private static final String KEY_TRUE = "true";
51+
4952
@Override
5053
protected boolean fieldNameNeedsUpperCase() {
5154
return false;
@@ -63,7 +66,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
6366
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES6_TYPE.toLowerCase()));
6467

6568
String authMeshStr = (String) props.get(KEY_ES6_AUTHMESH.toLowerCase());
66-
if (authMeshStr != null && "true".equalsIgnoreCase(authMeshStr)) {
69+
if (authMeshStr != null && StringUtils.equalsIgnoreCase(KEY_TRUE, authMeshStr)) {
6770
elasticsearchTableInfo.setAuthMesh(MathUtil.getBoolean(authMeshStr));
6871
elasticsearchTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES6_USERNAME.toLowerCase())));
6972
elasticsearchTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES6_PASSWORD.toLowerCase())));

0 commit comments

Comments
 (0)