From d7abeddbab9dd194bbf102b13889fdcdbe56d49e Mon Sep 17 00:00:00 2001 From: wackxu Date: Thu, 28 Dec 2017 09:41:50 +0800 Subject: [PATCH 1/2] add qos controller --- .../qoscontroller/disk_io_controller.go | 31 ++ pkg/kubelet/qoscontroller/doc.go | 20 + .../qoscontroller/memory_controller.go | 214 +++++++++ .../qoscontroller/network_io_controller.go | 331 +++++++++++++ .../qoscontroller/overall_controller.go | 150 ++++++ pkg/kubelet/qoscontroller/qos_controller.go | 74 +++ pkg/kubelet/qoscontroller/qos_metrics.go | 335 ++++++++++++++ .../qoscontroller/qos_resource_status.go | 128 +++++ pkg/kubelet/qoscontroller/sla_controller.go | 436 ++++++++++++++++++ .../qoscontroller/sla_controller_test.go | 119 +++++ 10 files changed, 1838 insertions(+) create mode 100644 pkg/kubelet/qoscontroller/disk_io_controller.go create mode 100644 pkg/kubelet/qoscontroller/doc.go create mode 100644 pkg/kubelet/qoscontroller/memory_controller.go create mode 100644 pkg/kubelet/qoscontroller/network_io_controller.go create mode 100644 pkg/kubelet/qoscontroller/overall_controller.go create mode 100644 pkg/kubelet/qoscontroller/qos_controller.go create mode 100644 pkg/kubelet/qoscontroller/qos_metrics.go create mode 100644 pkg/kubelet/qoscontroller/qos_resource_status.go create mode 100644 pkg/kubelet/qoscontroller/sla_controller.go create mode 100644 pkg/kubelet/qoscontroller/sla_controller_test.go diff --git a/pkg/kubelet/qoscontroller/disk_io_controller.go b/pkg/kubelet/qoscontroller/disk_io_controller.go new file mode 100644 index 0000000000000..1d33eddd3d72d --- /dev/null +++ b/pkg/kubelet/qoscontroller/disk_io_controller.go @@ -0,0 +1,31 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qoscontroller + +// Disk I/O Controller +type DiskIOController struct { +} + +// initialize of QosController is implemented by DiskIOController and does all the initialization works +func (dc *DiskIOController) initialize(qosResourceStatus *QosResourceStatus) error { + return nil +} + +//process of QosController is implemented by DiskIOController and does all what a disk I/O controller has to do +func (dc *DiskIOController) process(qosResourceStatus *QosResourceStatus) error { + return nil +} diff --git a/pkg/kubelet/qoscontroller/doc.go b/pkg/kubelet/qoscontroller/doc.go new file mode 100644 index 0000000000000..a850e86111d1d --- /dev/null +++ b/pkg/kubelet/qoscontroller/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// package qoscontroller guarantees the resource availability for the primary application. It constantly watches the node +// and application status. In case of performance drop, correction actions like freeze or kill are triggered +// to ensure primary applications' stability. +package qoscontroller diff --git a/pkg/kubelet/qoscontroller/memory_controller.go b/pkg/kubelet/qoscontroller/memory_controller.go new file mode 100644 index 0000000000000..d6f52ab93d53e --- /dev/null +++ b/pkg/kubelet/qoscontroller/memory_controller.go @@ -0,0 +1,214 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qoscontroller + +import ( + "github.com/golang/glog" + "k8s.io/api/core/v1" + "math" + "sort" +) + +// Memory Controller +type MemController struct { +} + +type By func(p1, p2 *v1.Pod) bool + +func (by By) Sort(pods []*v1.Pod) { + ps := &podSorter{ + pods: pods, + by: by, + } + sort.Sort(ps) +} + +type podSorter struct { + pods []*v1.Pod + by func(p1, p2 *v1.Pod) bool +} + +func (s *podSorter) Len() int { + return len(s.pods) +} + +func (s *podSorter) Swap(i, j int) { + s.pods[i], s.pods[j] = s.pods[j], s.pods[i] +} + +func (s *podSorter) Less(i, j int) bool { + return s.by(s.pods[i], s.pods[j]) +} + +//Function to sort secondary/best effort pod list based on memory Usage +func sortPodListMemoryUsage(qosResourceStatus *QosResourceStatus, pods []*v1.Pod) { + By(func(p1, p2 *v1.Pod) bool { + p1ID := p1.UID + _, ok1 := qosResourceStatus.podResourceSummary[p1ID] + p2ID := p2.UID + _, ok2 := qosResourceStatus.podResourceSummary[p2ID] + if !ok1 || !ok2 { + glog.Errorf("Cannot obtain pod IDs during pod sorting") + return false + } + p1MemoryUsage := qosResourceStatus.podResourceSummary[p1ID].memoryResourceUsage.currentUsage + p2MemoryUsage := qosResourceStatus.podResourceSummary[p2ID].memoryResourceUsage.currentUsage + return p1MemoryUsage > p2MemoryUsage + }).Sort(pods) + return +} + +// initialize of QosController is implemented by MemController and does all the initialization works +func (mc *MemController) initialize(qosResourceStatus *QosResourceStatus) *QosResourceStatus { + return qosResourceStatus +} + +// process of QosController is implemented by MemController and does all what a memory controller has to do +func (mc *MemController) process(qosResourceStatus *QosResourceStatus) *QosResourceStatus { + + var podRequestedMemory uint64 + var podMemoryThreshold float64 + + sortedSecondaryList := false + secondaryPods := qosResourceStatus.secondaryPodList + //Check the memory usage for each primary pod + for _, pod := range qosResourceStatus.primaryPodList { + podID := (*pod).UID + _, ok := qosResourceStatus.podResourceSummary[podID] + if !ok { + continue + } + podRequestedMemory = 0 + + //Calculate the pod requested memory using the requested memory for each container + for _, container := range pod.Spec.Containers { + podRequestedMemory += uint64(container.Resources.Requests.Memory().Value()) + } + + //Calculate the pod memory threshold based on the configured threshold rate + thresholdRate := 1 - qosResourceStatus.QosConfig.MemoryConfig.PodMemoryThresholdRate + podMemoryThreshold = float64(podRequestedMemory) * thresholdRate + + //Get the pod ID and use it to obtain the acquired memory statistics for last N samples + podMemoryUsage := qosResourceStatus.podResourceSummary[podID].memoryResourceUsage.currentUsage + podMemoryUsageSamples := qosResourceStatus.podResourceSummary[podID].memoryResourceUsage.samples + monitoringInterval := float64(qosResourceStatus.QosConfig.MonitoringInterval) + + //Calculate predicted memory usage + predictedMemoryUsage := calculatePredictedUsage(podMemoryUsage, podMemoryUsageSamples, monitoringInterval) + glog.Infof("pod=%v Current usage = %v predicted usage =%v threshold=%v", pod.Name, podMemoryUsage, predictedMemoryUsage, podMemoryThreshold) + //Check if the current pod memory usage is greater than the pod memory threshold + if float64(podMemoryUsage) > podMemoryThreshold && predictedMemoryUsage > podMemoryThreshold { + + if sortedSecondaryList == false { + //Sort the secondary pod list based on decreasing usage of memory + sortPodListMemoryUsage(qosResourceStatus, secondaryPods) + sortedSecondaryList = true + } + //Update the action list with the secondary pods to be killed + secondaryPods = updateActionList(podMemoryUsage, + podRequestedMemory, + &(qosResourceStatus.ActionList), + secondaryPods, + qosResourceStatus.QosConfig.MemoryConfig.ProcessMultiPod) + } + } + return qosResourceStatus +} + +//Function to calculate the predicted usage for the pod based on the rate of increase/decrease using N samples +func calculatePredictedUsage(currentUsage uint64, usageSamples []uint64, monitoringInterval float64) (predictedUsage float64) { + + var aggregateRate, averageRate, actualSamples, actualUsage float64 + var currentSample, previousSample float64 + + currentSample = float64(currentUsage) + aggregateRate = 0 + actualSamples = 0 + actualUsage = currentSample + + //Calculate the rate of increase for last N samples + for i := len(usageSamples); i > 1; i-- { + previousSample = float64(usageSamples[i-2]) + actualUsage += previousSample + if currentSample > previousSample { + if previousSample > 0 { + aggregateRate += (currentSample - previousSample) / previousSample + actualSamples++ + } + } else { + if currentSample > 0 { + aggregateRate -= (previousSample - currentSample) / currentSample + actualSamples++ + } + } + currentSample = previousSample + } + + //Calculate the average Usage and rate of increase + averageRate = aggregateRate / actualSamples + actualUsage = actualUsage / actualSamples + //Calculate the predicted usage in the next monitoring interval based on the increase/decrease rate + rate := math.Pow((1 + averageRate), monitoringInterval) + predictedUsage = actualUsage * rate + + return predictedUsage + +} + +func updateActionList(podMemoryUsage uint64, + primaryPodRequestedMemory uint64, + actionList *[]*Action, + secondaryPods []*v1.Pod, + processMultiPod bool) []*v1.Pod { + + var secondaryPodRequestedMemory uint64 + var revocableMemory uint64 + revocableMemory = 0 + + i := 0 + //Check the secondary pods to be killed + for _, pod := range secondaryPods { + //Populate the action list with the secondary pod to be killed + var action Action + action.Target = pod + action.ActionType = KillPod + *actionList = append(*actionList, &action) + i++ + glog.Infof("Secondary Pod %v added to action list", pod.Name) + //Check if the option of killing multiple secondary pods is enabled + if processMultiPod == false { + return secondaryPods[i:] + } + + //Consider the memory that will be released by killing the secondary pod + secondaryPodRequestedMemory = 0 + for _, container := range pod.Spec.Containers { + secondaryPodRequestedMemory += uint64(container.Resources.Requests.Memory().Value()) + } + + //Calculate the total revocable memory corresponding to secondary pods + revocableMemory += secondaryPodRequestedMemory + + //Check if the memory revoked meets the primary pods requested memory + //Some threshold of requested memory like 95% can be considered if required + if (podMemoryUsage + revocableMemory) > primaryPodRequestedMemory { + return secondaryPods[i:] + } + } + return secondaryPods[i:] +} diff --git a/pkg/kubelet/qoscontroller/network_io_controller.go b/pkg/kubelet/qoscontroller/network_io_controller.go new file mode 100644 index 0000000000000..92244166c796e --- /dev/null +++ b/pkg/kubelet/qoscontroller/network_io_controller.go @@ -0,0 +1,331 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qoscontroller + +import ( + "github.com/golang/glog" + "k8s.io/api/core/v1" + qosUtil "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" + "strconv" +) + +// Network I/O Controller +type NetworkIOController struct { +} + +//Function to sort secondary/best effort pod list based on network IO Usage +func sortPodListNetworkUsage(qosResourceStatus *QosResourceStatus, pods []*v1.Pod) { + By(func(p1, p2 *v1.Pod) bool { + p1ID := p1.UID + _, ok1 := qosResourceStatus.podResourceSummary[p1ID] + p2ID := p2.UID + _, ok2 := qosResourceStatus.podResourceSummary[p2ID] + if !ok1 || !ok2 { + glog.Errorf("Cannot obtain pod IDs during pod sorting") + return false + } + p1NetworkIOUsage := qosResourceStatus.podResourceSummary[p1ID].networkIOResourceUsage.currentUsage + p2NetworkIOUsage := qosResourceStatus.podResourceSummary[p2ID].networkIOResourceUsage.currentUsage + return p1NetworkIOUsage > p2NetworkIOUsage + }).Sort(pods) + return +} + +// initialize of QosController is implemented by NetworkIOController and does all the initialization works +func (nc *NetworkIOController) initialize(qosResourceStatus *QosResourceStatus) error { + return nil +} + +// process of QosController is implemented by NetworkIOController and does all what a network I/O controller has to do +func (nc *NetworkIOController) process(qosResourceStatus *QosResourceStatus) *QosResourceStatus { + + processUnfreezable := true + //sortedSecondaryList := false + secondaryPods := qosResourceStatus.secondaryPodList + bestEffortPods := qosResourceStatus.bestEffortPodList + + //Currently the network IO capacity of node is obtained from configuration file + nodeNetworkIO := uint64(qosResourceStatus.QosConfig.NetworkIOConfig.NodeNetworkIOCapacity) + //TBD to get the network IO capacity of node based on the network interface card + //nodeNetworkIO, err := machine.GetMachineNetworkIOCapacity() + //if err != nil { + // glog.Errorf("Cannot obtain Node Network IO Capacity") + // return err + //} + + //First check the network IO usage of the node + //Calculate the node network IO threshold based on the node configured threshold rate + nodeThresholdRate := 1 - qosResourceStatus.QosConfig.NetworkIOConfig.NodeNetworkIOThresholdRate + nodeNetworkIOThreshold := float64(nodeNetworkIO) * nodeThresholdRate + + //Get the acquired network statistics for last N samples + nodeNetworkIOUsage := qosResourceStatus.nodeResourceSummary.networkIOResourceUsage.currentUsage + nodeNetworkIOUsageSamples := qosResourceStatus.nodeResourceSummary.networkIOResourceUsage.samples + monitoringInterval := float64(qosResourceStatus.QosConfig.MonitoringInterval) + + //Calculate predicted network usage + predictedNodeNetworkIOUsage := calculatePredictedUsage(nodeNetworkIOUsage, nodeNetworkIOUsageSamples, monitoringInterval) + glog.Infof("Node Network IO usage = %v Node Threshold = %v Predicted Usage = %v",nodeNetworkIOUsage,nodeNetworkIOThreshold,predictedNodeNetworkIOUsage) + //Check if the current/predicted node network IO usage is greater than the node network IO threshold + if float64(nodeNetworkIOUsage) > nodeNetworkIOThreshold && predictedNodeNetworkIOUsage > nodeNetworkIOThreshold { + + if predictedNodeNetworkIOUsage > float64(nodeNetworkIOUsage) { + nodeNetworkIOUsage = uint64(predictedNodeNetworkIOUsage) + } + revocableNetworkIO := uint64(0) + + if bestEffortPods != nil { + //Sort the best effort pod list based on decreasing usage of network IO + sortPodListNetworkUsage(qosResourceStatus, bestEffortPods) + + //Update the action list with the best effort pods to be freezed + bestEffortPods, revocableNetworkIO = updateActionListForNetworkIOController(nodeNetworkIOUsage, + nodeNetworkIO, + &(qosResourceStatus.ActionList), + bestEffortPods, + qosResourceStatus.QosConfig.NetworkIOConfig.ProcessMultiPod) + processUnfreezable = false + } + // Consider freezing secondary pods if there are no besteffort pods to freeze OR + // network IO usage will be above threshold even after freezing best effort pods + if secondaryPods != nil && (processUnfreezable == true || + ((float64(nodeNetworkIOUsage - revocableNetworkIO) > nodeNetworkIOThreshold) && + qosResourceStatus.QosConfig.NetworkIOConfig.ProcessMultiPod == true )) { + //Sort the secondary pod list based on decreasing usage of network IO + sortPodListNetworkUsage(qosResourceStatus, secondaryPods) + nodeNetworkIOUsage -= revocableNetworkIO + //Update the action list with the secondary pods to be freezed + secondaryPods, revocableNetworkIO = updateActionListForNetworkIOController(nodeNetworkIOUsage, + nodeNetworkIO, + &(qosResourceStatus.ActionList), + secondaryPods, + qosResourceStatus.QosConfig.NetworkIOConfig.ProcessMultiPod) + } + // Set flag to NOT unfreeze any pods + processUnfreezable = false + } + + //To be included/considered after Network IO is consider reclaimable by scheduler + //Check the network usage for each primary pod. + /*for _, pod := range qosResourceStatus.primaryPodList { + podID := (*pod).UID + _, ok := qosResourceStatus.podResourceSummary[podID] + if !ok { + continue + } + + //Get the pod requested network IO + desiredNetworkIORateStr, _ := pod.Annotations[api.DesiredNetworkIORate] + podRequestedNetworkIO, _ := strconv.Atoi (desiredNetworkIORateStr) + + //Do not consider pods that do not specify the network IO + if podRequestedNetworkIO == 0 { + continue + } + + //Calculate the pod network IO threshold based on the configured threshold rate + thresholdRate := 1 - qosResourceStatus.QosConfig.NetworkIOConfig.PodNetworkIOThresholdRate + podNetworkIOThreshold := float64(podRequestedNetworkIO) * thresholdRate + + //Get the pod ID and use it to obtain the acquired network statistics for last N samples + podNetworkIOUsage := qosResourceStatus.podResourceSummary[podID].networkIOResourceUsage.currentUsage + podNetworkIOUsageSamples := qosResourceStatus.podResourceSummary[podID].networkIOResourceUsage.samples + monitoringInterval := float64(qosResourceStatus.QosConfig.MonitoringInterval) + + glog.Infof("Pod Network IO usage = %v Pod Threshold = %v",podNetworkIOUsage,podNetworkIOThreshold) + //Check if the current pod network IO usage is less than the pod network IO threshold + if float64(podNetworkIOUsage) < podNetworkIOThreshold { + + //Calculate predicted network usage + predictedNetworkIOUsage := calculatePredictedUsage(podNetworkIOUsage, podNetworkIOUsageSamples, monitoringInterval) + + //Check if predicted usage is less than the pod network IO threshold + if predictedNetworkIOUsage < podNetworkIOThreshold { + //No action wrt the current primary pod, continue to next primary pod + continue + } + } + processUnfreezable = false + //Sort the secondary pod list based on decreasing usage of network IO + if sortedSecondaryList == false { + sortPodListNetworkUsage(qosResourceStatus, secondaryPods) + sortedSecondaryList = true + } + //Update the action list with the secondary pods to be killed + secondaryPods = updateActionListForNetworkIOController(podNetworkIOUsage, + uint64(podRequestedNetworkIO), + &(qosResourceStatus.ActionList), + secondaryPods, + qosResourceStatus.QosConfig.NetworkIOConfig.ProcessMultiPod) + }*/ + + //Process the unfreeze pod list if both node usage and pod level usages are within thresholds + if processUnfreezable == true { + processUnfreezePodList(qosResourceStatus) + }else { + qosResourceStatus.UnfreezePodList = nil + } + return qosResourceStatus +} + +//Function to update the action list with best effort/secondary pods +func updateActionListForNetworkIOController(networkIOUsage uint64, + networkIOThreshold uint64, + actionList *[]*Action, + pods []*v1.Pod, + processMultiPod bool) ([]*v1.Pod, uint64) { + + var revocableNetworkIO uint64 + revocableNetworkIO = 0 + + i := 0 + //Check the pods to be frozen + for _, pod := range pods { + //Populate the action list with the pod to be frozen + var action Action + action.Target = pod + action.ActionType = FreezePod + *actionList = append(*actionList, &action) + i++ + glog.Infof("Pod %v added to action list", pod.Name) + //Check if the option of freezing multiple pods is enabled + if processMultiPod == false { + return pods[i:], revocableNetworkIO + } + + //Consider the networkIO that will be released by freezing the pod + desiredNetworkIORateStr, _ := pod.Annotations[v1.DesiredNetworkIORate] + podRequestedNetworkIO, _ := strconv.Atoi(desiredNetworkIORateStr) + + //Calculate the total revocable networkIO corresponding to pods + revocableNetworkIO += uint64(podRequestedNetworkIO) + + //Check if the networkIO revoked meets the requested pod/node networkIO + //Some threshold of requested networkIO like 95% can be considered if required + if (networkIOUsage - revocableNetworkIO) < networkIOThreshold { + return pods[i:],revocableNetworkIO + } + } + return pods[i:], revocableNetworkIO +} + +//Function to process unfreeze list +func processUnfreezePodList(qosResourceStatus *QosResourceStatus) { + requiredNodeNetworkIO := float64(0) + //var unfreezeSecondaryPodList []*api.Pod + var unfreezeBesteffortPodList []*v1.Pod + + //Currently the network IO capacity of node is obtained from configuration file + nodeNetworkIO := uint64(qosResourceStatus.QosConfig.NetworkIOConfig.NodeNetworkIOCapacity) + + //Calculate the node threshold + nodeThresholdRate := 1 - qosResourceStatus.QosConfig.NetworkIOConfig.NodeNetworkIOThresholdRate + nodeNetworkIOThreshold := float64(nodeNetworkIO) * nodeThresholdRate + + //Get the current node network usage + nodeNetworkIOUsage := float64(qosResourceStatus.nodeResourceSummary.networkIOResourceUsage.currentUsage) + + // Process the unfreeze pod list + for i := len (qosResourceStatus.UnfreezePodList) ;i > 0 ; i-- { + pod := qosResourceStatus.UnfreezePodList [i-1] + podID := (*pod).UID + _, ok := qosResourceStatus.podResourceSummary[podID] + if !ok { + continue + } + //Get the last network IO usage + unfreezePodNetworkIO := float64(qosResourceStatus.podResourceSummary[podID].networkIOResourceUsage.currentUsage) + qosStatus := qosUtil.GetPodQOS(pod) + + //Consider unfreeze of the Secondary pods on priority + if qosStatus != v1.PodQOSBestEffort { + //To unfreeze secondary pod check if resources are available in the node + availableNetworkIO := float64(nodeNetworkIOThreshold) - nodeNetworkIOUsage - requiredNodeNetworkIO + if unfreezePodNetworkIO < availableNetworkIO { + glog.Infof("Pod %v in unfreeze list", pod.Name) + requiredNodeNetworkIO += unfreezePodNetworkIO + } else { + // Remove pod from unfreeze list + qosResourceStatus.UnfreezePodList = append(qosResourceStatus.UnfreezePodList[:i-1],qosResourceStatus.UnfreezePodList[i:]...) + } + } else { + // Create list of best effort pods to consider for unfreeze later + unfreezeBesteffortPodList = append(unfreezeBesteffortPodList, pod) + qosResourceStatus.UnfreezePodList = append(qosResourceStatus.UnfreezePodList[:i-1],qosResourceStatus.UnfreezePodList[i:]...) + } + } + + // Process the best effort pods now + for _, besteffortPod := range unfreezeBesteffortPodList { + podID := (*besteffortPod).UID + //Get the last network IO usage + unfreezePodNetworkIO := float64(qosResourceStatus.podResourceSummary[podID].networkIOResourceUsage.currentUsage) + availableNetworkIO := float64(nodeNetworkIOThreshold) - nodeNetworkIOUsage - requiredNodeNetworkIO + if unfreezePodNetworkIO < availableNetworkIO { + requiredNodeNetworkIO += unfreezePodNetworkIO + qosResourceStatus.UnfreezePodList = append(qosResourceStatus.UnfreezePodList, besteffortPod) + glog.Infof("Pod %v in unfreeze list", besteffortPod.Name) + } + } + + //To be included/considered after Network IO is consider reclaimable by scheduler + //To unfreeze secondary pods check if reclaimable resource is available with primary pods + /*for _, primaryPod := range qosResourceStatus.primaryPodList { + primaryPodID := (*primaryPod).UID + _, ok := qosResourceStatus.podResourceSummary[primaryPodID] + if !ok { + continue + } + + //Get the pod requested network IO + desiredNetworkIORateStr, _ := primaryPod.Annotations[api.DesiredNetworkIORate] + podRequestedNetworkIO, _ := strconv.Atoi(desiredNetworkIORateStr) + + //Calculate the pod network IO threshold based on the configured threshold rate + thresholdRate := 1 - qosResourceStatus.QosConfig.NetworkIOConfig.PodNetworkIOThresholdRate + podNetworkIOThreshold := float64(podRequestedNetworkIO) * thresholdRate + podNetworkIOUsage := float64(qosResourceStatus.podResourceSummary[primaryPodID].networkIOResourceUsage.currentUsage) + + requiredPodNetworkIO := float64(0) + for i := len(unfreezeSecondaryPodList); i > 0 ; i-- { + pod := unfreezeSecondaryPodList[i-1] + podID := (*pod).UID + _, ok := qosResourceStatus.podResourceSummary[podID] + if !ok { + continue + } + //Get the last network IO usage + unfreezePodNetworkIO := float64(qosResourceStatus.podResourceSummary[podID].networkIOResourceUsage.currentUsage) + availablePodNetworkIO := podNetworkIOThreshold - podNetworkIOUsage - requiredPodNetworkIO + if unfreezePodNetworkIO < availablePodNetworkIO { + requiredPodNetworkIO += unfreezePodNetworkIO + unfreezeSecondaryPodList = append(unfreezeSecondaryPodList[:i-1],unfreezeSecondaryPodList[i:]...) + } + } + } + RemovePod: + for _, pod := range unfreezeSecondaryPodList { + //Remove pod from unfreeze list + for i := len(qosResourceStatus.UnfreezePodList) ; i > 0 ; i-- { + unfreezePod := qosResourceStatus.UnfreezePodList[i-1] + if pod == unfreezePod { + qosResourceStatus.UnfreezePodList = append(qosResourceStatus.UnfreezePodList[:i-1],qosResourceStatus.UnfreezePodList[i:]...) + continue RemovePod + } + } + }*/ +} diff --git a/pkg/kubelet/qoscontroller/overall_controller.go b/pkg/kubelet/qoscontroller/overall_controller.go new file mode 100644 index 0000000000000..4e92c3af2bad9 --- /dev/null +++ b/pkg/kubelet/qoscontroller/overall_controller.go @@ -0,0 +1,150 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qoscontroller + +import ( + "github.com/golang/glog" + "github.com/google/cadvisor/machine" + "k8s.io/api/core/v1" + "math" + + qosUtil "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" + "strconv" + "sync" +) + +// Overall Controller +type OverallController struct { +} + +var MutexNodeStability = &sync.Mutex{} +var IsNodeUnstable bool + +// initialize of QosController is implemented by OverallController and does all the initialization works +func (oc *OverallController) initialize(qosResourceStatus *QosResourceStatus) error { + qosResourceStatus.primaryPodList = nil + qosResourceStatus.secondaryPodList = nil + qosResourceStatus.bestEffortPodList = nil + qosResourceStatus.processNextController = true + qosResourceStatus.ActionList = nil + return nil +} + +// process of QosController interface is implemented by OverallController and does all what an overall controller has to do +func (oc *OverallController) process(qosResourceStatus *QosResourceStatus) error { + + var nodeMemoryRate, nodeAverageMemoryRate float64 + var nodeMemoryCurrentSample, nodeMemoryPreviousSample float64 + + nodeMemoryUsage := qosResourceStatus.nodeResourceSummary.memoryResourceUsage.currentUsage + nodeMemoryUsageSamples := qosResourceStatus.nodeResourceSummary.memoryResourceUsage.samples + monitoringInterval := qosResourceStatus.QosConfig.MonitoringInterval + nodeHighMemoryThresholdRate := qosResourceStatus.QosConfig.MemoryConfig.NodeHighMemoryRequestThresholdRate + nodeLowMemoryThresholdRate := qosResourceStatus.QosConfig.MemoryConfig.NodeLowMemoryRequestThresholdRate + + nodeMemory, err := machine.GetMachineMemoryCapacity() + if err != nil { + glog.Errorf("Cannot obtain Node Memory Capacity") + return err + } + + nodeMemoryCurrentSample = float64(nodeMemoryUsage) + nodeMemoryRate = 0 + + // Calculate the rate of increase for last N samples + for i := 0; i < len(nodeMemoryUsageSamples); i++ { + nodeMemoryPreviousSample = float64(nodeMemoryUsageSamples[i]) + nodeMemoryRate += (nodeMemoryCurrentSample - nodeMemoryPreviousSample) / nodeMemoryPreviousSample + nodeMemoryCurrentSample = nodeMemoryPreviousSample + } + + // Calculate the average rate of increase + nodeAverageMemoryRate = nodeMemoryRate / float64(len(nodeMemoryUsageSamples)) + + // Calculate the various predicted node level memory usage and node level thresholds + // in the next monitoring interval based on the increase in memory rate + nodeMemoryIncreaseRate := math.Pow((1 + nodeAverageMemoryRate), float64(monitoringInterval)) + nodePredictedMemoryUsage := float64(nodeMemoryUsage) * nodeMemoryIncreaseRate + nodeHighMemoryThreshold := float64(nodeMemory) * (1 - nodeHighMemoryThresholdRate) + nodeLowMemoryThreshold := float64(nodeMemory) * (1 - nodeLowMemoryThresholdRate) + + // Check if node memory usage greater than lower memory threshold + if float64(nodeMemoryUsage) > nodeLowMemoryThreshold { + // Check if predicted usage greater than high memory threshold + if nodePredictedMemoryUsage > nodeHighMemoryThreshold { + // Signalling Resource Estimator to stop sending resource usage to Resource Manager + glog.Infof("Node is unstable, Signalling Resource Estimator to stop sending resource usage to Resource Manager") + MutexNodeStability.Lock() + IsNodeUnstable = true + MutexNodeStability.Unlock() + } + // Node stable/unstable state retained when memory usage is between high and low memory threshold + + } else { + // Signalling Resource Estimator to keep sending resource usage to Resource Manager + MutexNodeStability.Lock() + IsNodeUnstable = false + MutexNodeStability.Unlock() + } + + // classify active pods into primary, secondary and best-effort pods +NextActivePod: + for _, pod := range qosResourceStatus.activePods { + + //Do not consider frozen pods in the active list. Ideally paused pods has to be handled with a separate state in kubelet + for _, frozenPod := range qosResourceStatus.FrozenPodList { + if frozenPod == pod { + continue NextActivePod + } + } + + //Do not consider pods not in running state + if pod.Status.Phase != v1.PodRunning { + glog.Infof("Pod %v not in running state", pod.Name) + continue NextActivePod + } + + cpuSecondaryAmountStr, okCpu := pod.Annotations[v1.CpuSecondaryAmount] + memSecondaryAmountStr, okMem := pod.Annotations[v1.MemSecondaryAmount] + if okCpu == false && okMem == false { + glog.Errorf("Annotations to classify the pod as secondary pod is not present in the pod %v", pod.UID) + } + cpuSecondaryAmount, _ := strconv.Atoi(cpuSecondaryAmountStr) + memSecondaryAmount, _ := strconv.Atoi(memSecondaryAmountStr) + + qosStatus := qosUtil.GetPodQOS(pod) + if qosStatus == v1.PodQOSBestEffort { + qosResourceStatus.bestEffortPodList = append(qosResourceStatus.bestEffortPodList, pod) + } else if (okMem == true && memSecondaryAmount > 0) || (okCpu == true && cpuSecondaryAmount > 0) { //Pod that consumes secondary resource is a secondary pod + qosResourceStatus.secondaryPodList = append(qosResourceStatus.secondaryPodList, pod) + } else { + qosResourceStatus.primaryPodList = append(qosResourceStatus.primaryPodList, pod) + } + } + //Set the unfreeze pod list to the frozen pod list + for _, pod := range qosResourceStatus.FrozenPodList { + qosResourceStatus.UnfreezePodList = append (qosResourceStatus.UnfreezePodList, pod) + } + + + //No need to process further if there are no secondary pods, best effort pods and unfreeze pods + if qosResourceStatus.secondaryPodList == nil && qosResourceStatus.bestEffortPodList == nil && qosResourceStatus.UnfreezePodList == nil { + glog.Infof("There are no secondary or best effort or unfreezable pods, so returning back to kubelet") + qosResourceStatus.processNextController = false + } + return nil +} diff --git a/pkg/kubelet/qoscontroller/qos_controller.go b/pkg/kubelet/qoscontroller/qos_controller.go new file mode 100644 index 0000000000000..fc09f1e122a31 --- /dev/null +++ b/pkg/kubelet/qoscontroller/qos_controller.go @@ -0,0 +1,74 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qoscontroller + +type QosController interface { + process(qosResourceStatus *QosResourceStatus) error + // Initialization function for the controllers + initialize(qosResourceStatus *QosResourceStatus) error +} + +type QosMonitor struct { +} + +func NewQosMonitor() *QosMonitor { + return &QosMonitor{} +} + +func (qosMonitor QosMonitor) StartQosMonitor(qosResourceStatus *QosResourceStatus) error { + run(qosResourceStatus) + return nil +} + +// Instance of Overall Controller +var oc OverallController + +// Instance of Resource Summary +var rs ResourceSummary + +// Instance of Memory Controller +var mc MemController + +// Instance of Network IO Controller +var nc NetworkIOController + +// Instance of Disk IO Controller +var dc DiskIOController + +// Instance of SLA Controller +var sc SlaController + +// function to run all the controllers sequentially and then to run the executor +func run(qosResourceStatus *QosResourceStatus) error { + //Call Metrics Aquisition + rs.qosAcquireMetrics(qosResourceStatus) + // Call Overall Controller + oc.initialize(qosResourceStatus) + oc.process(qosResourceStatus) + if qosResourceStatus.processNextController == false { + return nil + } + // Call Memory Controller + mc.process(qosResourceStatus) + // Call Network IO Controller + nc.process(qosResourceStatus) + // Call Disk IO Controller + dc.process(qosResourceStatus) + // Call SLA Controller + sc.process(qosResourceStatus) + return nil +} diff --git a/pkg/kubelet/qoscontroller/qos_metrics.go b/pkg/kubelet/qoscontroller/qos_metrics.go new file mode 100644 index 0000000000000..614be075e1dbf --- /dev/null +++ b/pkg/kubelet/qoscontroller/qos_metrics.go @@ -0,0 +1,335 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qoscontroller + +import ( + "fmt" + "github.com/golang/glog" + cadvisorapiv1 "github.com/google/cadvisor/info/v1" + cadvisorapiv2 "github.com/google/cadvisor/info/v2" + "k8s.io/api/core/v1" + kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/apimachinery/pkg/types" +) + +const MaxSamples int = 10 + +// ResourceSummary contains all the usage details of various resources +type ResourceSummary struct { + // for memory controller + memoryResourceUsage ResourceUsage + // for network IO controller + networkIOResourceUsage ResourceUsage + // for disk IO controller + diskIOResourceUsage ResourceUsage +} + +// ResourceUsage contains parameters pertaining in general to resources +type ResourceUsage struct { + threshold uint64 + currentUsage uint64 + predictedUsage uint64 + numSamples int + samples []uint64 +} + +// delete the metrics entries collected for the pod which are not currently in active pod list +func (*ResourceSummary) cleanUp(qosResourceStatus *QosResourceStatus) error { + for podId := range qosResourceStatus.podResourceSummary { + exists := false + for _, activePod := range qosResourceStatus.activePods { + if podId == activePod.ObjectMeta.UID { + exists = true + break + } + } + if !exists { + delete(qosResourceStatus.podResourceSummary, podId) + } + } + return nil +} + +// qosAcquireMetrics calculates/gathers all resources +func (rs *ResourceSummary) qosAcquireMetrics(qosResourceStatus *QosResourceStatus) error { + qosAcquireNodeMetrics(qosResourceStatus) + // loop each pod in the node and collect the pod level metrics +NextActivePod: + for _, pod := range qosResourceStatus.activePods { + //Do not collect metrics for frozen pods + for _, frozenPod := range qosResourceStatus.FrozenPodList { + if frozenPod == pod { + continue NextActivePod + } + } + qosAcquirePodMetrics(pod, qosResourceStatus) + } + err := rs.cleanUp(qosResourceStatus) + return err +} + +// qosAcquireNodeMetrics calculates/gathers all resources for the node +func qosAcquireNodeMetrics(qosResourceStatus *QosResourceStatus) error { + cadvisorOptions := cadvisorapiv2.RequestOptions{ + IdType: cadvisorapiv2.TypeName, + Count: 1, + Recursive: false, + } + + //calculate node memory usage + nodeMemoryResourceUsage := qosResourceStatus.nodeResourceSummary.memoryResourceUsage + numMemorySamples := nodeMemoryResourceUsage.numSamples + if MaxSamples == numMemorySamples { + for i := 0; i < (MaxSamples - 1); i++ { + nodeMemoryResourceUsage.samples[i] = nodeMemoryResourceUsage.samples[i+1] + } + nodeMemoryResourceUsage.samples[numMemorySamples-1] = 0 + } else if numMemorySamples < MaxSamples { + nodeMemoryResourceUsage.samples = append(nodeMemoryResourceUsage.samples, 0) + numMemorySamples += 1 + } + + derivedStatsMap, err := qosResourceStatus.cadvisor.DerivedStats("/", cadvisorOptions) + if err != nil { + return fmt.Errorf("Get DerivedStats error:%v", err) + } + + latestMemoryIOUsage := derivedStatsMap["/"].LatestUsage.Memory + nodeMemoryResourceUsage.numSamples = numMemorySamples + nodeMemoryResourceUsage.samples[numMemorySamples-1] = latestMemoryIOUsage + nodeMemoryResourceUsage.currentUsage = latestMemoryIOUsage + qosResourceStatus.nodeResourceSummary.memoryResourceUsage = nodeMemoryResourceUsage + + glog.Infof("--Acquired %d samples for Memory IO metrics for Node . Latest usage %v --", nodeMemoryResourceUsage.numSamples, nodeMemoryResourceUsage.currentUsage) + + // calculate network IO usage + nodeNetworkIOResourceUsage := qosResourceStatus.nodeResourceSummary.networkIOResourceUsage + numNetworkIOSamples := nodeNetworkIOResourceUsage.numSamples + + if MaxSamples == numNetworkIOSamples { + for i := 0; i < (MaxSamples - 1); i++ { + nodeNetworkIOResourceUsage.samples[i] = nodeNetworkIOResourceUsage.samples[i+1] + } + nodeNetworkIOResourceUsage.samples[numNetworkIOSamples-1] = 0 + } else if numNetworkIOSamples < MaxSamples { + nodeNetworkIOResourceUsage.samples = append(nodeNetworkIOResourceUsage.samples, 0) + numNetworkIOSamples += 1 + } + + derivedStatsNetworkIOMap, err := qosResourceStatus.cadvisor.DerivedStats("/", cadvisorOptions) + if err != nil { + return fmt.Errorf("Get DerivedStats error:%v", err) + } + + latestNetworkIOUsage := derivedStatsNetworkIOMap["/"].LatestUsage.Network + nodeNetworkIOResourceUsage.samples[numNetworkIOSamples-1] = latestNetworkIOUsage + if numNetworkIOSamples > 1 { + latestNetworkIOUsage -= nodeNetworkIOResourceUsage.samples[numNetworkIOSamples-2] + nodeNetworkIOResourceUsage.samples[numNetworkIOSamples-2] = latestNetworkIOUsage + nodeNetworkIOResourceUsage.currentUsage = latestNetworkIOUsage + } + + nodeNetworkIOResourceUsage.numSamples = numNetworkIOSamples + //nodeNetworkIOResourceUsage.samples[numNetworkIOSamples-1] = latestNetworkIOUsage + //nodeNetworkIOResourceUsage.currentUsage = latestNetworkIOUsage + qosResourceStatus.nodeResourceSummary.networkIOResourceUsage = nodeNetworkIOResourceUsage + + glog.Infof("--Acquired %d samples for Network IO metrics for Node . Value read = %v Latest usage %v --", + nodeNetworkIOResourceUsage.numSamples, nodeNetworkIOResourceUsage.samples[numNetworkIOSamples-1],nodeNetworkIOResourceUsage.currentUsage) + + return nil +} + +// qosAcquirePodMetrics calculates/gathers all resources for each of the pod and fills up the map +func qosAcquirePodMetrics(pod *api.Pod, qosResourceStatus *QosResourceStatus) error { + acquireMemoryMetrics(pod, qosResourceStatus) + acquireDiskIOMetrics(pod, qosResourceStatus) + acquireNetworkIOMetrics(pod, qosResourceStatus) + return nil +} + +// acquireMemoryMetrics calculates/gathers memory related resources for each of the pod and fills up the map +func acquireMemoryMetrics(pod *v1.Pod, qosResourceStatus *QosResourceStatus) error { + var resourceSummary *ResourceSummary + cadvisorOptions := cadvisorapiv2.RequestOptions{ + IdType: cadvisorapiv2.TypeName, + Count: 1, + Recursive: false, + } + + podId := pod.ObjectMeta.UID + nameToId := make(map[string]string) + + //check if there is an entry in the map, if it doesn't exist we end up creating one towards the end + //of this function + _, ok := qosResourceStatus.podResourceSummary[podId] + if ok { + resourceSummary = qosResourceStatus.podResourceSummary[podId] + } else { + resourceSummary = new(ResourceSummary) + } + resourceUsage := resourceSummary.memoryResourceUsage + numSamples := resourceUsage.numSamples + if MaxSamples == numSamples { + for i := 0; i < (MaxSamples - 1); i++ { + resourceUsage.samples[i] = resourceUsage.samples[i+1] + } + resourceUsage.samples[numSamples-1] = 0 + } else if numSamples < MaxSamples { + resourceUsage.samples = append(resourceUsage.samples, 0) + numSamples += 1 + } + + //get mappings from containers' Names to Ids + for _, containerStatus := range pod.Status.ContainerStatuses { + if len(containerStatus.Name) > 0 && len(containerStatus.ContainerID) >= 9 { + nameToId[containerStatus.Name] = containerStatus.ContainerID[9:] + } + } + + for _, container := range pod.Spec.Containers { + containerId, ok := nameToId[container.Name] + //glog.Infof("-----------------containerName:%v containerId:%v", container.Name, containerId) + if !ok { + + return fmt.Errorf("could not found the ContainerId of container: %v", container.Name) + } + + derivedStatsMap, err := qosResourceStatus.cadvisor.DerivedStats("/"+containerId, cadvisorOptions) + if err != nil { + return fmt.Errorf("Get DerivedStats error:%v", err) + } + latestUsage := derivedStatsMap["/"+containerId].LatestUsage.Memory + resourceUsage.samples[numSamples-1] += latestUsage + + } + resourceUsage.numSamples = numSamples + resourceUsage.currentUsage = resourceUsage.samples[numSamples-1] + resourceSummary.memoryResourceUsage = resourceUsage + qosResourceStatus.podResourceSummary[podId] = resourceSummary + + glog.Infof("-----Acquired %d samples for memory metrics for Pod:%v PodId:%v Latest usage %v----------", resourceUsage.numSamples, pod.Name, podId, resourceUsage.currentUsage) + /* + for i:=0;i 1 { + latestUsage -= resourceUsage.samples[numSamples-2] + resourceUsage.samples[numSamples-2] = latestUsage + resourceUsage.currentUsage = resourceUsage.samples[numSamples-2] + } + + //resourceUsage.samples[numSamples-1] = latestUsage + + resourceUsage.numSamples = numSamples + //resourceUsage.currentUsage = resourceUsage.samples[numSamples-1] + resourceSummary.networkIOResourceUsage = resourceUsage + qosResourceStatus.podResourceSummary[podId] = resourceSummary + + glog.Infof("-----Acquired %d samples for network IO stats for Pod:%v PodId:%v Latest usage %v----------", resourceUsage.numSamples, pod.Name, podId, resourceUsage.currentUsage) + + //for i:=0;i podRealLatency { + podRealLatency = v.Value.(float64) + } + } + + //podRealLatency := podMetricValue[len(podMetricValue)-1].Value.(float64) + glog.Infof("Get pod real latency from ops agent: %v", podRealLatency) + return float64(podRealLatency), nil +} + +func (sc *SlaController) GetPodDeploymentFromReplicaSet(rs *extensions.ReplicaSet, +qosResourceStatus *QosResourceStatus) ([]extensions.Deployment, error) { + + var deployments []extensions.Deployment + + if len(rs.Labels) == 0 { + //glog.Infof("No deployments found for ReplicaSet %v because it has no labels", rs.Name) + return nil, fmt.Errorf("No deployments found for ReplicaSet %v because it has no labels", rs.Name) + } + + ds, err := qosResourceStatus.kubeClient.Extensions().Deployments(rs.Namespace).List(metav1.ListOptions{}) + + if err != nil { + //glog.Infof("Failed list deployments for pod target latency: %v", err) + return nil, fmt.Errorf("Failed list deployments for pod target latency: %v", err) + } + + for _, d := range ds.Items { + + /* + if d.Namespace != rs.Namespace { + glog.Infof("Deployment namespace: %v is not the same as replica set namespace: %v, so skip. ", + d.Namespace, rs.Namespace) + continue + } + */ + + selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) + + if err != nil { + //glog.Infof("Invalid label selector for pod target latency: %v", err) + return nil, fmt.Errorf("Invalid label selector for pod target latency: %v", err) + } else { + glog.Infof("Deployment selector for pod target latency: %v ", selector) + } + + // If a deployment with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)){ + glog.Infoln("Replica set labels: ", rs.Labels) + glog.Infoln("Deployment selector is empty or not mathc to replica set label, so skip") + continue + } + + deployments = append(deployments, d) + } + + if len(deployments) == 0 { + return nil, fmt.Errorf("could not find deployments for ReplicaSet %v in namespace %v with labels: %v", + rs.Name, rs.Namespace, rs.Labels) + } else { + //glog.Infof("Get pod deployments from replca set: %v", deployments) + return deployments, nil + } +} + +func (sc *SlaController) GetPodDeploymentFromApiServer(pod *v1.Pod, +qosResourceStatus *QosResourceStatus) (*extensions.Deployment, error) { + var selector labels.Selector + rss, err := qosResourceStatus.kubeClient.Extensions().ReplicaSets(pod.Namespace).List(api.ListOptions{}) + + if err != nil { + return nil, fmt.Errorf("Failed list replica sets for pod target latency: %v", err) + } + + for _, rs := range rss.Items { + + /* + if rs.Namespace != pod.Namespace { + glog.Infof("Replica set namespace: %v is not the same as pod namespace: %v, so skip. ", + rs.Namespace, pod.Namespace) + continue + } + */ + + selector, err = metav1.LabelSelectorAsSelector(rs.Spec.Selector) + + if err != nil { + return nil, fmt.Errorf("Failed get selector for pod target latency: %v", err) + } else { + glog.Infof("Replica set selector for pod target latency: %v ", selector) + } + + // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + glog.Infoln("pod labels: ", pod.Labels) + glog.Infoln("Replica set selector is empty or not match to pod lable, so skip") + continue + } + + deployments, err := sc.GetPodDeploymentFromReplicaSet(&rs, qosResourceStatus) + + if err == nil && len(deployments) > 0 { + //glog.Infof("Get pod deployment for pod target latency: %v ", deployments[0]) + return &deployments[0], nil + } + } + + return nil, fmt.Errorf("Failed get pod deployment from api server for pod target latency: %v", err) +} + +func (sc *SlaController) GetNodeMetricRespFromOpsAgent(opsAgentEndpoint string) (*NodeMetricResp, error) { + var nodeMetricResp *NodeMetricResp + req, err := http.NewRequest("GET", opsAgentEndpoint, nil) + + if err != nil { + //glog.Infof("Failed http NewRequest to ops agent for node real latency: %v", err) + return nodeMetricResp, fmt.Errorf("Failed http NewRequest to ops agent for node real latency: %v", err) + } + + //glog.Infof("Http request to ops agent for node real latency: %v ", req) + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + + if err != nil { + //glog.Infof("Failed httpClient.Do for node real latency: %v", err) + return nodeMetricResp, fmt.Errorf("Failed httpClient.Do for node real latency: %v", err) + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + errInfo, _ := ioutil.ReadAll(resp.Body) + //glog.Infof("Failed get node real latency with code: %d and info: %s", resp.StatusCode, string(errInfo)) + return nodeMetricResp, fmt.Errorf("Failed get node real latency with code: %d and info: %v", + resp.StatusCode, string(errInfo)) + } + + //glog.Infof("Http response body from ops agent for node real latency: %v ", resp.Body) + err = json.NewDecoder(resp.Body).Decode(&nodeMetricResp) + + if err != nil { + //glog.Infof("Failed decode node real latency from ops agent: %v", err) + return nodeMetricResp, fmt.Errorf("Failed decode node real latency from ops agent: %v", err) + } + + //glog.Infof("Node real latency returned from ops agent: %v ", *nodeMetricResp) + return nodeMetricResp, nil +} + +func (sc *SlaController) process(qosResourceStatus *QosResourceStatus) *QosResourceStatus { + + var podTargetLatency float64 + var podLatencyThreshold float64 + var podRealLatency float64 + var opsAgentEndpoint string + opsAgentApiPath := qosResourceStatus.QosConfig.SlaConfig.OpsAgentApiPath + defalutOpsAgentApiPath := ":11808/api/v1/pod-metrics/latency" + sortedSecondaryList := false + + if qosResourceStatus.primaryPodList == nil || len(qosResourceStatus.primaryPodList) == 0 { + glog.Infoln("Primary pod list is empty, no action from SLA controller") + return qosResourceStatus + } + + if qosResourceStatus.secondaryPodList == nil || len(qosResourceStatus.secondaryPodList) == 0 { + glog.Infoln("Secondary pod list is empty, no action from SLA controller") + return qosResourceStatus + } + + actionListSize := len(qosResourceStatus.ActionList) + glog.Infoln("Current action list size: ", actionListSize) + + if actionListSize != 0 { + glog.Infof("Action list is not empty, the first action is: %v, skip processing primary pods", + qosResourceStatus.ActionList[0]) + return qosResourceStatus + } + + hostIP := qosResourceStatus.primaryPodList[0].Status.HostIP + + if opsAgentApiPath == "" { + glog.Infof("OpsAgentApiPath is nil or empty, use the default one: %v", defalutOpsAgentApiPath) + opsAgentEndpoint = "http://" + hostIP + defalutOpsAgentApiPath + } else { + opsAgentEndpoint = "http://" + hostIP + opsAgentApiPath + } + + //glog.Infof("endpoint for ops-agent %s", opsAgentEndpoint) + nodeMetricResp, err := sc.GetNodeMetricRespFromOpsAgent(opsAgentEndpoint) + + if err != nil { + glog.Infof("Failed get node real latency from ops agent: %v ", err) + return qosResourceStatus + } + + //Check the SLA status for each primary pod + glog.Infoln("Start processing each primary pod") + + for _, pod := range qosResourceStatus.primaryPodList { + podID := pod.UID + podIP := pod.Status.PodIP + glog.Infof("SLA controller is processing primary pod with ID: %v at IP: %v ", podID, podIP) + + //Calculate the pod latency threshold based on the configured threshold rate + //podTargetLatency, err = sc.GetPodTargetLatency(pod, qosResourceStatus) + podTargetLatency, err = sc.GetPodTargetLatencyFromAnnotation(pod) + + if err != nil || podTargetLatency == 0 { + glog.Infof("No target latency for primary pod at IP: %v with error: %v, skip it", podIP, err) + continue + } + + thresholdRate := 1 + qosResourceStatus.QosConfig.SlaConfig.PodLatencyThresholdRate + podLatencyThreshold = podTargetLatency * thresholdRate + podRealLatency, err = sc.GetPodRealLatencyFromOpsAgent(nodeMetricResp, podIP) + + if err != nil || podRealLatency == 0 { + glog.Infof("No real latency for primary pod at IP: %v with error: %v, skip it", podIP, err) + continue + } + + //Check if the current latency is greater than the pod latency threshold or the action list is empty + if podRealLatency <= podLatencyThreshold { + glog.Infof("Real latency: %v < target latency: %v for primary pod at IP: %v, skip it", + podRealLatency, podTargetLatency, podIP) + continue + } + + glog.Infof("Real latency: %v > target latency: %v for primary pod at IP: %v, kill it", + podRealLatency, podTargetLatency, podIP) + + if sortedSecondaryList == false && len(qosResourceStatus.secondaryPodList) > 1 { + //Sort the secondary pod list based on decreasing start time + sortPodListStartTime(qosResourceStatus.secondaryPodList) + sortedSecondaryList = true + } + + //Update the action list with the secondary pods to be killed + qosResourceStatus.secondaryPodList = slaUpdateActionList(&(qosResourceStatus.ActionList), + qosResourceStatus.secondaryPodList, qosResourceStatus.QosConfig.SlaConfig.ProcessMultiPod) + + IsNodeUnstable = true + glog.Infoln("SLA controller set IsNodeUnstable to true when checking primary pods") + return qosResourceStatus + } + + glog.Infoln("Start processing each secondary pod") + + //Check the SLA status for each secondary pod + for _, pod := range qosResourceStatus.secondaryPodList { + podID := pod.UID + podIP := pod.Status.PodIP + glog.Infof("SLA controller is processing secondary pod with ID: %v at IP: %v ", podID, podIP) + + //Calculate the pod latency threshold based on the configured threshold rate + //podTargetLatency, err = sc.GetPodTargetLatency(pod, qosResourceStatus) + podTargetLatency, err = sc.GetPodTargetLatencyFromAnnotation(pod) + + if err != nil || podTargetLatency == 0 { + glog.Infof("No target latency for secondary pod at IP: %v with error: %v, skip it", podIP, err) + continue + } + + thresholdRate := 1 + qosResourceStatus.QosConfig.SlaConfig.PodLatencyThresholdRate + podLatencyThreshold = podTargetLatency * thresholdRate + podRealLatency, err = sc.GetPodRealLatencyFromOpsAgent(nodeMetricResp, podIP) + + if err != nil || podRealLatency == 0 { + glog.Infof("No real latency for secondary pod at IP: %v with error: %v, skip it", podIP, err) + continue + } + + //Check if the current latency is greater than the pod latency threshold or the action list is empty + if podRealLatency <= podLatencyThreshold { + glog.Infof("Real latency: %v < target latency: %v for secondary pod at IP: %v, skip it", + podRealLatency, podTargetLatency, podIP) + continue + } + + glog.Infof("Real latency: %v > target latency: %v for secondary pod at IP: %v, kill it", + podRealLatency, podTargetLatency, podIP) + + if sortedSecondaryList == false && len(qosResourceStatus.secondaryPodList) > 1 { + //Sort the secondary pod list based on decreasing start time + sortPodListStartTime(qosResourceStatus.secondaryPodList) + sortedSecondaryList = true + } + + //Update the action list with the secondary pods to be killed + qosResourceStatus.secondaryPodList = slaUpdateActionList(&(qosResourceStatus.ActionList), + qosResourceStatus.secondaryPodList, qosResourceStatus.QosConfig.SlaConfig.ProcessMultiPod) + + IsNodeUnstable = true + glog.Infoln("SLA controller set IsNodeUnstable to true when checking secondary pods") + return qosResourceStatus + } + + return qosResourceStatus +} + +func slaUpdateActionList(actionList *[]*Action, secondaryPods []*v1.Pod, processMultiPod bool) []*v1.Pod { + + i := 0 + + //Check the secondary pods to be killed + for _, pod := range secondaryPods { + + //Populate the action list with the secondary pod to be killed + var action Action + action.Target = pod + action.ActionType = KillPod + *actionList = append(*actionList, &action) + i++ + glog.Infof("Secondary Pod %v added to action list", pod.Name) + + //Check if the option of killing multiple secondary pods is enabled + if processMultiPod == false { + glog.Infof("processMultiPod is: ", processMultiPod) + return secondaryPods[i:] + } + } + + return secondaryPods[i:] +} + +//Function to sort secondary/best effort pod list based on start time +func sortPodListStartTime(pods []*v1.Pod) { + By(func(p1, p2 *v1.Pod) bool { + p1StartTime := p1.Status.StartTime.Time + p2StartTime := p2.Status.StartTime.Time + return p1StartTime.After(p2StartTime) + }).Sort(pods) + return +} diff --git a/pkg/kubelet/qoscontroller/sla_controller_test.go b/pkg/kubelet/qoscontroller/sla_controller_test.go new file mode 100644 index 0000000000000..ba6a2843e8a46 --- /dev/null +++ b/pkg/kubelet/qoscontroller/sla_controller_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qoscontroller + +import ( + "testing" + "encoding/json" + "bytes" + "net/http" + "io/ioutil" +) + +func TestDecodeNodeRealLatencyFromString(t *testing.T) { + var nodeMetricResp *NodeMetricResp + var nodeMetricValue []*NodeMetricPoint + s := "{\"metric_name\":\"latency\",\"metric_value\":{\"129.188.37.75\":[{\"value\":0,\"tags\":{\"ip\":\"129.188.37.75\",\"transaction_type\":\"/\"},\"timestamp\":1473250120}]}}" + b := bytes.NewBufferString(s) + err := json.NewDecoder(b).Decode(&nodeMetricResp) + + if err != nil { + t.Log("Failed decode node real latency from string: ", err) + } else { + t.Log("nodeMetricResp: ", *nodeMetricResp) + } + + nodeMetricValue = nodeMetricResp.MetricValue["129.188.37.75"] + t.Log("nodeMetricValue: ", nodeMetricValue) + podRealLatency := nodeMetricValue[0].Value.(float64) + t.Log("podRealLatency: ", podRealLatency) +} + +func TestGetNodeRealLatency(t *testing.T) { + var nodeMetricResp *NodeMetricResp + hostIP := "10.162.215.149" + opsAgentEndpoint := "http://" + hostIP + ":11808/api/v1/pod-metrics/latency" + req, err := http.NewRequest("GET", opsAgentEndpoint, nil) + + if err != nil { + t.Log("Failed http NewRequest to ops agent for node real latency: ", err) + return + } + + t.Log("Http request to ops agent for node real latency: ", req) + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + + if err != nil { + t.Log("Failed httpClient.Do for node real latency: ", err) + return + } + + t.Log("Http response from ops agent for node real latency: ", resp) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + errInfo, _ := ioutil.ReadAll(resp.Body) + t.Logf("Failed get node real latency with code: %d and info: %s", resp.StatusCode, string(errInfo)) + return + } + + t.Log("Http response body from ops agent for node real latency: ", resp.Body) + err = json.NewDecoder(resp.Body).Decode(&nodeMetricResp) + + if err != nil { + t.Log("Failed decode node real latency from ops agent: ", err) + return + } + + podMetricValue := nodeMetricResp.MetricValue["192.168.0.9"] + + if podMetricValue == nil { + t.Log("No pod metric value from pod ") + } else { + t.Log("Get pod real latency from ops agent: ", podMetricValue[len(podMetricValue)-1].Value.(float64)) + } +} + +func TestUpdateActionList(t *testing.T) { + + var secondaryPods []string + var actionList []string + pod1 := "pod1" + secondaryPods = append(secondaryPods, pod1) + pod2 := "pod2" + secondaryPods = append(secondaryPods, pod2) + i := 0 + processMultiPod := false + + //Check the secondary pods to be killed + for _, pod := range secondaryPods { + + //Populate the action list with the secondary pod to be killed + actionList = append(actionList, "KillPod") + i++ + t.Log("Secondary Pod %v added to action list", pod) + + //Check if the option of killing multiple secondary pods is enabled + if processMultiPod == false { + t.Log("processMultiPod is: ", processMultiPod) + t.Log("Return secondary pods: ", secondaryPods[i:]) + } + } + + t.Log("Return secondary pods: ", secondaryPods[i:]) +} From d30f6b6bac637e785d67c173998f3a29ce082ede Mon Sep 17 00:00:00 2001 From: wackxu Date: Tue, 9 Jan 2018 14:49:09 +0800 Subject: [PATCH 2/2] fix error --- pkg/apis/core/types.go | 32 +++++++++ pkg/apis/extensions/types.go | 3 + pkg/kubelet/cadvisor/cadvisor_linux.go | 4 ++ pkg/kubelet/cadvisor/cadvisor_unsupported.go | 4 ++ pkg/kubelet/cadvisor/cadvisor_windows.go | 4 ++ pkg/kubelet/cadvisor/types.go | 3 + pkg/kubelet/qoscontroller/qos_metrics.go | 2 +- pkg/kubelet/qoscontroller/sla_controller.go | 4 +- .../admission/formatdeployment/admission.go | 72 +++++++++++++++++++ staging/src/k8s.io/api/core/v1/types.go | 43 +++++++++++ .../k8s.io/api/extensions/v1beta1/types.go | 3 + .../google/cadvisor/info/v2/container.go | 4 ++ .../google/cadvisor/summary/percentiles.go | 7 ++ .../google/cadvisor/summary/summary.go | 11 ++- 14 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 plugin/pkg/admission/formatdeployment/admission.go diff --git a/pkg/apis/core/types.go b/pkg/apis/core/types.go index e036d0721b9ad..7a87c969a4a26 100644 --- a/pkg/apis/core/types.go +++ b/pkg/apis/core/types.go @@ -4575,3 +4575,35 @@ const ( // DefaultHardPodAffinityWeight defines the weight of the implicit PreferredDuringScheduling affinity rule. DefaultHardPodAffinitySymmetricWeight int32 = 1 ) + +type SlaProfile struct { + //Spec SlaSpec + SlaSpec map[string]float32 + AppClass AppClass + TestDimensions []TestDimension + WorkLoads string +} + +type SlaSpecType string + +const ( + SlaSpecQPS SlaSpecType = "targetQPS" + SlaSpecRT SlaSpecType = "targetRT" +) + +type AppClass string + +const ( + AppClassService AppClass = "service" + AppClassJob AppClass = "job" +) + +type TestDimension string + +const ( + TestScaleUp TestDimension = "scale-up" + TestScaleOut TestDimension = "scale-out" + TestInterference TestDimension = "interference" + TestNetworkIO TestDimension = "network-io" + TestHeterogeneity TestDimension = "heterogeneity" +) \ No newline at end of file diff --git a/pkg/apis/extensions/types.go b/pkg/apis/extensions/types.go index e36972846bcf1..9dfe315eda005 100644 --- a/pkg/apis/extensions/types.go +++ b/pkg/apis/extensions/types.go @@ -92,6 +92,9 @@ type Deployment struct { } type DeploymentSpec struct { + // SLA Requirements for user's application + Sla *api.SlaProfile + // Number of desired pods. This is a pointer to distinguish between explicit // zero and not specified. Defaults to 1. // +optional diff --git a/pkg/kubelet/cadvisor/cadvisor_linux.go b/pkg/kubelet/cadvisor/cadvisor_linux.go index 948cb651b3006..3d0b23874a330 100644 --- a/pkg/kubelet/cadvisor/cadvisor_linux.go +++ b/pkg/kubelet/cadvisor/cadvisor_linux.go @@ -255,3 +255,7 @@ func (cc *cadvisorClient) HasDedicatedImageFs() (bool, error) { } return imageFsInfo.Device != rootFsInfo.Device, nil } + +func (cc *cadvisorClient) DerivedStats(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.DerivedStats, error) { + return cc.GetDerivedStats(name, options) +} \ No newline at end of file diff --git a/pkg/kubelet/cadvisor/cadvisor_unsupported.go b/pkg/kubelet/cadvisor/cadvisor_unsupported.go index f1ae9486d8735..a2d1b199b1526 100644 --- a/pkg/kubelet/cadvisor/cadvisor_unsupported.go +++ b/pkg/kubelet/cadvisor/cadvisor_unsupported.go @@ -84,3 +84,7 @@ func (cu *cadvisorUnsupported) HasDedicatedImageFs() (bool, error) { func (c *cadvisorUnsupported) GetFsInfoByFsUUID(uuid string) (cadvisorapiv2.FsInfo, error) { return cadvisorapiv2.FsInfo{}, nil } + +func (cu *cadvisorUnsupported) DerivedStats(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.DerivedStats, error) { + return nil, unsupportedErr +} \ No newline at end of file diff --git a/pkg/kubelet/cadvisor/cadvisor_windows.go b/pkg/kubelet/cadvisor/cadvisor_windows.go index 6ce1f8d1b38fd..f84dd3382fd25 100644 --- a/pkg/kubelet/cadvisor/cadvisor_windows.go +++ b/pkg/kubelet/cadvisor/cadvisor_windows.go @@ -84,3 +84,7 @@ func (cu *cadvisorClient) HasDedicatedImageFs() (bool, error) { func (c *cadvisorClient) GetFsInfoByFsUUID(uuid string) (cadvisorapiv2.FsInfo, error) { return cadvisorapiv2.FsInfo{}, nil } + +func (cc *cadvisorClient) DerivedStats(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.DerivedStats, error) { + return nil, nil +} \ No newline at end of file diff --git a/pkg/kubelet/cadvisor/types.go b/pkg/kubelet/cadvisor/types.go index 4777798f2ff4d..4fb61026bb6bd 100644 --- a/pkg/kubelet/cadvisor/types.go +++ b/pkg/kubelet/cadvisor/types.go @@ -48,6 +48,9 @@ type Interface interface { // GetFsInfoByFsUUID returns the stats of the filesystem with the specified // uuid. GetFsInfoByFsUUID(uuid string) (cadvisorapiv2.FsInfo, error) + + // Gets summary stats for all containers based on request options. + DerivedStats(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.DerivedStats, error) } // ImageFsInfoProvider informs cAdvisor how to find imagefs for container images. diff --git a/pkg/kubelet/qoscontroller/qos_metrics.go b/pkg/kubelet/qoscontroller/qos_metrics.go index 614be075e1dbf..b74008a1e1b04 100644 --- a/pkg/kubelet/qoscontroller/qos_metrics.go +++ b/pkg/kubelet/qoscontroller/qos_metrics.go @@ -155,7 +155,7 @@ func qosAcquireNodeMetrics(qosResourceStatus *QosResourceStatus) error { } // qosAcquirePodMetrics calculates/gathers all resources for each of the pod and fills up the map -func qosAcquirePodMetrics(pod *api.Pod, qosResourceStatus *QosResourceStatus) error { +func qosAcquirePodMetrics(pod *v1.Pod, qosResourceStatus *QosResourceStatus) error { acquireMemoryMetrics(pod, qosResourceStatus) acquireDiskIOMetrics(pod, qosResourceStatus) acquireNetworkIOMetrics(pod, qosResourceStatus) diff --git a/pkg/kubelet/qoscontroller/sla_controller.go b/pkg/kubelet/qoscontroller/sla_controller.go index 9c83163f400a8..6a759577eac8c 100644 --- a/pkg/kubelet/qoscontroller/sla_controller.go +++ b/pkg/kubelet/qoscontroller/sla_controller.go @@ -21,7 +21,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/kubernetes/pkg/apis/extensions" + extensions "k8s.io/api/extensions/v1beta1" "k8s.io/kubernetes/plugin/pkg/admission/formatdeployment" "net/http" "fmt" @@ -168,7 +168,7 @@ qosResourceStatus *QosResourceStatus) ([]extensions.Deployment, error) { func (sc *SlaController) GetPodDeploymentFromApiServer(pod *v1.Pod, qosResourceStatus *QosResourceStatus) (*extensions.Deployment, error) { var selector labels.Selector - rss, err := qosResourceStatus.kubeClient.Extensions().ReplicaSets(pod.Namespace).List(api.ListOptions{}) + rss, err := qosResourceStatus.kubeClient.Extensions().ReplicaSets(pod.Namespace).List(metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("Failed list replica sets for pod target latency: %v", err) diff --git a/plugin/pkg/admission/formatdeployment/admission.go b/plugin/pkg/admission/formatdeployment/admission.go new file mode 100644 index 0000000000000..2962ba05179ec --- /dev/null +++ b/plugin/pkg/admission/formatdeployment/admission.go @@ -0,0 +1,72 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http: //www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package formatdeployment + +import ( + "io" + "k8s.io/apiserver/pkg/admission" + api "k8s.io/kubernetes/pkg/apis/core" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/kubernetes/pkg/apis/extensions" + "strconv" + + "github.com/golang/glog" +) + +const TargetLatencyField = "pod.alpha.kubernetes.io/targetlatency" + +type formatdeployment struct { + *admission.Handler +} + +func (fd formatdeployment) Admit(attributes admission.Attributes) (err error) { + // Ignore all calls to subresources or resources other than pods. + if attributes == nil || len(attributes.GetSubresource()) != 0 || attributes.GetResource() != extensions.Resource("deployments").WithVersion(attributes.GetResource().Version) { + return nil + } + deployment, ok := attributes.GetObject().(*extensions.Deployment) + glog.V(3).Infof("Get deployment: %v when format deployment.", attributes.GetObject()) + if !ok { + return apierrors.NewBadRequest("Resource was marked with kind Pod but was unable to be converted") + } + + if deployment.Spec.Sla != nil && deployment.Spec.Sla.SlaSpec != nil { + rt, ok := deployment.Spec.Sla.SlaSpec[string(api.SlaSpecRT)] + if ok { + if deployment.Spec.Template.ObjectMeta.Annotations == nil { + deployment.Spec.Template.ObjectMeta.Annotations = make(map[string]string) + } + deployment.Spec.Template.ObjectMeta.Annotations[string(TargetLatencyField)] = strconv.FormatFloat(float64(rt), 'f', 2, 32) + } + } + + return nil +} + +// NewAddPodAnnotations creates a new add pod annotations admission control handler +func NewFormatDeployment() admission.Interface { + return &formatdeployment{ + Handler: admission.NewHandler(admission.Create, admission.Update), + } +} + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register("FormatDeployment", func(config io.Reader) (admission.Interface, error) { + return NewFormatDeployment(), nil + }) +} diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 49ef610927692..e17b4560a1112 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -3817,6 +3817,17 @@ type NodeSystemInfo struct { Architecture string `json:"architecture" protobuf:"bytes,10,opt,name=architecture"` } +const ( + CpuSecondaryAmount = "cpuSecondaryAmount" + MemSecondaryAmount = "memSecondaryAmount" + IsSecondary = "isSecondary" + ResourceReclaimable = "resourceReclaimable" +) + +const ( + DesiredNetworkIORate = "desiredNetworkIORate" +) + // NodeStatus is information about the current status of a node. type NodeStatus struct { // Capacity represents the total resources of a node. @@ -5196,3 +5207,35 @@ const ( // and data streams for a single forwarded connection PortForwardRequestIDHeader = "requestID" ) + +type SlaProfile struct { + //Spec SlaSpec `json:"slaSpec,omitempty"` + SlaSpec map[string]float32 `json:"slaSpec,omitempty"` + AppClass AppClass `json:"appClass,omitempty"` + TestDimensions []TestDimension `json:"testDimensions,omitempty"` + WorkLoads string `json:"workloads,omitempty"` +} + +type SlaSpecType string + +const ( + SlaSpecQPS SlaSpecType = "targetQPS" + SlaSpecRT SlaSpecType = "targetRT" +) + +type AppClass string + +const ( + AppClassService AppClass = "service" + AppClassJob AppClass = "job" +) + +type TestDimension string + +const ( + TestScaleUp TestDimension = "scale-up" + TestScaleOut TestDimension = "scale-out" + TestInterference TestDimension = "interference" + TestNetworkIO TestDimension = "network-io" + TestHeterogeneity TestDimension = "heterogeneity" +) \ No newline at end of file diff --git a/staging/src/k8s.io/api/extensions/v1beta1/types.go b/staging/src/k8s.io/api/extensions/v1beta1/types.go index c3d9f72d7340b..cdf30b2671ad0 100644 --- a/staging/src/k8s.io/api/extensions/v1beta1/types.go +++ b/staging/src/k8s.io/api/extensions/v1beta1/types.go @@ -125,6 +125,9 @@ type Deployment struct { // DeploymentSpec is the specification of the desired behavior of the Deployment. type DeploymentSpec struct { + // SLA Requirements for user's application + Sla *v1.SlaProfile `json:"sla,omitempty"` + // Number of desired pods. This is a pointer to distinguish between explicit // zero and not specified. Defaults to 1. // +optional diff --git a/vendor/github.com/google/cadvisor/info/v2/container.go b/vendor/github.com/google/cadvisor/info/v2/container.go index d32f571cb6748..d88997d6974e5 100644 --- a/vendor/github.com/google/cadvisor/info/v2/container.go +++ b/vendor/github.com/google/cadvisor/info/v2/container.go @@ -177,6 +177,8 @@ type Usage struct { Cpu Percentiles `json:"cpu"` // Mean, Max, and 90p memory size in bytes. Memory Percentiles `json:"memory"` + // Mean, Max, and 90p network IO rate in bytes. + Network Percentiles `json:"network"` } // latest sample collected for a container. @@ -185,6 +187,8 @@ type InstantUsage struct { Cpu uint64 `json:"cpu"` // Memory usage in bytes. Memory uint64 `json:"memory"` + // Network IO + Network uint64 `json:"network"` } type DerivedStats struct { diff --git a/vendor/github.com/google/cadvisor/summary/percentiles.go b/vendor/github.com/google/cadvisor/summary/percentiles.go index de92bc3a25126..71b44969d0603 100644 --- a/vendor/github.com/google/cadvisor/summary/percentiles.go +++ b/vendor/github.com/google/cadvisor/summary/percentiles.go @@ -130,13 +130,16 @@ func NewResource(size int) *resource { func GetDerivedPercentiles(stats []*info.Usage) info.Usage { cpu := NewResource(len(stats)) memory := NewResource(len(stats)) + nwtwork := NewResource(len(stats)) for _, stat := range stats { cpu.Add(stat.Cpu) memory.Add(stat.Memory) + nwtwork.Add(stat.Network) } usage := info.Usage{} usage.Cpu = cpu.GetAllPercentiles() usage.Memory = memory.GetAllPercentiles() + usage.Network = nwtwork.GetAllPercentiles() return usage } @@ -174,6 +177,7 @@ func GetMinutePercentiles(stats []*secondSample) info.Usage { lastSample := secondSample{} cpu := NewResource(len(stats)) memory := NewResource(len(stats)) + network := NewResource(len(stats)) for _, stat := range stats { if !lastSample.Timestamp.IsZero() { cpuRate, err := getCpuRate(*stat, lastSample) @@ -182,8 +186,10 @@ func GetMinutePercentiles(stats []*secondSample) info.Usage { } cpu.AddSample(cpuRate) memory.AddSample(stat.Memory) + network.AddSample(stat.Network) } else { memory.AddSample(stat.Memory) + network.AddSample(stat.Network) } lastSample = *stat } @@ -192,5 +198,6 @@ func GetMinutePercentiles(stats []*secondSample) info.Usage { PercentComplete: percent, Cpu: cpu.GetAllPercentiles(), Memory: memory.GetAllPercentiles(), + Network: network.GetAllPercentiles(), } } diff --git a/vendor/github.com/google/cadvisor/summary/summary.go b/vendor/github.com/google/cadvisor/summary/summary.go index 4912b9b40830f..8f6e0bb62a7f4 100644 --- a/vendor/github.com/google/cadvisor/summary/summary.go +++ b/vendor/github.com/google/cadvisor/summary/summary.go @@ -34,11 +34,13 @@ type secondSample struct { Timestamp time.Time // time when the sample was recorded. Cpu uint64 // cpu usage Memory uint64 // memory usage + Network uint64 // network IO Tx Bytes } type availableResources struct { Cpu bool Memory bool + Network bool } type StatsSummary struct { @@ -66,6 +68,9 @@ func (s *StatsSummary) AddSample(stat v1.ContainerStats) error { if s.available.Memory { sample.Memory = stat.Memory.WorkingSet } + if s.available.Network { + sample.Network = stat.Network.InterfaceStats.TxBytes + } s.secondSamples = append(s.secondSamples, &sample) s.updateLatestUsage() // TODO(jnagal): Use 'available' to avoid unnecessary computation. @@ -101,6 +106,7 @@ func (s *StatsSummary) updateLatestUsage() { } latest := s.secondSamples[numStats-1] usage.Memory = latest.Memory + usage.Network = latest.Network if numStats > 1 { previous := s.secondSamples[numStats-2] cpu, err := getCpuRate(*latest, *previous) @@ -177,7 +183,10 @@ func New(spec v1.ContainerSpec) (*StatsSummary, error) { if spec.HasMemory { summary.available.Memory = true } - if !summary.available.Cpu && !summary.available.Memory { + if spec.HasNetwork { + summary.available.Network = true + } + if !summary.available.Cpu && !summary.available.Memory && !summary.available.Network { return nil, fmt.Errorf("none of the resources are being tracked.") } summary.minuteSamples = NewSamplesBuffer(60 /* one hour */)