Skip to content

Commit 77a3ff3

Browse files
committed
rename
1 parent bfa6ac0 commit 77a3ff3

File tree

6 files changed

+115
-33
lines changed

6 files changed

+115
-33
lines changed

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818

1919
package com.dtstack.flink.sql;
2020

21-
import com.dtstack.flink.sql.exec.BuildProcess;
21+
import com.dtstack.flink.sql.exec.ApiResult;
22+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
2223
import com.dtstack.flink.sql.exec.ParamsInfo;
23-
import com.fasterxml.jackson.databind.ObjectMapper;
24-
import com.fasterxml.jackson.databind.node.ObjectNode;
2524
import org.apache.commons.lang.exception.ExceptionUtils;
2625
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2726

@@ -32,28 +31,15 @@
3231
* @author maqi
3332
*/
3433
public class GetPlan {
35-
public static final String STATUS_KEY = "status";
36-
public static final String MSG_KEY = "msg";
37-
public static final Integer FAIL = 0;
38-
public static final Integer SUCCESS = 1;
39-
40-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
4134

4235
public static String getExecutionPlan(String[] args) {
4336
try {
44-
ParamsInfo paramsInfo = BuildProcess.parseParams(args);
45-
StreamExecutionEnvironment env = BuildProcess.getStreamExecution(paramsInfo);
37+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
38+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4639
String executionPlan = env.getExecutionPlan();
47-
return getJsonStr(SUCCESS, executionPlan);
40+
return ApiResult.createSuccessResultJsonStr(executionPlan);
4841
} catch (Exception e) {
49-
return getJsonStr(FAIL, ExceptionUtils.getFullStackTrace(e));
42+
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
5043
}
5144
}
52-
53-
public static String getJsonStr(int status, String msg) {
54-
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
55-
objectNode.put(STATUS_KEY, status);
56-
objectNode.put(MSG_KEY, msg);
57-
return objectNode.toString();
58-
}
5945
}

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
package com.dtstack.flink.sql;
2121

2222

23-
import com.dtstack.flink.sql.exec.BuildProcess;
23+
import com.dtstack.flink.sql.exec.ExecuteProcessHelper;
2424
import com.dtstack.flink.sql.exec.ParamsInfo;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729

2830
/**
@@ -32,10 +34,12 @@
3234
*/
3335

3436
public class Main {
37+
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
3538

3639
public static void main(String[] args) throws Exception {
37-
ParamsInfo paramsInfo = BuildProcess.parseParams(args);
38-
StreamExecutionEnvironment env = BuildProcess.getStreamExecution(paramsInfo);
40+
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
41+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
3942
env.execute(paramsInfo.getName());
43+
LOG.info("program {} execution success", paramsInfo.getName());
4044
}
4145
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.dtstack.flink.sql.exec;
2+
3+
import org.codehaus.jackson.map.ObjectMapper;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.util.UUID;
8+
9+
/**
10+
* API调用结果返回
11+
* Date: 2020/2/24
12+
* Company: www.dtstack.com
13+
* @author maqi
14+
*/
15+
public class ApiResult {
16+
17+
private static final Logger LOG = LoggerFactory.getLogger(ApiResult.class);
18+
private static final ObjectMapper objectMapper = new ObjectMapper();
19+
20+
public static final Integer FAIL = 0;
21+
public static final Integer SUCCESS = 1;
22+
23+
private int code;
24+
private Object data;
25+
private String errorMsg;
26+
27+
public ApiResult() {
28+
}
29+
30+
public static String createSuccessResultJsonStr(String message) {
31+
ApiResult apiResult = createSuccessResult(SUCCESS, message);
32+
String result;
33+
try {
34+
result = objectMapper.writeValueAsString(apiResult);
35+
} catch (Exception e) {
36+
LOG.error("", e);
37+
result = "code:" + SUCCESS + ",message:" + message;
38+
}
39+
return result;
40+
}
41+
42+
public static ApiResult createSuccessResult(int code, String message) {
43+
ApiResult apiResult = new ApiResult();
44+
apiResult.setCode(code);
45+
apiResult.setData(message);
46+
return apiResult;
47+
}
48+
49+
public static String createErrorResultJsonStr(String message) {
50+
ApiResult apiResult = createErrorResult(message, FAIL);
51+
String result;
52+
try {
53+
result = objectMapper.writeValueAsString(apiResult);
54+
} catch (Exception e) {
55+
LOG.error("", e);
56+
result = "code:" + FAIL + ",message:" + message;
57+
}
58+
return result;
59+
}
60+
61+
public static ApiResult createErrorResult(String errMsg, int code) {
62+
ApiResult apiResult = new ApiResult();
63+
apiResult.setCode(code);
64+
apiResult.setErrorMsg(errMsg);
65+
return apiResult;
66+
}
67+
68+
public int getCode() {
69+
return code;
70+
}
71+
72+
public void setCode(int code) {
73+
this.code = code;
74+
}
75+
76+
public Object getData() {
77+
return data;
78+
}
79+
80+
public void setData(Object data) {
81+
this.data = data;
82+
}
83+
84+
public String getErrorMsg() {
85+
return errorMsg;
86+
}
87+
88+
public void setErrorMsg(String errorMsg) {
89+
this.errorMsg = errorMsg;
90+
}
91+
}

core/src/main/java/com/dtstack/flink/sql/exec/BuildProcess.java renamed to core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,15 @@
7676
import java.util.Set;
7777

7878
/**
79-
* 提取任务执行时共同的流程方法
79+
* 任务执行时的流程方法
8080
* Date: 2020/2/17
8181
* Company: www.dtstack.com
8282
* @author maqi
8383
*/
84-
public class BuildProcess {
84+
public class ExecuteProcessHelper {
8585

8686
private static final String CLASS_FILE_NAME_FMT = "class_path_%d";
87-
private static final Logger LOG = LoggerFactory.getLogger(BuildProcess.class);
87+
private static final Logger LOG = LoggerFactory.getLogger(ExecuteProcessHelper.class);
8888
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
8989

9090

@@ -125,7 +125,7 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
125125
}
126126

127127
public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception {
128-
StreamExecutionEnvironment env = BuildProcess.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
128+
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
129129
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
130130
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, paramsInfo.getConfProp());
131131

@@ -136,14 +136,14 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
136136
Map<String, Table> registerTableCache = Maps.newHashMap();
137137

138138
//register udf
139-
BuildProcess.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
139+
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
140140
//register table schema
141-
Set<URL> classPathSets = BuildProcess.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
141+
Set<URL> classPathSets = ExecuteProcessHelper.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
142142
paramsInfo.getRemoteSqlPluginPath(), paramsInfo.getPluginLoadMode(), sideTableMap, registerTableCache);
143143
// cache classPathSets
144-
BuildProcess.registerPluginUrlToCachedFile(env, classPathSets);
144+
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
145145

146-
BuildProcess.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
146+
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
147147

148148
if (env instanceof MyLocalStreamEnvironment) {
149149
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public String toString() {
9999
}
100100

101101
public String convertJarUrlListToString(List<URL> jarUrlList) {
102-
return jarUrlList.stream().map(URL::toString).reduce((pre, last) -> pre + last).get();
102+
return jarUrlList.stream().map(URL::toString).reduce((pre, last) -> pre + last).orElse("");
103103
}
104104

105105
public static ParamsInfo.Builder builder() {

core/src/test/java/com/dtstack/flink/sql/TestGetPlan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql;
2020

2121
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.exec.ApiResult;
2223
import com.fasterxml.jackson.databind.ObjectMapper;
2324
import com.fasterxml.jackson.databind.node.ObjectNode;
2425
import org.junit.Assert;
@@ -69,7 +70,7 @@ public void testGetExecutionPlan() throws Exception {
6970
String jsonStr = (String) getExecutionPlan.invoke(aClass.newInstance(), (Object)params);
7071

7172
ObjectNode jsonNodes = OBJECT_MAPPER.readValue(jsonStr, ObjectNode.class);
72-
Assert.assertEquals(jsonNodes.get(GetPlan.STATUS_KEY).asLong(), GetPlan.SUCCESS.longValue());
73+
Assert.assertEquals(jsonNodes.get("code").asLong(), ApiResult.SUCCESS.longValue());
7374
}
7475

7576

0 commit comments

Comments
 (0)