Skip to content

Commit

Permalink
Merge pull request #813 from googs1025/feature/contextual_logging
Browse files Browse the repository at this point in the history
feature: use contextal logging
  • Loading branch information
k8s-ci-robot authored Nov 3, 2024
2 parents 44370e5 + 59a8b1c commit aadfeeb
Show file tree
Hide file tree
Showing 22 changed files with 334 additions and 246 deletions.
64 changes: 42 additions & 22 deletions pkg/capacityscheduling/capacity_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
pdbLister: getPDBLister(handle.SharedInformerFactory()),
}
logger := klog.FromContext(ctx)

client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
if err != nil {
Expand Down Expand Up @@ -187,7 +188,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
},
},
)
klog.InfoS("CapacityScheduling start")
logger.Info("CapacityScheduling start")
return c, nil
}

Expand Down Expand Up @@ -288,17 +289,19 @@ func (c *CapacityScheduling) PreFilterExtensions() framework.PreFilterExtensions

// AddPod from pre-computed data in cycleState.
func (c *CapacityScheduling) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
logger := klog.FromContext(ctx)

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(cycleState)
if err != nil {
klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return framework.NewStatus(framework.Error, err.Error())
}

elasticQuotaInfo := elasticQuotaSnapshotState.elasticQuotaInfos[podToAdd.Pod.Namespace]
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.addPodIfNotPresent(podToAdd.Pod)
if err != nil {
klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(podToAdd.Pod))
logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(podToAdd.Pod))
}
}

Expand All @@ -307,17 +310,19 @@ func (c *CapacityScheduling) AddPod(ctx context.Context, cycleState *framework.C

// RemovePod from pre-computed data in cycleState.
func (c *CapacityScheduling) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
logger := klog.FromContext(ctx)

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(cycleState)
if err != nil {
klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return framework.NewStatus(framework.Error, err.Error())
}

elasticQuotaInfo := elasticQuotaSnapshotState.elasticQuotaInfos[podToRemove.Pod.Namespace]
if elasticQuotaInfo != nil {
err = elasticQuotaInfo.deletePodIfPresent(podToRemove.Pod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(podToRemove.Pod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(podToRemove.Pod))
}
}

Expand Down Expand Up @@ -348,11 +353,13 @@ func (c *CapacityScheduling) Reserve(ctx context.Context, state *framework.Cycle
c.Lock()
defer c.Unlock()

logger := klog.FromContext(ctx)

elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace]
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.addPodIfNotPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
return framework.NewStatus(framework.Error, err.Error())
}
}
Expand All @@ -363,11 +370,13 @@ func (c *CapacityScheduling) Unreserve(ctx context.Context, state *framework.Cyc
c.Lock()
defer c.Unlock()

logger := klog.FromContext(ctx)

elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace]
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.deletePodIfPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
}
}
}
Expand Down Expand Up @@ -400,14 +409,16 @@ func (p *preemptor) CandidatesToVictimsMap(candidates []preemption.Candidate) ma
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
func (p *preemptor) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
logger := klog.FromContext(context.TODO())

if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
klog.V(5).InfoS("Pod is not eligible for preemption because of its preemptionPolicy", "pod", klog.KObj(pod), "preemptionPolicy", v1.PreemptNever)
logger.V(5).Info("Pod is not eligible for preemption because of its preemptionPolicy", "pod", klog.KObj(pod), "preemptionPolicy", v1.PreemptNever)
return false, "not eligible due to preemptionPolicy=Never."
}

preFilterState, err := getPreFilterState(p.state)
if err != nil {
klog.V(5).InfoS("Failed to read preFilterState from cycleState, err: %s", err, "preFilterStateKey", preFilterStateKey)
logger.V(5).Info("Failed to read preFilterState from cycleState, err: %s", err, "preFilterStateKey", preFilterStateKey)
return false, "not eligible due to failed to read from cycleState"
}

Expand All @@ -422,7 +433,7 @@ func (p *preemptor) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(p.state)
if err != nil {
klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return true, ""
}

Expand Down Expand Up @@ -480,25 +491,27 @@ func (p *preemptor) SelectVictimsOnNode(
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {

logger := klog.FromContext(ctx)

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(state)
if err != nil {
msg := "Failed to read elasticQuotaSnapshot from cycleState"
klog.ErrorS(err, msg, "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, msg, "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return nil, 0, framework.NewStatus(framework.Unschedulable, msg)
}

preFilterState, err := getPreFilterState(state)
if err != nil {
msg := "Failed to read preFilterState from cycleState"
klog.ErrorS(err, msg, "preFilterStateKey", preFilterStateKey)
logger.Error(err, msg, "preFilterStateKey", preFilterStateKey)
return nil, 0, framework.NewStatus(framework.Unschedulable, msg)
}

var nominatedPodsReqInEQWithPodReq framework.Resource
var nominatedPodsReqWithPodReq framework.Resource
podReq := preFilterState.podReq

logger := klog.FromContext(ctx)
removePod := func(rpi *framework.PodInfo) error {
if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
return err
Expand Down Expand Up @@ -625,22 +638,22 @@ func (p *preemptor) SelectVictimsOnNode(
return false, err
}
victims = append(victims, pi.Pod)
klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node()))
logger.V(5).Info("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node()))
}

if preemptorWithElasticQuota && (preemptorElasticQuotaInfo.usedOverMaxWith(&nominatedPodsReqInEQWithPodReq) || elasticQuotaInfos.aggregatedUsedOverMinWith(nominatedPodsReqWithPodReq)) {
if err := removePod(pi); err != nil {
return false, err
}
victims = append(victims, pi.Pod)
klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), " node", klog.KObj(nodeInfo.Node()))
logger.V(5).Info("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), " node", klog.KObj(nodeInfo.Node()))
}

return fits, nil
}
for _, pi := range violatingVictims {
if fits, err := reprievePod(pi); err != nil {
klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
logger.Error(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
return nil, 0, framework.AsStatus(err)
} else if !fits {
numViolatingVictim++
Expand All @@ -649,7 +662,7 @@ func (p *preemptor) SelectVictimsOnNode(
// Now we try to reprieve non-violating victims.
for _, pi := range nonViolatingVictims {
if _, err := reprievePod(pi); err != nil {
klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
logger.Error(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
return nil, 0, framework.AsStatus(err)
}
}
Expand Down Expand Up @@ -694,6 +707,9 @@ func (c *CapacityScheduling) deleteElasticQuota(obj interface{}) {
}

func (c *CapacityScheduling) addPod(obj interface{}) {
ctx := context.TODO()
logger := klog.FromContext(ctx)

pod := obj.(*v1.Pod)

c.Lock()
Expand All @@ -703,8 +719,8 @@ func (c *CapacityScheduling) addPod(obj interface{}) {
// If elasticQuotaInfo is nil, try to list ElasticQuotas through elasticQuotaLister
if elasticQuotaInfo == nil {
var eqList v1alpha1.ElasticQuotaList
if err := c.client.List(context.Background(), &eqList, client.InNamespace(pod.Namespace)); err != nil {
klog.ErrorS(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace)
if err := c.client.List(ctx, &eqList, client.InNamespace(pod.Namespace)); err != nil {
logger.Error(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace)
return
}

Expand All @@ -724,11 +740,13 @@ func (c *CapacityScheduling) addPod(obj interface{}) {

err := elasticQuotaInfo.addPodIfNotPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
}
}

func (c *CapacityScheduling) updatePod(oldObj, newObj interface{}) {
logger := klog.FromContext(context.TODO())

oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)

Expand All @@ -744,13 +762,15 @@ func (c *CapacityScheduling) updatePod(oldObj, newObj interface{}) {
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.deletePodIfPresent(newPod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(newPod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(newPod))
}
}
}
}

func (c *CapacityScheduling) deletePod(obj interface{}) {
logger := klog.FromContext(context.TODO())

pod := obj.(*v1.Pod)
c.Lock()
defer c.Unlock()
Expand All @@ -759,7 +779,7 @@ func (c *CapacityScheduling) deletePod(obj interface{}) {
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.deletePodIfPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/capacityscheduling/capacity_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func TestReserve(t *testing.T) {

state := framework.NewCycleState()
for i, pod := range tt.pods {
got := cs.Reserve(nil, state, pod, "node-a")
got := cs.Reserve(context.TODO(), state, pod, "node-a")
if got.Code() != tt.expectedCodes[i] {
t.Errorf("expected %v, got %v : %v", tt.expected[i], got.Code(), got.Message())
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestUnreserve(t *testing.T) {

state := framework.NewCycleState()
for i, pod := range tt.pods {
cs.Unreserve(nil, state, pod, "node-a")
cs.Unreserve(context.TODO(), state, pod, "node-a")
if !reflect.DeepEqual(cs.elasticQuotaInfos["ns1"], tt.expected[i]["ns1"]) {
t.Errorf("expected %#v, got %#v", tt.expected[i]["ns1"].Used, cs.elasticQuotaInfos["ns1"].Used)
}
Expand Down
33 changes: 18 additions & 15 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *framework.CycleState, *corev1.Pod) Status
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
CalculateAssignedPods(string, string) int
ActivateSiblings(pod *corev1.Pod, state *framework.CycleState)
GetCreationTimestamp(context.Context, *corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(context.Context, string)
CalculateAssignedPods(context.Context, string, string) int
ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState)
BackoffPodGroup(string, time.Duration)
}

Expand Down Expand Up @@ -112,7 +112,8 @@ func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Durati

// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) {
func (pgMgr *PodGroupManager) ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState) {
lh := klog.FromContext(ctx)
pgName := util.GetPodGroupLabel(pod)
if pgName == "" {
return
Expand All @@ -129,7 +130,7 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}),
)
if err != nil {
klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName)
lh.Error(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName)
return
}

Expand Down Expand Up @@ -159,7 +160,8 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// 2. the total number of pods in the podgroup is less than the minimum number of pods
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
lh := klog.FromContext(ctx)
lh.V(5).Info("Pre-filter", "pod", klog.KObj(pod))
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pg == nil {
return nil
Expand Down Expand Up @@ -202,7 +204,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
minResources[corev1.ResourcePods] = *podQuantity
err = CheckClusterResource(ctx, nodes, minResources, pgFullName)
if err != nil {
klog.ErrorS(err, "Failed to PreFilter", "podGroup", klog.KObj(pg))
lh.Error(err, "Failed to PreFilter", "podGroup", klog.KObj(pg))
return err
}
pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout)
Expand All @@ -220,7 +222,7 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
assigned := pgMgr.CalculateAssignedPods(ctx, pg.Name, pg.Namespace)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
Expand All @@ -243,20 +245,20 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
func (pgMgr *PodGroupManager) GetCreationTimestamp(ctx context.Context, pod *corev1.Pod, ts time.Time) time.Time {
pgName := util.GetPodGroupLabel(pod)
if len(pgName) == 0 {
return ts
}
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
return ts
}
return pg.CreationTimestamp.Time
}

// DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter.
func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) {
func (pgMgr *PodGroupManager) DeletePermittedPodGroup(_ context.Context, pgFullName string) {
pgMgr.permittedPG.Delete(pgFullName)
}

Expand All @@ -274,10 +276,11 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod)
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int {
func (pgMgr *PodGroupManager) CalculateAssignedPods(ctx context.Context, podGroupName, namespace string) int {
lh := klog.FromContext(ctx)
nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List()
if err != nil {
klog.ErrorS(err, "Cannot get nodeInfos from frameworkHandle")
lh.Error(err, "Cannot get nodeInfos from frameworkHandle")
return 0
}
var count int
Expand Down Expand Up @@ -354,6 +357,6 @@ func getNodeResource(ctx context.Context, info *framework.NodeInfo, desiredPodGr
leftResource.ScalarResources[k] = allocatableEx - requestEx
}
}
klog.V(4).InfoS("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource)
logger.V(4).Info("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource)
return &leftResource
}
Loading

0 comments on commit aadfeeb

Please sign in to comment.