Skip to content

Commit af85629

Browse files
committed
remote plugin path not required
1 parent 77a3ff3 commit af85629

File tree

5 files changed

+39
-11
lines changed

5 files changed

+39
-11
lines changed

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ public class GetPlan {
3434

3535
public static String getExecutionPlan(String[] args) {
3636
try {
37+
long start = System.currentTimeMillis();
3738
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
3839
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
3940
String executionPlan = env.getExecutionPlan();
40-
return ApiResult.createSuccessResultJsonStr(executionPlan);
41+
long end = System.currentTimeMillis();
42+
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
4143
} catch (Exception e) {
4244
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
4345
}

core/src/main/java/com/dtstack/flink/sql/exec/ApiResult.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,25 @@
1515
public class ApiResult {
1616

1717
private static final Logger LOG = LoggerFactory.getLogger(ApiResult.class);
18-
private static final ObjectMapper objectMapper = new ObjectMapper();
18+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
1919

2020
public static final Integer FAIL = 0;
2121
public static final Integer SUCCESS = 1;
2222

2323
private int code;
24+
private long space;
2425
private Object data;
2526
private String errorMsg;
2627

2728
public ApiResult() {
2829
}
2930

30-
public static String createSuccessResultJsonStr(String message) {
31+
public static String createSuccessResultJsonStr(String message,long space) {
3132
ApiResult apiResult = createSuccessResult(SUCCESS, message);
33+
apiResult.setSpace(space);
3234
String result;
3335
try {
34-
result = objectMapper.writeValueAsString(apiResult);
36+
result = OBJECT_MAPPER.writeValueAsString(apiResult);
3537
} catch (Exception e) {
3638
LOG.error("", e);
3739
result = "code:" + SUCCESS + ",message:" + message;
@@ -50,7 +52,7 @@ public static String createErrorResultJsonStr(String message) {
5052
ApiResult apiResult = createErrorResult(message, FAIL);
5153
String result;
5254
try {
53-
result = objectMapper.writeValueAsString(apiResult);
55+
result = OBJECT_MAPPER.writeValueAsString(apiResult);
5456
} catch (Exception e) {
5557
LOG.error("", e);
5658
result = "code:" + FAIL + ",message:" + message;
@@ -88,4 +90,12 @@ public String getErrorMsg() {
8890
public void setErrorMsg(String errorMsg) {
8991
this.errorMsg = errorMsg;
9092
}
93+
94+
public long getSpace() {
95+
return space;
96+
}
97+
98+
public void setSpace(long space) {
99+
this.space = space;
100+
}
91101
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.config.CalciteConfig;
2323
import com.dtstack.flink.sql.enums.ClusterMode;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2526
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
2627
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
2728
import com.dtstack.flink.sql.function.FunctionManager;
@@ -43,13 +44,15 @@
4344
import com.dtstack.flink.sql.util.PluginUtil;
4445
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
4546
import com.fasterxml.jackson.databind.ObjectMapper;
47+
import com.google.common.base.Preconditions;
4648
import com.google.common.base.Strings;
4749
import com.google.common.collect.Lists;
4850
import com.google.common.collect.Maps;
4951
import com.google.common.collect.Sets;
5052
import org.apache.calcite.sql.SqlInsert;
5153
import org.apache.calcite.sql.SqlNode;
5254
import org.apache.commons.io.Charsets;
55+
import org.apache.commons.lang3.StringUtils;
5356
import org.apache.flink.api.common.typeinfo.TypeInformation;
5457
import org.apache.flink.api.java.tuple.Tuple2;
5558
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -106,6 +109,8 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
106109
String pluginLoadMode = options.getPluginLoadMode();
107110
String deployMode = options.getMode();
108111

112+
Preconditions.checkArgument(checkRemoteSqlPluginPath(remoteSqlPluginPath, deployMode, pluginLoadMode),
113+
"Non-local mode or shipfile deployment mode, remoteSqlPluginPath is required");
109114
String confProp = URLDecoder.decode(options.getConfProp(), Charsets.UTF_8.toString());
110115
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
111116

@@ -124,6 +129,22 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
124129

125130
}
126131

132+
/**
133+
* 非local模式或者shipfile部署模式,remoteSqlPluginPath必填
134+
* @param remoteSqlPluginPath
135+
* @param deployMode
136+
* @param pluginLoadMode
137+
* @return
138+
*/
139+
public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, String deployMode, String pluginLoadMode) {
140+
if (StringUtils.isEmpty(remoteSqlPluginPath)) {
141+
return StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.SHIPFILE.name())
142+
|| StringUtils.equalsIgnoreCase(deployMode, ClusterMode.local.name());
143+
}
144+
return true;
145+
}
146+
147+
127148
public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception {
128149
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
129150
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class Options {
4848
@OptionRequired(required = true,description = "Sql local plugin root")
4949
private String localSqlPluginPath;
5050

51-
@OptionRequired(required = true,description = "Sql remote plugin root")
51+
@OptionRequired(required = false,description = "Sql remote plugin root")
5252
private String remoteSqlPluginPath ;
5353

5454
@OptionRequired(description = "sql ext jar,eg udf jar")

core/src/test/java/com/dtstack/flink/sql/TestGetPlan.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,10 @@ public class TestGetPlan {
4646
@Test
4747
public void testGetExecutionPlan() throws Exception {
4848
List<URL> urls = new ArrayList<URL>();
49-
urls.addAll(getJarUrl("/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins/"));
5049
urls.addAll(getJarUrl("/Users/maqi/tmp/flink/flink-1.8.1/lib"));
5150

5251
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
5352
DtClassLoader childClassLoader = new DtClassLoader(urls.toArray(new URL[urls.size()]), parentClassLoader);
54-
55-
Thread.currentThread().setContextClassLoader(childClassLoader);
5653
Class<?> aClass = childClassLoader.loadClass("com.dtstack.flink.sql.GetPlan");
5754

5855
String[] params = {
@@ -62,8 +59,6 @@ public void testGetExecutionPlan() throws Exception {
6259
"%0d%0aCREATE+TABLE+MyTable(%0d%0a++++id+INT%2c%0d%0a++++channel+VARCHAR%2c%0d%0a++++pv+varchar%2c%0d%0a++++xctime+varchar%2c%0d%0a++++name+varchar%0d%0a+)WITH(%0d%0a++++type+%3d%27kafka11%27%2c%0d%0a++++bootstrapServers+%3d%27172.16.8.107%3a9092%27%2c%0d%0a++++zookeeperQuorum+%3d%27172.16.8.107%3a2181%2fkafka%27%2c%0d%0a++++offsetReset+%3d%27latest%27%2c%0d%0a++++topic+%3d%27mqTest02%27%2c%0d%0a++++timezone%3d%27Asia%2fShanghai%27%2c%0d%0a++++topicIsPattern+%3d%27false%27%2c%0d%0a++++parallelism+%3d%271%27%0d%0a+)%3b%0d%0a%0d%0a%0d%0aCREATE+TABLE+MyTable2(%0d%0a++++id2+INT%2c%0d%0a++++channel2+VARCHAR%2c%0d%0a++++pv2+varchar%2c%0d%0a++++xctime2+varchar%2c%0d%0a++++name2+varchar%0d%0a+)WITH(%0d%0a++++type+%3d%27kafka11%27%2c%0d%0a++++bootstrapServers+%3d%27172.16.8.107%3a9092%27%2c%0d%0a++++zookeeperQuorum+%3d%27172.16.8.107%3a2181%2fkafka%27%2c%0d%0a++++offsetReset+%3d%27latest%27%2c%0d%0a++++topic+%3d%27mqTest03%27%2c%0d%0a++++timezone%3d%27Asia%2fShanghai%27%2c%0d%0a++++topicIsPattern+%3d%27false%27%2c%0d%0a++++parallelism+%3d%271%27%0d%0a+)%3b%0d%0a%0d%0a%0d%0aCREATE+TABLE+sideTableA(%0d%0a++++id1+INT%2c%0d%0a++++channel1+varchar%2c%0d%0a++++time_info+varchar%2c%0d%0a++++name1+varchar%2c%0d%0a++++PRIMARY+KEY(channel1)+%2c%0d%0a++++PERIOD+FOR+SYSTEM_TIME%0d%0a+)WITH(%0d%0a++++type+%3d%27mysql%27%2c%0d%0a++++url+%3d%27jdbc%3amysql%3a%2f%2f172.16.8.109%3a3306%2ftest%27%2c%0d%0a++++userName+%3d%27dtstack%27%2c%0d%0a++++password+%3d%27abc123%27%2c%0d%0a++++tableName+%3d%27dimA%27%2c%0d%0a++++parallelism+%3d%271%27%2c%0d%0a++++cache+%3d+%27LRU%27%0d%0a%0d%0a+)%3b%0d%0a%0d%0a+CREATE+TABLE+sideTableB(%0d%0a++++id+INT%2c%0d%0a++++channel1+varchar%2c%0d%0a++++address+varchar%2c%0d%0a++++PRIMARY+KEY(channel1)%2c%0d%0a++++PERIOD+FOR+SYSTEM_TIME%0d%0a+)WITH(%0d%0a++++type+%3d%27mysql%27%2c%0d%0a++++url+%3d%27jdbc%3amysql%3a%2f%2f172.16.8.109%3a3306%2ftest%27%2c%0d%0a++++userName+%3d%27dtstack%27%2c%0d%0a++++password+%3d%27abc123%27%2c%0d%0a++++tableName+%3d%27dimB%27%2c%0d%0a++++parallelism+%3d%271%27%2c%0d%0a++++cache+%3d+%27LRU%27%2c%0d%0a++++asyncTimeout+%3d+%271000000%27%0d%0a+)%3b%0d%0a%0d%0aCREATE+TABLE+MyResult(%0d%0a++++xctime+VARCHAR%2c%0d%0a++++name+VARCHAR%2c%0d%0a++++time_info+VARCHAR%2c%0d%0a++++address+VARCHAR%0d%0a+)WITH(%0d%0a+++++type+%3d%27mysql%27%2c%0d%0a++++url+%3d%27jdbc%3amysql%3a%2f%2f172.16.8.109%3a3306%2ftest%27%2c%0d%0a++++userName+%3d%27dtstack%27%2c%0d%0a++++password+%3d%27abc123%27%2c%0d%0a++++tableName+%3d%27dimC%27%2c%0d%0a++++parallelism+%3d%271%27%2c%0d%0a+)%3b%0d%0a%0d%0ainsert+%0d%0ainto%0d%0a++MyResult%0d%0a++++select%0d%0a++++++++t1.xctime+as+xctime%2c%0d%0a++++++++t1.name+as+name%2c%0d%0a++++++++t3.channel1+as+time_info%2c%0d%0a++++++++t3.address+as+address+++++%0d%0a++++from%0d%0a++++++++MyTable+as+t1%0d%0a++++left+join%0d%0a++++++++--MyTable2+m2+%0d%0a++++++++sideTableA+m2+++++++++++++++++++++++++++%0d%0a++++++++on++t1.channel+%3d+m2.channel1%0d%0a++++left+join+sideTableB+t3%0d%0a++++++++on+t1.channel+%3d+t3.channel1%0d%0a%0d%0a%0d%0a",
6360
"-localSqlPluginPath",
6461
"/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins",
65-
"-remoteSqlPluginPath",
66-
"/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins",
6762
"-name","test"};
6863

6964
Method getExecutionPlan = aClass.getMethod("getExecutionPlan", String[].class);

0 commit comments

Comments
 (0)