From 6c6167b0ee4d37cbef1a0e307086f1878c1c8331 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Wed, 10 Dec 2025 14:12:49 +0100 Subject: [PATCH 1/2] refactor logging in controllabletask we reduce boiler-plate by having a common set of log fields that are defined once. infologger.level constant is used instead of "level". "cmd" and "command" log fields were unified to "command". --- executor/executable/controllabletask.go | 481 ++++++++---------------- 1 file changed, 162 insertions(+), 319 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index ff8d162f..6f085655 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -71,48 +71,39 @@ type CommitResponse struct { } func (t *ControllableTask) Launch() error { - log.WithFields(logrus.Fields{ + defaultLogFields := logrus.Fields{ "taskId": t.ti.TaskID.GetValue(), "taskName": t.ti.Name, - "level": infologger.IL_Devel, "partition": t.knownEnvironmentId.String(), "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch begin") + } + + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch begin") launchStartTime := time.Now() defer utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch", - log.WithFields(logrus.Fields{ - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - })) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) t.pendingFinalTaskStateCh = make(chan mesos.TaskState, 1) // we use this to receive a pending status update if the task was killed taskCmd, err := prepareTaskCmd(t.Tci) if err != nil { msg := "cannot build task command" - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). + log.WithFields(defaultLogFields). + WithError(err). Error(msg) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, msg+": "+err.Error()) return err } - log.WithField("payload", string(t.ti.GetData()[:])). - WithField("task", t.ti.Name). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("payload", string(t.ti.GetData()[:])). + WithField(infologger.Level, infologger.IL_Devel). Debug("starting task asynchronously") // We fork out into a goroutine for the actual process management. @@ -121,14 +112,10 @@ func (t *ControllableTask) Launch() error { // via channels. go func() { truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - log.WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch.async begin") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async begin") // Set up pipes for controlled process var errStdout, errStderr error @@ -137,36 +124,22 @@ func (t *ControllableTask) Launch() error { err = taskCmd.Start() if err != nil { - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). Error("failed to run task") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) _ = t.doTermIntKill(-taskCmd.Process.Pid) return } - log.WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - Debug("task launched") + log.WithFields(defaultLogFields).Debug("task launched") utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) if t.Tci.Stdout == nil { none := "none" @@ -181,10 +154,8 @@ func (t *ControllableTask) Launch() error { case "stdout": go func() { entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). WithField("nohooks", true) writer := &logger.SafeLogrusWriter{ Entry: entry, @@ -196,10 +167,8 @@ func (t *ControllableTask) Launch() error { case "all": go func() { entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) writer := &logger.SafeLogrusWriter{ Entry: entry, PrintFunc: entry.Debug, @@ -217,10 +186,8 @@ func (t *ControllableTask) Launch() error { case "stdout": go func() { entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). WithField("nohooks", true) writer := &logger.SafeLogrusWriter{ Entry: entry, @@ -232,10 +199,8 @@ func (t *ControllableTask) Launch() error { case "all": go func() { entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) writer := &logger.SafeLogrusWriter{ Entry: entry, PrintFunc: entry.Warn, @@ -249,19 +214,15 @@ func (t *ControllableTask) Launch() error { }() } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "controlPort": t.Tci.ControlPort, - "controlMode": t.Tci.ControlMode.String(), - "task": t.ti.Name, - "id": t.ti.TaskID.Value, - "path": taskCmd.Path, - "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", - "argc": len(taskCmd.Args), - "level": infologger.IL_Devel, - }). - Debug("starting gRPC client") + log.WithFields(defaultLogFields). + WithFields(logrus.Fields{ + "controlPort": t.Tci.ControlPort, + "controlMode": t.Tci.ControlMode.String(), + "path": taskCmd.Path, + "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", + "argc": len(taskCmd.Args), + infologger.Level: infologger.IL_Devel, + }).Debug("starting gRPC client") controlTransport := executorcmd.ProtobufTransport for _, v := range taskCmd.Args { @@ -278,26 +239,15 @@ func (t *ControllableTask) Launch() error { t.Tci.ControlMode, controlTransport, log.WithPrefix("executorcmd"). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - }, - ), + WithFields(defaultLogFields). + WithField("command", truncatedCmd), ) if t.rpc == nil { err = errors.New("rpc client is nil") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - }). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + WithField(infologger.Level, infologger.IL_Devel). Error("could not start gRPC client") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) @@ -308,60 +258,37 @@ func (t *ControllableTask) Launch() error { utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) utils.TimeTrack(rpcDialStartTime, "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) statePollingStartTime := time.Now() elapsed := 0 * time.Second for { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "elapsed": elapsed.String(), - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField("elapsed", elapsed.String()). + WithField(infologger.Level, infologger.IL_Devel). Debug("polling task for IDLE state reached") response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err != nil { log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - }). + WithFields(defaultLogFields). + WithField("state", response.GetState()). + WithField("command", truncatedCmd). Info("cannot query task status") } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("state", response.GetState()). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). Debug("task status queried") t.knownPid = int(response.GetPid()) } @@ -369,12 +296,9 @@ func (t *ControllableTask) Launch() error { reachedState := t.rpc.FromDeviceState(response.GetState()) if reachedState == "STANDBY" && err == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). + log.WithFields(defaultLogFields). WithField("command", truncatedCmd). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Debug("task running and ready for control input") break } else if reachedState == "DONE" || reachedState == "ERROR" { @@ -385,24 +309,20 @@ func (t *ControllableTask) Launch() error { // of this process, so we must rely on the PGID of the containing shell pid = -t.rpc.TaskCmd.Process.Pid } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.Name). + log.WithFields(defaultLogFields). Debug("sending SIGKILL (9) to task") _ = syscall.Kill(pid, syscall.SIGKILL) _ = stdoutIn.Close() _ = stderrIn.Close() - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Debug("task killed") + log.WithFields(defaultLogFields). + Debug("task killed") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, "task reached wrong state on startup") return } else if elapsed >= startupTimeout { err = errors.New("timeout while waiting for task startup") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Error(err.Error()) + log.WithFields(defaultLogFields). + Error(err.Error()) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) _ = t.rpc.Close() t.rpc = nil @@ -412,9 +332,7 @@ func (t *ControllableTask) Launch() error { return } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). + log.WithFields(defaultLogFields). WithField("command", truncatedCmd). Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) time.Sleep(startupPollingInterval) @@ -424,33 +342,21 @@ func (t *ControllableTask) Launch() error { utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) utils.TimeTrack(statePollingStartTime, "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) // Set up event stream from task esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) if err != nil { - log.WithField("task", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). Error("cannot set up event stream from task") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) _ = t.rpc.Close() @@ -465,22 +371,15 @@ func (t *ControllableTask) Launch() error { jsonEvent, err := json.Marshal(taskMessage) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). Warning("error marshaling message") } else { t.sendMessage(jsonEvent) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - }).Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") } // Process events from task in yet another goroutine @@ -492,32 +391,23 @@ func (t *ControllableTask) Launch() error { } for { if t.rpc == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). Debug("event stream done") break } esr, err := esc.Recv() if err == io.EOF { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). Debug("event stream EOF") break } if err != nil { - log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithField("errorType", reflect.TypeOf(err)). - WithField("level", infologger.IL_Devel). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). Warning("error receiving event") if status.Code(err) == codes.Unavailable { break @@ -528,29 +418,18 @@ func (t *ControllableTask) Launch() error { deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) if deviceEvent == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") break } else { - taskId := deo.TaskId.Value - if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithField("taskPid", t.knownPid). Debug("END_OF_STREAM DeviceEvent received - notifying environment") } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithField("taskPid", t.knownPid). - WithField("level", infologger.IL_Support). + WithField(infologger.Level, infologger.IL_Support). Warningf("task transitioned to ERROR on its own - notifying environment") } } @@ -562,31 +441,21 @@ func (t *ControllableTask) Launch() error { err = taskCmd.Wait() // ^ when this unblocks, the task is done - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }).Debug("task done (taskCmd.Wait unblocks), preparing final update") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task done (taskCmd.Wait unblocks), preparing final update") pendingState := mesos.TASK_FINISHED if err != nil { taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("level", infologger.IL_Ops). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Ops). Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "error": err.Error(), - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). Error("task terminated with error (details):") pendingState = mesos.TASK_FAILED } @@ -599,42 +468,27 @@ func (t *ControllableTask) Launch() error { if t.rpc != nil { _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). Debug("rpc client closed") t.rpc = nil - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). Debug("rpc client removed") } if errStdout != nil || errStderr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "errStderr": errStderr, - "errStdout": errStdout, - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("errStderr", errStderr). + WithField("errStdout", errStdout). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). Warning("failed to capture stdout or stderr of task") } t.sendStatus(t.knownEnvironmentId, pendingState, "") }() - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "task": t.ti.Name, - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") return nil } @@ -662,6 +516,13 @@ func (t *ControllableTask) Transition(cmd *executorcmd.ExecutorCommand_Transitio } func (t *ControllableTask) Kill() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + var ( pid = 0 reachedState = "UNKNOWN" // FIXME: should be LAUNCHING or similar @@ -670,11 +531,9 @@ func (t *ControllableTask) Kill() error { defer cancel() response, err := t.rpc.GetState(cxt, &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err == nil { // we successfully got the state from the task - log.WithField("nativeState", response.GetState()). - WithField("taskId", t.ti.GetTaskID()). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("nativeState", response.GetState()). + WithField(infologger.Level, infologger.IL_Devel). Debug("task status queried for upcoming soft kill") // NOTE: we acquire the transitioner-dependent STANDBY equivalent state @@ -717,14 +576,13 @@ func (t *ControllableTask) Kill() error { for reachedState != "DONE" { cmd := nextTransition(reachedState) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithFields(logrus.Fields{ - "evt": cmd.Event, - "src": cmd.Source, - "dst": cmd.Destination, - "targetList": cmd.TargetList, - "level": infologger.IL_Devel, + "evt": cmd.Event, + "src": cmd.Source, + "dst": cmd.Destination, + "targetList": cmd.TargetList, + infologger.Level: infologger.IL_Devel, }). Debug("state DONE not reached, about to commit transition") @@ -741,10 +599,8 @@ func (t *ControllableTask) Kill() error { select { case commitResponse = <-commitDone: case <-time.After(KILL_TRANSITION_TIMEOUT): - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence timed out") } // timeout we should break @@ -752,29 +608,23 @@ func (t *ControllableTask) Kill() error { break } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithField("newState", commitResponse.newState). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Debug("transition committed") if commitResponse.transitionError != nil || len(cmd.Event) == 0 { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence error") break } reachedState = commitResponse.newState } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Debug("teardown transition sequence done") pid = int(response.GetPid()) if pid == 0 { @@ -785,10 +635,8 @@ func (t *ControllableTask) Kill() error { // If GetState didn't succeed during this Kill code path, but might still have // at some earlier point during the lifetime of this task. // Either way, we might or might not have the true PID. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warn("cannot query task status for graceful process termination") pid = t.knownPid if pid == 0 { @@ -802,9 +650,8 @@ func (t *ControllableTask) Kill() error { // terminate the shell that is wrapping the command, so we avoid using // negative PID is all other cases in order to allow FairMQ cleanup to // run. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithError(err).WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). + WithError(err). Warn("task PID not known from task, using containing shell PGID") } } @@ -813,16 +660,12 @@ func (t *ControllableTask) Kill() error { t.rpc = nil if reachedState == "DONE" { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debugf("task reached DONE, will wait %.1fs before terminating it", DONE_TIMEOUT.Seconds()) t.pendingFinalTaskStateCh <- mesos.TASK_FINISHED time.Sleep(DONE_TIMEOUT) } else { // something went wrong - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debug("task died already or will be killed soon") t.pendingFinalTaskStateCh <- mesos.TASK_KILLED } @@ -830,25 +673,26 @@ func (t *ControllableTask) Kill() error { if pidExists(pid) { return t.doTermIntKill(pid) } else { - log.WithField("taskId", t.ti.GetTaskID()). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). Debugf("task terminated on its own") return nil } } func (t *ControllableTask) doKill9(pid int) error { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + log.WithFields(defaultLogFields). Debug("sending SIGKILL (9) to task") killErr := syscall.Kill(pid, syscall.SIGKILL) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGKILL failed") } @@ -856,18 +700,21 @@ func (t *ControllableTask) doKill9(pid int) error { } func (t *ControllableTask) doTermIntKill(pid int) error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + killErrCh := make(chan error) go func() { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGTERM (15) to task") err := syscall.Kill(pid, syscall.SIGTERM) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGTERM failed") } killErrCh <- err @@ -882,16 +729,12 @@ func (t *ControllableTask) doTermIntKill(pid int) error { if pidExists(pid) { // SIGINT for the "Waiting for graceful device shutdown. // Hit Ctrl-C again to abort immediately" message. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGINT (2) to task") killErr = syscall.Kill(pid, syscall.SIGINT) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGINT failed") } time.Sleep(SIGINT_TIMEOUT) From 16a0e5d5bf55bd94d437faf62d2f47790064c7ac Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 11 Dec 2025 12:43:33 +0100 Subject: [PATCH 2/2] Refactor controllable task Launch Launch is broken into smaller pieces for better readability. Some minor error handling bugs are automatically fixed on the occasion and some more are marked with fixme and will be taken care of in separate commits. --- executor/executable/controllabletask.go | 671 +++++++++++++----------- 1 file changed, 351 insertions(+), 320 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index 6f085655..72bbf737 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -29,6 +29,7 @@ import ( "encoding/json" "errors" "io" + "os/exec" "reflect" "strings" "syscall" @@ -111,108 +112,54 @@ func (t *ControllableTask) Launch() error { // Anything in the following goroutine must not touch *internalState, except // via channels. go func() { - truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel). - Debug("executor.ControllableTask.Launch.async begin") + t.doLaunchTask(taskCmd, launchStartTime) + }() - // Set up pipes for controlled process - var errStdout, errStderr error - stdoutIn, _ := taskCmd.StdoutPipe() - stderrIn, _ := taskCmd.StderrPipe() + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") + return nil +} - err = taskCmd.Start() - if err != nil { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithError(err). - Error("failed to run task") +func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time.Time) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return - } - log.WithFields(defaultLogFields).Debug("task launched") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async begin") - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + // Set up pipes for controlled process. They have to be retrieved before starting the task. + stdoutIn, _ := taskCmd.StdoutPipe() + stderrIn, _ := taskCmd.StderrPipe() - if t.Tci.Stdout == nil { - none := "none" - t.Tci.Stdout = &none - } - if t.Tci.Stderr == nil { - none := "none" - t.Tci.Stderr = &none - } + err := taskCmd.Start() + if err != nil { + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + Error("failed to run task") - switch *t.Tci.Stdout { - case "stdout": - go func() { - entry := log.WithPrefix("task-stdout"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() - case "all": - go func() { - entry := log.WithPrefix("task-stdout"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() - default: - go func() { - _, errStdout = io.Copy(io.Discard, stdoutIn) - }() - } + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + // fixme: i confirmed on staging, that's a nil access! taskCmd.Process is not set if Start() fails + _ = t.doTermIntKill(-taskCmd.Process.Pid) + // fixme: shouldn't we also close pipes, as we do in some other error cases later? + return + } + log.WithFields(defaultLogFields).Debug("task launched") - switch *t.Tci.Stderr { - case "stdout": - go func() { - entry := log.WithPrefix("task-stderr"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() - case "all": - go func() { - entry := log.WithPrefix("task-stderr"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() - default: - go func() { - _, errStderr = io.Copy(io.Discard, stderrIn) - }() - } + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + t.initTaskStdLogging(stdoutIn, stderrIn) log.WithFields(defaultLogFields). WithFields(logrus.Fields{ @@ -224,273 +171,357 @@ func (t *ControllableTask) Launch() error { infologger.Level: infologger.IL_Devel, }).Debug("starting gRPC client") - controlTransport := executorcmd.ProtobufTransport - for _, v := range taskCmd.Args { - if strings.Contains(v, "-P OCClite") { - controlTransport = executorcmd.JsonTransport - break - } + controlTransport := executorcmd.ProtobufTransport + for _, v := range taskCmd.Args { + if strings.Contains(v, "-P OCClite") { + controlTransport = executorcmd.JsonTransport + break } + } - rpcDialStartTime := time.Now() + rpcDialStartTime := time.Now() + t.rpc = executorcmd.NewClient( + t.Tci.ControlPort, + t.Tci.ControlMode, + controlTransport, + log.WithPrefix("executorcmd"). + WithFields(defaultLogFields), + ) + if t.rpc == nil { + err = errors.New("rpc client is nil") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + WithField(infologger.Level, infologger.IL_Devel). + Error("could not start gRPC client") - t.rpc = executorcmd.NewClient( - t.Tci.ControlPort, - t.Tci.ControlMode, - controlTransport, - log.WithPrefix("executorcmd"). - WithFields(defaultLogFields). - WithField("command", truncatedCmd), - ) - if t.rpc == nil { - err = errors.New("rpc client is nil") - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithError(err). - WithField(infologger.Level, infologger.IL_Devel). - Error("could not start gRPC client") + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + _ = t.doTermIntKill(-taskCmd.Process.Pid) + return + } + t.rpc.TaskCmd = taskCmd - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + utils.TimeTrack(rpcDialStartTime, + "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + err = t.pollTaskForStandbyState() + if err != nil { + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + + _ = t.rpc.Close() + t.rpc = nil + + pid := t.knownPid + if pid == 0 { + // The pid was never known through a successful `GetState` in the lifetime + // of this process, so we must rely on the PGID of the containing shell + pid = -taskCmd.Process.Pid } - t.rpc.TaskCmd = taskCmd + log.WithFields(defaultLogFields). + Debug("sending SIGKILL (9) to task") + _ = syscall.Kill(pid, syscall.SIGKILL) // fixme: not sure why we do it differently than elsewhere (doTermIntKill) + _ = stdoutIn.Close() + _ = stderrIn.Close() - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + log.WithFields(defaultLogFields). + Debug("task killed") + return + } - utils.TimeTrack(rpcDialStartTime, - "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) - statePollingStartTime := time.Now() - elapsed := 0 * time.Second - for { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField("elapsed", elapsed.String()). - WithField(infologger.Level, infologger.IL_Devel). - Debug("polling task for IDLE state reached") - - response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithError(err). - WithFields(defaultLogFields). - WithField("state", response.GetState()). - WithField("command", truncatedCmd). - Info("cannot query task status") - } else { - log.WithFields(defaultLogFields). - WithField("state", response.GetState()). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel). - Debug("task status queried") - t.knownPid = int(response.GetPid()) - } - // NOTE: we acquire the transitioner-dependent STANDBY equivalent state - reachedState := t.rpc.FromDeviceState(response.GetState()) + // Set up event stream from task + esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Error("cannot set up event stream from task") + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + _ = t.rpc.Close() + t.rpc = nil + // fixme: why don't we kill the task in this error case, but we do in others? + return + } - if reachedState == "STANDBY" && err == nil { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel). - Debug("task running and ready for control input") - break - } else if reachedState == "DONE" || reachedState == "ERROR" { - // something went wrong, the device moved to DONE or ERROR on startup - pid := t.knownPid - if pid == 0 { - // The pid was never known through a successful `GetState` in the lifetime - // of this process, so we must rely on the PGID of the containing shell - pid = -t.rpc.TaskCmd.Process.Pid - } - log.WithFields(defaultLogFields). - Debug("sending SIGKILL (9) to task") - _ = syscall.Kill(pid, syscall.SIGKILL) - _ = stdoutIn.Close() - _ = stderrIn.Close() + // send RUNNING + t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") + taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) + taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - log.WithFields(defaultLogFields). - Debug("task killed") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, "task reached wrong state on startup") - return - } else if elapsed >= startupTimeout { - err = errors.New("timeout while waiting for task startup") - log.WithFields(defaultLogFields). - Error(err.Error()) - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil + jsonEvent, err := json.Marshal(taskMessage) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Warning("error marshaling message") + } else { + t.sendMessage(jsonEvent) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + } + + // Process events from task in yet another goroutine + go func() { + t.processEventsFromTask(esc) + }() - _ = stdoutIn.Close() - _ = stderrIn.Close() + err = taskCmd.Wait() + // ^ when this unblocks, the task is done + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task done (taskCmd.Wait unblocks), preparing final update") - return - } else { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) - time.Sleep(startupPollingInterval) - elapsed += startupPollingInterval - } - } + pendingState := mesos.TASK_FINISHED + if err != nil { + taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Ops). + Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). + Error("task terminated with error (details):") + pendingState = mesos.TASK_FAILED + } - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + select { + case pending := <-t.pendingFinalTaskStateCh: + pendingState = pending + default: + } - utils.TimeTrack(statePollingStartTime, - "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + if t.rpc != nil { + _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much + log.WithFields(defaultLogFields). + Debug("rpc client closed") + t.rpc = nil + log.WithFields(defaultLogFields). + Debug("rpc client removed") + } - // Set up event stream from task - esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithFields(defaultLogFields). - WithError(err). - Error("cannot set up event stream from task") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil - return - } + t.sendStatus(t.knownEnvironmentId, pendingState, "") + return +} - // send RUNNING - t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") - taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) - taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) +func (t *ControllableTask) initTaskStdLogging(stdoutIn io.ReadCloser, stderrIn io.ReadCloser) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } - jsonEvent, err := json.Marshal(taskMessage) - if err != nil { - log.WithFields(defaultLogFields). - WithError(err). - Warning("error marshaling message") - } else { - t.sendMessage(jsonEvent) + if t.Tci.Stdout == nil { + none := "none" + t.Tci.Stdout = &none + } + if t.Tci.Stderr == nil { + none := "none" + t.Tci.Stderr = &none + } + + go func() { + var errStdout error + switch *t.Tci.Stdout { + case "stdout": + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() + case "all": + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() + default: + _, errStdout = io.Copy(io.Discard, stdoutIn) + } + if errStdout != nil { log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). + WithError(errStdout). WithField(infologger.Level, infologger.IL_Devel). - Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + Warning("failed to capture stdout of task") } + }() - // Process events from task in yet another goroutine - go func() { - deo := event.DeviceEventOrigin{ - AgentId: t.ti.AgentID, - ExecutorId: t.ti.GetExecutor().ExecutorID, - TaskId: t.ti.TaskID, + go func() { + var errStderr error + switch *t.Tci.Stderr { + case "stdout": + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, } - for { - if t.rpc == nil { - log.WithFields(defaultLogFields). - WithError(err). - Debug("event stream done") - break - } - esr, err := esc.Recv() - if err == io.EOF { - log.WithFields(defaultLogFields). - WithError(err). - Debug("event stream EOF") - break - } - if err != nil { - log.WithFields(defaultLogFields). - WithField("errorType", reflect.TypeOf(err)). - WithField(infologger.Level, infologger.IL_Devel). - WithError(err). - Warning("error receiving event") - if status.Code(err) == codes.Unavailable { - break - } - continue - } - ev := esr.GetEvent() - - deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) - if deviceEvent == nil { - log.WithFields(defaultLogFields). - Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") - break - } else { - if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { - log.WithFields(defaultLogFields). - WithField("taskPid", t.knownPid). - Debug("END_OF_STREAM DeviceEvent received - notifying environment") - } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { - log.WithFields(defaultLogFields). - WithField("taskPid", t.knownPid). - WithField(infologger.Level, infologger.IL_Support). - Warningf("task transitioned to ERROR on its own - notifying environment") - } - } - deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - - t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() + case "all": + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, } - }() + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() + default: + _, errStderr = io.Copy(io.Discard, stderrIn) + } + if errStderr != nil { + log.WithFields(defaultLogFields). + WithError(errStderr). + WithField(infologger.Level, infologger.IL_Devel). + Warning("failed to capture stderr of task") + } + }() +} - err = taskCmd.Wait() - // ^ when this unblocks, the task is done +func (t *ControllableTask) pollTaskForStandbyState() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + statePollingStartTime := time.Now() + elapsed := 0 * time.Second + for { log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). + WithField("elapsed", elapsed.String()). WithField(infologger.Level, infologger.IL_Devel). - Debug("task done (taskCmd.Wait unblocks), preparing final update") + Debug("polling task for STANDBY state reached") - pendingState := mesos.TASK_FINISHED + response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err != nil { - taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) - log.WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Ops). - Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) + log.WithError(err). + WithFields(defaultLogFields). + WithField("state", response.GetState()). + Info("cannot query task status") + } else { log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). + WithField("state", response.GetState()). WithField(infologger.Level, infologger.IL_Devel). - WithError(err). - Error("task terminated with error (details):") - pendingState = mesos.TASK_FAILED + Debug("task status queried") + t.knownPid = int(response.GetPid()) } + // NOTE: we acquire the transitioner-dependent STANDBY equivalent state + // fixme: that's a possible nil access there, because we do not "continue" on error + reachedState := t.rpc.FromDeviceState(response.GetState()) - select { - case pending := <-t.pendingFinalTaskStateCh: - pendingState = pending - default: + if reachedState == "STANDBY" && err == nil { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task running and ready for control input") + break + } else if reachedState == "DONE" || reachedState == "ERROR" { + // something went wrong, the device moved to DONE or ERROR on startup + return errors.New("task reached wrong state on startup") + } else if elapsed >= startupTimeout { + return errors.New("timeout while trying to poll task") + } else { + log.WithFields(defaultLogFields). + Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) + time.Sleep(startupPollingInterval) + elapsed += startupPollingInterval } + } - if t.rpc != nil { - _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much + utils.TimeTrack(statePollingStartTime, + "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + return nil +} + +func (t *ControllableTask) processEventsFromTask(esc pb.Occ_EventStreamClient) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + deo := event.DeviceEventOrigin{ + AgentId: t.ti.AgentID, + ExecutorId: t.ti.GetExecutor().ExecutorID, + TaskId: t.ti.TaskID, + } + + for { + if t.rpc == nil { log.WithFields(defaultLogFields). - Debug("rpc client closed") - t.rpc = nil + Debug("event stream done") + break + } + esr, err := esc.Recv() + if err == io.EOF { log.WithFields(defaultLogFields). - Debug("rpc client removed") + WithError(err). + Debug("event stream EOF") + break } - - if errStdout != nil || errStderr != nil { + if err != nil { log.WithFields(defaultLogFields). - WithField("errStderr", errStderr). - WithField("errStdout", errStdout). - WithField("command", truncatedCmd). + WithField("errorType", reflect.TypeOf(err)). WithField(infologger.Level, infologger.IL_Devel). - Warning("failed to capture stdout or stderr of task") + WithError(err). + Warning("error receiving event") + if status.Code(err) == codes.Unavailable { + break + } + // fixme: we also get codes.Canceled sometimes, it's probably OK and we should not complain + continue } + ev := esr.GetEvent() - t.sendStatus(t.knownEnvironmentId, pendingState, "") - }() + deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) + if deviceEvent == nil { + log.WithFields(defaultLogFields). + Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") + break + } else { + if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + Debug("END_OF_STREAM DeviceEvent received - notifying environment") + } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + WithField(infologger.Level, infologger.IL_Support). + Warningf("task transitioned to ERROR on its own - notifying environment") + } + } + deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - log.WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Devel). - Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") - return nil + t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + } } func (t *ControllableTask) UnmarshalTransition(data []byte) (cmd *executorcmd.ExecutorCommand_Transition, err error) {