Skip to content

Commit f6a1ce6

Browse files
解决elasticsearch-side分支与v1.8.0_dev的冲突
2 parents 9742e74 + be19965 commit f6a1ce6

File tree

54 files changed

+2029
-468
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2029
-468
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
2127
import com.datastax.driver.core.Cluster;
2228
import com.datastax.driver.core.ConsistencyLevel;
2329
import com.datastax.driver.core.HostDistance;
@@ -33,15 +39,11 @@
3339
import com.dtstack.flink.sql.side.JoinInfo;
3440
import com.dtstack.flink.sql.side.SideTableInfo;
3541
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3942
import com.google.common.collect.Lists;
4043
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.runtime.types.CRow;
42-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
43-
import org.apache.flink.types.Row;
44-
import org.apache.flink.util.Collector;
44+
import org.apache.calcite.sql.JoinType;
45+
import org.apache.commons.collections.CollectionUtils;
46+
import org.apache.commons.lang3.StringUtils;
4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
4749

@@ -222,9 +224,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
222224
//重试策略
223225
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
224226

225-
for (String server : address.split(",")) {
226-
cassandraPort = Integer.parseInt(server.split(":")[1]);
227-
serversList.add(InetAddress.getByName(server.split(":")[0]));
227+
for (String server : StringUtils.split(address, ",")) {
228+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
229+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
228230
}
229231

230232
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -278,7 +280,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
278280
//load data from table
279281
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
280282
ResultSet resultSet = session.execute(sql);
281-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
283+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
282284
for (com.datastax.driver.core.Row row : resultSet) {
283285
Map<String, Object> oneRow = Maps.newHashMap();
284286
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25+
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
27+
import org.apache.flink.types.Row;
28+
2229
import com.datastax.driver.core.Cluster;
2330
import com.datastax.driver.core.ConsistencyLevel;
2431
import com.datastax.driver.core.HostDistance;
@@ -38,25 +45,19 @@
3845
import com.dtstack.flink.sql.side.cache.CacheObj;
3946
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4047
import com.google.common.base.Function;
48+
import com.google.common.collect.Lists;
4149
import com.google.common.util.concurrent.AsyncFunction;
4250
import com.google.common.util.concurrent.FutureCallback;
4351
import com.google.common.util.concurrent.Futures;
4452
import com.google.common.util.concurrent.ListenableFuture;
4553
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.runtime.types.CRow;
51-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
52-
import org.apache.flink.types.Row;
54+
import org.apache.commons.lang3.StringUtils;
5355
import org.slf4j.Logger;
5456
import org.slf4j.LoggerFactory;
5557

5658
import java.net.InetAddress;
5759
import java.sql.Timestamp;
5860
import java.util.ArrayList;
59-
import java.util.Collections;
6061
import java.util.List;
6162
import java.util.Map;
6263

@@ -134,9 +135,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
134135
//重试策略
135136
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
136137

137-
for (String server : address.split(",")) {
138-
cassandraPort = Integer.parseInt(server.split(":")[1]);
139-
serversList.add(InetAddress.getByName(server.split(":")[0]));
138+
for (String server : StringUtils.split(address, ",")) {
139+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
140141
}
141142

142143
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838

3939
package com.dtstack.flink.sql.sink.cassandra;
4040

41+
import org.apache.flink.api.common.typeinfo.TypeInformation;
42+
import org.apache.flink.api.java.tuple.Tuple;
43+
import org.apache.flink.api.java.tuple.Tuple2;
44+
import org.apache.flink.configuration.Configuration;
45+
import org.apache.flink.types.Row;
46+
4147
import com.datastax.driver.core.Cluster;
4248
import com.datastax.driver.core.ConsistencyLevel;
4349
import com.datastax.driver.core.HostDistance;
@@ -49,13 +55,10 @@
4955
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5056
import com.datastax.driver.core.policies.RetryPolicy;
5157
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
52-
import org.apache.flink.api.common.typeinfo.TypeInformation;
53-
import org.apache.flink.api.java.tuple.Tuple;
54-
import org.apache.flink.api.java.tuple.Tuple2;
55-
import org.apache.flink.configuration.Configuration;
56-
import org.apache.flink.types.Row;
58+
import org.apache.commons.lang3.StringUtils;
5759
import org.slf4j.Logger;
5860
import org.slf4j.LoggerFactory;
61+
5962
import java.io.IOException;
6063
import java.net.InetAddress;
6164
import java.sql.DriverManager;
@@ -145,9 +148,9 @@ public void open(int taskNumber, int numTasks) {
145148
//重试策略
146149
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
147150

148-
for (String server : address.split(",")) {
149-
cassandraPort = Integer.parseInt(server.split(":")[1]);
150-
serversList.add(InetAddress.getByName(server.split(":")[0]));
151+
for (String server : StringUtils.split(address, ",")) {
152+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
153+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
151154
}
152155

153156
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql;
20+
21+
import com.dtstack.flink.sql.exec.ApiResult;
22+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
23+
import com.dtstack.flink.sql.exec.ParamsInfo;
24+
import org.apache.commons.lang.exception.ExceptionUtils;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
27+
/**
28+
* local模式获取sql任务的执行计划
29+
* Date: 2020/2/17
30+
* Company: www.dtstack.com
31+
* @author maqi
32+
*/
33+
public class GetPlan {
34+
35+
public static String getExecutionPlan(String[] args) {
36+
try {
37+
long start = System.currentTimeMillis();
38+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
39+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
40+
String executionPlan = env.getExecutionPlan();
41+
long end = System.currentTimeMillis();
42+
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
43+
} catch (Exception e) {
44+
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)