Skip to content

Commit 4867f51

Browse files
committed
opt
1 parent c432ba0 commit 4867f51

File tree

2 files changed

+26
-45
lines changed

2 files changed

+26
-45
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/factory/YarnClusterClientFactory.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
import org.apache.flink.configuration.ConfigConstants;
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.core.fs.FileSystem;
26-
import org.apache.flink.runtime.security.SecurityConfiguration;
27-
import org.apache.flink.runtime.security.SecurityUtils;
26+
import org.apache.flink.util.FileUtils;
27+
import org.apache.flink.util.function.FunctionUtils;
2828
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
2929
import org.apache.flink.yarn.YarnClusterDescriptor;
3030
import org.apache.hadoop.yarn.client.api.YarnClient;
3131
import org.apache.hadoop.yarn.conf.YarnConfiguration;
3232

3333
import java.io.File;
34+
import java.io.IOException;
3435
import java.util.Iterator;
3536
import java.util.Map;
3637

@@ -41,6 +42,7 @@
4142
* @author maqi
4243
*/
4344
public class YarnClusterClientFactory extends AbstractClusterClientFactory {
45+
private static final String XML_FILE_EXTENSION = "xml";
4446
@Override
4547
public ClusterDescriptor createClusterDescriptor(String yarnConfDir, Configuration flinkConfig) {
4648

@@ -70,27 +72,12 @@ public ClusterDescriptor createClusterDescriptor(String yarnConfDir, Configurati
7072
}
7173
}
7274

73-
private YarnConfiguration getYarnConf(String yarnConfDir) {
75+
private YarnConfiguration getYarnConf(String yarnConfDir) throws IOException {
7476
YarnConfiguration yarnConf = new YarnConfiguration();
75-
try {
76-
File dir = new File(yarnConfDir);
77-
if (dir.exists() && dir.isDirectory()) {
78-
File[] xmlFileList = new File(yarnConfDir).listFiles((dir1, name) -> {
79-
if (name.endsWith(".xml")) {
80-
return true;
81-
}
82-
return false;
83-
});
84-
if (xmlFileList != null) {
85-
for (File xmlFile : xmlFileList) {
86-
yarnConf.addResource(xmlFile.toURI().toURL());
87-
}
88-
}
89-
}
90-
91-
} catch (Exception e) {
92-
throw new RuntimeException(e);
93-
}
77+
FileUtils.listFilesInDirectory(new File(yarnConfDir).toPath(), this::isXmlFile)
78+
.stream()
79+
.map(FunctionUtils.uncheckedFunction(FileUtils::toURL))
80+
.forEach(yarnConf::addResource);
9481

9582
haYarnConf(yarnConf);
9683
return yarnConf;
@@ -101,8 +88,7 @@ private YarnConfiguration getYarnConf(String yarnConfDir) {
10188
*/
10289
private org.apache.hadoop.conf.Configuration haYarnConf(org.apache.hadoop.conf.Configuration yarnConf) {
10390
Iterator<Map.Entry<String, String>> iterator = yarnConf.iterator();
104-
while (iterator.hasNext()) {
105-
Map.Entry<String, String> entry = iterator.next();
91+
iterator.forEachRemaining((Map.Entry<String, String> entry) -> {
10692
String key = entry.getKey();
10793
String value = entry.getValue();
10894
if (key.startsWith("yarn.resourcemanager.hostname.")) {
@@ -112,7 +98,12 @@ private org.apache.hadoop.conf.Configuration haYarnConf(org.apache.hadoop.conf.C
11298
yarnConf.set(addressKey, value + ":" + YarnConfiguration.DEFAULT_RM_PORT);
11399
}
114100
}
115-
}
101+
});
102+
116103
return yarnConf;
117104
}
105+
106+
private boolean isXmlFile(java.nio.file.Path file) {
107+
return XML_FILE_EXTENSION.equals(org.apache.flink.shaded.guava18.com.google.common.io.Files.getFileExtension(file.toString()));
108+
}
118109
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/utils/JobGraphBuildUtil.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222
import com.dtstack.flink.sql.launcher.entity.JobParamsInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
2424
import com.dtstack.flink.sql.util.PluginUtil;
25-
import com.google.common.collect.Lists;
2625
import org.apache.commons.io.Charsets;
2726
import org.apache.commons.lang.BooleanUtils;
2827
import org.apache.commons.lang.StringUtils;
29-
import org.apache.commons.lang.exception.ExceptionUtils;
3028
import org.apache.flink.api.common.cache.DistributedCache;
3129
import org.apache.flink.client.program.PackagedProgram;
3230
import org.apache.flink.client.program.PackagedProgramUtils;
@@ -35,18 +33,15 @@
3533
import org.apache.flink.core.fs.Path;
3634
import org.apache.flink.runtime.jobgraph.JobGraph;
3735
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
36+
import org.apache.flink.util.function.FunctionUtils;
3837

3938
import java.io.File;
4039
import java.io.UnsupportedEncodingException;
41-
import java.net.MalformedURLException;
4240
import java.net.URL;
4341
import java.net.URLDecoder;
44-
import java.util.ArrayList;
4542
import java.util.Arrays;
46-
import java.util.Collections;
4743
import java.util.List;
4844
import java.util.Map;
49-
import java.util.Optional;
5045
import java.util.Properties;
5146
import java.util.stream.Collectors;
5247

@@ -117,26 +112,21 @@ public static void fillJobGraphClassPath(JobGraph jobGraph) {
117112
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
118113
List<URL> classPath = jobCacheFileConfig.entrySet().stream()
119114
.filter(tmp -> tmp.getKey().startsWith("class_path"))
120-
.map(tmp -> {
121-
try {
122-
return new URL("file:" + tmp.getValue().filePath);
123-
} catch (MalformedURLException e) {
124-
throw new RuntimeException(ExceptionUtils.getFullStackTrace(e));
125-
}
126-
})
115+
.map(FunctionUtils.uncheckedFunction(tmp -> new URL("file:" + tmp.getValue().filePath)))
127116
.collect(Collectors.toList());
117+
128118
jobGraph.getUserArtifacts().clear();
129119
jobGraph.setClasspaths(classPath);
130120
}
131121

132122
public static List<File> getPluginPathToShipFiles(JobGraph jobGraph) {
133-
List<File> shipFiles = new ArrayList<>();
134-
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
135-
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()) {
136-
if (tmp.getKey().startsWith("class_path")) {
137-
shipFiles.add(new File(tmp.getValue().filePath));
138-
}
139-
}
123+
List<File> shipFiles = jobGraph.getUserArtifacts()
124+
.entrySet()
125+
.stream()
126+
.filter(tmp -> tmp.getKey().startsWith("class_path"))
127+
.map(tmp -> new File(tmp.getValue().filePath))
128+
.collect(Collectors.toList());
129+
140130
jobGraph.getUserArtifacts().clear();
141131
return shipFiles;
142132
}

0 commit comments

Comments
 (0)