diff --git a/README.md b/README.md index 85cc7509e7..544691b78d 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,14 @@ These are top level keys in the Descheduler Policy that you can use to configure | `maxNoOfPodsToEvictTotal` |`int`| `nil` | maximum number of pods evicted per rescheduling cycle (summed through all strategies) | | `metricsCollector` |`object`| `nil` | configures collection of metrics for actual resource utilization | | `metricsCollector.enabled` |`bool`| `false` | enables kubernetes [metrics server](https://kubernetes-sigs.github.io/metrics-server/) collection | +| `prometheus` |`object`| `nil` | configures collection of Prometheus metrics for actual resource utilization | +| `prometheus.url` |`string`| `nil` | points to a Prometheus server url | +| `prometheus.insecureSkipVerify` |`bool`| `nil` | disables server certificate chain and host name verification | +| `prometheus.authToken` |`object`| `nil` | sets Prometheus server authentication token | +| `prometheus.authToken.raw` |`string`| `nil` | set the authentication token as a raw string (takes precedence over secretReference) | +| `prometheus.authToken.secretReference` |`object`| `nil` | read the authentication token from a kubernetes secret (the secret is expected to contain the token under `prometheusAuthToken` data key) | +| `prometheus.authToken.secretReference.namespace` |`string`| `nil` | authentication token kubernetes secret namespace (the curent rbac allows to get secrets from kube-system namespace) | +| `prometheus.authToken.secretReference.name` |`string`| `nil` | authentication token kubernetes secret name | ### Evictor Plugin configuration (Default Evictor) @@ -162,6 +170,13 @@ maxNoOfPodsToEvictPerNamespace: 5000 # you don't need to set this, unlimited if maxNoOfPodsToEvictTotal: 5000 # you don't need to set this, unlimited if not set metricsCollector: enabled: true # you don't need to set this, metrics are not collected if not set +prometheus: # you don't need to set this, prometheus client will not get created if not set + url: http://prometheus-kube-prometheus-prometheus.prom.svc.cluster.local + insecureSkipVerify: true + authToken: + secretReference: + namespace: "kube-system" + name: "authtoken" profiles: - name: ProfileName pluginConfig: @@ -287,6 +302,11 @@ design for scheduling pods onto nodes. This means that resource usage as reporte like `kubectl top`) may differ from the calculated consumption, due to these components reporting actual usage metrics. Metrics-based descheduling can be enabled by setting `metricsUtilization.metricsServer` field. In order to have the plugin consume the metrics the metric collector needs to be configured as well. +Alternatively, it is possible to create a prometheus client and configure a prometheus query to consume +metrics outside of the kubernetes metrics server. The query is expected to return a vector of values for +each node. The values are expected to be any real number within <0; 1> interval. During eviction only +a single pod is evicted at most from each overutilized node. There's currently no support for evicting +more. Kubernetes metric server takes precedence over Prometheus. See `metricsCollector` field at [Top Level configuration](#top-level-configuration) for available options. **Parameters:** @@ -300,6 +320,7 @@ See `metricsCollector` field at [Top Level configuration](#top-level-configurati |`evictableNamespaces`|(see [namespace filtering](#namespace-filtering))| |`metricsUtilization`|object| |`metricsUtilization.metricsServer`|bool| +|`metricsUtilization.prometheus.query`|string| **Example:** @@ -322,6 +343,8 @@ profiles: "pods": 50 metricsUtilization: metricsServer: true + # prometheus: + # query: instance:node_cpu:rate:sum plugins: balance: enabled: diff --git a/cmd/descheduler/app/options/options.go b/cmd/descheduler/app/options/options.go index 2b0d76846f..ceb84e7fc4 100644 --- a/cmd/descheduler/app/options/options.go +++ b/cmd/descheduler/app/options/options.go @@ -20,6 +20,7 @@ package options import ( "time" + promapi "github.com/prometheus/client_golang/api" "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -49,6 +50,7 @@ type DeschedulerServer struct { Client clientset.Interface EventClient clientset.Interface MetricsClient metricsclient.Interface + PrometheusClient promapi.Client SecureServing *apiserveroptions.SecureServingOptionsWithLoopback SecureServingInfo *apiserver.SecureServingInfo DisableMetrics bool diff --git a/kubernetes/base/rbac.yaml b/kubernetes/base/rbac.yaml index ab628cb514..cd5bdab8a8 100644 --- a/kubernetes/base/rbac.yaml +++ b/kubernetes/base/rbac.yaml @@ -36,6 +36,15 @@ rules: resources: ["nodes", "pods"] verbs: ["get", "list"] --- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: descheduler-role +rules: +- apiGroups: [""] + resources: ["secrets"] + verbs: ["get", "list", "watch"] +--- apiVersion: v1 kind: ServiceAccount metadata: @@ -54,3 +63,16 @@ subjects: - name: descheduler-sa kind: ServiceAccount namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: descheduler-role-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: descheduler-role +subjects: + - name: descheduler-sa + kind: ServiceAccount + namespace: kube-system diff --git a/pkg/api/types.go b/pkg/api/types.go index f282d0fe3d..dc8f558fbf 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -44,6 +44,9 @@ type DeschedulerPolicy struct { // MetricsCollector configures collection of metrics about actual resource utilization MetricsCollector MetricsCollector + + // Prometheus enables metrics collection through Prometheus + Prometheus Prometheus } // Namespaces carries a list of included/excluded namespaces @@ -94,3 +97,25 @@ type MetricsCollector struct { // Later, the collection can be extended to other providers. Enabled bool } + +type Prometheus struct { + URL string + AuthToken AuthToken + InsecureSkipVerify bool +} + +type AuthToken struct { + // raw for a raw authentication token + Raw string + // secretReference references an authentication token. + // secrets are expected to be created under the descheduler's namespace. + SecretReference SecretReference +} + +// SecretReference holds a reference to a Secret +type SecretReference struct { + // namespace is the namespace of the secret. + Namespace string + // name is the name of the secret. + Name string +} diff --git a/pkg/api/v1alpha2/types.go b/pkg/api/v1alpha2/types.go index bd2dc74009..4a65940893 100644 --- a/pkg/api/v1alpha2/types.go +++ b/pkg/api/v1alpha2/types.go @@ -43,6 +43,9 @@ type DeschedulerPolicy struct { // MetricsCollector configures collection of metrics for actual resource utilization MetricsCollector MetricsCollector `json:"metricsCollector,omitempty"` + + // Prometheus enables metrics collection through Prometheus + Prometheus Prometheus `json:"prometheus,omitempty"` } type DeschedulerProfile struct { @@ -76,3 +79,25 @@ type MetricsCollector struct { // Later, the collection can be extended to other providers. Enabled bool `json:"enabled"` } + +type Prometheus struct { + URL string `json:"url,omitempty"` + AuthToken AuthToken `json:"authToken,omitempty"` + InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` +} + +type AuthToken struct { + // raw for a raw authentication token + Raw string `json:"raw,omitempty"` + // secretReference references an authentication token. + // secrets are expected to be created under the descheduler's namespace. + SecretReference SecretReference `json:"secretReference,omitempty"` +} + +// SecretReference holds a reference to a Secret +type SecretReference struct { + // namespace is the namespace of the secret. + Namespace string `json:"namespace,omitempty"` + // name is the name of the secret. + Name string `json:"name,omitempty"` +} diff --git a/pkg/api/v1alpha2/zz_generated.conversion.go b/pkg/api/v1alpha2/zz_generated.conversion.go index 1b33be25cf..4300f31544 100644 --- a/pkg/api/v1alpha2/zz_generated.conversion.go +++ b/pkg/api/v1alpha2/zz_generated.conversion.go @@ -36,6 +36,16 @@ func init() { // RegisterConversions adds conversion functions to the given scheme. // Public to allow building arbitrary schemes. func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*AuthToken)(nil), (*api.AuthToken)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_AuthToken_To_api_AuthToken(a.(*AuthToken), b.(*api.AuthToken), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.AuthToken)(nil), (*AuthToken)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_AuthToken_To_v1alpha2_AuthToken(a.(*api.AuthToken), b.(*AuthToken), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*DeschedulerProfile)(nil), (*api.DeschedulerProfile)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha2_DeschedulerProfile_To_api_DeschedulerProfile(a.(*DeschedulerProfile), b.(*api.DeschedulerProfile), scope) }); err != nil { @@ -81,6 +91,26 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*Prometheus)(nil), (*api.Prometheus)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_Prometheus_To_api_Prometheus(a.(*Prometheus), b.(*api.Prometheus), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.Prometheus)(nil), (*Prometheus)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_Prometheus_To_v1alpha2_Prometheus(a.(*api.Prometheus), b.(*Prometheus), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*SecretReference)(nil), (*api.SecretReference)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_SecretReference_To_api_SecretReference(a.(*SecretReference), b.(*api.SecretReference), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*api.SecretReference)(nil), (*SecretReference)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_SecretReference_To_v1alpha2_SecretReference(a.(*api.SecretReference), b.(*SecretReference), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*api.DeschedulerPolicy)(nil), (*DeschedulerPolicy)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_api_DeschedulerPolicy_To_v1alpha2_DeschedulerPolicy(a.(*api.DeschedulerPolicy), b.(*DeschedulerPolicy), scope) }); err != nil { @@ -99,6 +129,32 @@ func RegisterConversions(s *runtime.Scheme) error { return nil } +func autoConvert_v1alpha2_AuthToken_To_api_AuthToken(in *AuthToken, out *api.AuthToken, s conversion.Scope) error { + out.Raw = in.Raw + if err := Convert_v1alpha2_SecretReference_To_api_SecretReference(&in.SecretReference, &out.SecretReference, s); err != nil { + return err + } + return nil +} + +// Convert_v1alpha2_AuthToken_To_api_AuthToken is an autogenerated conversion function. +func Convert_v1alpha2_AuthToken_To_api_AuthToken(in *AuthToken, out *api.AuthToken, s conversion.Scope) error { + return autoConvert_v1alpha2_AuthToken_To_api_AuthToken(in, out, s) +} + +func autoConvert_api_AuthToken_To_v1alpha2_AuthToken(in *api.AuthToken, out *AuthToken, s conversion.Scope) error { + out.Raw = in.Raw + if err := Convert_api_SecretReference_To_v1alpha2_SecretReference(&in.SecretReference, &out.SecretReference, s); err != nil { + return err + } + return nil +} + +// Convert_api_AuthToken_To_v1alpha2_AuthToken is an autogenerated conversion function. +func Convert_api_AuthToken_To_v1alpha2_AuthToken(in *api.AuthToken, out *AuthToken, s conversion.Scope) error { + return autoConvert_api_AuthToken_To_v1alpha2_AuthToken(in, out, s) +} + func autoConvert_v1alpha2_DeschedulerPolicy_To_api_DeschedulerPolicy(in *DeschedulerPolicy, out *api.DeschedulerPolicy, s conversion.Scope) error { if in.Profiles != nil { in, out := &in.Profiles, &out.Profiles @@ -118,6 +174,9 @@ func autoConvert_v1alpha2_DeschedulerPolicy_To_api_DeschedulerPolicy(in *Desched if err := Convert_v1alpha2_MetricsCollector_To_api_MetricsCollector(&in.MetricsCollector, &out.MetricsCollector, s); err != nil { return err } + if err := Convert_v1alpha2_Prometheus_To_api_Prometheus(&in.Prometheus, &out.Prometheus, s); err != nil { + return err + } return nil } @@ -140,6 +199,9 @@ func autoConvert_api_DeschedulerPolicy_To_v1alpha2_DeschedulerPolicy(in *api.Des if err := Convert_api_MetricsCollector_To_v1alpha2_MetricsCollector(&in.MetricsCollector, &out.MetricsCollector, s); err != nil { return err } + if err := Convert_api_Prometheus_To_v1alpha2_Prometheus(&in.Prometheus, &out.Prometheus, s); err != nil { + return err + } return nil } @@ -307,3 +369,53 @@ func autoConvert_api_Plugins_To_v1alpha2_Plugins(in *api.Plugins, out *Plugins, func Convert_api_Plugins_To_v1alpha2_Plugins(in *api.Plugins, out *Plugins, s conversion.Scope) error { return autoConvert_api_Plugins_To_v1alpha2_Plugins(in, out, s) } + +func autoConvert_v1alpha2_Prometheus_To_api_Prometheus(in *Prometheus, out *api.Prometheus, s conversion.Scope) error { + out.URL = in.URL + if err := Convert_v1alpha2_AuthToken_To_api_AuthToken(&in.AuthToken, &out.AuthToken, s); err != nil { + return err + } + out.InsecureSkipVerify = in.InsecureSkipVerify + return nil +} + +// Convert_v1alpha2_Prometheus_To_api_Prometheus is an autogenerated conversion function. +func Convert_v1alpha2_Prometheus_To_api_Prometheus(in *Prometheus, out *api.Prometheus, s conversion.Scope) error { + return autoConvert_v1alpha2_Prometheus_To_api_Prometheus(in, out, s) +} + +func autoConvert_api_Prometheus_To_v1alpha2_Prometheus(in *api.Prometheus, out *Prometheus, s conversion.Scope) error { + out.URL = in.URL + if err := Convert_api_AuthToken_To_v1alpha2_AuthToken(&in.AuthToken, &out.AuthToken, s); err != nil { + return err + } + out.InsecureSkipVerify = in.InsecureSkipVerify + return nil +} + +// Convert_api_Prometheus_To_v1alpha2_Prometheus is an autogenerated conversion function. +func Convert_api_Prometheus_To_v1alpha2_Prometheus(in *api.Prometheus, out *Prometheus, s conversion.Scope) error { + return autoConvert_api_Prometheus_To_v1alpha2_Prometheus(in, out, s) +} + +func autoConvert_v1alpha2_SecretReference_To_api_SecretReference(in *SecretReference, out *api.SecretReference, s conversion.Scope) error { + out.Namespace = in.Namespace + out.Name = in.Name + return nil +} + +// Convert_v1alpha2_SecretReference_To_api_SecretReference is an autogenerated conversion function. +func Convert_v1alpha2_SecretReference_To_api_SecretReference(in *SecretReference, out *api.SecretReference, s conversion.Scope) error { + return autoConvert_v1alpha2_SecretReference_To_api_SecretReference(in, out, s) +} + +func autoConvert_api_SecretReference_To_v1alpha2_SecretReference(in *api.SecretReference, out *SecretReference, s conversion.Scope) error { + out.Namespace = in.Namespace + out.Name = in.Name + return nil +} + +// Convert_api_SecretReference_To_v1alpha2_SecretReference is an autogenerated conversion function. +func Convert_api_SecretReference_To_v1alpha2_SecretReference(in *api.SecretReference, out *SecretReference, s conversion.Scope) error { + return autoConvert_api_SecretReference_To_v1alpha2_SecretReference(in, out, s) +} diff --git a/pkg/api/v1alpha2/zz_generated.deepcopy.go b/pkg/api/v1alpha2/zz_generated.deepcopy.go index bfaf198785..6023be2aa7 100644 --- a/pkg/api/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha2/zz_generated.deepcopy.go @@ -25,6 +25,23 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthToken) DeepCopyInto(out *AuthToken) { + *out = *in + out.SecretReference = in.SecretReference + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthToken. +func (in *AuthToken) DeepCopy() *AuthToken { + if in == nil { + return nil + } + out := new(AuthToken) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = *in @@ -57,6 +74,7 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { **out = **in } out.MetricsCollector = in.MetricsCollector + out.Prometheus = in.Prometheus return } @@ -182,3 +200,36 @@ func (in *Plugins) DeepCopy() *Plugins { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Prometheus) DeepCopyInto(out *Prometheus) { + *out = *in + out.AuthToken = in.AuthToken + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Prometheus. +func (in *Prometheus) DeepCopy() *Prometheus { + if in == nil { + return nil + } + out := new(Prometheus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecretReference) DeepCopyInto(out *SecretReference) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference. +func (in *SecretReference) DeepCopy() *SecretReference { + if in == nil { + return nil + } + out := new(SecretReference) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/api/zz_generated.deepcopy.go b/pkg/api/zz_generated.deepcopy.go index 2ac45bb6a8..5a913a9b3b 100644 --- a/pkg/api/zz_generated.deepcopy.go +++ b/pkg/api/zz_generated.deepcopy.go @@ -25,6 +25,23 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthToken) DeepCopyInto(out *AuthToken) { + *out = *in + out.SecretReference = in.SecretReference + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthToken. +func (in *AuthToken) DeepCopy() *AuthToken { + if in == nil { + return nil + } + out := new(AuthToken) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { *out = *in @@ -57,6 +74,7 @@ func (in *DeschedulerPolicy) DeepCopyInto(out *DeschedulerPolicy) { **out = **in } out.MetricsCollector = in.MetricsCollector + out.Prometheus = in.Prometheus return } @@ -232,6 +250,23 @@ func (in *PriorityThreshold) DeepCopy() *PriorityThreshold { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Prometheus) DeepCopyInto(out *Prometheus) { + *out = *in + out.AuthToken = in.AuthToken + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Prometheus. +func (in *Prometheus) DeepCopy() *Prometheus { + if in == nil { + return nil + } + out := new(Prometheus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in ResourceThresholds) DeepCopyInto(out *ResourceThresholds) { { @@ -253,3 +288,19 @@ func (in ResourceThresholds) DeepCopy() ResourceThresholds { in.DeepCopyInto(out) return *out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecretReference) DeepCopyInto(out *SecretReference) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretReference. +func (in *SecretReference) DeepCopy() *SecretReference { + if in == nil { + return nil + } + out := new(SecretReference) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/descheduler/client/client.go b/pkg/descheduler/client/client.go index c5ff5e018a..6a46b241c7 100644 --- a/pkg/descheduler/client/client.go +++ b/pkg/descheduler/client/client.go @@ -17,17 +17,30 @@ limitations under the License. package client import ( + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" + + promapi "github.com/prometheus/client_golang/api" + "github.com/prometheus/common/config" // Ensure to load all auth plugins. clientset "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/transport" componentbaseconfig "k8s.io/component-base/config" metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" ) +var K8sPodCAFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) { var cfg *rest.Config if len(clientConnection.Kubeconfig) != 0 { @@ -94,3 +107,70 @@ func GetMasterFromKubeconfig(filename string) (string, error) { } return "", fmt.Errorf("failed to get master address from kubeconfig: cluster information not found") } + +func loadCAFile(filepath string) (*x509.CertPool, error) { + caCert, err := ioutil.ReadFile(filepath) + if err != nil { + return nil, err + } + + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to append CA certificate to the pool") + } + + return caCertPool, nil +} + +func CreatePrometheusClient(prometheusURL, authToken string, insecureSkipVerify bool) (promapi.Client, error) { + // Ignore TLS verify errors if InsecureSkipVerify is set + roundTripper := promapi.DefaultRoundTripper + if insecureSkipVerify { + roundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } else { + // Retrieve Pod CA cert + caCertPool, err := loadCAFile(K8sPodCAFilePath) + if err != nil { + return nil, fmt.Errorf("Error loading CA file: %v", err) + } + + // Get Prometheus Host + u, err := url.Parse(prometheusURL) + if err != nil { + return nil, fmt.Errorf("Error parsing prometheus URL: %v", err) + } + roundTripper = transport.NewBearerAuthRoundTripper( + authToken, + &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + ServerName: u.Host, + }, + }, + ) + } + + if authToken != "" { + return promapi.NewClient(promapi.Config{ + Address: prometheusURL, + RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(authToken), roundTripper), + }) + } + return promapi.NewClient(promapi.Config{ + Address: prometheusURL, + }) +} diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 2b18852889..5bd0b0b8cc 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -23,6 +23,8 @@ import ( "strconv" "time" + promapi "github.com/prometheus/client_golang/api" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -34,14 +36,18 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilversion "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" + corev1listers "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" + "k8s.io/client-go/util/workqueue" componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" @@ -62,6 +68,11 @@ import ( "sigs.k8s.io/descheduler/pkg/version" ) +const ( + prometheusAuthTokenSecretKey = "prometheusAuthToken" + workQueueKey = "key" +) + type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status type profileRunner struct { @@ -70,15 +81,18 @@ type profileRunner struct { } type descheduler struct { - rs *options.DeschedulerServer - ir *informerResources - getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc - sharedInformerFactory informers.SharedInformerFactory - deschedulerPolicy *api.DeschedulerPolicy - eventRecorder events.EventRecorder - podEvictor *evictions.PodEvictor - podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) - metricsCollector *metricscollector.MetricsCollector + rs *options.DeschedulerServer + ir *informerResources + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + sharedInformerFactory informers.SharedInformerFactory + namespacedSecretsLister corev1listers.SecretNamespaceLister + deschedulerPolicy *api.DeschedulerPolicy + eventRecorder events.EventRecorder + podEvictor *evictions.PodEvictor + podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error) + metricsCollector *metricscollector.MetricsCollector + prometheusClient promapi.Client + queue workqueue.RateLimitingInterface } type informerResources struct { @@ -125,7 +139,7 @@ func (ir *informerResources) CopyTo(fakeClient *fakeclientset.Clientset, newFact return nil } -func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { +func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) { podInformer := sharedInformerFactory.Core().V1().Pods().Informer() ir := newInformerResources(sharedInformerFactory) @@ -156,16 +170,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche WithMetricsEnabled(!rs.DisableMetrics), ) - var metricsCollector *metricscollector.MetricsCollector - if deschedulerPolicy.MetricsCollector.Enabled { - nodeSelector := "" - if deschedulerPolicy.NodeSelector != nil { - nodeSelector = *deschedulerPolicy.NodeSelector - } - metricsCollector = metricscollector.NewMetricsCollector(rs.Client, rs.MetricsClient, nodeSelector) - } - - return &descheduler{ + desch := &descheduler{ rs: rs, ir: ir, getPodsAssignedToNode: getPodsAssignedToNode, @@ -174,8 +179,88 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche eventRecorder: eventRecorder, podEvictor: podEvictor, podEvictionReactionFnc: podEvictionReactionFnc, - metricsCollector: metricsCollector, - }, nil + prometheusClient: rs.PrometheusClient, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ""), + } + + if deschedulerPolicy.MetricsCollector.Enabled { + nodeSelector := "" + if deschedulerPolicy.NodeSelector != nil { + nodeSelector = *deschedulerPolicy.NodeSelector + } + desch.metricsCollector = metricscollector.NewMetricsCollector(rs.Client, rs.MetricsClient, nodeSelector) + } + + if namespacedSharedInformerFactory != nil { + namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(desch.eventHandler()) + desch.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(deschedulerPolicy.Prometheus.AuthToken.SecretReference.Namespace) + } + + return desch, nil +} + +func (d *descheduler) run(workers int, ctx context.Context) { + defer utilruntime.HandleCrash() + defer d.queue.ShutDown() + + klog.Infof("Starting authentication secret reconciler") + defer klog.Infof("Shutting down authentication secret reconciler") + + go wait.UntilWithContext(ctx, d.runWorker, time.Second) + + <-ctx.Done() +} + +func (d *descheduler) runWorker(ctx context.Context) { + for d.processNextWorkItem(ctx) { + } +} + +func (d *descheduler) processNextWorkItem(ctx context.Context) bool { + dsKey, quit := d.queue.Get() + if quit { + return false + } + defer d.queue.Done(dsKey) + + err := d.sync() + if err == nil { + d.queue.Forget(dsKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + d.queue.AddRateLimited(dsKey) + + return true +} + +func (d *descheduler) sync() error { + ns := d.deschedulerPolicy.Prometheus.AuthToken.SecretReference.Namespace + name := d.deschedulerPolicy.Prometheus.AuthToken.SecretReference.Name + secretObj, err := d.namespacedSecretsLister.Get(name) + if err != nil { + return fmt.Errorf("unable to get %v/%v secret", ns, name) + } + authToken := string(secretObj.Data[prometheusAuthTokenSecretKey]) + if authToken == "" { + return fmt.Errorf("prometheus authentication token secret missing %q data or empty", prometheusAuthTokenSecretKey) + } + klog.V(2).Infof("authentication secret token updated, recreating prometheus client") + prometheusClient, err := client.CreatePrometheusClient(d.deschedulerPolicy.Prometheus.URL, authToken, d.deschedulerPolicy.Prometheus.InsecureSkipVerify) + if err != nil { + return fmt.Errorf("unable to create a prometheus client: %v", err) + } + d.prometheusClient = prometheusClient + return nil +} + +func (d *descheduler) eventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { d.queue.Add(workQueueKey) }, + UpdateFunc: func(old, new interface{}) { d.queue.Add(workQueueKey) }, + DeleteFunc: func(obj interface{}) { d.queue.Add(workQueueKey) }, + } } func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error { @@ -255,6 +340,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), frameworkprofile.WithMetricsCollector(d.metricsCollector), + frameworkprofile.WithPrometheusClient(d.prometheusClient), ) if err != nil { klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) @@ -423,7 +509,24 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient) defer eventBroadcaster.Shutdown() - descheduler, err := newDescheduler(rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory) + var namespacedSharedInformerFactory informers.SharedInformerFactory + if deschedulerPolicy.Prometheus.URL != "" { + promConfig := deschedulerPolicy.Prometheus + // Raw auth token takes precedence + if len(promConfig.AuthToken.Raw) > 0 { + klog.V(2).Infof("Creating Prometheus client") + prometheusClient, err := client.CreatePrometheusClient(deschedulerPolicy.Prometheus.URL, promConfig.AuthToken.Raw, deschedulerPolicy.Prometheus.InsecureSkipVerify) + if err != nil { + return fmt.Errorf("unable to create a prometheus client: %v", err) + } + rs.PrometheusClient = prometheusClient + } else if promConfig.AuthToken.SecretReference.Name != "" { + // Will get reconciled + namespacedSharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields), informers.WithNamespace(deschedulerPolicy.Prometheus.AuthToken.SecretReference.Namespace)) + } + } + + descheduler, err := newDescheduler(rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory, namespacedSharedInformerFactory) if err != nil { span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error()))) return err @@ -448,6 +551,12 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer } } + if namespacedSharedInformerFactory != nil { + namespacedSharedInformerFactory.Start(ctx.Done()) + namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done()) + go descheduler.run(1, ctx) + } + wait.NonSlidingUntil(func() { // A next context is created here intentionally to avoid nesting the spans via context. sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil") diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index 312c6b271c..8f726c7fa2 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -164,7 +164,7 @@ func initDescheduler(t *testing.T, ctx context.Context, internalDeschedulerPolic sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields)) eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client) - descheduler, err := newDescheduler(rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory) + descheduler, err := newDescheduler(rs, internalDeschedulerPolicy, "v1", eventRecorder, sharedInformerFactory, nil) if err != nil { eventBroadcaster.Shutdown() t.Fatalf("Unable to create a descheduler instance: %v", err) diff --git a/pkg/framework/fake/fake.go b/pkg/framework/fake/fake.go index d274228935..681a9275bf 100644 --- a/pkg/framework/fake/fake.go +++ b/pkg/framework/fake/fake.go @@ -11,6 +11,8 @@ import ( "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + + promapi "github.com/prometheus/client_golang/api" ) type HandleImpl struct { @@ -20,6 +22,7 @@ type HandleImpl struct { EvictorFilterImpl frameworktypes.EvictorPlugin PodEvictorImpl *evictions.PodEvictor MetricsCollectorImpl *metricscollector.MetricsCollector + PrometheusClientImpl promapi.Client } var _ frameworktypes.Handle = &HandleImpl{} @@ -28,6 +31,10 @@ func (hi *HandleImpl) ClientSet() clientset.Interface { return hi.ClientsetImpl } +func (hi *HandleImpl) PrometheusClient() promapi.Client { + return hi.PrometheusClientImpl +} + func (hi *HandleImpl) MetricsCollector() *metricscollector.MetricsCollector { return hi.MetricsCollectorImpl } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index 8abb48465f..0ab93fb018 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -56,7 +56,18 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f return nil, fmt.Errorf("want args to be of type LowNodeUtilizationArgs, got %T", args) } - setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds) + if lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus.Query != "" { + uResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds) + oResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.TargetThresholds) + if len(uResourceNames) != 1 || uResourceNames[0] != ResourceMetrics { + return nil, fmt.Errorf("thresholds are expected to specify a single instance of %q resource, got %v instead", ResourceMetrics, uResourceNames) + } + if len(oResourceNames) != 1 || oResourceNames[0] != ResourceMetrics { + return nil, fmt.Errorf("targetThresholds are expected to specify a single instance of %q resource, got %v instead", ResourceMetrics, oResourceNames) + } + } else { + setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds) + } underutilizationCriteria := []interface{}{ "CPU", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourceCPU], @@ -95,6 +106,11 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f return nil, fmt.Errorf("metrics client not initialized") } usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) + } else if lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus.Query != "" { + if handle.PrometheusClient() == nil { + return nil, fmt.Errorf("prometheus client not initialized") + } + usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), handle.PrometheusClient(), lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus.Query) } else { usageClient = newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc()) } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go index 1bd895c3c7..324cea7f38 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go @@ -38,6 +38,8 @@ import ( frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" "sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/test" + + "github.com/prometheus/common/model" ) func TestLowNodeUtilization(t *testing.T) { @@ -1360,3 +1362,160 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) { }) } } + +func withLocalStorage(pod *v1.Pod) { + // A pod with local storage. + test.SetNormalOwnerRef(pod) + pod.Spec.Volumes = []v1.Volume{ + { + Name: "sample", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{Path: "somePath"}, + EmptyDir: &v1.EmptyDirVolumeSource{ + SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI), + }, + }, + }, + } + // A Mirror Pod. + pod.Annotations = test.GetMirrorPodAnnotation() +} + +func withCriticalPod(pod *v1.Pod) { + // A Critical Pod. + test.SetNormalOwnerRef(pod) + pod.Namespace = "kube-system" + priority := utils.SystemCriticalPriority + pod.Spec.Priority = &priority +} + +func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) { + n1NodeName := "n1" + n2NodeName := "n2" + n3NodeName := "n3" + + testCases := []struct { + name string + useDeviationThresholds bool + thresholds, targetThresholds api.ResourceThresholds + query string + samples []model.Sample + nodes []*v1.Node + pods []*v1.Pod + expectedPodsEvicted uint + evictedPods []string + evictableNamespaces *api.Namespaces + }{ + { + name: "with instance:node_cpu:rate:sum query", + thresholds: api.ResourceThresholds{ + v1.ResourceName("MetricResource"): 30, + }, + targetThresholds: api.ResourceThresholds{ + v1.ResourceName("MetricResource"): 50, + }, + query: "instance:node_cpu:rate:sum", + samples: []model.Sample{ + sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561), + sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522), + sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104), + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil), + test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage), + test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), + }, + expectedPodsEvicted: 1, + }, + } + + for _, tc := range testCases { + testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var objs []runtime.Object + for _, node := range tc.nodes { + objs = append(objs, node) + } + for _, pod := range tc.pods { + objs = append(objs, pod) + } + + fakeClient := fake.NewSimpleClientset(objs...) + + podsForEviction := make(map[string]struct{}) + for _, pod := range tc.evictedPods { + podsForEviction[pod] = struct{}{} + } + + evictionFailed := false + if len(tc.evictedPods) > 0 { + fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.CreateAction) + obj := getAction.GetObject() + if eviction, ok := obj.(*policy.Eviction); ok { + if _, exists := podsForEviction[eviction.Name]; exists { + return true, obj, nil + } + evictionFailed = true + return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) + } + return true, obj, nil + }) + } + + handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) + if err != nil { + t.Fatalf("Unable to initialize a framework handle: %v", err) + } + + handle.PrometheusClientImpl = &fakePromClient{ + result: tc.samples, + } + plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ + Thresholds: tc.thresholds, + TargetThresholds: tc.targetThresholds, + UseDeviationThresholds: tc.useDeviationThresholds, + EvictableNamespaces: tc.evictableNamespaces, + MetricsUtilization: MetricsUtilization{ + Prometheus: Prometheus{ + Query: tc.query, + }, + }, + }, + handle) + if err != nil { + t.Fatalf("Unable to initialize the plugin: %v", err) + } + + status := plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) + if status != nil { + t.Fatalf("Balance.err: %v", status.Err) + } + + podsEvicted := podEvictor.TotalEvicted() + if expectedPodsEvicted != podsEvicted { + t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted) + } + if evictionFailed { + t.Errorf("Pod evictions failed unexpectedly") + } + } + } + t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted)) + } +} diff --git a/pkg/framework/plugins/nodeutilization/nodeutilization.go b/pkg/framework/plugins/nodeutilization/nodeutilization.go index a694cacc83..980847a7c3 100644 --- a/pkg/framework/plugins/nodeutilization/nodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/nodeutilization.go @@ -34,6 +34,8 @@ import ( "sigs.k8s.io/descheduler/pkg/utils" ) +const ResourceMetrics = v1.ResourceName("MetricResource") + // NodeUsage stores a node's info, pods on it, thresholds and its resource usage type NodeUsage struct { node *v1.Node @@ -92,6 +94,10 @@ func getNodeThresholds( if len(node.Status.Allocatable) > 0 { nodeCapacity = node.Status.Allocatable } + if len(resourceNames) == 1 && resourceNames[0] == ResourceMetrics { + // Make ResourceMetrics 100% => 100 points + nodeCapacity[ResourceMetrics] = *resource.NewQuantity(int64(100), resource.DecimalSI) + } nodeThresholdsMap[node.Name] = NodeThresholds{ lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{}, @@ -327,15 +333,27 @@ func evictPods( if !preEvictionFilterWithOptions(pod) { continue } + + // In case podUsage does not support resource counting (e.g. provided metric + // does not quantify pod resource utilization) allow to evict only a single + // pod. It is recommended to run the descheduling cycle more often + // so the plugin can perform more evictions towards the re-distribution. + singleEviction := false podUsage, err := usageClient.podUsage(pod) if err != nil { - klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) - continue + if _, ok := err.(*notSupportedError); !ok { + klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) + continue + } + singleEviction = true } err = podEvictor.Evict(ctx, pod, evictOptions) if err == nil { klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod)) - + if singleEviction { + klog.V(3).InfoS("Currently, only a single pod eviction is allowed") + break + } for name := range totalAvailableUsage { if name == v1.ResourcePods { nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) diff --git a/pkg/framework/plugins/nodeutilization/types.go b/pkg/framework/plugins/nodeutilization/types.go index 8e005fa02d..4c15edc339 100644 --- a/pkg/framework/plugins/nodeutilization/types.go +++ b/pkg/framework/plugins/nodeutilization/types.go @@ -57,4 +57,14 @@ type MetricsUtilization struct { // metricsServer enables metrics from a kubernetes metrics server. // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. MetricsServer bool `json:"metricsServer,omitempty"` + + // prometheus enables metrics collection through a prometheus query. + Prometheus Prometheus `json:"prometheus,omitempty"` +} + +type Prometheus struct { + // query returning a vector of samples, each sample labeled with `instance` + // corresponding to a node name with each sample value as a real number + // in <0; 1> interval. + Query string `json:"query,omitempty"` } diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 0bf89aca70..5ca1d0db22 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -18,8 +18,15 @@ package nodeutilization import ( "context" + "encoding/json" "fmt" + "net/http" + "net/url" + "time" + promapi "github.com/prometheus/client_golang/api" + promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,6 +39,28 @@ import ( "sigs.k8s.io/descheduler/pkg/utils" ) +type UsageClientType int + +const ( + requestedUsageClientType UsageClientType = iota + actualUsageClientType + prometheusUsageClientType +) + +type notSupportedError struct { + usageClientType UsageClientType +} + +func (e notSupportedError) Error() string { + return "maximum number of evicted pods per node reached" +} + +func newNotSupportedError(usageClientType UsageClientType) *notSupportedError { + return ¬SupportedError{ + usageClientType: usageClientType, + } +} + type usageClient interface { // Both low/high node utilization plugins are expected to invoke sync right // after Balance method is invoked. There's no cache invalidation so each @@ -190,3 +219,110 @@ func (client *actualUsageClient) sync(nodes []*v1.Node) error { return nil } + +type prometheusUsageClient struct { + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + promClient promapi.Client + promQuery string + + _pods map[string][]*v1.Pod + _nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity +} + +var _ usageClient = &actualUsageClient{} + +func newPrometheusUsageClient( + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, + promClient promapi.Client, + promQuery string, +) *prometheusUsageClient { + return &prometheusUsageClient{ + getPodsAssignedToNode: getPodsAssignedToNode, + promClient: promClient, + promQuery: promQuery, + } +} + +func (client *prometheusUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity { + return client._nodeUtilization[node] +} + +func (client *prometheusUsageClient) pods(node string) []*v1.Pod { + return client._pods[node] +} + +func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + return nil, newNotSupportedError(prometheusUsageClientType) +} + +type fakePromClient struct { + result interface{} +} + +type fakePayload struct { + Status string `json:"status"` + Data queryResult `json:"data"` +} + +type queryResult struct { + Type model.ValueType `json:"resultType"` + Result interface{} `json:"result"` +} + +func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL { + return &url.URL{} +} + +func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) { + jsonData, err := json.Marshal(fakePayload{ + Status: "success", + Data: queryResult{ + Type: model.ValVector, + Result: client.result, + }, + }) + + return &http.Response{StatusCode: 200}, jsonData, err +} + +func (client *prometheusUsageClient) resourceNames() []v1.ResourceName { + return []v1.ResourceName{ResourceMetrics} +} + +func (client *prometheusUsageClient) sync(nodes []*v1.Node) error { + client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) + client._pods = make(map[string][]*v1.Pod) + + results, warnings, err := promv1.NewAPI(client.promClient).Query(context.TODO(), client.promQuery, time.Now()) + if err != nil { + return fmt.Errorf("unable to capture prometheus metrics: %v", err) + } + if len(warnings) > 0 { + klog.Infof("prometheus metrics warnings: %v", warnings) + } + + nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity) + for _, sample := range results.(model.Vector) { + nodeName := string(sample.Metric["instance"]) + nodeUsages[nodeName] = map[v1.ResourceName]*resource.Quantity{ + v1.ResourceName("MetricResource"): resource.NewQuantity(int64(sample.Value*100), resource.DecimalSI), + } + } + + for _, node := range nodes { + if _, exists := nodeUsages[node.Name]; !exists { + return fmt.Errorf("unable to find metric entry for %v", node.Name) + } + pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) + if err != nil { + klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) + return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + } + + // store the snapshot of pods from the same (or the closest) node utilization computation + client._pods[node.Name] = pods + client._nodeUtilization[node.Name] = nodeUsages[node.Name] + } + + return nil +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go index 016335520a..2af29923e3 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients_test.go +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + "github.com/prometheus/common/model" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime/schema" @@ -59,7 +61,7 @@ func updateMetricsAndCheckNodeUtilization( } err = usageClient.sync(nodes) if err != nil { - t.Fatalf("failed to capture a snapshot: %v", err) + t.Fatalf("failed to sync a snapshot: %v", err) } nodeUtilization := usageClient.nodeUtilization(nodeName) t.Logf("current node cpu usage: %v\n", nodeUtilization[v1.ResourceCPU].MilliValue()) @@ -135,3 +137,70 @@ func TestActualUsageClient(t *testing.T) { metricsClientset, collector, usageClient, nodes, n2.Name, n2metrics, ) } + +func sample(metricName, nodeName string, value float64) model.Sample { + return model.Sample{ + Metric: model.Metric{ + "__name__": model.LabelValue(metricName), + "instance": model.LabelValue(nodeName), + }, + Value: model.SampleValue(value), + Timestamp: 1728991761711, + } +} + +func TestPrometheusUsageClient(t *testing.T) { + n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil) + + nodes := []*v1.Node{n1, n2, n3} + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil) + p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + pClient := &fakePromClient{ + result: []model.Sample{ + sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104), + sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522), + sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561), + }, + } + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3) + + ctx := context.TODO() + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum") + err = prometheusUsageClient.sync(nodes) + if err != nil { + t.Fatalf("unable to sync prometheus metrics: %v", err) + } + + for _, node := range nodes { + nodeUtil := prometheusUsageClient.nodeUtilization(node.Name) + fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil) + } + + nodeThresholds := NodeThresholds{ + lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{ + ResourceMetrics: resource.NewQuantity(int64(30), resource.DecimalSI), + }, + highResourceThreshold: map[v1.ResourceName]*resource.Quantity{ + ResourceMetrics: resource.NewQuantity(int64(50), resource.DecimalSI), + }, + } + + fmt.Printf("nodeThresholds: %#v\n", nodeThresholds) +} diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index f142b0cdb7..4038d22d38 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -18,6 +18,7 @@ import ( "fmt" "time" + promapi "github.com/prometheus/client_golang/api" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -68,6 +69,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev // handleImpl implements the framework handle which gets passed to plugins type handleImpl struct { clientSet clientset.Interface + prometheusClient promapi.Client metricsCollector *metricscollector.MetricsCollector getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc sharedInformerFactory informers.SharedInformerFactory @@ -81,6 +83,10 @@ func (hi *handleImpl) ClientSet() clientset.Interface { return hi.clientSet } +func (hi *handleImpl) PrometheusClient() promapi.Client { + return hi.prometheusClient +} + func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector { return hi.metricsCollector } @@ -131,6 +137,7 @@ type Option func(*handleImplOpts) type handleImplOpts struct { clientSet clientset.Interface + prometheusClient promapi.Client sharedInformerFactory informers.SharedInformerFactory getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc podEvictor *evictions.PodEvictor @@ -144,6 +151,13 @@ func WithClientSet(clientSet clientset.Interface) Option { } } +// WithPrometheusClient sets Prometheus client for the scheduling frameworkImpl. +func WithPrometheusClient(prometheusClient promapi.Client) Option { + return func(o *handleImplOpts) { + o.prometheusClient = prometheusClient + } +} + func WithSharedInformerFactory(sharedInformerFactory informers.SharedInformerFactory) Option { return func(o *handleImplOpts) { o.sharedInformerFactory = sharedInformerFactory @@ -267,6 +281,7 @@ func NewProfile(config api.DeschedulerProfile, reg pluginregistry.Registry, opts podEvictor: hOpts.podEvictor, }, metricsCollector: hOpts.metricsCollector, + prometheusClient: hOpts.prometheusClient, } pluginNames := append(config.Plugins.Deschedule.Enabled, config.Plugins.Balance.Enabled...) diff --git a/pkg/framework/types/types.go b/pkg/framework/types/types.go index 6cb95ca247..2480e06b02 100644 --- a/pkg/framework/types/types.go +++ b/pkg/framework/types/types.go @@ -26,6 +26,8 @@ import ( "sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/metricscollector" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" + + promapi "github.com/prometheus/client_golang/api" ) // Handle provides handles used by plugins to retrieve a kubernetes client set, @@ -34,6 +36,7 @@ import ( type Handle interface { // ClientSet returns a kubernetes clientSet. ClientSet() clientset.Interface + PrometheusClient() promapi.Client Evictor() Evictor GetPodsAssignedToNodeFunc() podutil.GetPodsAssignedToNodeFunc SharedInformerFactory() informers.SharedInformerFactory