Skip to content

Commit f161f8a

Browse files
committed
*: Support to manage the increase/deletion of the raft node via Operator. #221
1 parent c03590e commit f161f8a

File tree

4 files changed

+108
-8
lines changed

4 files changed

+108
-8
lines changed

internal/xenon_executor.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ type XenonExecutor interface {
3333
GetRootPassword() string
3434
SetRootPassword(rootPassword string)
3535
RaftStatus(host string) (*apiv1alpha1.RaftStatus, error)
36+
XenonPing(host string) error
3637
RaftTryToLeader(host string) error
38+
ClusterAdd(host string, toAdd string) error
39+
ClusterRemove(host string, toRemove string) error
3740
}
3841

3942
func NewXenonExecutor() XenonExecutor {
@@ -86,3 +89,41 @@ func (executor *xenonExecutor) RaftTryToLeader(host string) error {
8689
}
8790
return nil
8891
}
92+
93+
func (executor *xenonExecutor) XenonPing(host string) error {
94+
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.XenonPing, nil))
95+
if err != nil {
96+
return err
97+
}
98+
_, err = executor.httpExecutor.Execute(req)
99+
if err != nil {
100+
return fmt.Errorf("failed to ping host[%s], err: %s", req.Req.URL, err)
101+
}
102+
return nil
103+
}
104+
105+
func (executor *xenonExecutor) ClusterAdd(host string, toAdd string) error {
106+
addHost := fmt.Sprintf("{\"address\": \"%s\"}", toAdd)
107+
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterAdd, addHost))
108+
if err != nil {
109+
return err
110+
}
111+
_, err = executor.httpExecutor.Execute(req)
112+
if err != nil {
113+
return fmt.Errorf("failed to add host[%s] to host[%s], err: %s", addHost, req.Req.URL, err)
114+
}
115+
return nil
116+
}
117+
118+
func (executor *xenonExecutor) ClusterRemove(host string, toRemove string) error {
119+
removeHost := fmt.Sprintf("{\"address\": \"%s\"}", toRemove)
120+
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterRemove, removeHost))
121+
if err != nil {
122+
return err
123+
}
124+
_, err = executor.httpExecutor.Execute(req)
125+
if err != nil {
126+
return fmt.Errorf("failed to remove host[%s] from host[%s], err: %s", removeHost, req.Req.URL, err)
127+
}
128+
return nil
129+
}

mysqlcluster/syncer/status.go

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,13 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
119119

120120
s.Status.ReadyNodes = len(readyNodes)
121121
if s.Status.ReadyNodes == int(*s.Spec.Replicas) && int(*s.Spec.Replicas) != 0 {
122-
s.Status.State = apiv1alpha1.ClusterReadyState
123-
clusterCondition.Type = apiv1alpha1.ConditionReady
122+
if err := s.reconcileXenon(s.Status.ReadyNodes); err != nil {
123+
clusterCondition.Message = fmt.Sprintf("%s", err)
124+
clusterCondition.Type = apiv1alpha1.ConditionError
125+
} else {
126+
s.Status.State = apiv1alpha1.ClusterReadyState
127+
clusterCondition.Type = apiv1alpha1.ConditionReady
128+
}
124129
}
125130

126131
if len(s.Status.Conditions) == 0 {
@@ -163,9 +168,13 @@ func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition {
163168
// When the cluster is ready or closed, the number of replicas changes,
164169
// indicating that the cluster is updating nodes.
165170
if oldState == apiv1alpha1.ClusterReadyState || oldState == apiv1alpha1.ClusterCloseState {
166-
if int(*s.Spec.Replicas) != s.Status.ReadyNodes {
167-
clusterCondition.Type = apiv1alpha1.ConditionUpdate
168-
s.Status.State = apiv1alpha1.ClusterUpdateState
171+
if int(*s.Spec.Replicas) > s.Status.ReadyNodes {
172+
clusterCondition.Type = apiv1alpha1.ConditionScaleOut
173+
s.Status.State = apiv1alpha1.ClusterScaleOutState
174+
return clusterCondition
175+
} else if int(*s.Spec.Replicas) < s.Status.ReadyNodes {
176+
clusterCondition.Type = apiv1alpha1.ConditionScaleIn
177+
s.Status.State = apiv1alpha1.ClusterScaleInState
169178
return clusterCondition
170179
}
171180
}
@@ -302,6 +311,53 @@ func (s *StatusSyncer) updateNodeRaftStatus(node *apiv1alpha1.NodeStatus) error
302311
return nil
303312
}
304313

314+
func (s *StatusSyncer) reconcileXenon(readyNodes int) error {
315+
expectXenonNodes := s.getExpectXenonNodes(readyNodes)
316+
for _, nodeStatus := range s.Status.Nodes {
317+
toRemove := utils.StringDiffIn(nodeStatus.RaftStatus.Nodes, expectXenonNodes)
318+
if err := s.removeNodesFromXenon(nodeStatus.Name, toRemove); err != nil {
319+
return err
320+
}
321+
toAdd := utils.StringDiffIn(expectXenonNodes, nodeStatus.RaftStatus.Nodes)
322+
if err := s.addNodesInXenon(nodeStatus.Name, toAdd); err != nil {
323+
return err
324+
}
325+
}
326+
return nil
327+
}
328+
329+
func (s *StatusSyncer) getExpectXenonNodes(readyNodes int) []string {
330+
expectXenonNodes := []string{}
331+
for i := 0; i < readyNodes; i++ {
332+
expectXenonNodes = append(expectXenonNodes, fmt.Sprintf("%s:%d", s.GetPodHostName(i), utils.XenonPort))
333+
}
334+
return expectXenonNodes
335+
}
336+
337+
func (s *StatusSyncer) removeNodesFromXenon(host string, toRemove []string) error {
338+
if err := s.XenonExecutor.XenonPing(host); err != nil {
339+
return err
340+
}
341+
for _, removeHost := range toRemove {
342+
if err := s.XenonExecutor.ClusterRemove(host, removeHost); err != nil {
343+
return err
344+
}
345+
}
346+
return nil
347+
}
348+
349+
func (s *StatusSyncer) addNodesInXenon(host string, toAdd []string) error {
350+
if err := s.XenonExecutor.XenonPing(host); err != nil {
351+
return err
352+
}
353+
for _, addHost := range toAdd {
354+
if err := s.XenonExecutor.ClusterAdd(host, addHost); err != nil {
355+
return err
356+
}
357+
}
358+
return nil
359+
}
360+
305361
// setPodHealthy set the pod lable healthy.
306362
func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
307363
healthy := "no"

utils/common.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ func StringDiffIn(actual, desired []string) []string {
110110
diff = append(diff, aStr)
111111
}
112112
}
113-
114113
return diff
115114
}
116115

@@ -120,7 +119,6 @@ func stringIn(str string, strs []string) (int, bool) {
120119
return i, true
121120
}
122121
}
123-
124122
return 0, false
125123
}
126124

@@ -133,6 +131,5 @@ func UnmarshalJSON(in io.Reader, obj interface{}) error {
133131
if err = json.Unmarshal(body, obj); err != nil {
134132
return fmt.Errorf("error unmarshal data, error: %s, body: %s", err, string(body))
135133
}
136-
137134
return nil
138135
}

utils/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ var (
4141
XenonHttpUrls = map[XenonHttpUrl]string{
4242
RaftStatus: http.MethodGet,
4343
RaftTryToLeader: http.MethodPost,
44+
XenonPing: http.MethodGet,
45+
ClusterAdd: http.MethodPost,
46+
ClusterRemove: http.MethodPost,
4447
}
4548
)
4649

@@ -161,5 +164,8 @@ type XenonHttpUrl string
161164

162165
const (
163166
RaftStatus XenonHttpUrl = "/v1/raft/status"
167+
XenonPing XenonHttpUrl = "/v1/xenon/ping"
168+
ClusterAdd XenonHttpUrl = "/v1/cluster/add"
169+
ClusterRemove XenonHttpUrl = "/v1/cluster/remove"
164170
RaftTryToLeader XenonHttpUrl = "/v1/raft/trytoleader"
165171
)

0 commit comments

Comments
 (0)