diff --git a/pkg/coscheduling/core/core.go b/pkg/coscheduling/core/core.go index 2a1607231..b1b068034 100644 --- a/pkg/coscheduling/core/core.go +++ b/pkg/coscheduling/core/core.go @@ -85,6 +85,29 @@ type PodGroupManager struct { sync.RWMutex } +func AddPodFactory(pgMgr *PodGroupManager) func(obj interface{}) { + return func(obj interface{}) { + p, ok := obj.(*corev1.Pod) + if !ok { + return + } + if p.Spec.NodeName == "" { + return + } + pgFullName, _ := pgMgr.GetPodGroup(context.Background(), p) + if pgFullName == "" { + return + } + pgMgr.RWMutex.Lock() + defer pgMgr.RWMutex.Unlock() + if assigned, exist := pgMgr.assignedPodsByPG[pgFullName]; exist { + assigned.Insert(p.Name) + } else { + pgMgr.assignedPodsByPG[pgFullName] = sets.New(p.Name) + } + } +} + // NewPodGroupManager creates a new operation object. func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager { pgMgr := &PodGroupManager{ @@ -97,10 +120,14 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha assignedPodsByPG: map[string]sets.Set[string]{}, } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: AddPodFactory(pgMgr), DeleteFunc: func(obj interface{}) { switch t := obj.(type) { case *corev1.Pod: pod := t + if pod.Spec.NodeName == "" { + return + } pgMgr.Unreserve(context.Background(), pod) return case cache.DeletedFinalStateUnknown: @@ -108,6 +135,9 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha if !ok { return } + if pod.Spec.NodeName == "" { + return + } pgMgr.Unreserve(context.Background(), pod) return default: diff --git a/pkg/coscheduling/core/core_test.go b/pkg/coscheduling/core/core_test.go index c57f659c8..8a0f5adb6 100644 --- a/pkg/coscheduling/core/core_test.go +++ b/pkg/coscheduling/core/core_test.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" clicache "k8s.io/client-go/tools/cache" @@ -169,6 +170,7 @@ func TestPreFilter(t *testing.T) { scheduleTimeout: &scheduleTimeout, permittedPG: newCache(), backedOffPG: newCache(), + assignedPodsByPG: make(map[string]sets.Set[string]), } informerFactory.Start(ctx.Done()) @@ -263,19 +265,17 @@ func TestPermit(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(cs, 0) podInformer := informerFactory.Core().V1().Pods() - pgMgr := &PodGroupManager{ - client: client, - snapshotSharedLister: tu.NewFakeSharedLister(tt.existingPods, nodes), - podLister: podInformer.Lister(), - scheduleTimeout: &scheduleTimeout, - } + pgMgr := NewPodGroupManager(client, tu.NewFakeSharedLister(tt.existingPods, nodes), &scheduleTimeout, podInformer) informerFactory.Start(ctx.Done()) if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { t.Fatal("WaitForCacheSync failed") } + addFunc := AddPodFactory(pgMgr) for _, p := range tt.existingPods { podInformer.Informer().GetStore().Add(p) + // we call add func here because we can not ensure existing pods are added before premit are called + addFunc(p) } if got := pgMgr.Permit(ctx, tt.pod); got != tt.want { diff --git a/pkg/coscheduling/coscheduling.go b/pkg/coscheduling/coscheduling.go index c60ba19dd..e69a09451 100644 --- a/pkg/coscheduling/coscheduling.go +++ b/pkg/coscheduling/coscheduling.go @@ -159,7 +159,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt // This indicates there are already enough Pods satisfying the PodGroup, // so don't bother to reject the whole PodGroup. - assigned := cs.pgMgr.GetAssignedPodCount(pg.Name) + assigned := cs.pgMgr.GetAssignedPodCount(pgName) if assigned >= int(pg.Spec.MinMember) { klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) diff --git a/pkg/coscheduling/coscheduling_test.go b/pkg/coscheduling/coscheduling_test.go index b79f15495..1ef4140da 100644 --- a/pkg/coscheduling/coscheduling_test.go +++ b/pkg/coscheduling/coscheduling_test.go @@ -605,24 +605,27 @@ func TestPostFilter(t *testing.T) { cs := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(cs, 0) podInformer := informerFactory.Core().V1().Pods() - + pgMgr := core.NewPodGroupManager( + client, + tu.NewFakeSharedLister(tt.existingPods, nodes), + &scheduleTimeout, + podInformer, + ) pl := &Coscheduling{ frameworkHandler: f, - pgMgr: core.NewPodGroupManager( - client, - tu.NewFakeSharedLister(tt.existingPods, nodes), - &scheduleTimeout, - podInformer, - ), - scheduleTimeout: &scheduleTimeout, + pgMgr: pgMgr, + scheduleTimeout: &scheduleTimeout, } informerFactory.Start(ctx.Done()) if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { t.Fatal("WaitForCacheSync failed") } + addFunc := core.AddPodFactory(pgMgr) for _, p := range tt.existingPods { podInformer.Informer().GetStore().Add(p) + // we call add func here because we can not ensure existing pods are added before premit are called + addFunc(p) } _, got := pl.PostFilter(ctx, framework.NewCycleState(), tt.pod, nodeStatusMap)