Skip to content

Commit 53f4f40

Browse files
committed
添加log
1 parent 1fc0d53 commit 53f4f40

File tree

5 files changed

+56
-44
lines changed

5 files changed

+56
-44
lines changed

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Field;
29+
import java.nio.charset.StandardCharsets;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.io.File;
@@ -104,8 +105,8 @@ public List<String> getProgramExeArgList() throws Exception {
104105
continue;
105106
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
106107
File file = new File(value.toString());
107-
String content = FileUtils.readFile(file, "UTF-8");
108-
value = URLEncoder.encode(content, Charsets.UTF_8.name());
108+
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
109+
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
109110
}
110111
args.add("-" + key);
111112
args.add(value.toString());

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
9696
YarnClient yarnClient = YarnClient.createYarnClient();
9797
yarnClient.init(yarnConf);
9898
yarnClient.start();
99-
ApplicationId applicationId = null;
99+
ApplicationId applicationId;
100100

101101
String yarnSessionConf = launcherOptions.getYarnSessionConf();
102102
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.launcher;
2221

23-
import com.aiweiergou.tool.logger.api.ChangeLogLevelProcess;
22+
//import com.aiweiergou.tool.logger.api.ChangeLogLevelProcess;
23+
2424
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2525
import com.google.common.collect.Lists;
2626
import com.alibaba.fastjson.JSON;
@@ -41,13 +41,16 @@
4141
import org.apache.flink.configuration.GlobalConfiguration;
4242
import org.apache.flink.runtime.jobgraph.JobGraph;
4343
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
4446

4547
import java.io.BufferedReader;
4648
import java.io.File;
4749
import java.io.FileInputStream;
4850
import java.io.IOException;
4951
import java.io.InputStreamReader;
5052
import java.net.URLDecoder;
53+
import java.nio.charset.StandardCharsets;
5154
import java.util.LinkedList;
5255
import java.util.List;
5356
import java.util.Map;
@@ -56,6 +59,7 @@
5659
/**
5760
* Date: 2017/2/20
5861
* Company: www.dtstack.com
62+
*
5963
* @author xuchao
6064
*/
6165

@@ -64,15 +68,16 @@ public class LauncherMain {
6468

6569
private static String SP = File.separator;
6670

71+
private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);
72+
6773

6874
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
6975
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
70-
String corePath = localSqlRootJar + SP + jarPath;
71-
return corePath;
76+
return localSqlRootJar + SP + jarPath;
7277
}
7378

7479
public static void main(String[] args) throws Exception {
75-
if (args.length == 1 && args[0].endsWith(".json")){
80+
if (args.length == 1 && args[0].endsWith(".json")) {
7681
args = parseJson(args);
7782
}
7883
OptionParser optionParser = new OptionParser(args);
@@ -84,27 +89,31 @@ public static void main(String[] args) throws Exception {
8489
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
8590
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
8691

87-
if(mode.equals(ClusterMode.local.name())) {
88-
String[] localArgs = argList.toArray(new String[argList.size()]);
92+
// LOG.info("current mode is {}", mode);
93+
System.out.println("current mode is " + mode);
94+
95+
if (mode.equals(ClusterMode.local.name())) {
96+
String[] localArgs = argList.toArray(new String[0]);
8997
Main.main(localArgs);
9098
return;
9199
}
92100

93101
String pluginRoot = launcherOptions.getLocalSqlPluginPath();
94102
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
95-
String[] remoteArgs = argList.toArray(new String[argList.size()]);
103+
String[] remoteArgs = argList.toArray(new String[0]);
96104
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
97105

98106
String savePointPath = confProperties.getProperty(ConfigConstrant.SAVE_POINT_PATH_KEY);
99-
if(StringUtils.isNotBlank(savePointPath)){
107+
if (StringUtils.isNotBlank(savePointPath)) {
100108
String allowNonRestoredState = confProperties.getOrDefault(ConfigConstrant.ALLOW_NON_RESTORED_STATE_KEY, "false").toString();
101109
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savePointPath, BooleanUtils.toBoolean(allowNonRestoredState)));
102110
}
103111

104-
if(mode.equals(ClusterMode.yarnPer.name())){
112+
if (mode.equals(ClusterMode.yarnPer.name())) {
105113
String flinkConfDir = launcherOptions.getFlinkconf();
106114
Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
107115
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, 1);
116+
LOG.info("current jobID is {}", jobGraph.getJobID());
108117
PerJobSubmitter.submit(launcherOptions, jobGraph, config);
109118
} else {
110119
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
@@ -116,35 +125,35 @@ public static void main(String[] args) throws Exception {
116125

117126
private static String[] parseJson(String[] args) {
118127
BufferedReader reader = null;
119-
String lastStr = "";
120-
try{
128+
StringBuilder lastStr = new StringBuilder();
129+
try {
121130
FileInputStream fileInputStream = new FileInputStream(args[0]);
122-
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8");
131+
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, StandardCharsets.UTF_8);
123132
reader = new BufferedReader(inputStreamReader);
124-
String tempString = null;
125-
while((tempString = reader.readLine()) != null){
126-
lastStr += tempString;
133+
String tempString;
134+
while ((tempString = reader.readLine()) != null) {
135+
lastStr.append(tempString);
127136
}
128137
reader.close();
129-
}catch(IOException e){
138+
} catch (IOException e) {
130139
e.printStackTrace();
131-
}finally{
132-
if(reader != null){
140+
} finally {
141+
if (reader != null) {
133142
try {
134143
reader.close();
135144
} catch (IOException e) {
136145
e.printStackTrace();
137146
}
138147
}
139148
}
140-
Map<String, Object> map = JSON.parseObject(lastStr, new TypeReference<Map<String, Object>>(){} );
149+
Map<String, Object> map = JSON.parseObject(lastStr.toString(), new TypeReference<Map<String, Object>>() {
150+
});
141151
List<String> list = new LinkedList<>();
142152

143153
for (Map.Entry<String, Object> entry : map.entrySet()) {
144154
list.add("-" + entry.getKey());
145155
list.add(entry.getValue().toString());
146156
}
147-
String[] array = list.toArray(new String[list.size()]);
148-
return array;
157+
return list.toArray(new String[0]);
149158
}
150159
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2222
import com.dtstack.flink.sql.launcher.YarnConfLoader;
2323
import com.dtstack.flink.sql.option.Options;
24-
import com.esotericsoftware.minlog.Log;
2524
import org.apache.commons.lang3.StringUtils;
2625
import org.apache.flink.api.common.cache.DistributedCache;
2726
import org.apache.flink.configuration.Configuration;
@@ -80,7 +79,8 @@ public void init(String yarnConfDir, Configuration flinkConfig, Properties userC
8079
yarnClient.init(yarnConf);
8180
yarnClient.start();
8281

83-
Log.info("----init yarn success ----");
82+
System.out.println("----init yarn success ----");
83+
// LOG.info("----init yarn success ----");
8484
}
8585

8686
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph)

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.yarn.api.records.ApplicationId;
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
35+
3536
import java.net.URLDecoder;
3637
import java.util.Arrays;
3738
import java.util.List;
@@ -41,6 +42,7 @@
4142
* per job mode submitter
4243
* Date: 2018/11/17
4344
* Company: www.dtstack.com
45+
*
4446
* @author xuchao
4547
*/
4648

@@ -49,15 +51,15 @@ public class PerJobSubmitter {
4951
private static final Logger LOG = LoggerFactory.getLogger(PerJobSubmitter.class);
5052

5153
public static String submit(Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig) throws Exception {
52-
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
53-
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
54-
List<String> paths = getJarPaths(addjarPath);
55-
paths.forEach( path -> {
56-
jobGraph.addJar(new Path("file://" + path));
57-
});
58-
}
59-
60-
String confProp = launcherOptions.getConfProp();
54+
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
55+
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
56+
List<String> paths = getJarPaths(addjarPath);
57+
paths.forEach(path -> {
58+
jobGraph.addJar(new Path("file://" + path));
59+
});
60+
}
61+
62+
String confProp = launcherOptions.getConfProp();
6163
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
6264
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
6365
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
@@ -67,23 +69,23 @@ public static String submit(Options launcherOptions, JobGraph jobGraph, Configur
6769

6870
String flinkJarPath = launcherOptions.getFlinkJarPath();
6971
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(flinkJarPath, launcherOptions, jobGraph);
70-
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);
72+
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
7173

7274
String applicationId = clusterClient.getClusterId().toString();
7375
String flinkJobId = jobGraph.getJobID().toString();
7476

7577
String tips = String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId);
78+
System.out.println(tips);
7679
LOG.info(tips);
7780

7881
return applicationId;
7982
}
8083

81-
private static List<String> getJarPaths(String addjarPath) {
82-
if (addjarPath.length() > 2) {
83-
addjarPath = addjarPath.substring(1,addjarPath.length()-1).replace("\"","");
84-
}
85-
List<String> paths = Arrays.asList(StringUtils.split(addjarPath, ","));
86-
return paths;
87-
}
84+
private static List<String> getJarPaths(String addjarPath) {
85+
if (addjarPath.length() > 2) {
86+
addjarPath = addjarPath.substring(1, addjarPath.length() - 1).replace("\"", "");
87+
}
88+
return Arrays.asList(StringUtils.split(addjarPath, ","));
89+
}
8890

8991
}

0 commit comments

Comments
 (0)