From 0e7a4ed200b4a1842c4a5b5fab24c0b76cecfd30 Mon Sep 17 00:00:00 2001 From: ozb Date: Wed, 17 Jan 2018 10:42:58 -0500 Subject: [PATCH 01/10] add test for spark thrift server on kubernetes --- pom.xml | 45 +++++++++++++++++++ .../k8s/integrationtest/KubernetesSuite.scala | 41 +++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/pom.xml b/pom.xml index cdc987a..f9a064f 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,8 @@ 1.4.0 18.0 + org.spark-project.hive + 1.2.1.spark2 1.3.9 3.0.0 1.2.17 @@ -89,6 +91,49 @@ commons-lang3 ${commons-lang3.version} + + ${hive.group} + hive-jdbc + ${hive.version} + + + org.apache.curator + curator-framework + + + org.apache.thrift + libthrift + + + org.apache.thrift + libfb303 + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + commons-logging + commons-logging + + + org.codehaus.groovy + groovy-all + + + org.scala-lang scala-library diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 7802b97..b5c0f45 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -21,6 +21,8 @@ import java.net.URI import java.nio.file.{Path, Paths} import java.util.UUID import java.util.regex.Pattern +import java.sql.DriverManager +import org.apache.hive.jdbc.HiveDriver import scala.collection.JavaConverters._ import com.google.common.io.PatternFilenameFilter @@ -103,6 +105,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit runSparkPiAndVerifyCompletion(appArgs = Array("5")) } + test("Run Spark ThriftServer") { + runThriftServerAndVerifyQuery() + } + test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") { sparkAppConf .set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi") @@ -206,6 +212,41 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit executorPodChecker) } + private def runThriftServerAndVerifyQuery( + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + appArgs: Array[String] = Array.empty[String]): Unit = { + val appArguments = SparkAppArguments( + mainAppResource = "", + mainClass = "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", + appArgs = appArgs) + SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + val driverPod = kubernetesTestComponents.kubernetesClient + .pods + .withLabel("spark-app-locator", APP_LOCATOR_LABEL) + .withLabel("spark-role", "driver") + .list() + .getItems + .get(0) + driverPodChecker(driverPod) + + val driverPodResource = kubernetesTestComponents.kubernetesClient + .pods + .withName(driverPod.getMetadata.getName) + driverPodResource.portForward(10000,10000) + val jdbcUri = s"jdbc:hive2://localhost:10000/" + val connection = DriverManager.getConnection(jdbcUri, "user", "pass") + val statement = connection.createStatement() + try { + val resultSet = statement.executeQuery("select 42") + resultSet.next() + assert(resultSet.getInt(1) == 42) + } finally { + statement.close() + connection.close() + driverPodResource.delete() + } + } + private def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, From b43d137ebec62129b8560fcf3fbd63f04ae37bb3 Mon Sep 17 00:00:00 2001 From: Oz Ben-Ami Date: Thu, 18 Jan 2018 17:11:46 -0500 Subject: [PATCH 02/10] fixed some dependency issues --- pom.xml | 42 ++---------------------------------------- 1 file changed, 2 insertions(+), 40 deletions(-) diff --git a/pom.xml b/pom.xml index f9a064f..a1fa365 100644 --- a/pom.xml +++ b/pom.xml @@ -30,8 +30,8 @@ 1.4.0 18.0 - org.spark-project.hive - 1.2.1.spark2 + org.apache.hive + 2.3.0 1.3.9 3.0.0 1.2.17 @@ -95,44 +95,6 @@ ${hive.group} hive-jdbc ${hive.version} - - - org.apache.curator - curator-framework - - - org.apache.thrift - libthrift - - - org.apache.thrift - libfb303 - - - org.apache.zookeeper - zookeeper - - - org.slf4j - slf4j-api - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - commons-logging - commons-logging - - - org.codehaus.groovy - groovy-all - - org.scala-lang From 66b1771448666bf283733dd2cb6e969f2b8031a2 Mon Sep 17 00:00:00 2001 From: ozb Date: Thu, 18 Jan 2018 17:16:19 -0500 Subject: [PATCH 03/10] dynamic port --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index b5c0f45..5cca6fe 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -232,8 +232,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit val driverPodResource = kubernetesTestComponents.kubernetesClient .pods .withName(driverPod.getMetadata.getName) - driverPodResource.portForward(10000,10000) - val jdbcUri = s"jdbc:hive2://localhost:10000/" + val localPort = driverPodResource.portForward(10000).getLocalPort + val jdbcUri = s"jdbc:hive2://localhost:$localPort/" val connection = DriverManager.getConnection(jdbcUri, "user", "pass") val statement = connection.createStatement() try { From befc2cc008eeba99409aca42f7ed91f44fa53d0d Mon Sep 17 00:00:00 2001 From: ozb Date: Thu, 18 Jan 2018 17:54:03 -0500 Subject: [PATCH 04/10] eventually --- .../k8s/integrationtest/KubernetesSuite.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 5cca6fe..601fe59 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -228,22 +228,23 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit .getItems .get(0) driverPodChecker(driverPod) - val driverPodResource = kubernetesTestComponents.kubernetesClient - .pods - .withName(driverPod.getMetadata.getName) - val localPort = driverPodResource.portForward(10000).getLocalPort - val jdbcUri = s"jdbc:hive2://localhost:$localPort/" - val connection = DriverManager.getConnection(jdbcUri, "user", "pass") - val statement = connection.createStatement() - try { - val resultSet = statement.executeQuery("select 42") - resultSet.next() - assert(resultSet.getInt(1) == 42) - } finally { - statement.close() - connection.close() - driverPodResource.delete() + .pods + .withName(driverPod.getMetadata.getName) + + Eventually.eventually(TIMEOUT, INTERVAL) { + val localPort = driverPodResource.portForward(10000).getLocalPort + val jdbcUri = s"jdbc:hive2://localhost:$localPort/" + val connection = DriverManager.getConnection(jdbcUri, "user", "pass") + val statement = connection.createStatement() + try { + val resultSet = statement.executeQuery("select 42") + resultSet.next() + assert(resultSet.getInt(1) == 42) + } finally { + statement.close() + connection.close() + } } } From 02363058d51220479e5e97632a38e306fb5a5945 Mon Sep 17 00:00:00 2001 From: Oz Ben-Ami Date: Thu, 18 Jan 2018 18:54:28 -0500 Subject: [PATCH 05/10] fix dependencies, again --- pom.xml | 15 +++++++++++++-- .../k8s/integrationtest/KubernetesSuite.scala | 2 +- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index a1fa365..1ff26c0 100644 --- a/pom.xml +++ b/pom.xml @@ -30,8 +30,9 @@ 1.4.0 18.0 - org.apache.hive - 2.3.0 + 2.6.5 + org.spark-project.hive + 1.2.1.spark2 1.3.9 3.0.0 1.2.17 @@ -86,6 +87,11 @@ log4j ${log4j.version} + + org.apache.hadoop + hadoop-common + ${hadoop.version} + org.apache.commons commons-lang3 @@ -96,6 +102,11 @@ hive-jdbc ${hive.version} + + ${hive.group} + hive-jdbc + ${hive.version} + org.scala-lang scala-library diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 601fe59..7b31289 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -105,7 +105,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit runSparkPiAndVerifyCompletion(appArgs = Array("5")) } - test("Run Spark ThriftServer") { + test("Run Spark Thrift Server") { runThriftServerAndVerifyQuery() } From 95b6b5555cc93ea6079162c063957c4c473ab926 Mon Sep 17 00:00:00 2001 From: Oz Ben-Ami Date: Thu, 18 Jan 2018 18:55:53 -0500 Subject: [PATCH 06/10] build spark with hive and thrift --- dev/dev-run-integration-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dev-run-integration-tests.sh b/dev/dev-run-integration-tests.sh index 2eb8327..a25eecd 100755 --- a/dev/dev-run-integration-tests.sh +++ b/dev/dev-run-integration-tests.sh @@ -75,7 +75,7 @@ then fi cd $SPARK_REPO_LOCAL_DIR git checkout -B $BRANCH origin/$branch - ./dev/make-distribution.sh --tgz -Phadoop-2.7 -Pkubernetes -DskipTests; + ./dev/make-distribution.sh --tgz -Phadoop-2.7 -Pkubernetes -Phive -Phive-thriftserver -DskipTests; SPARK_TGZ=$(find $SPARK_REPO_LOCAL_DIR -name spark-*.tgz) echo "Built Spark TGZ at $SPARK_TGZ". cd - From 72d79e4632d56054689b23f11096a7f6a068c8c8 Mon Sep 17 00:00:00 2001 From: Oz Ben-Ami Date: Thu, 18 Jan 2018 19:04:23 -0500 Subject: [PATCH 07/10] removed duplicate dep --- pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pom.xml b/pom.xml index d301e99..95166b4 100644 --- a/pom.xml +++ b/pom.xml @@ -102,11 +102,6 @@ hive-jdbc ${hive.version} - - ${hive.group} - hive-jdbc - ${hive.version} - org.scala-lang scala-library From 0b5f09e1ede8483993273549872c0f34794fd75a Mon Sep 17 00:00:00 2001 From: Oz Ben-Ami Date: Thu, 18 Jan 2018 19:25:15 -0500 Subject: [PATCH 08/10] changed remaining PP_LOCATOR_LABEL --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 7f1d4f5..14aad8e 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -247,7 +247,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit private def runThriftServerAndVerifyQuery( driverPodChecker: Pod => Unit = doBasicDriverPodCheck, - appArgs: Array[String] = Array.empty[String]): Unit = { + appArgs: Array[String] = Array.empty[String], + appLocator: String = appLocator): Unit = { val appArguments = SparkAppArguments( mainAppResource = "", mainClass = "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", @@ -255,7 +256,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) val driverPod = kubernetesTestComponents.kubernetesClient .pods - .withLabel("spark-app-locator", APP_LOCATOR_LABEL) + .withLabel("spark-app-locator", appLocator) .withLabel("spark-role", "driver") .list() .getItems From 4c8fe2f266050524d7ff478af15001eeaf60041d Mon Sep 17 00:00:00 2001 From: Oz Ben-Ami Date: Thu, 18 Jan 2018 21:31:27 -0500 Subject: [PATCH 09/10] addressed comments --- .../k8s/integrationtest/KubernetesSuite.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 14aad8e..9898ff4 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -21,11 +21,11 @@ import java.nio.file.{Path, Paths} import java.util.UUID import java.util.regex.Pattern import java.sql.DriverManager -import org.apache.hive.jdbc.HiveDriver import scala.collection.JavaConverters._ import com.google.common.io.PatternFilenameFilter import io.fabric8.kubernetes.api.model.{Container, Pod} +import org.apache.hive.jdbc.HiveDriver import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} @@ -246,9 +246,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } private def runThriftServerAndVerifyQuery( - driverPodChecker: Pod => Unit = doBasicDriverPodCheck, - appArgs: Array[String] = Array.empty[String], - appLocator: String = appLocator): Unit = { + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + appArgs: Array[String] = Array.empty[String], + appLocator: String = appLocator): Unit = { val appArguments = SparkAppArguments( mainAppResource = "", mainClass = "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", @@ -276,8 +276,11 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit resultSet.next() assert(resultSet.getInt(1) == 42) } finally { - statement.close() - connection.close() + try { + statement.close() + } finally { + connection.close() + } } } } From 36dee227ffbda346c8045cdb25cd7ccc21829457 Mon Sep 17 00:00:00 2001 From: Oz Ben-Ami Date: Fri, 19 Jan 2018 06:27:10 -0500 Subject: [PATCH 10/10] spacing --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 9898ff4..d352827 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -246,9 +246,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } private def runThriftServerAndVerifyQuery( - driverPodChecker: Pod => Unit = doBasicDriverPodCheck, - appArgs: Array[String] = Array.empty[String], - appLocator: String = appLocator): Unit = { + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + appArgs: Array[String] = Array.empty[String], + appLocator: String = appLocator): Unit = { val appArguments = SparkAppArguments( mainAppResource = "", mainClass = "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",