Skip to content

Commit

Permalink
[nodeutilization]: prometheus usage client through kubernetes metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvagabund committed Nov 16, 2024
1 parent 974dedb commit 477104c
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 7 deletions.
22 changes: 22 additions & 0 deletions kubernetes/base/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ rules:
resources: ["nodes", "pods"]
verbs: ["get", "list"]
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: descheduler-role
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand All @@ -54,3 +63,16 @@ subjects:
- name: descheduler-sa
kind: ServiceAccount
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: descheduler-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: descheduler-role
subjects:
- name: descheduler-sa
kind: ServiceAccount
namespace: kube-system
2 changes: 1 addition & 1 deletion pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche
v1.SchemeGroupVersion.WithResource("namespaces"), // Used by the defaultevictor plugin
schedulingv1.SchemeGroupVersion.WithResource("priorityclasses"), // Used by the defaultevictor plugin
policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), // Used by the defaultevictor plugin

v1.SchemeGroupVersion.WithResource("secrets"), // Used by LowNodeUtilization plugin
) // Used by the defaultevictor plugin

getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
Expand Down
123 changes: 121 additions & 2 deletions pkg/framework/plugins/nodeutilization/lownodeutilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,36 @@ package nodeutilization

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"

"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"

promapi "github.com/prometheus/client_golang/api"
"github.com/prometheus/common/config"
)

const LowNodeUtilizationPluginName = "LowNodeUtilization"
const (
LowNodeUtilizationPluginName = "LowNodeUtilization"
K8sPodCAFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)

// LowNodeUtilization evicts pods from overutilized nodes to underutilized nodes. Note that CPU/Memory requests are used
// to calculate nodes' utilization and not the actual resource usage.
Expand All @@ -44,18 +60,44 @@ type LowNodeUtilization struct {
overutilizationCriteria []interface{}
resourceNames []v1.ResourceName
usageClient usageClient
promClient promapi.Client
}

var _ frameworktypes.BalancePlugin = &LowNodeUtilization{}

func loadCAFile(filepath string) (*x509.CertPool, error) {
caCert, err := ioutil.ReadFile(filepath)
if err != nil {
return nil, err
}

caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to append CA certificate to the pool")
}

return caCertPool, nil
}

// NewLowNodeUtilization builds plugin from its arguments while passing a handle
func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
lowNodeUtilizationArgsArgs, ok := args.(*LowNodeUtilizationArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type LowNodeUtilizationArgs, got %T", args)
}

setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds)
if lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus.URL != "" {
uResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)
oResourceNames := getResourceNames(lowNodeUtilizationArgsArgs.TargetThresholds)
if len(uResourceNames) != 1 || uResourceNames[0] != ResourceMetrics {
return nil, fmt.Errorf("thresholds are expected to specify a single instance of %q resource, got %v instead", ResourceMetrics, uResourceNames)
}
if len(oResourceNames) != 1 || oResourceNames[0] != ResourceMetrics {
return nil, fmt.Errorf("targetThresholds are expected to specify a single instance of %q resource, got %v instead", ResourceMetrics, oResourceNames)
}
} else {
setDefaultForLNUThresholds(lowNodeUtilizationArgsArgs.Thresholds, lowNodeUtilizationArgsArgs.TargetThresholds, lowNodeUtilizationArgsArgs.UseDeviationThresholds)
}

underutilizationCriteria := []interface{}{
"CPU", lowNodeUtilizationArgsArgs.Thresholds[v1.ResourceCPU],
Expand Down Expand Up @@ -91,6 +133,83 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f
var usageClient usageClient
if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer {
usageClient = newActualUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
} else if lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus.URL != "" {
promConfig := lowNodeUtilizationArgsArgs.MetricsUtilization.Prometheus

var authToken string
// Raw auth token takes precedence
if len(promConfig.AuthToken.Raw) > 0 {
authToken = promConfig.AuthToken.Raw
} else if promConfig.AuthToken.SecretReference.Name != "" {
secretObj, err := handle.ClientSet().CoreV1().Secrets(promConfig.AuthToken.SecretReference.Namespace).Get(context.TODO(), promConfig.AuthToken.SecretReference.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("unable to get secret with prometheus authentication token: %v", err)
}
authToken = string(secretObj.Data[PrometheusAuthTokenKey])
if authToken == "" {
return nil, fmt.Errorf("prometheus authentication token secret missing %q data or empty", PrometheusAuthTokenKey)
}
}

// Ignore TLS verify errors if InsecureSkipVerify is set
roundTripper := promapi.DefaultRoundTripper
if promConfig.EnableAuthentication {
// Retrieve Pod CA cert
caCertPool, err := loadCAFile(K8sPodCAFilePath)
if err != nil {
return nil, fmt.Errorf("Error loading CA file: %v", err)
}

// Get Prometheus Host
u, err := url.Parse(promConfig.URL)
if err != nil {
return nil, fmt.Errorf("Error parsing prometheus URL: %v", err)
}
roundTripper = transport.NewBearerAuthRoundTripper(
authToken,
&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
ServerName: u.Host,
},
},
)
} else if promConfig.InsecureSkipVerify {
roundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
}

var promClient promapi.Client
if authToken != "" {
promClient, err = promapi.NewClient(promapi.Config{
Address: promConfig.URL,
RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(authToken), roundTripper),
})
if err != nil {
return nil, fmt.Errorf("unable to create a new prom client: %v", err)
}
} else {
promClient, err = promapi.NewClient(promapi.Config{
Address: promConfig.URL,
})
}

usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), promClient, promConfig.Query)
// reset all resource names to just ResourceMetrics
resourceNames = []v1.ResourceName{ResourceMetrics}
} else {
usageClient = newRequestedUsageClient(resourceNames, handle.GetPodsAssignedToNodeFunc())
}
Expand Down
164 changes: 164 additions & 0 deletions pkg/framework/plugins/nodeutilization/lownodeutilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/test"

"github.com/prometheus/common/model"
)

func TestLowNodeUtilization(t *testing.T) {
Expand Down Expand Up @@ -1360,3 +1362,165 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
})
}
}

func withLocalStorage(pod *v1.Pod) {
// A pod with local storage.
test.SetNormalOwnerRef(pod)
pod.Spec.Volumes = []v1.Volume{
{
Name: "sample",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{Path: "somePath"},
EmptyDir: &v1.EmptyDirVolumeSource{
SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI),
},
},
},
}
// A Mirror Pod.
pod.Annotations = test.GetMirrorPodAnnotation()
}

func withCriticalPod(pod *v1.Pod) {
// A Critical Pod.
test.SetNormalOwnerRef(pod)
pod.Namespace = "kube-system"
priority := utils.SystemCriticalPriority
pod.Spec.Priority = &priority
}

func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) {
n1NodeName := "n1"
n2NodeName := "n2"
n3NodeName := "n3"

testCases := []struct {
name string
useDeviationThresholds bool
thresholds, targetThresholds api.ResourceThresholds
query string
samples []model.Sample
nodes []*v1.Node
pods []*v1.Pod
expectedPodsEvicted uint
evictedPods []string
evictableNamespaces *api.Namespaces
}{
{
name: "with instance:node_cpu:rate:sum query",
thresholds: api.ResourceThresholds{
v1.ResourceName("MetricResource"): 30,
},
targetThresholds: api.ResourceThresholds{
v1.ResourceName("MetricResource"): 50,
},
query: "instance:node_cpu:rate:sum",
samples: []model.Sample{
sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561),
sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522),
sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104),
},
nodes: []*v1.Node{
test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil),
test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil),
test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage),
test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 1,
},
}

for _, tc := range testCases {
testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var objs []runtime.Object
for _, node := range tc.nodes {
objs = append(objs, node)
}
for _, pod := range tc.pods {
objs = append(objs, pod)
}

fakeClient := fake.NewSimpleClientset(objs...)

podsForEviction := make(map[string]struct{})
for _, pod := range tc.evictedPods {
podsForEviction[pod] = struct{}{}
}

evictionFailed := false
if len(tc.evictedPods) > 0 {
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.CreateAction)
obj := getAction.GetObject()
if eviction, ok := obj.(*policy.Eviction); ok {
if _, exists := podsForEviction[eviction.Name]; exists {
return true, obj, nil
}
evictionFailed = true
return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name)
}
return true, obj, nil
})
}

handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil)
if err != nil {
t.Fatalf("Unable to initialize a framework handle: %v", err)
}

plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{
Thresholds: tc.thresholds,
TargetThresholds: tc.targetThresholds,
UseDeviationThresholds: tc.useDeviationThresholds,
EvictableNamespaces: tc.evictableNamespaces,
MetricsUtilization: MetricsUtilization{
Prometheus: Prometheus{
URL: "http://prometheus.example.orgname",
AuthToken: AuthToken{
Raw: "XXXXX",
},
},
},
},
handle)
if err != nil {
t.Fatalf("Unable to initialize the plugin: %v", err)
}

pClient := &fakePromClient{
result: tc.samples,
}

plugin.(*LowNodeUtilization).usageClient = newPrometheusUsageClient(handle.GetPodsAssignedToNodeFunc(), pClient, tc.query)
status := plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes)
if status != nil {
t.Fatalf("Balance.err: %v", status.Err)
}

podsEvicted := podEvictor.TotalEvicted()
if expectedPodsEvicted != podsEvicted {
t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted)
}
if evictionFailed {
t.Errorf("Pod evictions failed unexpectedly")
}
}
}
t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted))
}
}
Loading

0 comments on commit 477104c

Please sign in to comment.