diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index efbd0df9..c8794c7f 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -63,6 +63,7 @@ type ControllableTask struct { taskBase rpc *executorcmd.RpcClient pendingFinalTaskStateCh chan mesos.TaskState + taskDoneCh chan error knownPid int } @@ -159,6 +160,12 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. t.initTaskStdLogging(stdoutIn, stderrIn) + // We start to Wait() for the result already, so we have access to ProcessState on an early failure + t.taskDoneCh = make(chan error, 1) + go func() { + t.taskDoneCh <- taskCmd.Wait() + }() + log.WithFields(defaultLogFields). WithFields(logrus.Fields{ "controlPort": t.Tci.ControlPort, @@ -186,12 +193,20 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. WithFields(defaultLogFields), ) if t.rpc == nil { - err = errors.New("rpc client is nil") - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithError(err). - WithField(infologger.Level, infologger.IL_Devel). - Error("could not start gRPC client") + // Check if the task is still running by checking ProcessState + if taskCmd.ProcessState != nil { + err = errors.New("AliECS executor could not connect to task, likely crashed on startup") + } else { + err = errors.New("AliECS executor could not connect to task, likely took too long to start") + } + + taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) + log.WithFields(logrus.Fields{ + "task": utils.TrimJitPrefix(taskClassName), + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + infologger.Level: infologger.IL_Ops, + }).Error(err.Error()) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) @@ -262,7 +277,7 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. t.processEventsFromTask(esc) }() - err = taskCmd.Wait() + err = <-t.taskDoneCh // ^ when this unblocks, the task is done log.WithFields(defaultLogFields). WithField("command", truncatedCmd). @@ -330,7 +345,8 @@ func (t *ControllableTask) cleanupFailedTask(taskCmd *exec.Cmd) { _ = t.doTermIntKill(-taskCmd.Process.Pid) - err := taskCmd.Wait() + // Wait for task to finish and report the error + err := <-t.taskDoneCh if err != nil { log.WithFields(defaultLogFields). WithField(infologger.Level, infologger.IL_Support). diff --git a/executor/executorcmd/client.go b/executor/executorcmd/client.go index 738a7e52..bc9a2e7a 100644 --- a/executor/executorcmd/client.go +++ b/executor/executorcmd/client.go @@ -80,12 +80,8 @@ func NewClient( log.WithField("error", err.Error()). WithField("endpoint", endpoint). WithField("transport", controlTransportS). - WithField("level", infologger.IL_Trace). + WithField("level", infologger.IL_Devel). Error("gRPC client can't dial") - log.WithField("error", err.Error()). - WithField("endpoint", endpoint). - WithField("level", infologger.IL_Ops). - Error("AliECS executor could not connect to task, possible crash on startup") cancel() if conn != nil {