|
26 | 26 | import org.apache.flink.configuration.Configuration; |
27 | 27 | import com.google.common.base.Strings; |
28 | 28 | import org.apache.flink.runtime.jobgraph.JobGraph; |
| 29 | +import org.apache.flink.runtime.security.SecurityConfiguration; |
| 30 | +import org.apache.flink.runtime.security.SecurityUtils; |
29 | 31 | import org.apache.flink.yarn.AbstractYarnClusterDescriptor; |
30 | 32 | import org.apache.flink.yarn.YarnClusterDescriptor; |
31 | 33 | import org.apache.hadoop.fs.Path; |
@@ -55,45 +57,40 @@ public class PerJobClusterClientBuilder { |
55 | 57 |
|
56 | 58 | private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterClientBuilder.class); |
57 | 59 |
|
58 | | - private static String KEYTAB = "security.kerberos.login.keytab"; |
59 | | - |
60 | | - private static String PRINCIPAL = "security.kerberos.login.principal"; |
| 60 | + private static final String DEFAULT_CONF_DIR = "./"; |
61 | 61 |
|
62 | 62 | private YarnClient yarnClient; |
63 | 63 |
|
64 | 64 | private YarnConfiguration yarnConf; |
65 | 65 |
|
66 | | - public void init(String yarnConfDir, Properties conf) throws IOException { |
| 66 | + private Configuration flinkConfig; |
| 67 | + |
| 68 | + public void init(String yarnConfDir, Configuration flinkConfig, Properties userConf) throws Exception { |
67 | 69 |
|
68 | 70 | if(Strings.isNullOrEmpty(yarnConfDir)) { |
69 | 71 | throw new RuntimeException("parameters of yarn is required"); |
70 | 72 | } |
| 73 | + userConf.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString())); |
| 74 | + this.flinkConfig = flinkConfig; |
| 75 | + SecurityUtils.install(new SecurityConfiguration(flinkConfig)); |
71 | 76 |
|
72 | 77 | yarnConf = YarnConfLoader.getYarnConf(yarnConfDir); |
73 | | - |
74 | | - if (isKerberos(conf)){ |
75 | | - String keytab = (String) conf.get(KEYTAB); |
76 | | - String principal = (String) conf.get(PRINCIPAL); |
77 | | - login(yarnConf, keytab, principal); |
78 | | - } |
79 | | - |
80 | 78 | yarnClient = YarnClient.createYarnClient(); |
81 | 79 | yarnClient.init(yarnConf); |
82 | 80 | yarnClient.start(); |
83 | 81 |
|
84 | 82 | System.out.println("----init yarn success ----"); |
85 | 83 | } |
86 | 84 |
|
87 | | - public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig) |
| 85 | + public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph) |
88 | 86 | throws MalformedURLException { |
89 | 87 |
|
90 | | - confProp.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString())); |
91 | | - String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? "" : launcherOptions.getFlinkconf(); |
| 88 | + String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? DEFAULT_CONF_DIR : launcherOptions.getFlinkconf(); |
92 | 89 | AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, flinkConf); |
93 | 90 |
|
94 | 91 | if (StringUtils.isNotBlank(flinkJarPath)) { |
95 | 92 | if (!new File(flinkJarPath).exists()) { |
96 | | - throw new RuntimeException("The Flink jar path is not exist"); |
| 93 | + throw new RuntimeException("The param '-flinkJarPath' ref dir is not exist"); |
97 | 94 | } |
98 | 95 | } |
99 | 96 |
|
@@ -163,22 +160,4 @@ private AbstractYarnClusterDescriptor getClusterDescriptor( |
163 | 160 | false); |
164 | 161 | } |
165 | 162 |
|
166 | | - private boolean isKerberos(Properties conf){ |
167 | | - String keytab = (String) conf.get(KEYTAB); |
168 | | - if (StringUtils.isNotBlank(keytab)){ |
169 | | - return true; |
170 | | - } else { |
171 | | - return false; |
172 | | - } |
173 | | - } |
174 | | - |
175 | | - private void login(org.apache.hadoop.conf.Configuration conf, String keytab, String principal) throws IOException { |
176 | | - if (StringUtils.isEmpty(principal)){ |
177 | | - throw new RuntimeException(PRINCIPAL + " must not be null!"); |
178 | | - } |
179 | | - UserGroupInformation.setConfiguration(conf); |
180 | | - UserGroupInformation.loginUserFromKeytab(principal, keytab); |
181 | | - LOG.info("login successfully! keytab: " + keytab + "principal: " + principal); |
182 | | - LOG.info("UGI: " + UserGroupInformation.getCurrentUser()); |
183 | | - } |
184 | 163 | } |
0 commit comments