diff --git a/kubernetes/base/rbac.yaml b/kubernetes/base/rbac.yaml index ab628cb514..dfd211a56f 100644 --- a/kubernetes/base/rbac.yaml +++ b/kubernetes/base/rbac.yaml @@ -36,6 +36,15 @@ rules: resources: ["nodes", "pods"] verbs: ["get", "list"] --- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: descheduler-role +rules: +- apiGroups: [""] + resources: ["secrets"] + verbs: ["get"] +--- apiVersion: v1 kind: ServiceAccount metadata: @@ -54,3 +63,16 @@ subjects: - name: descheduler-sa kind: ServiceAccount namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: descheduler-role-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: descheduler-role +subjects: + - name: descheduler-sa + kind: ServiceAccount + namespace: kube-system diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index d5e853d0ae..4ac5ef507a 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -135,7 +135,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche v1.SchemeGroupVersion.WithResource("namespaces"), // Used by the defaultevictor plugin schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"), // Used by the defaultevictor plugin policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), // Used by the defaultevictor plugin - + v1.SchemeGroupVersion.WithResource("secrets"), // Used by LowNodeUtilization plugin ) // Used by the defaultevictor plugin getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index a0c66531bd..ed64361974 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -18,20 +18,36 @@ package nodeutilization import ( "context" + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/transport" "k8s.io/klog/v2" + "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + + promapi "github.com/prometheus/client_golang/api" + "github.com/prometheus/common/config" ) -const LowNodeUtilizationPluginName = "LowNodeUtilization" +const ( + LowNodeUtilizationPluginName = "LowNodeUtilization" + K8sPodCAFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +) // LowNodeUtilization evicts pods from overutilized nodes to underutilized nodes. Note that CPU/Memory requests are used // to calculate nodes' utilization and not the actual resource usage. @@ -44,10 +60,25 @@ type LowNodeUtilization struct { overutilizationCriteria []interface{} resourceNames []v1.ResourceName usageClient usageClient + promClient promapi.Client } var _ frameworktypes.BalancePlugin = &LowNodeUtilization{} +func loadCAFile(filepath string) (*x509.CertPool, error) { + caCert, err := ioutil.ReadFile(filepath) + if err != nil { + return nil, err + } + + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to append CA certificate to the pool") + } + + return caCertPool, nil +} + // NewLowNodeUtilization builds plugin from its arguments while passing a handle func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) { lowNodeUtilizationArgsArgs, ok := args.(*LowNodeUtilizationArgs) @@ -55,7 +86,18 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f return nil, fmt.Errorf("want args to be of type LowNodeUtilizationArgs, got %T", args) } - setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds) + if lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus.URL != "" { + uResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds) + oResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.TargetThresholds) + if len(uResourceNames) != 1 || uResourceNames[0] != ResourceMetrics { + return nil, fmt.Errorf("thresholds are expected to specify a single instance of %q resource, got %v instead", ResourceMetrics, uResourceNames) + } + if len(oResourceNames) != 1 || oResourceNames[0] != ResourceMetrics { + return nil, fmt.Errorf("targetThresholds are expected to specify a single instance of %q resource, got %v instead", ResourceMetrics, oResourceNames) + } + } else { + setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds) + } underutilizationCriteria := []interface{}{ "CPU", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourceCPU], @@ -91,6 +133,83 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f var usageClient usageClient if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer { usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) + } else if lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus.URL != "" { + promConfig := lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus + + var authToken string + // Raw auth token takes precedence + if len(promConfig.AuthToken.Raw) > 0 { + authToken = promConfig.AuthToken.Raw + } else if promConfig.AuthToken.SecretReference.Name != "" { + secretObj, err := handle.ClientSet().CoreV1().Secrets(promConfig.AuthToken.SecretReference.Namespace).Get(context.TODO(), promConfig.AuthToken.SecretReference.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("unable to get secret with prometheus authentication token: %v", err) + } + authToken = string(secretObj.Data[PrometheusAuthTokenKey]) + if authToken == "" { + return nil, fmt.Errorf("prometheus authentication token secret missing %q data or empty", PrometheusAuthTokenKey) + } + } + + // Ignore TLS verify errors if InsecureSkipVerify is set + roundTripper := promapi.DefaultRoundTripper + if promConfig.EnableAuthentication { + // Retrieve Pod CA cert + caCertPool, err := loadCAFile(K8sPodCAFilePath) + if err != nil { + return nil, fmt.Errorf("Error loading CA file: %v", err) + } + + // Get Prometheus Host + u, err := url.Parse(promConfig.URL) + if err != nil { + return nil, fmt.Errorf("Error parsing prometheus URL: %v", err) + } + roundTripper = transport.NewBearerAuthRoundTripper( + authToken, + &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + ServerName: u.Host, + }, + }, + ) + } else if promConfig.InsecureSkipVerify { + roundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } + + var promClient promapi.Client + if authToken != "" { + promClient, err = promapi.NewClient(promapi.Config{ + Address: promConfig.URL, + RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(authToken), roundTripper), + }) + if err != nil { + return nil, fmt.Errorf("unable to create a new prom client: %v", err) + } + } else { + promClient, err = promapi.NewClient(promapi.Config{ + Address: promConfig.URL, + }) + } + + usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), promClient, promConfig.Query) + // reset all resource names to just ResourceMetrics + resourceNames = []v1.ResourceName{ResourceMetrics} } else { usageClient = newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()) } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go index d7cc8cbc4b..85ae3a1621 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go @@ -38,6 +38,8 @@ import ( frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" "sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/test" + + "github.com/prometheus/common/model" ) func TestLowNodeUtilization(t *testing.T) { @@ -1360,3 +1362,165 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) { }) } } + +func withLocalStorage(pod *v1.Pod) { + // A pod with local storage. + test.SetNormalOwnerRef(pod) + pod.Spec.Volumes = []v1.Volume{ + { + Name: "sample", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{Path: "somePath"}, + EmptyDir: &v1.EmptyDirVolumeSource{ + SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI), + }, + }, + }, + } + // A Mirror Pod. + pod.Annotations = test.GetMirrorPodAnnotation() +} + +func withCriticalPod(pod *v1.Pod) { + // A Critical Pod. + test.SetNormalOwnerRef(pod) + pod.Namespace = "kube-system" + priority := utils.SystemCriticalPriority + pod.Spec.Priority = &priority +} + +func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) { + n1NodeName := "n1" + n2NodeName := "n2" + n3NodeName := "n3" + + testCases := []struct { + name string + useDeviationThresholds bool + thresholds, targetThresholds api.ResourceThresholds + query string + samples []model.Sample + nodes []*v1.Node + pods []*v1.Pod + expectedPodsEvicted uint + evictedPods []string + evictableNamespaces *api.Namespaces + }{ + { + name: "with instance:node_cpu:rate:sum query", + thresholds: api.ResourceThresholds{ + v1.ResourceName("MetricResource"): 30, + }, + targetThresholds: api.ResourceThresholds{ + v1.ResourceName("MetricResource"): 50, + }, + query: "instance:node_cpu:rate:sum", + samples: []model.Sample{ + sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561), + sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522), + sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104), + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil), + test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage), + test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), + }, + expectedPodsEvicted: 1, + }, + } + + for _, tc := range testCases { + testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var objs []runtime.Object + for _, node := range tc.nodes { + objs = append(objs, node) + } + for _, pod := range tc.pods { + objs = append(objs, pod) + } + + fakeClient := fake.NewSimpleClientset(objs...) + + podsForEviction := make(map[string]struct{}) + for _, pod := range tc.evictedPods { + podsForEviction[pod] = struct{}{} + } + + evictionFailed := false + if len(tc.evictedPods) > 0 { + fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.CreateAction) + obj := getAction.GetObject() + if eviction, ok := obj.(*policy.Eviction); ok { + if _, exists := podsForEviction[eviction.Name]; exists { + return true, obj, nil + } + evictionFailed = true + return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) + } + return true, obj, nil + }) + } + + handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) + if err != nil { + t.Fatalf("Unable to initialize a framework handle: %v", err) + } + + plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ + Thresholds: tc.thresholds, + TargetThresholds: tc.targetThresholds, + UseDeviationThresholds: tc.useDeviationThresholds, + EvictableNamespaces: tc.evictableNamespaces, + MetricsUtilization: MetricsUtilization{ + Prometheus: Prometheus{ + URL: "http://prometheus.example.orgname", + AuthToken: AuthToken{ + Raw: "XXXXX", + }, + }, + }, + }, + handle) + if err != nil { + t.Fatalf("Unable to initialize the plugin: %v", err) + } + + pClient := &fakePromClient{ + result: tc.samples, + } + + plugin.(*LowNodeUtilization).usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), pClient, tc.query) + status := plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) + if status != nil { + t.Fatalf("Balance.err: %v", status.Err) + } + + podsEvicted := podEvictor.TotalEvicted() + if expectedPodsEvicted != podsEvicted { + t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted) + } + if evictionFailed { + t.Errorf("Pod evictions failed unexpectedly") + } + } + } + t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted)) + } +} diff --git a/pkg/framework/plugins/nodeutilization/nodeutilization.go b/pkg/framework/plugins/nodeutilization/nodeutilization.go index a694cacc83..9a9848cffc 100644 --- a/pkg/framework/plugins/nodeutilization/nodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/nodeutilization.go @@ -34,6 +34,8 @@ import ( "sigs.k8s.io/descheduler/pkg/utils" ) +const ResourceMetrics = v1.ResourceName("MetricResource") + // NodeUsage stores a node's info, pods on it, thresholds and its resource usage type NodeUsage struct { node *v1.Node @@ -92,6 +94,8 @@ func getNodeThresholds( if len(node.Status.Allocatable) > 0 { nodeCapacity = node.Status.Allocatable } + // Make ResourceMetrics 100% => 1000 points + nodeCapacity[ResourceMetrics] = *resource.NewQuantity(int64(1000), resource.DecimalSI) nodeThresholdsMap[node.Name] = NodeThresholds{ lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{}, @@ -327,15 +331,27 @@ func evictPods( if !preEvictionFilterWithOptions(pod) { continue } + + // In case podUsage does not support resource counting (e.g. provided metric + // does not quantify pod resource utilization) allow to evict only a single + // pod. It is recommended to run the descheduling cycle more often + // so the plugin can perform more evictions towards the re-distribution. + singleEviction := false podUsage, err := usageClient.podUsage(pod) if err != nil { - klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) - continue + if _, ok := err.(*notSupportedError); !ok { + klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) + continue + } + singleEviction = true } err = podEvictor.Evict(ctx, pod, evictOptions) if err == nil { klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod)) - + if singleEviction { + klog.V(3).InfoS("Currently, only a single pod eviction is allowed") + break + } for name := range totalAvailableUsage { if name == v1.ResourcePods { nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) diff --git a/pkg/framework/plugins/nodeutilization/types.go b/pkg/framework/plugins/nodeutilization/types.go index 8e005fa02d..0d31af89e5 100644 --- a/pkg/framework/plugins/nodeutilization/types.go +++ b/pkg/framework/plugins/nodeutilization/types.go @@ -57,4 +57,34 @@ type MetricsUtilization struct { // metricsServer enables metrics from a kubernetes metrics server. // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. MetricsServer bool `json:"metricsServer,omitempty"` + + // prometheus enables metrics collection through a prometheus query. + Prometheus Prometheus `json:"prometheus,omitempty"` +} + +type Prometheus struct { + URL string `json:"url,omitempty"` + Query string `json:"query,omitempty"` + AuthToken AuthToken `json:"authToken,omitempty"` + EnableAuthentication bool `json:"enableAuthentication,omitempty"` + InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` +} + +const PrometheusAuthTokenKey = "PrometheusAuthToken" + +type AuthToken struct { + // raw for a raw authentication token + Raw string `json:"raw,omitempty"` + // secretReference references an authentication token. + // secrets are expected to be created under the descheduler's namespace. + SecretReference SecretReference `json:"secretReference,omitempty"` +} + +// SecretReference holds a reference to a Secret +type SecretReference struct { + // namespace is the namespace of the secret. + Namespace string `json:"namespace,omitempty"` + // name is the name of the secret. + // Required + Name string `json:"name,omitempty"` } diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 2717356f21..c69a5b21a6 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -18,8 +18,15 @@ package nodeutilization import ( "context" + "encoding/json" "fmt" + "net/http" + "net/url" + "time" + promapi "github.com/prometheus/client_golang/api" + promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,6 +38,28 @@ import ( "sigs.k8s.io/descheduler/pkg/utils" ) +type UsageClientType int + +const ( + requestedUsageClientType UsageClientType = iota + actualUsageClientType + prometheusUsageClientType +) + +type notSupportedError struct { + usageClientType UsageClientType +} + +func (e notSupportedError) Error() string { + return "maximum number of evicted pods per node reached" +} + +func newNotSupportedError(usageClientType UsageClientType) *notSupportedError { + return ¬SupportedError{ + usageClientType: usageClientType, + } +} + type usageClient interface { // Both low/high node utilization plugins are expected to invoke sync right // after Balance method is invoked. There's no cache invalidation so each @@ -184,3 +213,109 @@ func (client *actualUsageClient) sync(nodes []*v1.Node) error { return nil } + +type prometheusUsageClient struct { + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + promClient promapi.Client + promQuery string + + _pods map[string][]*v1.Pod + _nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity +} + +var _ usageClient = &actualUsageClient{} + +func newPrometheusUsageClient( + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, + promClient promapi.Client, + promQuery string, +) *prometheusUsageClient { + return &prometheusUsageClient{ + getPodsAssignedToNode: getPodsAssignedToNode, + promClient: promClient, + promQuery: promQuery, + } +} + +func (client *prometheusUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity { + return client._nodeUtilization[node] +} + +func (client *prometheusUsageClient) pods(node string) []*v1.Pod { + return client._pods[node] +} + +func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + return nil, newNotSupportedError(prometheusUsageClientType) +} + +type fakePromClient struct { + result interface{} +} + +type fakePayload struct { + Status string `json:"status"` + Data queryResult `json:"data"` +} + +type queryResult struct { + Type model.ValueType `json:"resultType"` + Result interface{} `json:"result"` +} + +func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL { + return &url.URL{} +} +func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) { + jsonData, err := json.Marshal(fakePayload{ + Status: "success", + Data: queryResult{ + Type: model.ValVector, + Result: client.result, + }, + }) + + return &http.Response{StatusCode: 200}, jsonData, err +} + +func (client *prometheusUsageClient) resourceNames() []v1.ResourceName { + return []v1.ResourceName{ResourceMetrics} +} + +func (client *prometheusUsageClient) sync(nodes []*v1.Node) error { + client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) + client._pods = make(map[string][]*v1.Pod) + + results, warnings, err := promv1.NewAPI(client.promClient).Query(context.TODO(), client.promQuery, time.Now()) + if err != nil { + return fmt.Errorf("unable to capture prometheus metrics: %v", err) + } + if len(warnings) > 0 { + klog.Infof("prometheus metrics warnings: %v", warnings) + } + + nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity) + for _, sample := range results.(model.Vector) { + nodeName := string(sample.Metric["instance"]) + nodeUsages[nodeName] = map[v1.ResourceName]*resource.Quantity{ + v1.ResourceName("MetricResource"): resource.NewQuantity(int64(sample.Value*1000), resource.DecimalSI), + } + } + + for _, node := range nodes { + if _, exists := nodeUsages[node.Name]; !exists { + return fmt.Errorf("unable to find metric entry for %v", node.Name) + } + pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) + if err != nil { + klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) + return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + } + + // store the snapshot of pods from the same (or the closest) node utilization computation + client._pods[node.Name] = pods + client._nodeUtilization[node.Name] = nodeUsages[node.Name] + } + + return nil +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go index 75f6e5912c..7e510905c6 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients_test.go +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/prometheus/common/model" "k8s.io/client-go/informers" fakeclientset "k8s.io/client-go/kubernetes/fake" "k8s.io/metrics/pkg/apis/metrics/v1beta1" @@ -59,7 +60,7 @@ func updateMetricsAndCheckNodeUtilization( } err = usageClient.sync(nodes) if err != nil { - t.Fatalf("failed to capture a snapshot: %v", err) + t.Fatalf("failed to sync a snapshot: %v", err) } nodeUtilization := usageClient.nodeUtilization(nodeName) t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue()) @@ -135,3 +136,70 @@ func TestActualUsageClient(t *testing.T) { metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, ) } + +func sample(metricName, nodeName string, value float64) model.Sample { + return model.Sample{ + Metric: model.Metric{ + "__name__": model.LabelValue(metricName), + "instance": model.LabelValue(nodeName), + }, + Value: model.SampleValue(value), + Timestamp: 1728991761711, + } +} + +func TestPrometheusUsageClient(t *testing.T) { + n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil) + + nodes := []*v1.Node{n1, n2, n3} + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil) + p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + pClient := &fakePromClient{ + result: []model.Sample{ + sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104), + sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522), + sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561), + }, + } + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3) + + ctx := context.TODO() + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum") + err = prometheusUsageClient.sync(nodes) + if err != nil { + t.Fatalf("unable to sync prometheus metrics: %v", err) + } + + for _, node := range nodes { + nodeUtil := prometheusUsageClient.nodeUtilization(node.Name) + fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil) + } + + nodeThresholds := NodeThresholds{ + lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{ + v1.ResourceName("MetricResource"): resource.NewQuantity(int64(300), resource.DecimalSI), + }, + highResourceThreshold: map[v1.ResourceName]*resource.Quantity{ + v1.ResourceName("MetricResource"): resource.NewQuantity(int64(500), resource.DecimalSI), + }, + } + + fmt.Printf("nodeThresholds: %#v\n", nodeThresholds) +}