From 79d7a084c086a1c710101adfe1c1d721d51401fa Mon Sep 17 00:00:00 2001 From: zhennzhang Date: Tue, 26 Dec 2023 14:43:23 +0800 Subject: [PATCH 1/3] no application name for local debug --- .../metadata/cube/model/NBatchConstants.java | 1 + .../engine/spark/job/NSparkCubingJob.java | 7 +++++++ .../engine/spark/job/SegmentBuildJob.java | 21 +++++++++++++++++++ .../engine/spark/job/stage/BuildParam.scala | 7 +++++++ .../spark/job/stage/build/BuildDict.scala | 4 ++++ .../build/partition/PartitionBuildDict.scala | 5 ++++- .../spark/application/JobWorkSpace.scala | 2 +- 7 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java index bcca5491827..cd5f0585b21 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java @@ -42,6 +42,7 @@ public interface NBatchConstants { String P_PARTITION_IDS = "partitionIds"; String P_BUCKETS = "buckets"; + String P_IS_INDEX_BUILD = "isIndexBuild"; String P_INCREMENTAL_BUILD = "incrementalBuild"; String P_SELECTED_PARTITION_COL = "selectedPartitionCol"; String P_SELECTED_PARTITION_VALUE = "selectedPartition"; diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index 18d52bf851a..807336fa27c 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -183,6 +183,9 @@ private static NSparkCubingJob innerCreate(JobFactory.JobBuildParams params) { job.setParam(NBatchConstants.P_SEGMENT_IDS, String.join(",", job.getTargetSegments())); job.setParam(NBatchConstants.P_DATA_RANGE_START, String.valueOf(startTime)); job.setParam(NBatchConstants.P_DATA_RANGE_END, String.valueOf(endTime)); + if (isIndexBuildJob(jobType)) { + job.setParam(NBatchConstants.P_IS_INDEX_BUILD, String.valueOf(true)); + } if (CollectionUtils.isNotEmpty(ignoredSnapshotTables)) { job.setParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES, String.join(",", ignoredSnapshotTables)); } @@ -323,6 +326,10 @@ public SparkCleanupTransactionalTableStep getCleanIntermediateTableStep() { return getTask(SparkCleanupTransactionalTableStep.class); } + private static boolean isIndexBuildJob(JobTypeEnum jobType) { + return JobTypeEnum.INDEX_BUILD.equals(jobType); + } + @Override public void cancelJob() { NDataflowManager nDataflowManager = NDataflowManager.getInstance(getConfig(), getProject()); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java index 05e0f553277..16a8a42a982 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java @@ -33,6 +33,7 @@ import org.apache.kylin.guava30.shaded.common.base.Throwables; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; @@ -62,6 +63,7 @@ public class SegmentBuildJob extends SegmentJob { private boolean usePlanner = false; + private boolean isIndexBuild = false; public static void main(String[] args) { SegmentBuildJob segmentBuildJob = new SegmentBuildJob(); @@ -75,6 +77,10 @@ protected final void extraInit() { if (enablePlanner != null && Boolean.valueOf(enablePlanner)) { usePlanner = true; } + String isIndexBuildJob = getParam(NBatchConstants.P_IS_INDEX_BUILD); + if (isIndexBuildJob != null && Boolean.valueOf(isIndexBuildJob)) { + isIndexBuild = true; + } } @Override @@ -145,6 +151,21 @@ protected void build() throws IOException { val buildParam = new BuildParam(); MATERIALIZED_FACT_TABLE.createStage(this, seg, buildParam, exec); + if (isIndexBuild) { + if (Objects.isNull(buildParam.getBuildFlatTable())) { + val spanTree = new AdaptiveSpanningTree(config, + new AdaptiveSpanningTree.AdaptiveTreeBuilder(seg, this.getReadOnlyLayouts())); + buildParam.setSpanningTree(spanTree); + } + if (!buildParam.getSpanningTree().fromFlatTable()) { + log.info("this is an index build job for segment " + + seg.getId() + + " and all new created indexes will be built from parent index, " + + "will skip build dict and generate flat table"); + buildParam.setSkipBuildDict(true); + buildParam.setSkipGenerateFlatTable(true); + } + } BUILD_DICT.createStage(this, seg, buildParam, exec); GENERATE_FLAT_TABLE.createStage(this, seg, buildParam, exec); // enable cost based planner according to the parameter diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala index 98782dec82b..89ef25e57ce 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala @@ -49,6 +49,7 @@ class BuildParam { private var cachedPartitionStats: Map[Long, Statistics] = immutable.Map.newBuilder[Long, Statistics].result() + private var skipBuildDict: Boolean = _ private var skipGenerateFlatTable: Boolean = _ private var skipMaterializedFactTableView: Boolean = _ @@ -73,6 +74,12 @@ class BuildParam { this.skipMaterializedFactTableView = skipMaterializedFactTableView } + def isSkipBuildDict: Boolean = skipBuildDict; + + def setSkipBuildDict(skipBuildDict: Boolean): Unit = { + this.skipBuildDict = skipBuildDict + } + def isSkipGenerateFlatTable: Boolean = skipGenerateFlatTable def setSkipGenerateFlatTable(skipGenerateFlatTable: Boolean): Unit = { diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala index ca4aaa29985..e9f2f86e4d6 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala @@ -32,5 +32,9 @@ class BuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: B buildParam.setDict(dict) } + if (buildParam.isSkipBuildDict) { + onStageSkipped() + } + override def getStageName: String = "BuildDict" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala index 23e834f085b..32b6a343e43 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala @@ -29,7 +29,10 @@ class PartitionBuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buil override def execute(): Unit = { val dict: Dataset[Row] = buildDictIfNeed() buildParam.setDict(dict) - } + if (buildParam.isSkipBuildDict) { + onStageSkipped() + } + } override def getStageName: String = "PartitionBuildDict" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala index 44c13b575c7..e60d8bb1338 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala @@ -99,7 +99,7 @@ class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: Jo def fail(jf: JobFailed): Unit = { try { - logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable) + logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable.getCause) KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new util.HashMap, jf.throwable)) worker.getApplication.updateJobErrorInfo(jf) stop() From 6720fd34a1bbb840cc8781d7742ca5c1adbfa3ea Mon Sep 17 00:00:00 2001 From: zhennzhang Date: Tue, 26 Dec 2023 14:46:09 +0800 Subject: [PATCH 2/3] KYLIN-5748 Skip build flat table for index build job if all new created indexes can be built from existed parent index --- .../org/apache/kylin/engine/spark/job/stage/BuildParam.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala index 89ef25e57ce..59c3da42d30 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala @@ -74,7 +74,7 @@ class BuildParam { this.skipMaterializedFactTableView = skipMaterializedFactTableView } - def isSkipBuildDict: Boolean = skipBuildDict; + def isSkipBuildDict: Boolean = skipBuildDict def setSkipBuildDict(skipBuildDict: Boolean): Unit = { this.skipBuildDict = skipBuildDict From 5f52f1a9dcdcd02f8bdc5262de9f65b97bae24d9 Mon Sep 17 00:00:00 2001 From: zhennzhang Date: Tue, 26 Dec 2023 14:52:29 +0800 Subject: [PATCH 3/3] KYLIN-5748 Skip build flat table for index build job if all new created indexes can be built from existed parent index --- .../main/scala/org/apache/spark/application/JobWorkSpace.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala index e60d8bb1338..44c13b575c7 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala @@ -99,7 +99,7 @@ class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: Jo def fail(jf: JobFailed): Unit = { try { - logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable.getCause) + logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable) KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new util.HashMap, jf.throwable)) worker.getApplication.updateJobErrorInfo(jf) stop()