Skip to content

Commit 98a9110

Browse files
authored
Merge pull request #282 from runkecheng/feature_update_node_without_restart
*:update replicas without restart
2 parents 42f4fcc + f161f8a commit 98a9110

File tree

12 files changed

+120
-199
lines changed

12 files changed

+120
-199
lines changed

api/v1alpha1/mysqlcluster_types.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,18 @@ type Persistence struct {
247247
type ClusterState string
248248

249249
const (
250-
// ClusterInitState indicates whether the cluster is initializing.
250+
// ClusterInitState indicates whether the cluster is initializing.
251251
ClusterInitState ClusterState = "Initializing"
252252
// ClusterUpdateState indicates whether the cluster is being updated.
253253
ClusterUpdateState ClusterState = "Updating"
254254
// ClusterReadyState indicates whether all containers in the pod are ready.
255255
ClusterReadyState ClusterState = "Ready"
256256
// ClusterCloseState indicates whether the cluster is closed.
257257
ClusterCloseState ClusterState = "Closed"
258+
// ClusterScaleInState indicates whether the cluster replicas is decreasing.
259+
ClusterScaleInState ClusterState = "ScaleIn"
260+
// ClusterScaleOutState indicates whether the cluster replicas is increasing.
261+
ClusterScaleOutState ClusterState = "ScaleOut"
258262
)
259263

260264
// ClusterConditionType defines type for cluster condition type.
@@ -271,6 +275,10 @@ const (
271275
ConditionClose ClusterConditionType = "Closed"
272276
// ConditionError indicates whether there is an error in the cluster.
273277
ConditionError ClusterConditionType = "Error"
278+
// ConditionScaleIn indicates whether the cluster replicas is decreasing.
279+
ConditionScaleIn ClusterConditionType = "ScaleIn"
280+
// ConditionScaleOut indicates whether the cluster replicas is increasing.
281+
ConditionScaleOut ClusterConditionType = "ScaleOut"
274282
)
275283

276284
// ClusterCondition defines type for cluster conditions.

internal/xenon_executor.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ type XenonExecutor interface {
3333
GetRootPassword() string
3434
SetRootPassword(rootPassword string)
3535
RaftStatus(host string) (*apiv1alpha1.RaftStatus, error)
36+
XenonPing(host string) error
3637
RaftTryToLeader(host string) error
38+
ClusterAdd(host string, toAdd string) error
39+
ClusterRemove(host string, toRemove string) error
3740
}
3841

3942
func NewXenonExecutor() XenonExecutor {
@@ -86,3 +89,41 @@ func (executor *xenonExecutor) RaftTryToLeader(host string) error {
8689
}
8790
return nil
8891
}
92+
93+
func (executor *xenonExecutor) XenonPing(host string) error {
94+
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.XenonPing, nil))
95+
if err != nil {
96+
return err
97+
}
98+
_, err = executor.httpExecutor.Execute(req)
99+
if err != nil {
100+
return fmt.Errorf("failed to ping host[%s], err: %s", req.Req.URL, err)
101+
}
102+
return nil
103+
}
104+
105+
func (executor *xenonExecutor) ClusterAdd(host string, toAdd string) error {
106+
addHost := fmt.Sprintf("{\"address\": \"%s\"}", toAdd)
107+
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterAdd, addHost))
108+
if err != nil {
109+
return err
110+
}
111+
_, err = executor.httpExecutor.Execute(req)
112+
if err != nil {
113+
return fmt.Errorf("failed to add host[%s] to host[%s], err: %s", addHost, req.Req.URL, err)
114+
}
115+
return nil
116+
}
117+
118+
func (executor *xenonExecutor) ClusterRemove(host string, toRemove string) error {
119+
removeHost := fmt.Sprintf("{\"address\": \"%s\"}", toRemove)
120+
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterRemove, removeHost))
121+
if err != nil {
122+
return err
123+
}
124+
_, err = executor.httpExecutor.Execute(req)
125+
if err != nil {
126+
return fmt.Errorf("failed to remove host[%s] from host[%s], err: %s", removeHost, req.Req.URL, err)
127+
}
128+
return nil
129+
}

mysqlcluster/container/backup.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ limitations under the License.
1717
package container
1818

1919
import (
20-
"fmt"
21-
2220
corev1 "k8s.io/api/core/v1"
2321
"k8s.io/apimachinery/pkg/util/intstr"
2422

@@ -59,10 +57,6 @@ func (c *backupSidecar) getEnvVars() []corev1.EnvVar {
5957
Name: "SERVICE_NAME",
6058
Value: c.GetNameForResource(utils.HeadlessSVC),
6159
},
62-
{
63-
Name: "REPLICAS",
64-
Value: fmt.Sprintf("%d", *c.Spec.Replicas),
65-
},
6660
{
6761
Name: "MYSQL_ROOT_PASSWORD",
6862
Value: c.Spec.MysqlOpts.RootPassword,

mysqlcluster/container/init_sidecar.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package container
1818

1919
import (
20-
"fmt"
2120
"strconv"
2221

2322
corev1 "k8s.io/api/core/v1"
@@ -79,10 +78,6 @@ func (c *initSidecar) getEnvVars() []corev1.EnvVar {
7978
Name: "STATEFULSET_NAME",
8079
Value: c.GetNameForResource(utils.StatefulSet),
8180
},
82-
{
83-
Name: "REPLICAS",
84-
Value: fmt.Sprintf("%d", *c.Spec.Replicas),
85-
},
8681
{
8782
Name: "ADMIT_DEFEAT_HEARBEAT_COUNT",
8883
Value: strconv.Itoa(int(*c.Spec.XenonOpts.AdmitDefeatHearbeatCount)),

mysqlcluster/container/init_sidecar_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package container
1818

1919
import (
20-
"fmt"
2120
"strconv"
2221
"testing"
2322

@@ -92,10 +91,6 @@ var (
9291
Name: "STATEFULSET_NAME",
9392
Value: "sample-mysql",
9493
},
95-
{
96-
Name: "REPLICAS",
97-
Value: fmt.Sprintf("%d", *testInitSidecarCluster.Spec.Replicas),
98-
},
9994
{
10095
Name: "ADMIT_DEFEAT_HEARBEAT_COUNT",
10196
Value: strconv.Itoa(int(*testInitSidecarCluster.Spec.XenonOpts.AdmitDefeatHearbeatCount)),

mysqlcluster/container/xenon.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,7 @@ func (c *xenon) getEnvVars() []corev1.EnvVar {
5353

5454
// getLifecycle get the container lifecycle.
5555
func (c *xenon) getLifecycle() *corev1.Lifecycle {
56-
return &corev1.Lifecycle{
57-
PostStart: &corev1.Handler{
58-
Exec: &corev1.ExecAction{
59-
Command: []string{"sh", "-c", "/scripts/post-start.sh"},
60-
},
61-
},
62-
PreStop: &corev1.Handler{
63-
Exec: &corev1.ExecAction{
64-
Command: []string{"sh", "-c", "/scripts/pre-stop.sh"},
65-
},
66-
},
67-
}
56+
return nil
6857
}
6958

7059
// getResources get the container resources.

mysqlcluster/container/xenon_test.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
)
2929

3030
var (
31-
xenonReplicas int32 = 1
31+
xenonReplicas int32 = 3
3232
xenonMysqlCluster = mysqlv1alpha1.MysqlCluster{
3333
ObjectMeta: metav1.ObjectMeta{
3434
Name: "sample",
@@ -70,19 +70,7 @@ func TestGetXenonEnvVar(t *testing.T) {
7070
}
7171

7272
func TestGetXenonLifecycle(t *testing.T) {
73-
lifecycle := &corev1.Lifecycle{
74-
PostStart: &corev1.Handler{
75-
Exec: &corev1.ExecAction{
76-
Command: []string{"sh", "-c", "/scripts/post-start.sh"},
77-
},
78-
},
79-
PreStop: &corev1.Handler{
80-
Exec: &corev1.ExecAction{
81-
Command: []string{"sh", "-c", "/scripts/pre-stop.sh"},
82-
},
83-
},
84-
}
85-
assert.Equal(t, lifecycle, xenonCase.Lifecycle)
73+
assert.Nil(t, xenonCase.Lifecycle)
8674
}
8775

8876
func TestGetXenonResources(t *testing.T) {

mysqlcluster/syncer/status.go

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,13 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
119119

120120
s.Status.ReadyNodes = len(readyNodes)
121121
if s.Status.ReadyNodes == int(*s.Spec.Replicas) && int(*s.Spec.Replicas) != 0 {
122-
s.Status.State = apiv1alpha1.ClusterReadyState
123-
clusterCondition.Type = apiv1alpha1.ConditionReady
122+
if err := s.reconcileXenon(s.Status.ReadyNodes); err != nil {
123+
clusterCondition.Message = fmt.Sprintf("%s", err)
124+
clusterCondition.Type = apiv1alpha1.ConditionError
125+
} else {
126+
s.Status.State = apiv1alpha1.ClusterReadyState
127+
clusterCondition.Type = apiv1alpha1.ConditionReady
128+
}
124129
}
125130

126131
if len(s.Status.Conditions) == 0 {
@@ -163,9 +168,13 @@ func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition {
163168
// When the cluster is ready or closed, the number of replicas changes,
164169
// indicating that the cluster is updating nodes.
165170
if oldState == apiv1alpha1.ClusterReadyState || oldState == apiv1alpha1.ClusterCloseState {
166-
if int(*s.Spec.Replicas) != s.Status.ReadyNodes {
167-
clusterCondition.Type = apiv1alpha1.ConditionUpdate
168-
s.Status.State = apiv1alpha1.ClusterUpdateState
171+
if int(*s.Spec.Replicas) > s.Status.ReadyNodes {
172+
clusterCondition.Type = apiv1alpha1.ConditionScaleOut
173+
s.Status.State = apiv1alpha1.ClusterScaleOutState
174+
return clusterCondition
175+
} else if int(*s.Spec.Replicas) < s.Status.ReadyNodes {
176+
clusterCondition.Type = apiv1alpha1.ConditionScaleIn
177+
s.Status.State = apiv1alpha1.ClusterScaleInState
169178
return clusterCondition
170179
}
171180
}
@@ -302,6 +311,53 @@ func (s *StatusSyncer) updateNodeRaftStatus(node *apiv1alpha1.NodeStatus) error
302311
return nil
303312
}
304313

314+
func (s *StatusSyncer) reconcileXenon(readyNodes int) error {
315+
expectXenonNodes := s.getExpectXenonNodes(readyNodes)
316+
for _, nodeStatus := range s.Status.Nodes {
317+
toRemove := utils.StringDiffIn(nodeStatus.RaftStatus.Nodes, expectXenonNodes)
318+
if err := s.removeNodesFromXenon(nodeStatus.Name, toRemove); err != nil {
319+
return err
320+
}
321+
toAdd := utils.StringDiffIn(expectXenonNodes, nodeStatus.RaftStatus.Nodes)
322+
if err := s.addNodesInXenon(nodeStatus.Name, toAdd); err != nil {
323+
return err
324+
}
325+
}
326+
return nil
327+
}
328+
329+
func (s *StatusSyncer) getExpectXenonNodes(readyNodes int) []string {
330+
expectXenonNodes := []string{}
331+
for i := 0; i < readyNodes; i++ {
332+
expectXenonNodes = append(expectXenonNodes, fmt.Sprintf("%s:%d", s.GetPodHostName(i), utils.XenonPort))
333+
}
334+
return expectXenonNodes
335+
}
336+
337+
func (s *StatusSyncer) removeNodesFromXenon(host string, toRemove []string) error {
338+
if err := s.XenonExecutor.XenonPing(host); err != nil {
339+
return err
340+
}
341+
for _, removeHost := range toRemove {
342+
if err := s.XenonExecutor.ClusterRemove(host, removeHost); err != nil {
343+
return err
344+
}
345+
}
346+
return nil
347+
}
348+
349+
func (s *StatusSyncer) addNodesInXenon(host string, toAdd []string) error {
350+
if err := s.XenonExecutor.XenonPing(host); err != nil {
351+
return err
352+
}
353+
for _, addHost := range toAdd {
354+
if err := s.XenonExecutor.ClusterAdd(host, addHost); err != nil {
355+
return err
356+
}
357+
}
358+
return nil
359+
}
360+
305361
// setPodHealthy set the pod lable healthy.
306362
func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
307363
healthy := "no"

0 commit comments

Comments
 (0)