2626import org .apache .flink .client .deployment .ClusterSpecification ;
2727import org .apache .flink .client .program .ClusterClientProvider ;
2828import org .apache .flink .configuration .Configuration ;
29+ import org .apache .flink .configuration .SecurityOptions ;
2930import org .apache .flink .configuration .TaskManagerOptions ;
3031import org .apache .flink .runtime .jobgraph .JobGraph ;
3132import org .apache .flink .yarn .YarnClusterDescriptor ;
4243import java .util .Arrays ;
4344import java .util .List ;
4445import java .util .Optional ;
46+ import java .util .stream .Stream ;
4547
4648
4749/**
@@ -83,6 +85,8 @@ public void exec() throws Exception {
8385 }
8486 }
8587
88+ dumpSameKeytab (flinkConfiguration , shipFiles );
89+
8690 clusterDescriptor .addShipFiles (shipFiles );
8791
8892 ClusterSpecification clusterSpecification = YarnClusterClientFactory .INSTANCE .getClusterSpecification (flinkConfiguration );
@@ -94,6 +98,14 @@ public void exec() throws Exception {
9498 LOG .info (String .format ("deploy per_job with appId: %s, jobId: %s" , applicationId , flinkJobId ));
9599 }
96100
101+ private void dumpSameKeytab (Configuration flinkConfiguration , List <File > shipFiles ) {
102+ String flinkKeytab = Stream
103+ .of (flinkConfiguration .getString (SecurityOptions .KERBEROS_LOGIN_KEYTAB ).split (File .separator ))
104+ .reduce ((a , b ) -> b )
105+ .get ();
106+ shipFiles .removeIf (f -> f .getName ().equals (flinkKeytab ));
107+ }
108+
97109 private void appendApplicationConfig (Configuration flinkConfig , JobParamsInfo jobParamsInfo ) {
98110 if (!StringUtils .isEmpty (jobParamsInfo .getName ())) {
99111 flinkConfig .setString (YarnConfigOptions .APPLICATION_NAME , jobParamsInfo .getName ());
0 commit comments