Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,23 @@ import org.apache.linkis.entrance.execute.EntranceJob
import org.apache.linkis.entrance.log.LogReader
import org.apache.linkis.entrance.timeout.JobTimeoutManager
import org.apache.linkis.entrance.utils.{EntranceUtils, JobHistoryHelper}
import java.util.concurrent.ConcurrentHashMap
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{
ENGINE_PRIORITY_RUNTIME_KEY,
FIFO_QUEUE_STRATEGY,
PFIFO_SCHEDULER_STRATEGY
}
import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ENGINE_PRIORITY_RUNTIME_KEY, FIFO_QUEUE_STRATEGY, PFIFO_SCHEDULER_STRATEGY}
import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState}
import org.apache.linkis.server.conf.ServerConfiguration

import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.conf.TimeType

import java.{lang, util}
import java.text.MessageFormat
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

abstract class EntranceServer extends Logging {
Expand Down Expand Up @@ -302,79 +299,88 @@ abstract class EntranceServer extends Logging {
val timeoutType = EntranceConfiguration.ENTRANCE_TASK_TIMEOUT.getHotValue()
logger.info(s"Start to check timeout Job, timout is ${timeoutType}")
val timeoutTime = System.currentTimeMillis() - timeoutType.toLong
val undoneTask = getAllUndoneTask(null, null)
undoneTask.filter(job => job.createTime < timeoutTime).foreach { job =>
job.onFailure(s"Job has run for longer than the maximum time $timeoutType", null)
getAllUndoneTask(null, null).filter(job => job.createTime < timeoutTime).foreach {
job =>
job.onFailure(s"Job has run for longer than the maximum time $timeoutType", null)
}
logger.info(s"Finished to check timeout Job, timout is ${timeoutType}")
} { case t: Throwable =>
logger.warn(s"TimeoutDetective Job failed. ${t.getMessage}", t)
}
}

// 新增任务诊断检测逻辑
if (EntranceConfiguration.TASK_DIAGNOSIS_ENABLE) {
logger.info("Start to check tasks for diagnosis")
val diagnosisTimeout = EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT
val diagnosisTime = System.currentTimeMillis() - diagnosisTimeout
undoneTask
.filter { job =>
val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels)
engineType.contains(
EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE
) && job.createTime < diagnosisTime && !diagnosedJobs.containsKey(job.getId())
}
.foreach { job =>
// 异步触发诊断逻辑
Utils.defaultScheduler.execute(new Runnable() {
override def run(): Unit = {
try {
// 检查并设置诊断标记,确保每个任务只被诊断一次
if (diagnosedJobs.putIfAbsent(job.getId(), true) == null) {
// 调用Doctoris诊断系统
logger.info(s"Start to diagnose spark job ${job.getId()}")
job match {
case entranceJob: EntranceJob =>
// 调用doctoris实时诊断API
val response = EntranceUtils.taskRealtimeDiagnose(entranceJob.getJobRequest, null)
logger.info(s"Finished to diagnose job ${job
.getId()}, result: ${response.result}, reason: ${response.reason}")
// 更新诊断信息
if (response.success) {
// 构造诊断更新请求
JobHistoryHelper.addDiagnosis(job.getId(), response.result)
logger.info(s"Successfully updated diagnosis for job ${job.getId()}")
}
case _ =>
logger.warn(s"Job ${job.getId()} is not an EntranceJob, skip diagnosis")
}
}
} catch {
case t: Throwable =>
logger.warn(s"Diagnose job ${job.getId()} failed. ${t.getMessage}", t)
// 如果诊断失败,移除标记,允许重试
diagnosedJobs.remove(job.getId())
},
EntranceConfiguration.ENTRANCE_TASK_TIMEOUT_SCAN.getValue.toLong,
EntranceConfiguration.ENTRANCE_TASK_TIMEOUT_SCAN.getValue.toLong,
TimeUnit.MILLISECONDS
)

Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable() {
override def run(): Unit = {
val undoneTask = getAllUndoneTask(null, null)
// 新增任务诊断检测逻辑
if (EntranceConfiguration.TASK_DIAGNOSIS_ENABLE) {
logger.info("Start to check tasks for diagnosis")
val diagnosisTime = System.currentTimeMillis() - new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT).toLong
undoneTask
.filter { job =>
val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels)
engineType.contains(
EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE
) && job.createTime < diagnosisTime && !diagnosedJobs.containsKey(job.getId())
}
.foreach { job =>
try {
// 检查并设置诊断标记,确保每个任务只被诊断一次
val jobId = job.getJobRequest.getId
diagnosedJobs.putIfAbsent(job.getId(), true)
// 调用Doctoris诊断系统
logger.info(s"Start to diagnose spark job ${job.getId()}")
job match {
case entranceJob: EntranceJob =>
// 调用doctoris实时诊断API
job.getLogListener.foreach(
_.onLogUpdate(job, LogUtils.generateInfo("Start to diagnose spark job"))
)
val response =
EntranceUtils.taskRealtimeDiagnose(entranceJob.getJobRequest, null)
logger.info(s"Finished to diagnose job ${job
.getId()}, result: ${response.result}, reason: ${response.reason}")
// 更新诊断信息
if (response.success) {
// 构造诊断更新请求
JobHistoryHelper.addDiagnosis(job.getJobRequest.getId, response.result)
logger.info(s"Successfully updated diagnosis for job ${job.getId()}")
}
}
})
case _ =>
logger.warn(s"Job $jobId is not an EntranceJob, skip diagnosis")
}

} catch {
case t: Throwable =>
logger.warn(s"Diagnose job ${job.getId()} failed. ${t.getMessage}", t)
// 如果诊断失败,移除标记,允许重试
diagnosedJobs.remove(job.getId())
}
logger.info("Finished to check Spark tasks for diagnosis")
}

// 定期清理diagnosedJobs,只保留未完成任务的记录
val undoneJobIds = undoneTask.map(_.getId()).toSet
val iterator = diagnosedJobs.keySet().iterator()
while (iterator.hasNext) {
val jobId = iterator.next()
if (!undoneJobIds.contains(jobId)) {
iterator.remove()
logger.info("Finished to check Spark tasks for diagnosis")
}
}
// 定期清理diagnosedJobs,只保留未完成任务的记录
val undoneJobIds = undoneTask.map(_.getId()).toSet
val iterator = diagnosedJobs.keySet().iterator()
while (iterator.hasNext) {
val jobId = iterator.next()
if (!undoneJobIds.contains(jobId)) {
iterator.remove()
}
logger.info(s"Cleaned diagnosedJobs cache, current size: ${diagnosedJobs.size()}")
} { case t: Throwable =>
logger.warn(s"TimeoutDetective Job failed. ${t.getMessage}", t)
}
logger.info(s"Cleaned diagnosedJobs cache, current size: ${diagnosedJobs.size()}")
}

},
EntranceConfiguration.ENTRANCE_TASK_TIMEOUT_SCAN.getValue.toLong,
EntranceConfiguration.ENTRANCE_TASK_TIMEOUT_SCAN.getValue.toLong,
new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT_SCAN).toLong,
new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT_SCAN).toLong,
TimeUnit.MILLISECONDS
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ object EntranceConfiguration {
val AI_SQL_DYNAMIC_ENGINE_SWITCH =
CommonVars("linkis.aisql.dynamic.engine.type.switch", false).getValue

val DOCTOR_REQUEST_TIMEOUT = CommonVars("linkis.aisql.doctor.http.timeout", 30000).getValue
val DOCTOR_REQUEST_TIMEOUT = CommonVars("linkis.aisql.doctor.http.timeout", 300000).getValue

val DOCTOR_HTTP_MAX_CONNECT = CommonVars("linkis.aisql.doctor.http.max.connect", 20).getValue

Expand Down Expand Up @@ -411,8 +411,8 @@ object EntranceConfiguration {

var SPARK3_PYTHON_VERSION = CommonVars.apply("spark.python.version", "python3");

var SPARK_DYNAMIC_CONF_USER_ENABLED =
CommonVars.apply("spark.dynamic.conf.user.enabled", false).getValue
var SPARK_DYNAMIC_ALLOCATION_ENABLED =
CommonVars.apply("spark.dynamic.allocation.enabled", false).getValue

var SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS =
CommonVars.apply("spark.dynamic.allocation.additional.confs", "").getValue
Expand Down Expand Up @@ -441,6 +441,9 @@ object EntranceConfiguration {
val TASK_DIAGNOSIS_ENGINE_TYPE =
CommonVars[String]("linkis.task.diagnosis.engine.type", "spark").getValue

val TASK_DIAGNOSIS_TIMEOUT = CommonVars[Long]("linkis.task.diagnosis.timeout", 300000L).getValue
val TASK_DIAGNOSIS_TIMEOUT = CommonVars[String]("linkis.task.diagnosis.timeout", "5m").getValue

val TASK_DIAGNOSIS_TIMEOUT_SCAN =
CommonVars("linkis.task.diagnosis.timeout.scan", "1m").getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

package org.apache.linkis.entrance.execute.simple

import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext

object SimpleExecuteBusContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ package org.apache.linkis.entrance.interceptor.impl
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.CodeAndRunTypeUtils.LANGUAGE_TYPE_AI_SQL
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.entrance.conf.EntranceConfiguration.{AI_SQL_CREATORS, AI_SQL_KEY, TASK_RETRY_CODE_TYPE, TASK_RETRY_SWITCH}
import org.apache.linkis.entrance.conf.EntranceConfiguration.{
AI_SQL_CREATORS,
AI_SQL_KEY,
TASK_RETRY_CODE_TYPE,
TASK_RETRY_SWITCH
}
import org.apache.linkis.entrance.interceptor.EntranceInterceptor
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.manager.label.entity.Label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,21 @@ object EntranceUtils extends Logging {
// deal with spark3 dynamic allocation conf
// 1.只有spark3需要处理动态规划参数 2.用户未指定模板名称,则设置默认值与spark底层配置保持一致,否则使用用户模板中指定的参数
val properties = new util.HashMap[String, AnyRef]()
val sparkDynamicAllocationEnabled: Boolean =
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_ENABLED
val isSpark3 = LabelUtil.isTargetEngine(
jobRequest.getLabels,
EngineType.SPARK.toString,
LabelCommonConfig.SPARK3_ENGINE_VERSION.getValue
)
try {
if (isSpark3) {
if (isSpark3 && sparkDynamicAllocationEnabled) {
logger.info(s"Task :${jobRequest.getId} using dynamic conf ")
if (EntranceConfiguration.SPARK_DYNAMIC_CONF_USER_ENABLED) {
// If dynamic allocation is disabled, only set python version
properties.put(
EntranceConfiguration.SPARK3_PYTHON_VERSION.key,
EntranceConfiguration.SPARK3_PYTHON_VERSION.getValue
)
} else {
setSparkDynamicAllocationDefaultConfs(properties, logAppender)
}
// If dynamic allocation is disabled, only set python version
properties.put(
EntranceConfiguration.SPARK3_PYTHON_VERSION.key,
EntranceConfiguration.SPARK3_PYTHON_VERSION.getValue
)
}
} catch {
case e: Exception =>
Expand All @@ -271,9 +269,9 @@ object EntranceUtils extends Logging {
* Set spark dynamic allocation default confs
*/
private def setSparkDynamicAllocationDefaultConfs(
properties: util.HashMap[String, AnyRef],
logAppender: lang.StringBuilder
): Unit = {
properties: util.HashMap[String, AnyRef],
logAppender: lang.StringBuilder
): Unit = {
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_CORES.key,
EntranceConfiguration.SPARK_EXECUTOR_CORES.getValue
Expand Down Expand Up @@ -351,21 +349,21 @@ object EntranceUtils extends Logging {
val params = new util.HashMap[String, AnyRef]()
val metricsParams = job.getMetrics
if (MapUtils.isEmpty(metricsParams)) {
return DoctorResponse(success = false, "")
return DoctorResponse(success = false, "Diagnose error, metricsParams is empty!")
}
val yarnResource =
MapUtils.getMap(metricsParams, "yarnResource", new util.HashMap[String, AnyRef]())
if (MapUtils.isEmpty(yarnResource)) {
DoctorResponse(success = false, "")
DoctorResponse(success = false, "Diagnose error, yarnResource is empty!")
} else {
var response: DoctorResponse = null
for (application <- yarnResource.keySet().asInstanceOf[Set[String]]) {
yarnResource.keySet().toArray.foreach { application =>
params.put("taskId", application)
params.put("engineType", LabelUtil.getEngineType(job.getLabels))
params.put("userId", job.getExecuteUser)
val msg = s"Task execution time exceeds 5m time, perform task diagnosis"
params.put("triggerReason", msg)
params.put("sparkConfig", "")
params.put("sparkConfig", new util.HashMap[String, AnyRef]())
params.put("taskName", "")
params.put("linkisTaskUrl", "")
val request = DoctorRequest(
Expand Down Expand Up @@ -532,14 +530,8 @@ object EntranceUtils extends Logging {
DoctorResponse(success = true, result = engineType, reason = reason, duration = duration)
} else if (request.apiUrl.contains("realtime")) {
// 实时诊断API
val success = dataMap.get("success").toString.toBoolean
val result = if (dataMap.containsKey("result")) dataMap.get("result").toString else ""
val reason = if (dataMap.containsKey("reason")) dataMap.get("reason").toString else ""
logInfo(
s"${request.successMessage}: $success, Result: $result, Reason: $reason, This decision took $duration seconds",
logAppender
)
DoctorResponse(success = success, result = result, reason = reason, duration = duration)
val resultJson = BDPJettyServerHelper.gson.toJson(responseMapJson)
DoctorResponse(success = true, result = resultJson, reason = null, duration = duration)
} else {
// 默认处理
val result = dataMap.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ object JobHistoryHelper extends Logging {
}
}

def addDiagnosis(jobid: String, diagnosis: String): Unit = {
def addDiagnosis(jobid: Long, diagnosis: String): Unit = {
val jobDiagnosisRequest = new JobDiagnosisRequest()
jobDiagnosisRequest.setJobHistoryId(jobid.toLong)
jobDiagnosisRequest.setJobHistoryId(jobid)
jobDiagnosisRequest.setDiagnosisContent(diagnosis)
jobDiagnosisRequest.setDiagnosisContent("doctoris")
JobDiagnosisReqInsert(jobDiagnosisRequest)
jobDiagnosisRequest.setDiagnosisSource("doctoris")
sender.ask(JobDiagnosisReqInsert(jobDiagnosisRequest))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import org.apache.linkis.jobhistory.entity.JobDiagnosis;

import org.apache.ibatis.annotations.Param;

public interface JobDiagnosisMapper {
void insert(JobDiagnosis jobDiagnosis);

void deleteById(Long id);

void update(JobDiagnosis jobDiagnosis);

JobDiagnosis selectByJobIdAndSource(Long jobHistoryId, String diagnosisSource);
JobDiagnosis selectByJobIdAndSource(
@Param("id") Long jobHistoryId, @Param("diagnosisSource") String diagnosisSource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -837,14 +837,10 @@ public Message queryFailedTaskDiagnosis(
String jobStatus = jobHistory.getStatus();
JobDiagnosis jobDiagnosis =
jobHistoryDiagnosisService.selectByJobId(Long.valueOf(taskID), diagnosisSource);
if (StringUtils.isNotBlank(diagnosisSource)) {
if (StringUtils.isNotBlank(jobDiagnosis.getDiagnosisContent())) {
return Message.ok().data("diagnosisMsg", jobDiagnosis.getDiagnosisContent());
} else {
if (null == jobDiagnosis) {
if (StringUtils.isNotBlank(diagnosisSource)) {
return Message.ok().data("diagnosisMsg", diagnosisMsg);
}
}
if (null == jobDiagnosis) {
diagnosisMsg = JobhistoryUtils.getDiagnosisMsg(taskID);
jobDiagnosis = new JobDiagnosis();
jobDiagnosis.setJobHistoryId(Long.valueOf(taskID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<select id="selectByJobIdAndSource" resultType="org.apache.linkis.jobhistory.entity.JobDiagnosis">
SELECT id, job_history_id, diagnosis_content, created_time, updated_time, only_read, diagnosis_source
FROM linkis_ps_job_history_diagnosis
WHERE job_history_id = #{0}
WHERE job_history_id = #{id}
<if test="diagnosisSource != null">
AND diagnosis_source = #{diagnosisSource}
</if>
Expand Down
Loading
Loading