Skip to content

Commit 9ef9410

Browse files
author
Wei
authored
Merge pull request #130 from peng19940915/main
feat: fix the exception when scale 100+ or more ecs.
2 parents 1f7a016 + 8398981 commit 9ef9410

File tree

8 files changed

+152
-13
lines changed

8 files changed

+152
-13
lines changed

charts/karpenter/crds/karpenter.k8s.alibabacloud_ecsnodeclasses.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,10 @@ spec:
212212
evictionSoft
213213
rule: has(self.evictionSoftGracePeriod) ? self.evictionSoftGracePeriod.all(e,
214214
(e in self.evictionSoft)):true
215+
resourceGroupId:
216+
description: ResourceGroupID is the resource group id in ECS
217+
pattern: rg-[0-9a-z]+
218+
type: string
215219
securityGroupSelectorTerms:
216220
description: SecurityGroupSelectorTerms is a list of or security group
217221
selector terms. The terms are ORed.
@@ -266,6 +270,7 @@ spec:
266270
type: string
267271
type: array
268272
performanceLevel:
273+
default: PL0
269274
description: |-
270275
The performance level of the ESSD to use as the system disk. Default value: PL0.
271276
Valid values:
@@ -311,6 +316,13 @@ spec:
311316
rule: self.all(k, k !='karpenter.sh/nodeclaim')
312317
- message: tag contains a restricted tag matching karpenter.k8s.alibabacloud/ecsnodeclass
313318
rule: self.all(k, k !='karpenter.k8s.alibabacloud/ecsnodeclass')
319+
vSwitchSelectionPolicy:
320+
default: cheapest
321+
description: VSwitchSelectionPolicy is the policy to select the vSwitch.
322+
enum:
323+
- balanced
324+
- cheapest
325+
type: string
314326
vSwitchSelectorTerms:
315327
description: VSwitchSelectorTerms is a list of or vSwitch selector
316328
terms. The terms are ORed.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ require (
9999
golang.org/x/sys v0.23.0 // indirect
100100
golang.org/x/term v0.23.0 // indirect
101101
golang.org/x/text v0.17.0 // indirect
102-
golang.org/x/time v0.6.0 // indirect
102+
golang.org/x/time v0.6.0
103103
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
104104
google.golang.org/api v0.146.0 // indirect
105105
google.golang.org/genproto/googleapis/api v0.0.0-20231009173412-8bfb1ae86b6c // indirect

pkg/apis/v1alpha1/ecsnodeclass.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ import (
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
)
2626

27+
const (
28+
VSwitchSelectionPolicyBalanced = "balanced"
29+
)
30+
2731
// ECSNodeClassSpec is the top level specification for the AlibabaCloud Karpenter Provider.
2832
// This will contain the configuration necessary to launch instances in AlibabaCloud.
2933
type ECSNodeClassSpec struct {
@@ -34,6 +38,10 @@ type ECSNodeClassSpec struct {
3438
// +kubebuilder:validation:MaxItems:=30
3539
// +required
3640
VSwitchSelectorTerms []VSwitchSelectorTerm `json:"vSwitchSelectorTerms" hash:"ignore"`
41+
// VSwitchSelectionPolicy is the policy to select the vSwitch.
42+
// +kubebuilder:validation:Enum:=balanced;cheapest
43+
// +kubebuilder:default:=cheapest
44+
VSwitchSelectionPolicy string `json:"vSwitchSelectionPolicy,omitempty"`
3745
// SecurityGroupSelectorTerms is a list of or security group selector terms. The terms are ORed.
3846
// +kubebuilder:validation:XValidation:message="securityGroupSelectorTerms cannot be empty",rule="self.size() != 0"
3947
// +kubebuilder:validation:XValidation:message="expected at least one, got none, ['tags', 'id', 'name']",rule="self.all(x, has(x.tags) || has(x.id) || has(x.name))"

pkg/controllers/controllers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/interruption"
3333
nodeclaimgarbagecollection "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclaim/garbagecollection"
3434
nodeclaimtagging "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclaim/tagging"
35+
nodeclaimunregisteredtaint "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclaim/unregisteredtaint"
3536
nodeclasshash "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclass/hash"
3637
nodeclaasstatus "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclass/status"
3738
nodeclasstermination "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/controllers/nodeclass/termination"
@@ -62,6 +63,7 @@ func NewControllers(ctx context.Context, mgr manager.Manager, clk clock.Clock, r
6263
nodeclasstermination.NewController(kubeClient, recorder),
6364
controllerspricing.NewController(pricingProvider),
6465
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider),
66+
nodeclaimunregisteredtaint.NewController(kubeClient),
6567
nodeclaimtagging.NewController(kubeClient, instanceProvider),
6668
providersinstancetype.NewController(instanceTypeProvider),
6769
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package unregisteredtaint
2+
3+
/*
4+
Copyright 2024 The CloudPilot AI Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
import (
19+
"context"
20+
"time"
21+
22+
"github.com/awslabs/operatorpkg/singleton"
23+
"github.com/samber/lo"
24+
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/client-go/util/retry"
26+
controllerruntime "sigs.k8s.io/controller-runtime"
27+
"sigs.k8s.io/controller-runtime/pkg/client"
28+
"sigs.k8s.io/controller-runtime/pkg/log"
29+
"sigs.k8s.io/controller-runtime/pkg/manager"
30+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
31+
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
32+
)
33+
34+
// Controller is used to delete the unregistered taint when the node is ready
35+
type Controller struct {
36+
kubeClient client.Client
37+
}
38+
39+
func NewController(kubeClient client.Client) *Controller {
40+
return &Controller{
41+
kubeClient: kubeClient,
42+
}
43+
}
44+
45+
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
46+
logger := log.FromContext(ctx)
47+
// get all nodes
48+
nodeList := &corev1.NodeList{}
49+
if err := c.kubeClient.List(ctx, nodeList, client.HasLabels{v1.NodeRegisteredLabelKey}); err != nil {
50+
return reconcile.Result{}, err
51+
}
52+
for i := range nodeList.Items {
53+
node := &nodeList.Items[i]
54+
if !hasUnregisteredTaint(node) || !isNodeReady(node) {
55+
continue
56+
}
57+
58+
nodeCopy := node.DeepCopy()
59+
// remove the unregistered taint
60+
nodeCopy.Spec.Taints = lo.Reject(nodeCopy.Spec.Taints, func(item corev1.Taint, index int) bool {
61+
return item.MatchTaint(&v1.UnregisteredNoExecuteTaint)
62+
})
63+
64+
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
65+
return c.kubeClient.Patch(ctx, nodeCopy, client.MergeFromWithOptions(node, client.MergeFromWithOptimisticLock{}))
66+
}); err != nil {
67+
logger.Error(err, "failed to remove unregistered taint", "node", node.Name)
68+
continue
69+
}
70+
logger.Info("removed unregistered taint from node", "node", node.Name)
71+
}
72+
73+
// check again every minute
74+
return reconcile.Result{RequeueAfter: time.Minute}, nil
75+
}
76+
77+
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
78+
return controllerruntime.NewControllerManagedBy(m).
79+
Named("node.unregisteredtaint").
80+
WatchesRawSource(singleton.Source()).
81+
Complete(singleton.AsReconciler(c))
82+
}
83+
84+
func hasUnregisteredTaint(node *corev1.Node) bool {
85+
_, has := lo.Find(node.Spec.Taints, func(item corev1.Taint) bool {
86+
return item.Key == v1.UnregisteredTaintKey
87+
})
88+
return has
89+
}
90+
91+
func isNodeReady(node *corev1.Node) bool {
92+
_, ready := lo.Find(node.Status.Conditions, func(item corev1.NodeCondition) bool {
93+
return item.Type == corev1.NodeReady &&
94+
item.Status == corev1.ConditionTrue &&
95+
time.Since(item.LastTransitionTime.Time) > time.Second*15
96+
})
97+
98+
return ready
99+
}

pkg/operator/operator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
9393
ackProvider := ack.NewDefaultProvider(clusterID, ackClient, cache.New(alicache.ClusterAttachScriptTTL, alicache.DefaultCleanupInterval))
9494

9595
instanceProvider := instance.NewDefaultProvider(
96+
ctx,
9697
region,
9798
ecsClient,
9899
imageResolver,

pkg/operator/options/options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Options struct {
4040
VMMemoryOverheadPercent float64
4141
Interruption bool
4242
TelemetryShare bool
43+
APGCreationQPS int
4344
}
4445

4546
func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
@@ -48,6 +49,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
4849
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.065), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
4950
fs.BoolVar(&o.Interruption, "interruption", env.WithDefaultBool("INTERRUPTION", true), "Enable interruption handling.")
5051
fs.BoolVar(&o.TelemetryShare, "telemetry-share", env.WithDefaultBool("TELEMETRY_SHARE", true), "Enable telemetry sharing.")
52+
fs.IntVar(&o.APGCreationQPS, "apg-qps", int(env.WithDefaultInt64("APG_CREATION_QPS", 100)), "The QPS limit for creating AutoProvisionGroup.")
5153
}
5254

5355
func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {

pkg/providers/instance/instance.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ package instance
1818

1919
import (
2020
"context"
21+
"crypto/rand"
2122
"errors"
2223
"fmt"
2324
"math"
25+
"math/big"
2426
"net/http"
2527
"strings"
2628
"time"
@@ -31,6 +33,7 @@ import (
3133
"github.com/patrickmn/go-cache"
3234
"github.com/samber/lo"
3335
"go.uber.org/multierr"
36+
"golang.org/x/time/rate"
3437
corev1 "k8s.io/api/core/v1"
3538
"sigs.k8s.io/controller-runtime/pkg/log"
3639
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
@@ -69,16 +72,17 @@ type DefaultProvider struct {
6972
imageFamilyResolver imagefamily.Resolver
7073
vSwitchProvider vswitch.Provider
7174
ackProvider ack.Provider
75+
createLimiter *rate.Limiter
7276
}
7377

74-
func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
78+
func NewDefaultProvider(ctx context.Context, region string, ecsClient *ecsclient.Client,
7579
imageFamilyResolver imagefamily.Resolver, vSwitchProvider vswitch.Provider,
7680
ackProvider ack.Provider) *DefaultProvider {
7781
p := &DefaultProvider{
78-
ecsClient: ecsClient,
79-
region: region,
80-
instanceCache: cache.New(instanceCacheExpiration, instanceCacheExpiration),
81-
82+
ecsClient: ecsClient,
83+
region: region,
84+
instanceCache: cache.New(instanceCacheExpiration, instanceCacheExpiration),
85+
createLimiter: rate.NewLimiter(rate.Limit(1), options.FromContext(ctx).APGCreationQPS),
8286
imageFamilyResolver: imageFamilyResolver,
8387
vSwitchProvider: vSwitchProvider,
8488
ackProvider: ackProvider,
@@ -90,6 +94,11 @@ func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
9094
func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass, nodeClaim *karpv1.NodeClaim,
9195
instanceTypes []*cloudprovider.InstanceType,
9296
) (*Instance, error) {
97+
// Wait for rate limiter
98+
if err := p.createLimiter.Wait(ctx); err != nil {
99+
log.FromContext(ctx).Error(err, "rate limit exceeded")
100+
return nil, fmt.Errorf("rate limit exceeded: %w", err)
101+
}
93102
schedulingRequirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)
94103
// Only filter the instances if there are no minValues in the requirement.
95104
if !schedulingRequirements.HasMinValues() {
@@ -107,7 +116,6 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNod
107116

108117
return NewInstanceFromProvisioningGroup(launchInstance, createAutoProvisioningGroupRequest, p.region), nil
109118
}
110-
111119
func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) {
112120
if instance, ok := p.instanceCache.Get(id); ok {
113121
return instance.(*Instance), nil
@@ -370,6 +378,7 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1alpha
370378
}
371379

372380
runtime := &util.RuntimeOptions{}
381+
373382
resp, err := p.ecsClient.CreateAutoProvisioningGroupWithOptions(createAutoProvisioningGroupRequest, runtime)
374383
if err != nil {
375384
return nil, nil, fmt.Errorf("creating auto provisioning group, %w", err)
@@ -470,7 +479,7 @@ func (p *DefaultProvider) getProvisioningGroup(ctx context.Context, nodeClass *v
470479
break
471480
}
472481

473-
vSwitchID := p.getVSwitchID(instanceType, zonalVSwitchs, requirements, capacityType)
482+
vSwitchID := p.getVSwitchID(instanceType, zonalVSwitchs, requirements, capacityType, nodeClass.Spec.VSwitchSelectionPolicy)
474483
if vSwitchID == "" {
475484
return nil, errors.New("vSwitchID not found")
476485
}
@@ -570,23 +579,29 @@ func (p *DefaultProvider) checkODFallback(nodeClaim *karpv1.NodeClaim, instanceT
570579
}
571580

572581
func (p *DefaultProvider) getVSwitchID(instanceType *cloudprovider.InstanceType,
573-
zonalVSwitchs map[string]*vswitch.VSwitch, reqs scheduling.Requirements, capacityType string) string {
582+
zonalVSwitchs map[string]*vswitch.VSwitch, reqs scheduling.Requirements, capacityType string, vSwitchSelectionPolicy string) string {
574583
cheapestVSwitchID := ""
575584
cheapestPrice := math.MaxFloat64
576585

586+
if capacityType == karpv1.CapacityTypeOnDemand || vSwitchSelectionPolicy == v1alpha1.VSwitchSelectionPolicyBalanced {
587+
// For on-demand, randomly select a zone's vswitch
588+
zoneIDs := lo.Keys(zonalVSwitchs)
589+
if len(zoneIDs) > 0 {
590+
randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(zoneIDs))))
591+
return zonalVSwitchs[zoneIDs[randomIndex.Int64()]].ID
592+
}
593+
}
594+
577595
// For different AZ, the spot price may differ. So we need to get the cheapest vSwitch in the zone
578596
for i := range instanceType.Offerings {
579597
if reqs.Compatible(instanceType.Offerings[i].Requirements, scheduling.AllowUndefinedWellKnownLabels) != nil {
580598
continue
581599
}
600+
582601
vswitch, ok := zonalVSwitchs[instanceType.Offerings[i].Requirements.Get(corev1.LabelTopologyZone).Any()]
583602
if !ok {
584603
continue
585604
}
586-
if capacityType == karpv1.CapacityTypeOnDemand {
587-
return vswitch.ID
588-
}
589-
590605
if instanceType.Offerings[i].Price < cheapestPrice {
591606
cheapestVSwitchID = vswitch.ID
592607
cheapestPrice = instanceType.Offerings[i].Price

0 commit comments

Comments
 (0)