@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.integrationtest.docker
1919import java .io .{File , PrintWriter }
2020import java .net .URI
2121import java .nio .file .Paths
22+ import java .util .UUID
2223
2324import com .google .common .base .Charsets
2425import com .google .common .io .Files
@@ -33,10 +34,10 @@ import scala.collection.JavaConverters._
3334import org .apache .spark .deploy .k8s .integrationtest .constants ._
3435import org .apache .spark .deploy .k8s .integrationtest .KubernetesSuite
3536import org .apache .spark .deploy .k8s .integrationtest .Logging
36- import org .apache .spark .deploy .k8s .integrationtest .Utils .{ RedirectThread , tryWithResource }
37+ import org .apache .spark .deploy .k8s .integrationtest .Utils .tryWithResource
3738
3839private [spark] class KubernetesSuiteDockerManager (
39- dockerEnv : Map [String , String ], dockerTag : String ) extends Logging {
40+ dockerEnv : Map [String , String ], userProvidedDockerImageTag : Option [ String ] ) extends Logging {
4041
4142 private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH
4243 // Dockerfile paths must be relative to the build path.
@@ -47,9 +48,11 @@ private[spark] class KubernetesSuiteDockerManager(
4748 private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + " init-container/Dockerfile"
4849 private val TIMEOUT = PatienceConfiguration .Timeout (Span (2 , Minutes ))
4950 private val INTERVAL = PatienceConfiguration .Interval (Span (2 , Seconds ))
51+
52+ private val resolvedDockerImageTag =
53+ userProvidedDockerImageTag.getOrElse(UUID .randomUUID().toString.replaceAll(" -" , " " ))
5054 private val dockerHost = dockerEnv.getOrElse(" DOCKER_HOST" ,
5155 throw new IllegalStateException (" DOCKER_HOST env not found." ))
52-
5356 private val originalDockerUri = URI .create(dockerHost)
5457 private val httpsDockerUri = new URIBuilder ()
5558 .setHost(originalDockerUri.getHost)
@@ -69,31 +72,39 @@ private[spark] class KubernetesSuiteDockerManager(
6972 .build()
7073
7174 def buildSparkDockerImages (): Unit = {
72- Eventually .eventually(TIMEOUT , INTERVAL ) { dockerClient.ping() }
73- buildImage(" spark-base" , BASE_DOCKER_FILE )
74- buildImage(" spark-driver" , DRIVER_DOCKER_FILE )
75- buildImage(" spark-executor" , EXECUTOR_DOCKER_FILE )
76- buildImage(" spark-init" , INIT_CONTAINER_DOCKER_FILE )
75+ if (userProvidedDockerImageTag.isEmpty) {
76+ Eventually .eventually(TIMEOUT , INTERVAL ) {
77+ dockerClient.ping()
78+ }
79+ buildImage(" spark-base" , BASE_DOCKER_FILE )
80+ buildImage(" spark-driver" , DRIVER_DOCKER_FILE )
81+ buildImage(" spark-executor" , EXECUTOR_DOCKER_FILE )
82+ buildImage(" spark-init" , INIT_CONTAINER_DOCKER_FILE )
83+ }
7784 }
7885
7986 def deleteImages (): Unit = {
80- removeRunningContainers()
81- deleteImage(" spark-base" )
82- deleteImage(" spark-driver" )
83- deleteImage(" spark-executor" )
84- deleteImage(" spark-init" )
87+ if (userProvidedDockerImageTag.isEmpty) {
88+ removeRunningContainers()
89+ deleteImage(" spark-base" )
90+ deleteImage(" spark-driver" )
91+ deleteImage(" spark-executor" )
92+ deleteImage(" spark-init" )
93+ }
8594 }
8695
96+ def dockerImageTag (): String = resolvedDockerImageTag
97+
8798 private def buildImage (name : String , dockerFile : String ): Unit = {
88- logInfo(s " Building Docker image - $name: $dockerTag " )
99+ logInfo(s " Building Docker image - $name: $resolvedDockerImageTag " )
89100 val dockerFileWithBaseTag = new File (DOCKER_BUILD_PATH .resolve(
90- s " $dockerFile- $dockerTag " ).toAbsolutePath.toString)
101+ s " $dockerFile- $resolvedDockerImageTag " ).toAbsolutePath.toString)
91102 dockerFileWithBaseTag.deleteOnExit()
92103 try {
93104 val originalDockerFileText = Files .readLines(
94105 DOCKER_BUILD_PATH .resolve(dockerFile).toFile, Charsets .UTF_8 ).asScala
95106 val dockerFileTextWithProperBaseImage = originalDockerFileText.map(
96- _.replace(" FROM spark-base" , s " FROM spark-base: $dockerTag " ))
107+ _.replace(" FROM spark-base" , s " FROM spark-base: $resolvedDockerImageTag " ))
97108 tryWithResource(Files .newWriter(dockerFileWithBaseTag, Charsets .UTF_8 )) { fileWriter =>
98109 tryWithResource(new PrintWriter (fileWriter)) { printWriter =>
99110 for (line <- dockerFileTextWithProperBaseImage) {
@@ -105,8 +116,8 @@ private[spark] class KubernetesSuiteDockerManager(
105116 }
106117 dockerClient.build(
107118 DOCKER_BUILD_PATH ,
108- s " $name: $dockerTag " ,
109- s " $dockerFile- $dockerTag " ,
119+ s " $name: $resolvedDockerImageTag " ,
120+ s " $dockerFile- $resolvedDockerImageTag " ,
110121 new LoggingBuildHandler ())
111122 } finally {
112123 dockerFileWithBaseTag.delete()
@@ -119,15 +130,15 @@ private[spark] class KubernetesSuiteDockerManager(
119130 private def removeRunningContainers (): Unit = {
120131 val imageIds = dockerClient.listImages(ListImagesParam .allImages())
121132 .asScala
122- .filter(image => image.repoTags().asScala.exists(_.endsWith(s " : $dockerTag " )))
133+ .filter(image => image.repoTags().asScala.exists(_.endsWith(s " : $resolvedDockerImageTag " )))
123134 .map(_.id())
124135 .toSet
125136 Eventually .eventually(KubernetesSuite .TIMEOUT , KubernetesSuite .INTERVAL ) {
126137 val runningContainersWithImageTag = stopRunningContainers(imageIds)
127138 require(
128139 runningContainersWithImageTag.isEmpty,
129140 s " ${runningContainersWithImageTag.size} containers found still running " +
130- s " with the image tag $dockerTag " )
141+ s " with the image tag $resolvedDockerImageTag " )
131142 }
132143 dockerClient.listContainers(ListContainersParam .allContainers())
133144 .asScala
@@ -139,7 +150,7 @@ private[spark] class KubernetesSuiteDockerManager(
139150 .asScala
140151 .filter(container => imageIds.contains(container.imageId()))
141152 require(containersWithImageTag.isEmpty, s " ${containersWithImageTag.size} containers still " +
142- s " found with image tag $dockerTag . " )
153+ s " found with image tag $resolvedDockerImageTag . " )
143154 }
144155
145156 }
@@ -148,7 +159,7 @@ private[spark] class KubernetesSuiteDockerManager(
148159 val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds)
149160 if (runningContainersWithImageTag.nonEmpty) {
150161 logInfo(s " Found ${runningContainersWithImageTag.size} containers running with " +
151- s " an image with the tag $dockerTag . Attempting to remove these containers, " +
162+ s " an image with the tag $resolvedDockerImageTag . Attempting to remove these containers, " +
152163 s " and then will stall for 2 seconds. " )
153164 runningContainersWithImageTag.foreach { container =>
154165 dockerClient.stopContainer(container.id(), 5 )
@@ -168,10 +179,10 @@ private[spark] class KubernetesSuiteDockerManager(
168179
169180 private def deleteImage (name : String ): Unit = {
170181 try {
171- dockerClient.removeImage(s " $name: $dockerTag " )
182+ dockerClient.removeImage(s " $name: $resolvedDockerImageTag " )
172183 } catch {
173184 case e : RuntimeException =>
174- logWarning(s " Failed to delete image $name: $dockerTag . There may be images leaking in the " +
185+ logWarning(s " Failed to delete image $name: $resolvedDockerImageTag . There may be images leaking in the " +
175186 s " docker environment which are now stale and unused. " , e)
176187 }
177188 }
0 commit comments