Skip to content

Commit 2e7b0a6

Browse files
authored
Add retry mechanism for patch operations to handle resource version conflicts (#207)
fixes: aws-controllers-k8s/community#2267 ACK controllers were experiencing race conditions when multiple controllers attempted to patch the same Kubernetes resource simultaneously. This occurred because Kubernetes uses optimistic concurrency control through resource versions - each resource has a version that must match when performing updates. When multiple controllers read a resource and then attempt to patch it, only the first patch succeeds while subsequent patches fail with HTTP 409 conflict errors due to stale resource versions. Description of changes: A retry mechanism using Kubernetes client-go's standard `retry.RetryOnConflict` function with `retry.DefaultBackoff` configuration. When a patch operation encounters a resource version conflict, the mechanism automatically refreshes the resource version by fetching the latest state from the API server, then retries the patch operation with the updated version. This process repeats up to 5 times with exponential backoff (starting at 100ms, doubling each time, capped at 1 second), ensuring that temporary conflicts are resolved automatically while avoiding excessive API server load. Ref: - https://github.com/kubernetes-sigs/kro/blob/c1bc05c5384245d3ef2a5104198459732552b148/pkg/controller/resourcegraphdefinition/controller_status.go#L34-L75 - https://github.com/prometheus-operator/prometheus-operator/blob/3ff38ebe6216c28da344abd6e4f698831309b959/pkg/k8sutil/k8sutil.go#L239-L376 - https://github.com/prometheus-operator/prometheus-operator/blob/3ff38ebe6216c28da344abd6e4f698831309b959/pkg/k8sutil/k8sutil.go#L513-L522 By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent 2ab09c6 commit 2e7b0a6

File tree

4 files changed

+111
-36
lines changed

4 files changed

+111
-36
lines changed

pkg/runtime/adoption_reconciler.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ func (r *adoptionReconciler) patchAdoptedCondition(
375375
adoptedCondition.Status = corev1.ConditionTrue
376376
}
377377

378-
return r.patchStatus(ctx, res, base)
378+
return r.patchResourceStatus(ctx, res, base)
379379
}
380380

381381
// isAdopted returns true if the AdoptedResource is in a terminal adoption state
@@ -411,7 +411,7 @@ func (r *adoptionReconciler) markManaged(
411411
) error {
412412
base := res.DeepCopy()
413413
k8sctrlutil.AddFinalizer(res, adoptionFinalizerString)
414-
return r.patchMetadataAndSpec(ctx, res, base)
414+
return r.patchResourceMetadataAndSpec(ctx, res, base)
415415
}
416416

417417
// markUnmanaged removes the supplied resource from management by ACK.
@@ -423,7 +423,7 @@ func (r *adoptionReconciler) markUnmanaged(
423423
) error {
424424
base := res.DeepCopy()
425425
k8sctrlutil.RemoveFinalizer(res, adoptionFinalizerString)
426-
return r.patchMetadataAndSpec(ctx, res, base)
426+
return r.patchResourceMetadataAndSpec(ctx, res, base)
427427
}
428428

429429
// handleReconcileError will handle errors from reconcile handlers, which
@@ -561,12 +561,12 @@ func (r *adoptionReconciler) getRegion(
561561
return ackv1alpha1.AWSRegion(r.cfg.Region)
562562
}
563563

564-
// patchMetadataAndSpec patches the Metadata and Spec for AdoptedResource into
564+
// patchResourceMetadataAndSpec patches the Metadata and Spec for AdoptedResource into
565565
// k8s. The adopted resource 'res' also gets updated with content returned from
566566
// apiserver.
567-
// TODO(vijtrip2@): Refactor this and use single 'patchMetadataAndSpec' method
567+
// TODO(vijtrip2@): Refactor this and use single 'patchResourceMetadataAndSpec' method
568568
// for reconciler and adoptionReconciler
569-
func (r *adoptionReconciler) patchMetadataAndSpec(
569+
func (r *adoptionReconciler) patchResourceMetadataAndSpec(
570570
ctx context.Context,
571571
res *ackv1alpha1.AdoptedResource,
572572
base *ackv1alpha1.AdoptedResource,
@@ -576,21 +576,22 @@ func (r *adoptionReconciler) patchMetadataAndSpec(
576576
// Keep a copy of status field to reset the status of 'res' after patch call
577577
resStatusCopy := res.DeepCopy().Status
578578

579-
err := patchWithoutCancel(ctx, r.kc, res, client.MergeFrom(base))
579+
err := patchMetadataAndSpec(ctx, r.kc, res, client.MergeFrom(base))
580580
res.Status = resStatusCopy
581581
return err
582582
}
583583

584-
// patchStatus patches the Status for AdoptedResource into k8s. The adopted
584+
// patchResourceStatus patches the Status for AdoptedResource into k8s. The adopted
585585
// resource 'res' also gets updated with the content returned from apiserver.
586-
// TODO(vijtrip2): Refactor this and use single 'patchStatus' method
586+
// TODO(vijtrip2): Refactor this and use single 'patchResourceStatus' method
587587
// for reconciler and adoptionReconciler
588-
func (r *adoptionReconciler) patchStatus(
588+
func (r *adoptionReconciler) patchResourceStatus(
589589
ctx context.Context,
590590
res *ackv1alpha1.AdoptedResource,
591591
base *ackv1alpha1.AdoptedResource,
592592
) error {
593-
return patchStatusWithoutCancel(ctx, r.kc, res, client.MergeFrom(base))
593+
rlog := ackrtlog.FromContext(ctx)
594+
return patchStatus(ctx, r.kc, r.apiReader, res, client.MergeFrom(base), rlog)
594595
}
595596

596597
// NewAdoptionReconciler returns a new adoptionReconciler object

pkg/runtime/field_export_reconciler.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (r *fieldExportReconciler) Sync(
150150

151151
r.resetConditions(ctx, &desired)
152152
defer func() {
153-
r.patchStatus(ctx, &desired, latest)
153+
r.patchResourceStatus(ctx, &desired, latest)
154154
}()
155155

156156
// Get the field from the resource
@@ -324,7 +324,7 @@ func (r *fieldExportReconciler) writeToConfigMap(
324324
cm.Data[key] = sourceValue
325325

326326
ackrtlog.DebugFieldExport(r.log, desired, "patching target config map")
327-
err = patchWithoutCancel(ctx, r.kc, cm, patch)
327+
err = patchMetadataAndSpec(ctx, r.kc, cm, patch)
328328
if err != nil {
329329
return err
330330
}
@@ -371,7 +371,7 @@ func (r *fieldExportReconciler) writeToSecret(
371371
secret.Data[key] = []byte(sourceValue)
372372

373373
ackrtlog.DebugFieldExport(r.log, desired, "patching target secret")
374-
err = patchWithoutCancel(ctx, r.kc, secret, patch)
374+
err = patchMetadataAndSpec(ctx, r.kc, secret, patch)
375375
if err != nil {
376376
return err
377377
}
@@ -519,16 +519,17 @@ func (r *fieldExportReconciler) patchTerminalCondition(
519519
return nil
520520
}
521521

522-
// patchStatus patches the Status for FieldExport into k8s. The field export
522+
// patchResourceStatus patches the Status for FieldExport into k8s. The field export
523523
// 'res' also gets updated with the content returned from apiserver.
524-
// TODO(vijtrip2): Refactor this and use single 'patchStatus' method
524+
// TODO(vijtrip2): Refactor this and use single 'patchResourceStatus' method
525525
// for all reconcilers
526-
func (r *fieldExportReconciler) patchStatus(
526+
func (r *fieldExportReconciler) patchResourceStatus(
527527
ctx context.Context,
528528
res *ackv1alpha1.FieldExport,
529529
base *ackv1alpha1.FieldExport,
530530
) error {
531-
return patchStatusWithoutCancel(ctx, r.kc, res, client.MergeFrom(base))
531+
rlog := ackrtlog.FromContext(ctx)
532+
return patchStatus(ctx, r.kc, r.apiReader, res, client.MergeFrom(base), rlog)
532533
}
533534

534535
// markManaged places the supplied resource under the management of ACK.
@@ -541,7 +542,7 @@ func (r *fieldExportReconciler) markManaged(
541542
if !k8sctrlutil.ContainsFinalizer(res, fieldExportFinalizerString) {
542543
base := res.DeepCopy()
543544
k8sctrlutil.AddFinalizer(res, fieldExportFinalizerString)
544-
return r.patchMetadataAndSpec(ctx, res, base)
545+
return r.patchResourceMetadataAndSpec(ctx, res, base)
545546
}
546547
return nil
547548
}
@@ -556,17 +557,17 @@ func (r *fieldExportReconciler) markUnmanaged(
556557
if k8sctrlutil.ContainsFinalizer(res, fieldExportFinalizerString) {
557558
base := res.DeepCopy()
558559
k8sctrlutil.RemoveFinalizer(res, fieldExportFinalizerString)
559-
return r.patchMetadataAndSpec(ctx, res, base)
560+
return r.patchResourceMetadataAndSpec(ctx, res, base)
560561
}
561562
return nil
562563
}
563564

564-
// patchMetadataAndSpec patches the Metadata and Spec for FieldExport into
565+
// patchResourceMetadataAndSpec patches the Metadata and Spec for FieldExport into
565566
// k8s. The field export 'res' also gets updated with content returned from
566567
// apiserver.
567-
// TODO(vijtrip2@): Refactor this and use single 'patchMetadataAndSpec' method
568+
// TODO(vijtrip2@): Refactor this and use single 'patchResourceMetadataAndSpec' method
568569
// for all reconcilers
569-
func (r *fieldExportReconciler) patchMetadataAndSpec(
570+
func (r *fieldExportReconciler) patchResourceMetadataAndSpec(
570571
ctx context.Context,
571572
res *ackv1alpha1.FieldExport,
572573
base *ackv1alpha1.FieldExport,
@@ -576,7 +577,7 @@ func (r *fieldExportReconciler) patchMetadataAndSpec(
576577
// Keep a copy of status field to reset the status of 'res' after patch call
577578
resStatusCopy := res.DeepCopy().Status
578579

579-
err := patchWithoutCancel(ctx, r.kc, res, client.MergeFrom(base))
580+
err := patchMetadataAndSpec(ctx, r.kc, res, client.MergeFrom(base))
580581
res.Status = resStatusCopy
581582
return err
582583
}

pkg/runtime/reconciler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ func (r *resourceReconciler) patchResourceMetadataAndSpec(
892892
lorig := latestCleaned.DeepCopy()
893893
patch := client.MergeFrom(desiredCleaned.RuntimeObject())
894894

895-
err = patchWithoutCancel(ctx, r.kc, latestCleaned.RuntimeObject(), patch)
895+
err = patchMetadataAndSpec(ctx, r.kc, latestCleaned.RuntimeObject(), patch)
896896

897897
if err == nil {
898898
if rlog.IsDebugEnabled() {
@@ -930,7 +930,7 @@ func (r *resourceReconciler) patchResourceStatus(
930930
lobj := latest.DeepCopy().RuntimeObject()
931931
patch := client.MergeFrom(dobj)
932932

933-
err = patchStatusWithoutCancel(ctx, r.kc, lobj, patch)
933+
err = patchStatus(ctx, r.kc, r.apiReader, lobj, patch, rlog)
934934

935935
if err == nil {
936936
if rlog.IsDebugEnabled() {

pkg/runtime/util.go

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strings"
2121

2222
corev1 "k8s.io/api/core/v1"
23+
"k8s.io/client-go/util/retry"
2324
"sigs.k8s.io/controller-runtime/pkg/client"
2425

2526
ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1"
@@ -41,6 +42,10 @@ const (
4142
AdoptionPolicy_Adopt AdoptionPolicy = "adopt"
4243
// AdoptPolicy is ...
4344
AdoptionPolicy_AdoptOrCreate AdoptionPolicy = "adopt-or-create"
45+
46+
// Operation types for patch operations
47+
OperationType_MetadataSpec = "metadata+spec"
48+
OperationType_Status = "status"
4449
)
4550

4651
// IsAdopted returns true if the supplied AWSResource was created with a
@@ -144,34 +149,102 @@ func getAdoptionFields(res acktypes.AWSResource) string {
144149
return ""
145150
}
146151

147-
// patchWithoutCancel performs a patch operation using context.WithoutCancel to prevent
152+
// patchObject performs a patch operation using context.WithoutCancel to prevent
148153
// patch operations from being cancelled while preserving context values.
154+
// It automatically determines whether to patch spec/metadata or status based on operationType.
149155
//
150156
// NOTE(rushmash91): The 30s SIGTERM grace period acts as the effective timeout -
151157
// no additional timeout needed to avoid interfering with normal Kubernetes client
152158
// timeout/retry strategy.
153-
func patchWithoutCancel(
159+
func patchObject(
154160
ctx context.Context,
155161
kc client.Client,
156162
obj client.Object,
157163
patch client.Patch,
164+
operationType string,
158165
) error {
159166
patchCtx := context.WithoutCancel(ctx)
167+
if operationType == OperationType_Status {
168+
return kc.Status().Patch(patchCtx, obj, patch)
169+
}
160170
return kc.Patch(patchCtx, obj, patch)
161171
}
162172

163-
// patchStatusWithoutCancel performs a status patch operation using context.WithoutCancel
164-
// to prevent patch operations from being cancelled while preserving context values.
173+
// patchWithRetry performs a patch operation with retry on conflicts using client-go's standard retry mechanism.
174+
// This helps handle race conditions where multiple controllers try to update the same resource.
165175
//
166-
// NOTE(rushmash91): The 30s SIGTERM grace period acts as the effective timeout -
167-
// no additional timeout needed to avoid interfering with normal Kubernetes client
168-
// timeout/retry strategy.
169-
func patchStatusWithoutCancel(
176+
// When a conflict occurs (HTTP 409), it refreshes the resource version and retries the patch operation.
177+
func patchWithRetry(
170178
ctx context.Context,
171179
kc client.Client,
180+
apiReader client.Reader,
172181
obj client.Object,
173182
patch client.Patch,
183+
logger acktypes.Logger,
184+
operationType string,
174185
) error {
175-
patchCtx := context.WithoutCancel(ctx)
176-
return kc.Status().Patch(patchCtx, obj, patch)
186+
attempt := 0
187+
188+
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
189+
attempt++
190+
191+
// For retry attempts (after first attempt), refresh the object from the API server
192+
if attempt > 1 {
193+
logger.Debug(fmt.Sprintf("%s patch conflict detected, refreshing resource version", operationType),
194+
"attempt", attempt,
195+
"object", client.ObjectKeyFromObject(obj))
196+
197+
key := client.ObjectKeyFromObject(obj)
198+
freshObject := obj.DeepCopyObject().(client.Object)
199+
200+
err := apiReader.Get(ctx, key, freshObject)
201+
if err != nil {
202+
logger.Info(fmt.Sprintf("failed to refresh resource version during %s patch retry", operationType),
203+
"attempt", attempt,
204+
"object", key,
205+
"error", err.Error())
206+
return err
207+
}
208+
209+
// Update the resource version on our object
210+
obj.SetResourceVersion(freshObject.GetResourceVersion())
211+
}
212+
213+
err := patchObject(ctx, kc, obj, patch, operationType)
214+
if err == nil && attempt > 1 {
215+
logger.Debug(fmt.Sprintf("%s patch succeeded after retry", operationType),
216+
"attempts", attempt,
217+
"object", client.ObjectKeyFromObject(obj))
218+
}
219+
220+
if err != nil && attempt == 1 {
221+
logger.Debug(fmt.Sprintf("%s patch failed on first attempt", operationType),
222+
"object", client.ObjectKeyFromObject(obj),
223+
"error", err.Error())
224+
}
225+
226+
return err
227+
})
228+
}
229+
230+
// patchMetadataAndSpec performs a patch operation using client-go's standard retry mechanism on conflicts.
231+
func patchMetadataAndSpec(
232+
ctx context.Context,
233+
kc client.Client,
234+
obj client.Object,
235+
patch client.Patch,
236+
) error {
237+
return patchObject(ctx, kc, obj, patch, OperationType_MetadataSpec)
238+
}
239+
240+
// patchStatus performs a status patch operation using client-go's standard retry mechanism on conflicts.
241+
func patchStatus(
242+
ctx context.Context,
243+
kc client.Client,
244+
apiReader client.Reader,
245+
obj client.Object,
246+
patch client.Patch,
247+
logger acktypes.Logger,
248+
) error {
249+
return patchWithRetry(ctx, kc, apiReader, obj, patch, logger, OperationType_Status)
177250
}

0 commit comments

Comments
 (0)