diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index ab2011c74be..1e02babe937 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -281,11 +281,12 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) response match { case e: ErrorExecuteResponse => val props: util.Map[String, String] = engineCreationContext.getOptions - val aiSqlEnable: String = props.getOrDefault("linkis.ai.sql.enable", "false").toString + val taskRetry: String = + props.getOrDefault("linkis.task.retry.switch", "false").toString val retryNum: Int = Integer.valueOf(props.getOrDefault("linkis.ai.retry.num", "0").toString) - if (retryEnable && !props.isEmpty && "true".equals(aiSqlEnable) && retryNum > 0) { + if (retryEnable && !props.isEmpty && "true".equals(taskRetry) && retryNum > 0) { logger.info( s"aisql execute failed, with index: ${index} retryNum: ${retryNum}, and will retry", e.t diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/exception/EntranceErrorCode.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/exception/EntranceErrorCode.java index 065f436e0a9..d1835f2621f 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/exception/EntranceErrorCode.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/exception/EntranceErrorCode.java @@ -32,7 +32,9 @@ public enum EntranceErrorCode { JOB_UPDATE_FAILED(20016, "job update failed"), VARIABLE_NULL_EXCEPTION(20017, "variable is null"), USER_NULL_EXCEPTION(20018, "User information not obtained"), - USER_IP_EXCEPTION(20019, "User IP address is not configured"); + USER_IP_EXCEPTION(20019, "User IP address is not configured"), + METRICS_PARAMS_EXCEPTION(20020, "metricsParams is null"), + YARN_RESOURCE_YARN_PARAMS_EXCEPTION(20021, "yarnResource is null"); private int errCode; private String desc; diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java index 306e5bbf2d2..39b3f58c71b 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java @@ -29,7 +29,9 @@ import org.apache.linkis.governance.common.entity.job.JobRequest; import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext; import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineType; import org.apache.linkis.manager.label.entity.entrance.ExecuteOnceLabel; +import org.apache.linkis.manager.label.utils.LabelUtil; import org.apache.linkis.protocol.engine.JobProgressInfo; import org.apache.linkis.protocol.utils.TaskUtils; import org.apache.linkis.scheduler.executer.OutputExecuteResponse; @@ -154,31 +156,70 @@ public boolean onJobFailed( } boolean containsAny = false; + // 文本匹配 String errorDescArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_DESC(); + // 错误码匹配 String errorCodeArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES(); - for (String keyword : errorDescArray.split(",")) { - if (errorDesc.contains(keyword.trim()) || errorCodeArray.contains(errorCode + "")) { - containsAny = true; - break; - } - } - - if (!containsAny) { - return false; - } - + // 正则匹配 + String errorDescRegex = EntranceConfiguration.SUPPORTED_RETRY_ERROR_DESC_REGEX(); + final EntranceJob entranceJob = (EntranceJob) job; + String engineType = LabelUtil.getEngineType(entranceJob.getJobRequest().getLabels()); AtomicBoolean canRetry = new AtomicBoolean(false); - String aiSqlKey = EntranceConfiguration.AI_SQL_KEY().key(); String retryNumKey = EntranceConfiguration.RETRY_NUM_KEY().key(); - final EntranceJob entranceJob = (EntranceJob) job; - - // 处理广播表 - String dataFrameKey = EntranceConfiguration.SUPPORT_ADD_RETRY_CODE_KEYS(); - if (containsAny(errorDesc, dataFrameKey)) { - entranceJob - .getJobRequest() - .setExecutionCode("set spark.sql.autoBroadcastJoinThreshold=-1; " + code); + if (engineType.equals(EngineType.JDBC().toString()) && StringUtils.isNotBlank(errorDescRegex)) { + // JDBC执行正则匹配 + for (String regex : errorDescRegex.split(",")) { + String trimmedRegex = regex.trim(); + if (StringUtils.isNotBlank(trimmedRegex)) { + try { + Pattern pattern = Pattern.compile(trimmedRegex, Pattern.CASE_INSENSITIVE); + if (pattern.matcher(errorDesc).find()) { + containsAny = true; + break; + } + } catch (Exception e) { + logger.warn("Invalid regex pattern: {}", trimmedRegex, e); + } + } + } + if (!containsAny) { + return false; + } + JobRequest jobRequest = entranceJob.getJobRequest(); + Pattern queryPattern = + Pattern.compile( + "Query timeout. Increase the query_timeout session variable and retry|Query exceeded time limit of (\\S+) seconds", + Pattern.CASE_INSENSITIVE); + Pattern newPlannerPattern = + Pattern.compile("StarRocks planner use long time", Pattern.CASE_INSENSITIVE); + Pattern queryQueuePendingPattern = + Pattern.compile("pending timeout", Pattern.CASE_INSENSITIVE); + if (queryPattern.matcher(errorDesc).find()) { + jobRequest.setExecutionCode("SET query_timeout = 1200;\n" + code); + } else if (newPlannerPattern.matcher(errorDesc).find()) { + jobRequest.setExecutionCode("SET new_planner_optimize_timeout = 300000;\n" + code); + } else if (queryQueuePendingPattern.matcher(errorDesc).find()) { + jobRequest.setExecutionCode("SET query_queue_pending_timeout_second = 3600;\n" + code); + } + } else { + for (String keyword : errorDescArray.split(",")) { + if (errorDesc.contains(keyword.trim()) || errorCodeArray.contains(errorCode + "")) { + containsAny = true; + break; + } + } + if (!containsAny) { + return false; + } + // 处理广播表 + String dataFrameKey = EntranceConfiguration.SUPPORT_ADD_RETRY_CODE_KEYS(); + if (containsAny(errorDesc, dataFrameKey) + && engineType.equals(EngineType.SPARK().toString())) { + entranceJob + .getJobRequest() + .setExecutionCode("set spark.sql.autoBroadcastJoinThreshold=-1; " + code); + } } Map startupMap = TaskUtils.getStartupMap(props); diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index 4afb1b15b59..4547a2a0096 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -17,6 +17,7 @@ package org.apache.linkis.entrance +import org.apache.linkis.common.conf.TimeType import org.apache.linkis.common.exception.{ErrorException, LinkisException, LinkisRuntimeException} import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} @@ -34,13 +35,16 @@ import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.protocol.constants.TaskConstant import org.apache.linkis.protocol.utils.TaskUtils import org.apache.linkis.rpc.Sender -import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ENGINE_PRIORITY_RUNTIME_KEY, FIFO_QUEUE_STRATEGY, PFIFO_SCHEDULER_STRATEGY} +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ + ENGINE_PRIORITY_RUNTIME_KEY, + FIFO_QUEUE_STRATEGY, + PFIFO_SCHEDULER_STRATEGY +} import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState} import org.apache.linkis.server.conf.ServerConfiguration import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils -import org.apache.linkis.common.conf.TimeType import java.{lang, util} import java.text.MessageFormat @@ -322,41 +326,58 @@ abstract class EntranceServer extends Logging { // 新增任务诊断检测逻辑 if (EntranceConfiguration.TASK_DIAGNOSIS_ENABLE) { logger.info("Start to check tasks for diagnosis") - val diagnosisTime = System.currentTimeMillis() - new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT).toLong + val diagnosisTime = System.currentTimeMillis() - new TimeType( + EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT + ).toLong undoneTask .filter { job => val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels) engineType.contains( EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE - ) && job.createTime < diagnosisTime && !diagnosedJobs.containsKey(job.getId()) + ) && job.createTime < diagnosisTime && !diagnosedJobs.containsKey( + job.getJobRequest.getId.toString + ) } .foreach { job => try { // 检查并设置诊断标记,确保每个任务只被诊断一次 val jobId = job.getJobRequest.getId - diagnosedJobs.putIfAbsent(job.getId(), true) + diagnosedJobs.putIfAbsent(jobId.toString, true) // 调用Doctoris诊断系统 - logger.info(s"Start to diagnose spark job ${job.getId()}") + logger.info(s"Start to diagnose spark job $jobId") job match { case entranceJob: EntranceJob => // 调用doctoris实时诊断API job.getLogListener.foreach( - _.onLogUpdate(job, LogUtils.generateInfo("Start to diagnose spark job")) + _.onLogUpdate( + job, + LogUtils.generateInfo( + s"Start diagnosing Spark task as task execution time exceeds 5 minutes,jobId:$jobId" + ) + ) ) val response = EntranceUtils.taskRealtimeDiagnose(entranceJob.getJobRequest, null) - logger.info(s"Finished to diagnose job ${job - .getId()}, result: ${response.result}, reason: ${response.reason}") + logger.info( + s"Finished to diagnose job $jobId, result: ${response.result}, reason: ${response.reason}" + ) // 更新诊断信息 if (response.success) { // 构造诊断更新请求 JobHistoryHelper.addDiagnosis(job.getJobRequest.getId, response.result) logger.info(s"Successfully updated diagnosis for job ${job.getId()}") } + job.getLogListener.foreach( + _.onLogUpdate( + job, + LogUtils.generateInfo( + s"Finished diagnosing task,This decision took ${response.duration} seconds" + ) + ) + ) case _ => logger.warn(s"Job $jobId is not an EntranceJob, skip diagnosis") } - } catch { case t: Throwable => logger.warn(s"Diagnose job ${job.getId()} failed. ${t.getMessage}", t) @@ -367,7 +388,7 @@ abstract class EntranceServer extends Logging { } } // 定期清理diagnosedJobs,只保留未完成任务的记录 - val undoneJobIds = undoneTask.map(_.getId()).toSet + val undoneJobIds = undoneTask.map(_.getJobRequest.getId.toString()).toSet val iterator = diagnosedJobs.keySet().iterator() while (iterator.hasNext) { val jobId = iterator.next() @@ -377,7 +398,6 @@ abstract class EntranceServer extends Logging { } logger.info(s"Cleaned diagnosedJobs cache, current size: ${diagnosedJobs.size()}") } - }, new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT_SCAN).toLong, new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT_SCAN).toLong, diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index 5c226a4968a..825fd0e8d02 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -301,6 +301,12 @@ object EntranceConfiguration { "Spark application has already stopped,Spark application sc has already stopped,Failed to allocate a page,dataFrame to local exception,org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator" ).getValue + val SUPPORTED_RETRY_ERROR_DESC_REGEX = + CommonVars( + "linkis.entrance.supported.retry.error.desc.regex", + "Query timeout.*,Query exceeded time limit.*,Memory of process exceed limit.*,Backend node not found,Connection reset by peer,StarRocks planner use long time,pending timeout" + ).getValue + val SUPPORT_ADD_RETRY_CODE_KEYS = CommonVars( "linkis.entrance.supported.add.retry.code.keys", @@ -433,7 +439,7 @@ object EntranceConfiguration { CommonVars[String]("linkis.doctor.sensitive.sql.check.whitelist", "").getValue var DOCTOR_SENSITIVE_SQL_CHECK_ENGINETYPE = - CommonVars[String]("linkis.doctor.sensitive.sql.check.engine.type", "hive,spark").getValue + CommonVars[String]("linkis.doctor.sensitive.sql.check.engine.type", "hive,spark,jdbc").getValue // 任务诊断配置 val TASK_DIAGNOSIS_ENABLE = CommonVars[Boolean]("linkis.task.diagnosis.enable", false).getValue diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/EntranceUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/EntranceUtils.scala index 33448795f1f..24f80685e15 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/EntranceUtils.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/EntranceUtils.scala @@ -29,7 +29,11 @@ import org.apache.linkis.datasource.client.request.{ import org.apache.linkis.datasourcemanager.common.domain.DataSource import org.apache.linkis.entrance.conf.EntranceConfiguration import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary -import org.apache.linkis.entrance.exception.EntranceRPCException +import org.apache.linkis.entrance.exception.{ + EntranceErrorCode, + EntranceErrorException, + EntranceRPCException +} import org.apache.linkis.governance.common.entity.job.JobRequest import org.apache.linkis.governance.common.protocol.conf.{DepartmentRequest, DepartmentResponse} import org.apache.linkis.instance.label.client.InstanceLabelClient @@ -349,12 +353,18 @@ object EntranceUtils extends Logging { val params = new util.HashMap[String, AnyRef]() val metricsParams = job.getMetrics if (MapUtils.isEmpty(metricsParams)) { - return DoctorResponse(success = false, "Diagnose error, metricsParams is empty!") + throw new EntranceErrorException( + EntranceErrorCode.METRICS_PARAMS_EXCEPTION.getErrCode, + EntranceErrorCode.METRICS_PARAMS_EXCEPTION.getDesc + ) } val yarnResource = MapUtils.getMap(metricsParams, "yarnResource", new util.HashMap[String, AnyRef]()) if (MapUtils.isEmpty(yarnResource)) { - DoctorResponse(success = false, "Diagnose error, yarnResource is empty!") + throw new EntranceErrorException( + EntranceErrorCode.YARN_RESOURCE_YARN_PARAMS_EXCEPTION.getErrCode, + EntranceErrorCode.YARN_RESOURCE_YARN_PARAMS_EXCEPTION.getDesc + ) } else { var response: DoctorResponse = null yarnResource.keySet().toArray.foreach { application => diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala index afcea3142eb..9d05100f3dd 100644 --- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala @@ -18,6 +18,7 @@ package org.apache.linkis.manager.engineplugin.jdbc.executor import org.apache.linkis.common.conf.Configuration +import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{OverloadUtils, Utils} import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask import org.apache.linkis.engineconn.computation.executor.execute.{ @@ -68,6 +69,7 @@ import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord} import org.apache.commons.collections.MapUtils import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils +import org.apache.commons.lang3.exception.ExceptionUtils import org.springframework.util.CollectionUtils @@ -235,6 +237,11 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) } catch { case e: Throwable => logger.error(s"Cannot run $code", e) + // 推送堆栈信息到前端 + val errorStr = ExceptionUtils.getStackTrace(e) + engineExecutorContext.appendStdout( + LogUtils.generateERROR(s"execute code failed!: $errorStr") + ) return ErrorExecuteResponse(e.getMessage, e) } finally { connectionManager.removeStatement(taskId) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index 4b6a339a08c..8bbfb3f6f20 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -803,6 +803,9 @@ public Message queryFailedTaskDiagnosis( if (!QueryUtils.checkNumberValid(taskID)) { throw new LinkisCommonErrorException(21304, "Invalid taskID : " + taskID); } + if (StringUtils.isBlank(diagnosisSource)) { + diagnosisSource = "linkis"; + } JobHistory jobHistory = null; boolean isAdmin = Configuration.isJobHistoryAdmin(username) || Configuration.isAdmin(username); boolean isDepartmentAdmin = Configuration.isDepartmentAdmin(username); @@ -838,7 +841,7 @@ public Message queryFailedTaskDiagnosis( JobDiagnosis jobDiagnosis = jobHistoryDiagnosisService.selectByJobId(Long.valueOf(taskID), diagnosisSource); if (null == jobDiagnosis) { - if (StringUtils.isNotBlank(diagnosisSource)) { + if (diagnosisSource.equals("doctoris")) { return Message.ok().data("diagnosisMsg", diagnosisMsg); } diagnosisMsg = JobhistoryUtils.getDiagnosisMsg(taskID); @@ -847,6 +850,7 @@ public Message queryFailedTaskDiagnosis( jobDiagnosis.setDiagnosisContent(diagnosisMsg); jobDiagnosis.setCreatedTime(new Date()); jobDiagnosis.setUpdatedDate(new Date()); + jobDiagnosis.setDiagnosisSource("linkis"); if (TaskStatus.isComplete(TaskStatus.valueOf(jobStatus))) { jobDiagnosis.setOnlyRead("1"); } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobDiagnosisMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobDiagnosisMapper.xml index 61e3f62b349..3241a21c1fb 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobDiagnosisMapper.xml +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobDiagnosisMapper.xml @@ -34,10 +34,9 @@ diagnosis_content = #{diagnosisContent}, created_time = #{createdTime}, updated_time = #{updatedTime}, - only_read = #{onlyRead}, - diagnosis_source = #{diagnosisSource} + only_read = #{onlyRead} - WHERE job_history_id = #{jobHistoryId} + WHERE job_history_id = #{jobHistoryId} and diagnosis_source = #{diagnosisSource}