Skip to content

Commit 70a3986

Browse files
author
Roger Torrentsgenerós
authored
feat: emit pod events on drain (#703)
* feat: emit pod events on drain * fix: satisfy PR requested change * chore: use recorderBufferSize const
1 parent 0a16f7d commit 70a3986

File tree

5 files changed

+84
-10
lines changed

5 files changed

+84
-10
lines changed

cmd/node-termination-handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio
407407
}
408408

409409
func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error {
410-
err := node.CordonAndDrain(nodeName, drainEvent.Description)
410+
err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder.EventRecorder)
411411
if err != nil {
412412
if errors.IsNotFound(err) {
413413
log.Err(err).Msgf("node '%s' not found in the cluster", nodeName)

pkg/node/node.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/apimachinery/pkg/api/errors"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/labels"
32+
"k8s.io/apimachinery/pkg/runtime"
3233
"k8s.io/apimachinery/pkg/types"
3334
"k8s.io/client-go/kubernetes"
3435
"k8s.io/client-go/rest"
@@ -64,6 +65,13 @@ const (
6465
maxTaintValueLength = 63
6566
)
6667

68+
const (
69+
// PodEvictReason is the event reason emitted for Pod evictions during node drain
70+
PodEvictReason = "PodEviction"
71+
// PodEvictMsg is the event message emitted for Pod evictions during node drain
72+
PodEvictMsgFmt = "Pod evicted due to node drain (node %s)"
73+
)
74+
6775
var (
6876
maxRetryDeadline time.Duration = 5 * time.Second
6977
conflictRetryInterval time.Duration = 750 * time.Millisecond
@@ -95,7 +103,7 @@ func NewWithValues(nthConfig config.Config, drainHelper *drain.Helper, uptime up
95103
}
96104

97105
// CordonAndDrain will cordon the node and evict pods based on the config
98-
func (n Node) CordonAndDrain(nodeName string, reason string) error {
106+
func (n Node) CordonAndDrain(nodeName string, reason string, recorder recorderInterface) error {
99107
if n.nthConfig.DryRun {
100108
log.Info().Str("node_name", nodeName).Str("reason", reason).Msg("Node would have been cordoned and drained, but dry-run flag was set.")
101109
return nil
@@ -114,6 +122,25 @@ func (n Node) CordonAndDrain(nodeName string, reason string) error {
114122
if err != nil {
115123
return err
116124
}
125+
// Emit events for all pods that will be evicted
126+
if recorder != nil {
127+
pods, err := n.fetchAllPods(nodeName)
128+
if err == nil {
129+
for _, pod := range pods.Items {
130+
podRef := &corev1.ObjectReference{
131+
Kind: "Pod",
132+
Name: pod.Name,
133+
Namespace: pod.Namespace,
134+
}
135+
annotations := make(map[string]string)
136+
annotations["node"] = nodeName
137+
for k, v := range pod.GetLabels() {
138+
annotations[k] = v
139+
}
140+
recorder.AnnotatedEventf(podRef, annotations, corev1.EventTypeNormal, PodEvictReason, PodEvictMsgFmt, nodeName)
141+
}
142+
}
143+
}
117144
err = drain.RunNodeDrain(n.drainHelper, node.Name)
118145
if err != nil {
119146
return err
@@ -800,3 +827,7 @@ func filterPodForDeletion(podName string) func(pod corev1.Pod) drain.PodDeleteSt
800827
return drain.MakePodDeleteStatusOkay()
801828
}
802829
}
830+
831+
type recorderInterface interface {
832+
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{})
833+
}

pkg/node/node_internal_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ package node
1515

1616
import (
1717
"context"
18-
"io/ioutil"
1918
"os"
2019
"strconv"
2120
"testing"
@@ -86,7 +85,7 @@ func TestUncordonIfRebootedFileReadError(t *testing.T) {
8685

8786
func TestUncordonIfRebootedSystemNotRestarted(t *testing.T) {
8887
d1 := []byte("350735.47 234388.90")
89-
err := ioutil.WriteFile(testFile, d1, 0644)
88+
err := os.WriteFile(testFile, d1, 0644)
9089
h.Ok(t, err)
9190

9291
client := fake.NewSimpleClientset()
@@ -110,7 +109,7 @@ func TestUncordonIfRebootedSystemNotRestarted(t *testing.T) {
110109

111110
func TestUncordonIfRebootedFailureToRemoveLabel(t *testing.T) {
112111
d1 := []byte("0 234388.90")
113-
err := ioutil.WriteFile(testFile, d1, 0644)
112+
err := os.WriteFile(testFile, d1, 0644)
114113
h.Ok(t, err)
115114

116115
client := fake.NewSimpleClientset()
@@ -134,7 +133,7 @@ func TestUncordonIfRebootedFailureToRemoveLabel(t *testing.T) {
134133

135134
func TestUncordonIfRebootedFailureSuccess(t *testing.T) {
136135
d1 := []byte("0 234388.90")
137-
err := ioutil.WriteFile(testFile, d1, 0644)
136+
err := os.WriteFile(testFile, d1, 0644)
138137
h.Ok(t, err)
139138

140139
client := fake.NewSimpleClientset()

pkg/node/node_test.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package node_test
1616
import (
1717
"context"
1818
"strconv"
19+
"strings"
1920
"testing"
2021
"time"
2122

@@ -28,9 +29,13 @@ import (
2829
v1 "k8s.io/api/core/v1"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/client-go/kubernetes/fake"
32+
"k8s.io/client-go/tools/record"
3133
"k8s.io/kubectl/pkg/drain"
3234
)
3335

36+
// Size of the fakeRecorder buffer
37+
const recorderBufferSize = 10
38+
3439
var nodeName = "NAME"
3540

3641
func getDrainHelper(client *fake.Clientset) *drain.Helper {
@@ -61,7 +66,11 @@ func TestDryRun(t *testing.T) {
6166
tNode, err := node.New(config.Config{DryRun: true})
6267
h.Ok(t, err)
6368

64-
err = tNode.CordonAndDrain(nodeName, "cordonReason")
69+
fakeRecorder := record.NewFakeRecorder(recorderBufferSize)
70+
defer close(fakeRecorder.Events)
71+
72+
err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
73+
6574
h.Ok(t, err)
6675

6776
err = tNode.Cordon(nodeName, "cordonReason")
@@ -98,6 +107,7 @@ func TestNewFailure(t *testing.T) {
98107
}
99108

100109
func TestDrainSuccess(t *testing.T) {
110+
isOwnerController := true
101111
client := fake.NewSimpleClientset()
102112
_, err := client.CoreV1().Nodes().Create(
103113
context.Background(),
@@ -106,14 +116,48 @@ func TestDrainSuccess(t *testing.T) {
106116
},
107117
metav1.CreateOptions{})
108118
h.Ok(t, err)
119+
120+
_, err = client.CoreV1().Pods("default").Create(
121+
context.Background(),
122+
&v1.Pod{
123+
ObjectMeta: metav1.ObjectMeta{
124+
GenerateName: "cool-app-pod-",
125+
OwnerReferences: []metav1.OwnerReference{
126+
{
127+
APIVersion: "apps/v1",
128+
Name: "cool-app",
129+
Kind: "ReplicaSet",
130+
Controller: &isOwnerController,
131+
},
132+
},
133+
},
134+
Spec: v1.PodSpec{
135+
NodeName: nodeName,
136+
},
137+
},
138+
metav1.CreateOptions{})
139+
h.Ok(t, err)
140+
141+
fakeRecorder := record.NewFakeRecorder(recorderBufferSize)
142+
109143
tNode := getNode(t, getDrainHelper(client))
110-
err = tNode.CordonAndDrain(nodeName, "cordonReason")
144+
err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
145+
close(fakeRecorder.Events)
111146
h.Ok(t, err)
147+
expectedEventArrived := false
148+
for event := range fakeRecorder.Events {
149+
if strings.Contains(event, "Normal PodEviction Pod evicted due to node drain") {
150+
expectedEventArrived = true
151+
}
152+
}
153+
h.Assert(t, expectedEventArrived, "PodEvicted event was not emitted")
112154
}
113155

114156
func TestDrainCordonNodeFailure(t *testing.T) {
157+
fakeRecorder := record.NewFakeRecorder(recorderBufferSize)
158+
defer close(fakeRecorder.Events)
115159
tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset()))
116-
err := tNode.CordonAndDrain(nodeName, "cordonReason")
160+
err := tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
117161
h.Assert(t, true, "Failed to return error on CordonAndDrain failing to cordon node", err != nil)
118162
}
119163

pkg/observability/k8s-events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func InitK8sEventRecorder(enabled bool, nodeName string, sqsMode bool, nodeMetad
113113
}
114114

115115
broadcaster := record.NewBroadcaster()
116-
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("default")})
116+
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
117117

118118
return K8sEventRecorder{
119119
annotations: annotations,

0 commit comments

Comments
 (0)