From edd5da8aaf7be8037c083025bd14791c63f4e192 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Thu, 28 Mar 2024 20:08:35 +0800 Subject: [PATCH] cache assigned pod count Signed-off-by: KunWuLuan --- pkg/coscheduling/core/core.go | 117 ++++++++++++++++++----- pkg/coscheduling/core/core_test.go | 12 +-- pkg/coscheduling/coscheduling.go | 3 +- pkg/coscheduling/coscheduling_test.go | 19 ++-- test/integration/coscheduling_test.go | 132 ++++++++++++++++++++++++++ 5 files changed, 242 insertions(+), 41 deletions(-) diff --git a/pkg/coscheduling/core/core.go b/pkg/coscheduling/core/core.go index 177467614..cd7ad6fb7 100644 --- a/pkg/coscheduling/core/core.go +++ b/pkg/coscheduling/core/core.go @@ -28,8 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" informerv1 "k8s.io/client-go/informers/core/v1" listerv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "sigs.k8s.io/controller-runtime/pkg/client" @@ -64,10 +66,11 @@ func (s *PermitState) Clone() framework.StateData { type Manager interface { PreFilter(context.Context, *corev1.Pod) error Permit(context.Context, *framework.CycleState, *corev1.Pod) Status + Unreserve(context.Context, *corev1.Pod) GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) + GetAssignedPodCount(string) int GetCreationTimestamp(context.Context, *corev1.Pod, time.Time) time.Time DeletePermittedPodGroup(context.Context, string) - CalculateAssignedPods(context.Context, string, string) int ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState) BackoffPodGroup(string, time.Duration) } @@ -87,9 +90,34 @@ type PodGroupManager struct { backedOffPG *gocache.Cache // podLister is pod lister podLister listerv1.PodLister + // assignedPodsByPG stores the pods assumed or bound for podgroups + assignedPodsByPG map[string]sets.Set[string] sync.RWMutex } +func AddPodFactory(pgMgr *PodGroupManager) func(obj interface{}) { + return func(obj interface{}) { + p, ok := obj.(*corev1.Pod) + if !ok { + return + } + if p.Spec.NodeName == "" { + return + } + pgFullName, _ := pgMgr.GetPodGroup(context.Background(), p) + if pgFullName == "" { + return + } + pgMgr.RWMutex.Lock() + defer pgMgr.RWMutex.Unlock() + if assigned, exist := pgMgr.assignedPodsByPG[pgFullName]; exist { + assigned.Insert(p.Name) + } else { + pgMgr.assignedPodsByPG[pgFullName] = sets.New(p.Name) + } + } +} + // NewPodGroupManager creates a new operation object. func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager { pgMgr := &PodGroupManager{ @@ -99,10 +127,43 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha podLister: podInformer.Lister(), permittedPG: gocache.New(3*time.Second, 3*time.Second), backedOffPG: gocache.New(10*time.Second, 10*time.Second), + assignedPodsByPG: map[string]sets.Set[string]{}, } + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: AddPodFactory(pgMgr), + DeleteFunc: func(obj interface{}) { + switch t := obj.(type) { + case *corev1.Pod: + pod := t + if pod.Spec.NodeName == "" { + return + } + pgMgr.Unreserve(context.Background(), pod) + return + case cache.DeletedFinalStateUnknown: + pod, ok := t.Obj.(*corev1.Pod) + if !ok { + return + } + if pod.Spec.NodeName == "" { + return + } + pgMgr.Unreserve(context.Background(), pod) + return + default: + return + } + }, + }) return pgMgr } +func (pgMgr *PodGroupManager) GetAssignedPodCount(pgName string) int { + pgMgr.RWMutex.RLock() + defer pgMgr.RWMutex.RUnlock() + return len(pgMgr.assignedPodsByPG[pgName]) +} + func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) { if backoff == time.Duration(0) { return @@ -222,16 +283,23 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle return PodGroupNotFound } - assigned := pgMgr.CalculateAssignedPods(ctx, pg.Name, pg.Namespace) + pgMgr.RWMutex.RLock() + defer pgMgr.RWMutex.RUnlock() + assigned, exist := pgMgr.assignedPodsByPG[pgFullName] + if !exist { + assigned = sets.Set[string]{} + pgMgr.assignedPodsByPG[pgFullName] = assigned + } + assigned.Insert(pod.Name) // The number of pods that have been assigned nodes is calculated from the snapshot. // The current pod in not included in the snapshot during the current scheduling cycle. - if int32(assigned)+1 >= pg.Spec.MinMember { + if len(assigned) >= int(pg.Spec.MinMember) { return Success } - if assigned == 0 { + if len(assigned) == 1 { // Given we've reached Permit(), it's mean all PreFilter checks (minMember & minResource) - // already pass through, so if assigned == 0, it could be due to: + // already pass through, so if len(assigned) == 1, it could be due to: // - minResource get satisfied // - new pods added // In either case, we should and only should use this 0-th pod to trigger activating @@ -244,6 +312,24 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle return Wait } +// Unreserve invalidates assigned pod from assignedPodsByPG when schedule or bind failed. +func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, pod *corev1.Pod) { + pgFullName, _ := pgMgr.GetPodGroup(ctx, pod) + if pgFullName == "" { + return + } + + pgMgr.RWMutex.Lock() + defer pgMgr.RWMutex.Unlock() + assigned, exist := pgMgr.assignedPodsByPG[pgFullName] + if exist { + assigned.Delete(pod.Name) + if len(assigned) == 0 { + delete(pgMgr.assignedPodsByPG, pgFullName) + } + } +} + // GetCreationTimestamp returns the creation time of a podGroup or a pod. func (pgMgr *PodGroupManager) GetCreationTimestamp(ctx context.Context, pod *corev1.Pod, ts time.Time) time.Time { pgName := util.GetPodGroupLabel(pod) @@ -275,27 +361,6 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg } -// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. -func (pgMgr *PodGroupManager) CalculateAssignedPods(ctx context.Context, podGroupName, namespace string) int { - lh := klog.FromContext(ctx) - nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List() - if err != nil { - lh.Error(err, "Cannot get nodeInfos from frameworkHandle") - return 0 - } - var count int - for _, nodeInfo := range nodeInfos { - for _, podInfo := range nodeInfo.Pods { - pod := podInfo.Pod - if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" { - count++ - } - } - } - - return count -} - // CheckClusterResource checks if resource capacity of the cluster can satisfy . // It returns an error detailing the resource gap if not satisfied; otherwise returns nil. func CheckClusterResource(ctx context.Context, nodeList []*framework.NodeInfo, resourceRequest corev1.ResourceList, desiredPodGroupName string) error { diff --git a/pkg/coscheduling/core/core_test.go b/pkg/coscheduling/core/core_test.go index ba7205da4..0968492d8 100644 --- a/pkg/coscheduling/core/core_test.go +++ b/pkg/coscheduling/core/core_test.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" clicache "k8s.io/client-go/tools/cache" @@ -170,6 +171,7 @@ func TestPreFilter(t *testing.T) { scheduleTimeout: &scheduleTimeout, permittedPG: newCache(), backedOffPG: newCache(), + assignedPodsByPG: make(map[string]sets.Set[string]), } informerFactory.Start(ctx.Done()) @@ -264,19 +266,17 @@ func TestPermit(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(cs, 0) podInformer := informerFactory.Core().V1().Pods() - pgMgr := &PodGroupManager{ - client: client, - snapshotSharedLister: tu.NewFakeSharedLister(tt.existingPods, nodes), - podLister: podInformer.Lister(), - scheduleTimeout: &scheduleTimeout, - } + pgMgr := NewPodGroupManager(client, tu.NewFakeSharedLister(tt.existingPods, nodes), &scheduleTimeout, podInformer) informerFactory.Start(ctx.Done()) if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { t.Fatal("WaitForCacheSync failed") } + addFunc := AddPodFactory(pgMgr) for _, p := range tt.existingPods { podInformer.Informer().GetStore().Add(p) + // we call add func here because we can not ensure existing pods are added before premit are called + addFunc(p) } if got := pgMgr.Permit(ctx, &framework.CycleState{}, tt.pod); got != tt.want { diff --git a/pkg/coscheduling/coscheduling.go b/pkg/coscheduling/coscheduling.go index a6e50122b..1dcf8705a 100644 --- a/pkg/coscheduling/coscheduling.go +++ b/pkg/coscheduling/coscheduling.go @@ -166,7 +166,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt // This indicates there are already enough Pods satisfying the PodGroup, // so don't bother to reject the whole PodGroup. - assigned := cs.pgMgr.CalculateAssignedPods(ctx, pg.Name, pod.Namespace) + assigned := cs.pgMgr.GetAssignedPodCount(pgName) if assigned >= int(pg.Spec.MinMember) { lh.V(4).Info("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) @@ -256,6 +256,7 @@ func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleSta if pg == nil { return } + cs.pgMgr.Unreserve(ctx, pod) cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if waitingPod.GetPod().Namespace == pod.Namespace && util.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { lh.V(3).Info("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg)) diff --git a/pkg/coscheduling/coscheduling_test.go b/pkg/coscheduling/coscheduling_test.go index 85cf73d5c..f0ade5f10 100644 --- a/pkg/coscheduling/coscheduling_test.go +++ b/pkg/coscheduling/coscheduling_test.go @@ -622,24 +622,27 @@ func TestPostFilter(t *testing.T) { cs := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(cs, 0) podInformer := informerFactory.Core().V1().Pods() - + pgMgr := core.NewPodGroupManager( + client, + tu.NewFakeSharedLister(tt.existingPods, nodes), + &scheduleTimeout, + podInformer, + ) pl := &Coscheduling{ frameworkHandler: f, - pgMgr: core.NewPodGroupManager( - client, - tu.NewFakeSharedLister(tt.existingPods, nodes), - &scheduleTimeout, - podInformer, - ), - scheduleTimeout: &scheduleTimeout, + pgMgr: pgMgr, + scheduleTimeout: &scheduleTimeout, } informerFactory.Start(ctx.Done()) if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { t.Fatal("WaitForCacheSync failed") } + addFunc := core.AddPodFactory(pgMgr) for _, p := range tt.existingPods { podInformer.Informer().GetStore().Add(p) + // we call add func here because we can not ensure existing pods are added before premit are called + addFunc(p) } _, got := pl.PostFilter(ctx, framework.NewCycleState(), tt.pod, nodeStatusMap) diff --git a/test/integration/coscheduling_test.go b/test/integration/coscheduling_test.go index a2bce1d3b..18c514e81 100644 --- a/test/integration/coscheduling_test.go +++ b/test/integration/coscheduling_test.go @@ -376,6 +376,138 @@ func TestCoschedulingPlugin(t *testing.T) { } } +func TestPodCompleted(t *testing.T) { + testCtx := &testContext{} + testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background()) + + cs := kubernetes.NewForConfigOrDie(globalKubeConfig) + extClient := util.NewClientOrDie(globalKubeConfig) + testCtx.ClientSet = cs + testCtx.KubeConfig = globalKubeConfig + + if err := wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) { + groupList, _, err := cs.ServerGroupsAndResources() + if err != nil { + return false, nil + } + for _, group := range groupList { + if group.Name == scheduling.GroupName { + t.Log("The CRD is ready to serve") + return true, nil + } + } + return false, nil + }); err != nil { + t.Fatalf("Timed out waiting for CRD to be ready: %v", err) + } + + cfg, err := util.NewDefaultSchedulerComponentConfig() + if err != nil { + t.Fatal(err) + } + cfg.Profiles[0].Plugins.QueueSort = schedapi.PluginSet{ + Enabled: []schedapi.Plugin{{Name: coscheduling.Name}}, + Disabled: []schedapi.Plugin{{Name: "*"}}, + } + cfg.Profiles[0].Plugins.PreFilter.Enabled = append(cfg.Profiles[0].Plugins.PreFilter.Enabled, schedapi.Plugin{Name: coscheduling.Name}) + cfg.Profiles[0].Plugins.PostFilter.Enabled = append(cfg.Profiles[0].Plugins.PostFilter.Enabled, schedapi.Plugin{Name: coscheduling.Name}) + cfg.Profiles[0].Plugins.Permit.Enabled = append(cfg.Profiles[0].Plugins.Permit.Enabled, schedapi.Plugin{Name: coscheduling.Name}) + cfg.Profiles[0].PluginConfig = append(cfg.Profiles[0].PluginConfig, schedapi.PluginConfig{ + Name: coscheduling.Name, + Args: &schedconfig.CoschedulingArgs{ + PermitWaitingTimeSeconds: 3, + PodGroupBackoffSeconds: 1, + }, + }) + + ns := fmt.Sprintf("integration-test-%v", string(uuid.NewUUID())) + createNamespace(t, testCtx, ns) + + testCtx = initTestSchedulerWithOptions( + t, + testCtx, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(fwkruntime.Registry{coscheduling.Name: coscheduling.New}), + ) + syncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + t.Log("Init scheduler success") + defer cleanupTest(t, testCtx) + + // Create Nodes. + node1Name := "fake-node-1" + node1 := st.MakeNode().Name(node1Name).Label("node", node1Name).Obj() + node1.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(6, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node1.Status.Capacity = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(6, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + pause := imageutils.GetPauseImageName() + for _, tt := range []struct { + name string + completed []*v1.Pod + pods []*v1.Pod + podGroups []*v1alpha1.PodGroup + }{ + { + name: "pg cannot be scheduled with completed pod", + completed: []*v1.Pod{ + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Node(node1Name).Phase(v1.PodSucceeded).Obj(), pause)}, + pods: []*v1.Pod{ + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-3").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-4").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-5").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-6").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + }, + podGroups: []*v1alpha1.PodGroup{ + util.MakePG("pg1-1", ns, 6, nil, nil), + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + t.Logf("Start-coscheduling-test %v", tt.name) + defer cleanupPodGroups(testCtx.Ctx, extClient, tt.podGroups) + // create pod group + if err := createPodGroups(testCtx.Ctx, extClient, tt.podGroups); err != nil { + t.Fatal(err) + } + defer cleanupPods(t, testCtx, tt.pods) + // Create Pods, we will expect them to be scheduled in a reversed order. + for i := range tt.completed { + t.Logf("Creating pod %v", tt.completed[i].Name) + if _, err := cs.CoreV1().Pods(tt.completed[i].Namespace).Create(testCtx.Ctx, tt.completed[i], metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", tt.completed[i].Name, err) + } + t.Logf("Create pod %v success", tt.completed[i].Name) + } + for i := range tt.pods { + t.Logf("Creating pod %v", tt.pods[i].Name) + if _, err := cs.CoreV1().Pods(tt.pods[i].Namespace).Create(testCtx.Ctx, tt.pods[i], metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", tt.pods[i].Name, err) + } + t.Logf("Create pod %v success", tt.pods[i].Name) + } + time.Sleep(10 * time.Second) + for _, v := range tt.pods { + if podScheduled(t, cs, ns, v.Name) { + t.Fatalf("%v should not be scheduled", v.Name) + } + } + t.Logf("Case %v finished", tt.name) + }) + } +} + func TestPodgroupBackoff(t *testing.T) { testCtx := &testContext{} testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())