Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,12 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
response match {
case e: ErrorExecuteResponse =>
val props: util.Map[String, String] = engineCreationContext.getOptions
val aiSqlEnable: String = props.getOrDefault("linkis.ai.sql.enable", "false").toString
val taskRetry: String =
props.getOrDefault("linkis.task.retry.switch", "false").toString
val retryNum: Int =
Integer.valueOf(props.getOrDefault("linkis.ai.retry.num", "0").toString)

if (retryEnable && !props.isEmpty && "true".equals(aiSqlEnable) && retryNum > 0) {
if (retryEnable && !props.isEmpty && "true".equals(taskRetry) && retryNum > 0) {
logger.info(
s"aisql execute failed, with index: ${index} retryNum: ${retryNum}, and will retry",
e.t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public enum EntranceErrorCode {
JOB_UPDATE_FAILED(20016, "job update failed"),
VARIABLE_NULL_EXCEPTION(20017, "variable is null"),
USER_NULL_EXCEPTION(20018, "User information not obtained"),
USER_IP_EXCEPTION(20019, "User IP address is not configured");
USER_IP_EXCEPTION(20019, "User IP address is not configured"),
METRICS_PARAMS_EXCEPTION(20020, "metricsParams is null"),
YARN_RESOURCE_YARN_PARAMS_EXCEPTION(20021, "yarnResource is null");

private int errCode;
private String desc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.linkis.governance.common.entity.job.JobRequest;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineType;
import org.apache.linkis.manager.label.entity.entrance.ExecuteOnceLabel;
import org.apache.linkis.manager.label.utils.LabelUtil;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.protocol.utils.TaskUtils;
import org.apache.linkis.scheduler.executer.OutputExecuteResponse;
Expand Down Expand Up @@ -154,31 +156,70 @@ public boolean onJobFailed(
}

boolean containsAny = false;
// 文本匹配
String errorDescArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_DESC();
// 错误码匹配
String errorCodeArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES();
for (String keyword : errorDescArray.split(",")) {
if (errorDesc.contains(keyword.trim()) || errorCodeArray.contains(errorCode + "")) {
containsAny = true;
break;
}
}

if (!containsAny) {
return false;
}

// 正则匹配
String errorDescRegex = EntranceConfiguration.SUPPORTED_RETRY_ERROR_DESC_REGEX();
final EntranceJob entranceJob = (EntranceJob) job;
String engineType = LabelUtil.getEngineType(entranceJob.getJobRequest().getLabels());
AtomicBoolean canRetry = new AtomicBoolean(false);
String aiSqlKey = EntranceConfiguration.AI_SQL_KEY().key();
String retryNumKey = EntranceConfiguration.RETRY_NUM_KEY().key();

final EntranceJob entranceJob = (EntranceJob) job;

// 处理广播表
String dataFrameKey = EntranceConfiguration.SUPPORT_ADD_RETRY_CODE_KEYS();
if (containsAny(errorDesc, dataFrameKey)) {
entranceJob
.getJobRequest()
.setExecutionCode("set spark.sql.autoBroadcastJoinThreshold=-1; " + code);
if (engineType.equals(EngineType.JDBC().toString()) && StringUtils.isNotBlank(errorDescRegex)) {
// JDBC执行正则匹配
for (String regex : errorDescRegex.split(",")) {
String trimmedRegex = regex.trim();
if (StringUtils.isNotBlank(trimmedRegex)) {
try {
Pattern pattern = Pattern.compile(trimmedRegex, Pattern.CASE_INSENSITIVE);
if (pattern.matcher(errorDesc).find()) {
containsAny = true;
break;
}
} catch (Exception e) {
logger.warn("Invalid regex pattern: {}", trimmedRegex, e);
}
}
}
if (!containsAny) {
return false;
}
JobRequest jobRequest = entranceJob.getJobRequest();
Pattern queryPattern =
Pattern.compile(
"Query timeout. Increase the query_timeout session variable and retry|Query exceeded time limit of (\\S+) seconds",
Pattern.CASE_INSENSITIVE);
Pattern newPlannerPattern =
Pattern.compile("StarRocks planner use long time", Pattern.CASE_INSENSITIVE);
Pattern queryQueuePendingPattern =
Pattern.compile("pending timeout", Pattern.CASE_INSENSITIVE);
if (queryPattern.matcher(errorDesc).find()) {
jobRequest.setExecutionCode("SET query_timeout = 1200;\n" + code);
} else if (newPlannerPattern.matcher(errorDesc).find()) {
jobRequest.setExecutionCode("SET new_planner_optimize_timeout = 300000;\n" + code);
} else if (queryQueuePendingPattern.matcher(errorDesc).find()) {
jobRequest.setExecutionCode("SET query_queue_pending_timeout_second = 3600;\n" + code);
}
} else {
for (String keyword : errorDescArray.split(",")) {
if (errorDesc.contains(keyword.trim()) || errorCodeArray.contains(errorCode + "")) {
containsAny = true;
break;
}
}
if (!containsAny) {
return false;
}
// 处理广播表
String dataFrameKey = EntranceConfiguration.SUPPORT_ADD_RETRY_CODE_KEYS();
if (containsAny(errorDesc, dataFrameKey)
&& engineType.equals(EngineType.SPARK().toString())) {
entranceJob
.getJobRequest()
.setExecutionCode("set spark.sql.autoBroadcastJoinThreshold=-1; " + code);
}
}

Map<String, Object> startupMap = TaskUtils.getStartupMap(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.entrance

import org.apache.linkis.common.conf.TimeType
import org.apache.linkis.common.exception.{ErrorException, LinkisException, LinkisRuntimeException}
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
Expand All @@ -34,13 +35,16 @@ import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ENGINE_PRIORITY_RUNTIME_KEY, FIFO_QUEUE_STRATEGY, PFIFO_SCHEDULER_STRATEGY}
import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{
ENGINE_PRIORITY_RUNTIME_KEY,
FIFO_QUEUE_STRATEGY,
PFIFO_SCHEDULER_STRATEGY
}
import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState}
import org.apache.linkis.server.conf.ServerConfiguration

import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.conf.TimeType

import java.{lang, util}
import java.text.MessageFormat
Expand Down Expand Up @@ -322,41 +326,58 @@ abstract class EntranceServer extends Logging {
// 新增任务诊断检测逻辑
if (EntranceConfiguration.TASK_DIAGNOSIS_ENABLE) {
logger.info("Start to check tasks for diagnosis")
val diagnosisTime = System.currentTimeMillis() - new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT).toLong
val diagnosisTime = System.currentTimeMillis() - new TimeType(
EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT
).toLong
undoneTask
.filter { job =>
val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels)
engineType.contains(
EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE
) && job.createTime < diagnosisTime && !diagnosedJobs.containsKey(job.getId())
) && job.createTime < diagnosisTime && !diagnosedJobs.containsKey(
job.getJobRequest.getId.toString
)
}
.foreach { job =>
try {
// 检查并设置诊断标记,确保每个任务只被诊断一次
val jobId = job.getJobRequest.getId
diagnosedJobs.putIfAbsent(job.getId(), true)
diagnosedJobs.putIfAbsent(jobId.toString, true)
// 调用Doctoris诊断系统
logger.info(s"Start to diagnose spark job ${job.getId()}")
logger.info(s"Start to diagnose spark job $jobId")
job match {
case entranceJob: EntranceJob =>
// 调用doctoris实时诊断API
job.getLogListener.foreach(
_.onLogUpdate(job, LogUtils.generateInfo("Start to diagnose spark job"))
_.onLogUpdate(
job,
LogUtils.generateInfo(
s"Start diagnosing Spark task as task execution time exceeds 5 minutes,jobId:$jobId"
)
)
)
val response =
EntranceUtils.taskRealtimeDiagnose(entranceJob.getJobRequest, null)
logger.info(s"Finished to diagnose job ${job
.getId()}, result: ${response.result}, reason: ${response.reason}")
logger.info(
s"Finished to diagnose job $jobId, result: ${response.result}, reason: ${response.reason}"
)
// 更新诊断信息
if (response.success) {
// 构造诊断更新请求
JobHistoryHelper.addDiagnosis(job.getJobRequest.getId, response.result)
logger.info(s"Successfully updated diagnosis for job ${job.getId()}")
}
job.getLogListener.foreach(
_.onLogUpdate(
job,
LogUtils.generateInfo(
s"Finished diagnosing task,This decision took ${response.duration} seconds"
)
)
)
case _ =>
logger.warn(s"Job $jobId is not an EntranceJob, skip diagnosis")
}

} catch {
case t: Throwable =>
logger.warn(s"Diagnose job ${job.getId()} failed. ${t.getMessage}", t)
Expand All @@ -367,7 +388,7 @@ abstract class EntranceServer extends Logging {
}
}
// 定期清理diagnosedJobs,只保留未完成任务的记录
val undoneJobIds = undoneTask.map(_.getId()).toSet
val undoneJobIds = undoneTask.map(_.getJobRequest.getId.toString()).toSet
val iterator = diagnosedJobs.keySet().iterator()
while (iterator.hasNext) {
val jobId = iterator.next()
Expand All @@ -377,7 +398,6 @@ abstract class EntranceServer extends Logging {
}
logger.info(s"Cleaned diagnosedJobs cache, current size: ${diagnosedJobs.size()}")
}

},
new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT_SCAN).toLong,
new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT_SCAN).toLong,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ object EntranceConfiguration {
"Spark application has already stopped,Spark application sc has already stopped,Failed to allocate a page,dataFrame to local exception,org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator"
).getValue

val SUPPORTED_RETRY_ERROR_DESC_REGEX =
CommonVars(
"linkis.entrance.supported.retry.error.desc.regex",
"Query timeout.*,Query exceeded time limit.*,Memory of process exceed limit.*,Backend node not found,Connection reset by peer,StarRocks planner use long time,pending timeout"
).getValue

val SUPPORT_ADD_RETRY_CODE_KEYS =
CommonVars(
"linkis.entrance.supported.add.retry.code.keys",
Expand Down Expand Up @@ -433,7 +439,7 @@ object EntranceConfiguration {
CommonVars[String]("linkis.doctor.sensitive.sql.check.whitelist", "").getValue

var DOCTOR_SENSITIVE_SQL_CHECK_ENGINETYPE =
CommonVars[String]("linkis.doctor.sensitive.sql.check.engine.type", "hive,spark").getValue
CommonVars[String]("linkis.doctor.sensitive.sql.check.engine.type", "hive,spark,jdbc").getValue

// 任务诊断配置
val TASK_DIAGNOSIS_ENABLE = CommonVars[Boolean]("linkis.task.diagnosis.enable", false).getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ import org.apache.linkis.datasource.client.request.{
import org.apache.linkis.datasourcemanager.common.domain.DataSource
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary
import org.apache.linkis.entrance.exception.EntranceRPCException
import org.apache.linkis.entrance.exception.{
EntranceErrorCode,
EntranceErrorException,
EntranceRPCException
}
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.protocol.conf.{DepartmentRequest, DepartmentResponse}
import org.apache.linkis.instance.label.client.InstanceLabelClient
Expand Down Expand Up @@ -349,12 +353,18 @@ object EntranceUtils extends Logging {
val params = new util.HashMap[String, AnyRef]()
val metricsParams = job.getMetrics
if (MapUtils.isEmpty(metricsParams)) {
return DoctorResponse(success = false, "Diagnose error, metricsParams is empty!")
throw new EntranceErrorException(
EntranceErrorCode.METRICS_PARAMS_EXCEPTION.getErrCode,
EntranceErrorCode.METRICS_PARAMS_EXCEPTION.getDesc
)
}
val yarnResource =
MapUtils.getMap(metricsParams, "yarnResource", new util.HashMap[String, AnyRef]())
if (MapUtils.isEmpty(yarnResource)) {
DoctorResponse(success = false, "Diagnose error, yarnResource is empty!")
throw new EntranceErrorException(
EntranceErrorCode.YARN_RESOURCE_YARN_PARAMS_EXCEPTION.getErrCode,
EntranceErrorCode.YARN_RESOURCE_YARN_PARAMS_EXCEPTION.getDesc
)
} else {
var response: DoctorResponse = null
yarnResource.keySet().toArray.foreach { application =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.manager.engineplugin.jdbc.executor

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
import org.apache.linkis.engineconn.computation.executor.execute.{
Expand Down Expand Up @@ -68,6 +69,7 @@ import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.commons.collections.MapUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import org.springframework.util.CollectionUtils

Expand Down Expand Up @@ -235,6 +237,11 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
} catch {
case e: Throwable =>
logger.error(s"Cannot run $code", e)
// 推送堆栈信息到前端
val errorStr = ExceptionUtils.getStackTrace(e)
engineExecutorContext.appendStdout(
LogUtils.generateERROR(s"execute code failed!: $errorStr")
)
return ErrorExecuteResponse(e.getMessage, e)
} finally {
connectionManager.removeStatement(taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,9 @@ public Message queryFailedTaskDiagnosis(
if (!QueryUtils.checkNumberValid(taskID)) {
throw new LinkisCommonErrorException(21304, "Invalid taskID : " + taskID);
}
if (StringUtils.isBlank(diagnosisSource)) {
diagnosisSource = "linkis";
}
JobHistory jobHistory = null;
boolean isAdmin = Configuration.isJobHistoryAdmin(username) || Configuration.isAdmin(username);
boolean isDepartmentAdmin = Configuration.isDepartmentAdmin(username);
Expand Down Expand Up @@ -838,7 +841,7 @@ public Message queryFailedTaskDiagnosis(
JobDiagnosis jobDiagnosis =
jobHistoryDiagnosisService.selectByJobId(Long.valueOf(taskID), diagnosisSource);
if (null == jobDiagnosis) {
if (StringUtils.isNotBlank(diagnosisSource)) {
if (diagnosisSource.equals("doctoris")) {
return Message.ok().data("diagnosisMsg", diagnosisMsg);
}
diagnosisMsg = JobhistoryUtils.getDiagnosisMsg(taskID);
Expand All @@ -847,6 +850,7 @@ public Message queryFailedTaskDiagnosis(
jobDiagnosis.setDiagnosisContent(diagnosisMsg);
jobDiagnosis.setCreatedTime(new Date());
jobDiagnosis.setUpdatedDate(new Date());
jobDiagnosis.setDiagnosisSource("linkis");
if (TaskStatus.isComplete(TaskStatus.valueOf(jobStatus))) {
jobDiagnosis.setOnlyRead("1");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
<if test="diagnosisContent != null">diagnosis_content = #{diagnosisContent},</if>
<if test="createdTime != null">created_time = #{createdTime},</if>
<if test="updatedTime != null">updated_time = #{updatedTime},</if>
<if test="onlyRead != null">only_read = #{onlyRead},</if>
<if test="diagnosisSource != null">diagnosis_source = #{diagnosisSource}</if>
<if test="onlyRead != null">only_read = #{onlyRead}</if>
</trim>
WHERE job_history_id = #{jobHistoryId}
WHERE job_history_id = #{jobHistoryId} and diagnosis_source = #{diagnosisSource}
</update>

<select id="selectByJobIdAndSource" resultType="org.apache.linkis.jobhistory.entity.JobDiagnosis">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,14 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
val oldStatus: String = jobHistoryMapper.selectJobHistoryStatusForUpdate(jobReq.getId)
val startUpMap: util.Map[String, AnyRef] =
TaskUtils.getStartupMap(jobReqUpdate.jobReq.getParams)
val aiSqlEnable: AnyRef = startUpMap.getOrDefault("linkis.ai.sql.enable", "false")
if (oldStatus != null && !shouldUpdate(oldStatus, jobReq.getStatus, aiSqlEnable.toString)) {
val taskRetrySwitch: AnyRef = startUpMap.getOrDefault("linkis.task.retry.switch", "false")
if (
oldStatus != null && !shouldUpdate(
oldStatus,
jobReq.getStatus,
taskRetrySwitch.toString
)
) {
throw new QueryException(
120001,
s"jobId:${jobReq.getId},oldStatus(在数据库中的task状态为):${oldStatus}," +
Expand Down
Loading