Skip to content

Commit

Permalink
chore: Additional upstream metrics (#1672)
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 authored Sep 25, 2024
1 parent 0b52bb1 commit 769f811
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.With(prometheus.Labels{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
}).Inc()
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
Expand Down
12 changes: 11 additions & 1 deletion pkg/controllers/node/termination/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
func init() {
crmetrics.Registry.MustRegister(
TerminationDurationSeconds,
NodeLifetimeDurationSeconds)
NodeLifetimeDurationSeconds,
NodesDrainedTotal)
}

const dayDuration = time.Hour * 24
Expand All @@ -44,6 +45,15 @@ var (
},
[]string{metrics.NodePoolLabel},
)
NodesDrainedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "drained_total",
Help: "The total number of nodes drained by Karpenter",
},
[]string{metrics.NodePoolLabel},
)
NodeLifetimeDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var _ = Describe("Termination", func() {
metrics.NodesTerminatedTotal.Reset()
termination.TerminationDurationSeconds.Reset()
termination.NodeLifetimeDurationSeconds.Reset()
termination.NodesDrainedTotal.Reset()
})

Context("Reconciliation", func() {
Expand Down Expand Up @@ -841,6 +842,7 @@ var _ = Describe("Termination", func() {
node = ExpectNodeExists(ctx, env.Client, node.Name)
// Reconcile twice, once to set the NodeClaim to terminating, another to check the instance termination status (and delete the node).
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectMetricCounterValue(termination.NodesDrainedTotal, 1, map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
ExpectObjectReconciled(ctx, env.Client, terminationController, node)

m, ok := FindMetricWithLabelValues("karpenter_nodes_terminated_total", map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
},
},
}); err != nil {
var apiStatus apierrors.APIStatus
if errors.As(err, &apiStatus) {
code := apiStatus.Status().Code
NodesEvictionRequestsTotal.With(map[string]string{CodeLabel: fmt.Sprint(code)}).Inc()
}
// status codes for the eviction API are defined here:
// https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
Expand All @@ -199,6 +204,7 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
log.FromContext(ctx).Error(err, "failed evicting pod")
return false
}
NodesEvictionRequestsTotal.With(map[string]string{CodeLabel: "200"}).Inc()
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
return true
}
43 changes: 43 additions & 0 deletions pkg/controllers/node/termination/terminator/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terminator

import (
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

"sigs.k8s.io/karpenter/pkg/metrics"
)

const (
// CodeLabel for eviction request
CodeLabel = "code"
)

var NodesEvictionRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "eviction_requests_total",
Help: "The total number of eviction requests made by Karpenter",
},
[]string{CodeLabel},
)

func init() {
crmetrics.Registry.MustRegister(NodesEvictionRequestsTotal)
}
4 changes: 4 additions & 0 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ var _ = Describe("Eviction/Queue", func() {
Labels: testLabels,
},
})
terminator.NodesEvictionRequestsTotal.Reset()
})

Context("Eviction API", func() {
Expand All @@ -102,11 +103,13 @@ var _ = Describe("Eviction/Queue", func() {
It("should succeed with no event when the pod UID conflicts", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue())
ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "409"})
Expect(recorder.Events()).To(HaveLen(0))
})
It("should succeed with an evicted event when there are no PDBs", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue())
ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "200"})
Expect(recorder.Calls("Evicted")).To(Equal(1))
})
It("should succeed with no event when there are PDBs that allow an eviction", func() {
Expand All @@ -130,6 +133,7 @@ var _ = Describe("Eviction/Queue", func() {
})
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "500"})
})
It("should ensure that calling Evict() is valid while making Add() calls", func() {
cancelCtx, cancel := context.WithCancel(ctx)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ func (p *Provisioner) GetPendingPods(ctx context.Context) ([]*corev1.Pod, error)
if err != nil {
return nil, fmt.Errorf("listing pods, %w", err)
}
pods = lo.Reject(pods, func(po *corev1.Pod, _ int) bool {
rejectedPods, pods := lo.FilterReject(pods, func(po *corev1.Pod, _ int) bool {
if err := p.Validate(ctx, po); err != nil {
log.FromContext(ctx).WithValues("Pod", klog.KRef(po.Namespace, po.Name)).V(1).Info(fmt.Sprintf("ignoring pod, %s", err))
return true
}
return false
})
scheduler.IgnoredPodCount.Set(float64(len(rejectedPods)))
p.consolidationWarnings(ctx, pods)
return pods, nil
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controllers/provisioning/scheduling/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func init() {
crmetrics.Registry.MustRegister(SchedulingDurationSeconds, QueueDepth)
crmetrics.Registry.MustRegister(SchedulingDurationSeconds, QueueDepth, IgnoredPodCount)
}

const (
Expand Down Expand Up @@ -58,4 +58,11 @@ var (
schedulingIDLabel,
},
)
IgnoredPodCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Name: "ignored_pod_count",
Help: "Number of pods ignored during scheduling by Karpenter",
},
)
)
4 changes: 4 additions & 0 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
Expand Down Expand Up @@ -98,6 +100,7 @@ var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
cloudProvider.Reset()
cluster.Reset()
pscheduling.IgnoredPodCount.Set(0)
})

var _ = Describe("Provisioning", func() {
Expand Down Expand Up @@ -1370,6 +1373,7 @@ var _ = Describe("Provisioning", func() {
PersistentVolumeClaims: []string{"invalid"},
})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
ExpectMetricGaugeValue(pscheduling.IgnoredPodCount, 1, nil)
ExpectNotScheduled(ctx, env.Client, pod)
})
It("should schedule with an empty storage class if the pvc is bound", func() {
Expand Down
17 changes: 15 additions & 2 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type Cluster struct {
// changed about the cluster that might make consolidation possible. By recording
// the state, interested disruption methods can check to see if this has changed to
// optimize and not try to disrupt if nothing about the cluster has changed.
clusterState time.Time
antiAffinityPods sync.Map // pod namespaced name -> *corev1.Pod of pods that have required anti affinities
clusterState time.Time
unsyncedStartTime time.Time
antiAffinityPods sync.Map // pod namespaced name -> *corev1.Pod of pods that have required anti affinities
}

func NewCluster(clk clock.Clock, client client.Client) *Cluster {
Expand All @@ -82,6 +83,18 @@ func NewCluster(clk clock.Clock, client client.Client) *Cluster {
//
//nolint:gocyclo
func (c *Cluster) Synced(ctx context.Context) (synced bool) {
// Set the metric depending on the result of the Synced() call
defer func() {
if synced {
c.unsyncedStartTime = time.Time{}
ClusterStateUnsyncedTimeSeconds.With(map[string]string{}).Set(0)
} else {
if c.unsyncedStartTime.IsZero() {
c.unsyncedStartTime = c.clock.Now()
}
ClusterStateUnsyncedTimeSeconds.With(map[string]string{}).Set(c.clock.Since(c.unsyncedStartTime).Seconds())
}
}()
// Set the metric to whatever the result of the Synced() call is
defer func() {
ClusterStateSynced.Set(lo.Ternary[float64](synced, 1, 0))
Expand Down
11 changes: 10 additions & 1 deletion pkg/controllers/state/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,17 @@ var (
Help: "Returns 1 if cluster state is synced and 0 otherwise. Synced checks that nodeclaims and nodes that are stored in the APIServer have the same representation as Karpenter's cluster state",
},
)
ClusterStateUnsyncedTimeSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: stateSubsystem,
Name: "unsynced_time_seconds",
Help: "The time for which cluster state is not synced",
},
[]string{},
)
)

func init() {
crmetrics.Registry.MustRegister(ClusterStateNodesCount, ClusterStateSynced)
crmetrics.Registry.MustRegister(ClusterStateNodesCount, ClusterStateSynced, ClusterStateUnsyncedTimeSeconds)
}
22 changes: 22 additions & 0 deletions pkg/controllers/state/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var _ = AfterSuite(func() {

var _ = BeforeEach(func() {
fakeClock.SetTime(time.Now())
state.ClusterStateUnsyncedTimeSeconds.Reset()
cloudProvider.InstanceTypes = fake.InstanceTypesAssorted()
nodePool = test.NodePool(v1.NodePool{ObjectMeta: metav1.ObjectMeta{Name: "default"}})
ExpectApplied(ctx, env.Client, nodePool)
Expand Down Expand Up @@ -1126,6 +1127,27 @@ var _ = Describe("Cluster State Sync", func() {
Expect(cluster.Synced(ctx)).To(BeTrue())
ExpectMetricGaugeValue(state.ClusterStateSynced, 1.0, nil)
ExpectMetricGaugeValue(state.ClusterStateNodesCount, 1000.0, nil)
metric, found := FindMetricWithLabelValues("karpenter_cluster_state_unsynced_time_seconds", map[string]string{})
Expect(found).To(BeTrue())
Expect(metric.GetGauge().GetValue()).To(BeEquivalentTo(0))
})
It("should emit cluster_state_unsynced_time_seconds metric when cluster state is unsynced", func() {
nodeClaim := test.NodeClaim(v1.NodeClaim{
Status: v1.NodeClaimStatus{
ProviderID: "",
},
})
nodeClaim.Status.ProviderID = ""
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectReconcileSucceeded(ctx, nodeClaimController, client.ObjectKeyFromObject(nodeClaim))
Expect(cluster.Synced(ctx)).To(BeFalse())

fakeClock.Step(2 * time.Minute)
ExpectReconcileSucceeded(ctx, nodeClaimController, client.ObjectKeyFromObject(nodeClaim))
Expect(cluster.Synced(ctx)).To(BeFalse())
metric, found := FindMetricWithLabelValues("karpenter_cluster_state_unsynced_time_seconds", map[string]string{})
Expect(found).To(BeTrue())
Expect(metric.GetGauge().GetValue()).To(BeNumerically(">=", 120))
})
It("should consider the cluster state synced when nodes don't have provider id", func() {
// Deploy 1000 nodes and sync them all with the cluster
Expand Down

0 comments on commit 769f811

Please sign in to comment.