Skip to content

Commit 6c6addc

Browse files
authored
Add configurable max workers for route controller instances (#672)
* Added environment variable to control max workers for route controllers. Target GC now uses RW lock to accommodate parallel Deploy operations.
1 parent 3a3809b commit 6c6addc

File tree

8 files changed

+59
-20
lines changed

8 files changed

+59
-20
lines changed

docs/guides/environment.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,13 @@ When set as "true", the controller will not use the [AWS Resource Groups Tagging
102102
The Resource Groups Tagging API is only available on the public internet and customers using private clusters will need to enable this feature. When enabled, the controller will use VPC Lattice APIs to lookup tags which are not as performant and requires more API calls.
103103

104104
The Helm chart sets this value to "false" by default.
105+
106+
---
107+
108+
#### `ROUTE_MAX_CONCURRENT_RECONCILES`
109+
110+
**Type:** *int*
111+
112+
**Default:** 1
113+
114+
Maximum number of concurrently running reconcile loops per route type (HTTP, GRPC, TLS)

helm/templates/deployment.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ spec:
9797
value: {{ .Values.webhookEnabled | quote }}
9898
- name: DISABLE_TAGGING_SERVICE_API
9999
value: {{ .Values.disableTaggingServiceApi | quote }}
100+
- name: ROUTE_MAX_CONCURRENT_RECONCILES
101+
value: {{ .Values.routeMaxConcurrentReconciles | quote }}
102+
100103
terminationGracePeriodSeconds: 10
101104
volumes:
102105
- name: webhook-cert

helm/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ defaultServiceNetwork:
8585
latticeEndpoint:
8686
webhookEnabled: true
8787
disableTaggingServiceApi: false
88+
routeMaxConcurrentReconciles:
8889

8990
# TLS cert/key for the webhook. If specified, values must be base64 encoded
9091
webhookTLS:

pkg/config/controller_config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"os"
7+
"strconv"
78

89
"strings"
910

@@ -28,6 +29,7 @@ const (
2829
AWS_ACCOUNT_ID = "AWS_ACCOUNT_ID"
2930
DEV_MODE = "DEV_MODE"
3031
WEBHOOK_ENABLED = "WEBHOOK_ENABLED"
32+
ROUTE_MAX_CONCURRENT_RECONCILES = "ROUTE_MAX_CONCURRENT_RECONCILES"
3133
)
3234

3335
var VpcID = ""
@@ -40,6 +42,7 @@ var WebhookEnabled = ""
4042

4143
var DisableTaggingServiceAPI = false
4244
var ServiceNetworkOverrideMode = false
45+
var RouteMaxConcurrentReconciles = 1
4346

4447
func ConfigInit() error {
4548
sess, _ := session.NewSession()
@@ -95,6 +98,15 @@ func configInit(sess *session.Session, metadata EC2Metadata) error {
9598
return fmt.Errorf("cannot get cluster name: %s", err)
9699
}
97100

101+
routeMaxConcurrentReconciles := os.Getenv(ROUTE_MAX_CONCURRENT_RECONCILES)
102+
if routeMaxConcurrentReconciles != "" {
103+
routeMaxConcurrentReconcilesInt, err := strconv.Atoi(routeMaxConcurrentReconciles)
104+
if err != nil {
105+
return fmt.Errorf("invalid value for ROUTE_MAX_CONCURRENT_RECONCILES: %s", err)
106+
}
107+
RouteMaxConcurrentReconciles = routeMaxConcurrentReconcilesInt
108+
}
109+
98110
return nil
99111
}
100112

pkg/config/controller_config_test.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func Test_config_init_no_env_var(t *testing.T) {
4646
os.Unsetenv(CLUSTER_VPC_ID)
4747
os.Unsetenv(DEFAULT_SERVICE_NETWORK)
4848
os.Unsetenv(AWS_ACCOUNT_ID)
49+
os.Unsetenv(ROUTE_MAX_CONCURRENT_RECONCILES)
4950
err := configInit(nil, ec2MetadataUnavailable())
5051
assert.NotNil(t, err)
5152

@@ -58,16 +59,30 @@ func Test_config_init_with_all_env_var(t *testing.T) {
5859
testClusterLocalGateway := "default"
5960
testAwsAccountId := "12345678"
6061
testClusterName := "cluster-name"
62+
testMaxRouteReconciles := "5"
63+
testMaxRouteReconcilesInt := 5
6164

6265
os.Setenv(REGION, testRegion)
6366
os.Setenv(CLUSTER_VPC_ID, testClusterVpcId)
6467
os.Setenv(DEFAULT_SERVICE_NETWORK, testClusterLocalGateway)
6568
os.Setenv(AWS_ACCOUNT_ID, testAwsAccountId)
6669
os.Setenv(CLUSTER_NAME, testClusterName)
67-
configInit(nil, ec2MetadataUnavailable())
68-
assert.Equal(t, Region, testRegion)
69-
assert.Equal(t, VpcID, testClusterVpcId)
70-
assert.Equal(t, AccountID, testAwsAccountId)
71-
assert.Equal(t, DefaultServiceNetwork, testClusterLocalGateway)
70+
os.Setenv(ROUTE_MAX_CONCURRENT_RECONCILES, testMaxRouteReconciles)
71+
err := configInit(nil, ec2MetadataUnavailable())
72+
assert.Nil(t, err)
73+
assert.Equal(t, testRegion, Region)
74+
assert.Equal(t, testClusterVpcId, VpcID)
75+
assert.Equal(t, testAwsAccountId, AccountID)
76+
assert.Equal(t, testClusterLocalGateway, DefaultServiceNetwork)
7277
assert.Equal(t, testClusterName, ClusterName)
78+
assert.Equal(t, testMaxRouteReconcilesInt, RouteMaxConcurrentReconciles)
79+
}
80+
81+
func Test_bad_reconcile_value(t *testing.T) {
82+
// Test variable
83+
maxReconciles := "FOO"
84+
85+
os.Setenv(ROUTE_MAX_CONCURRENT_RECONCILES, maxReconciles)
86+
err := configInit(nil, ec2MetadataUnavailable())
87+
assert.NotNil(t, err)
7388
}

pkg/controllers/route_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controllers
1919
import (
2020
"context"
2121
"fmt"
22+
"sigs.k8s.io/controller-runtime/pkg/controller"
2223

2324
"github.com/pkg/errors"
2425
corev1 "k8s.io/api/core/v1"
@@ -119,7 +120,10 @@ func RegisterAllRouteControllers(
119120
Watches(&gwv1beta1.Gateway{}, gwEventHandler).
120121
Watches(&corev1.Service{}, svcEventHandler.MapToRoute(routeInfo.routeType)).
121122
Watches(&anv1alpha1.ServiceImport{}, svcImportEventHandler.MapToRoute(routeInfo.routeType)).
122-
Watches(&discoveryv1.EndpointSlice{}, svcEventHandler.MapToRoute(routeInfo.routeType))
123+
Watches(&discoveryv1.EndpointSlice{}, svcEventHandler.MapToRoute(routeInfo.routeType)).
124+
WithOptions(controller.Options{
125+
MaxConcurrentReconciles: config.RouteMaxConcurrentReconciles,
126+
})
123127

124128
if ok, err := k8s.IsGVKSupported(mgr, anv1alpha1.GroupVersion.String(), anv1alpha1.TargetGroupPolicyKind); ok {
125129
builder.Watches(&anv1alpha1.TargetGroupPolicy{}, svcEventHandler.MapToRoute(routeInfo.routeType))

pkg/deploy/stack_deployer.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func NewLatticeServiceStackDeploy(
8181
tgGcSynth := lattice.NewTargetGroupSynthesizer(log, cloud, k8sClient, tgMgr, tgSvcExpBuilder, svcBuilder, nil)
8282
tgGcFn := NewTgGcFn(tgGcSynth)
8383
tgGc = &TgGc{
84-
lock: sync.Mutex{},
84+
lock: sync.RWMutex{},
8585
log: log.Named("tg-gc"),
8686
ctx: context.TODO(),
8787
isDone: atomic.Bool{},
@@ -130,7 +130,7 @@ func NewTgGcFn(tgSynth *lattice.TargetGroupSynthesizer) TgGcCycleFn {
130130
}
131131

132132
type TgGc struct {
133-
lock sync.Mutex
133+
lock sync.RWMutex
134134
log gwlog.Logger
135135
ctx context.Context
136136
isDone atomic.Bool
@@ -189,16 +189,10 @@ func (d *latticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Sta
189189
listenerSynthesizer := lattice.NewListenerSynthesizer(d.log, d.listenerManager, d.targetGroupManager, stack)
190190
ruleSynthesizer := lattice.NewRuleSynthesizer(d.log, d.ruleManager, d.targetGroupManager, stack)
191191

192-
// We need to block GC when we deploy stack. Stack deployer first creates TG and then
193-
// associate TG with Service. If GC will run in between it can delete newly created TG
194-
// before association since it's dangling TG. This lock also prevents concurrent
195-
// deployments, only one deployment can run at the time.
196-
//
197-
// TODO: This place can become a contention. May be debug log with lock waiting time?
198192
defer func() {
199-
tgGc.lock.Unlock()
193+
tgGc.lock.RUnlock()
200194
}()
201-
tgGc.lock.Lock()
195+
tgGc.lock.RLock()
202196

203197
//Handle targetGroups creation request
204198
if err := targetGroupSynthesizer.SynthesizeCreate(ctx); err != nil {
@@ -267,9 +261,9 @@ func NewTargetGroupStackDeploy(
267261

268262
func (d *latticeTargetGroupStackDeployer) Deploy(ctx context.Context, stack core.Stack) error {
269263
defer func() {
270-
tgGc.lock.Unlock()
264+
tgGc.lock.RUnlock()
271265
}()
272-
tgGc.lock.Lock()
266+
tgGc.lock.RLock()
273267

274268
synthesizers := []ResourceSynthesizer{
275269
lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sclient, d.targetGroupManager, d.svcExportTgBuilder, d.svcBuilder, stack),

pkg/utils/gwlog/actions.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
package gwlog
22

3-
const ReconcileStart = "RECONCILE_START_MARKER"
4-
const ReconcileEnd = "RECONCILE_END_MARKER"
3+
const ReconcileStart = "reconcile_start"
4+
const ReconcileEnd = "reconcile_end"

0 commit comments

Comments
 (0)