Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache assigned pod count #708

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hack/install-envtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ version=$(cat ${SCRIPT_ROOT}/go.mod | grep 'k8s.io/kubernetes' | grep -v '=>' |

GOPATH=$(go env GOPATH)
TEMP_DIR=${TMPDIR-/tmp}
go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
go install sigs.k8s.io/controller-runtime/tools/setup-envtest@release-0.17
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved
"${GOPATH}"/bin/setup-envtest use -p env "${version}" > "${TEMP_DIR}/setup-envtest"

115 changes: 92 additions & 23 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
informerv1 "k8s.io/client-go/informers/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -54,10 +56,11 @@ const (
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *corev1.Pod) Status
Unreserve(context.Context, *corev1.Pod)
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetAssignedPodCount(string) int
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
CalculateAssignedPods(string, string) int
ActivateSiblings(pod *corev1.Pod, state *framework.CycleState)
BackoffPodGroup(string, time.Duration)
}
Expand All @@ -77,9 +80,34 @@ type PodGroupManager struct {
backedOffPG *gochache.Cache
// podLister is pod lister
podLister listerv1.PodLister
//
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved
assignedPodsByPG map[string]sets.Set[string]
sync.RWMutex
}

func AddPodFactory(pgMgr *PodGroupManager) func(obj interface{}) {
return func(obj interface{}) {
p, ok := obj.(*corev1.Pod)
if !ok {
return
}
if p.Spec.NodeName == "" {
return
}
pgFullName, _ := pgMgr.GetPodGroup(context.Background(), p)
if pgFullName == "" {
return
}
pgMgr.RWMutex.Lock()
defer pgMgr.RWMutex.Unlock()
if assigned, exist := pgMgr.assignedPodsByPG[pgFullName]; exist {
assigned.Insert(p.Name)
} else {
pgMgr.assignedPodsByPG[pgFullName] = sets.New(p.Name)
}
}
}

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
pgMgr := &PodGroupManager{
Expand All @@ -89,10 +117,46 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha
podLister: podInformer.Lister(),
permittedPG: gochache.New(3*time.Second, 3*time.Second),
backedOffPG: gochache.New(10*time.Second, 10*time.Second),
assignedPodsByPG: map[string]sets.Set[string]{},
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: AddPodFactory(pgMgr),
DeleteFunc: func(obj interface{}) {
switch t := obj.(type) {
case *corev1.Pod:
pod := t
if pod.Spec.NodeName == "" {
return
}
pgMgr.Unreserve(context.Background(), pod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PodDelete event consists of 3 types of events:

  • Pod failed
  • Pod completed (successfully)
  • Pod get deleted

but for completed Pod, we should still count them as part of gang, right? could you also help if integration test covers this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When pod completed, it will be removed from NodeInfo. CalculateAssignedPods will count pods in NodeInfo, so we did not count completed pods previously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will see if integration test covers this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we did not count completed pods previously.

True. I'm wondering if we fix this glitch in this PR - in DeleteFunc(), additionally check if the Pod is completed, if so, do NOT invalidate it from the assignedPodsByPG cache. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#560 (comment)

We have discussed in this issues about whether we should count completed pods.
Is there new situation to count completed pods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It seems restart the whole Job is more conventional for now, then let's postpone the idea until new requirement emerges.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

return
case cache.DeletedFinalStateUnknown:
pod, ok := t.Obj.(*corev1.Pod)
if !ok {
return
}
if pod.Spec.NodeName == "" {
return
}
pgMgr.Unreserve(context.Background(), pod)
return
default:
return
}
},
})
return pgMgr
}

func (pgMgr *PodGroupManager) GetAssignedPodCount(pgName string) int {
pgMgr.RWMutex.RLock()
defer pgMgr.RWMutex.RUnlock()
if assigned, exist := pgMgr.assignedPodsByPG[pgName]; exist {
return len(assigned)
}
return 0
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved
}

func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) {
if backoff == time.Duration(0) {
return
Expand Down Expand Up @@ -203,15 +267,40 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Statu
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
pgMgr.RWMutex.RLock()
defer pgMgr.RWMutex.RUnlock()
assigned, exist := pgMgr.assignedPodsByPG[pgFullName]
if !exist {
assigned = sets.Set[string]{}
pgMgr.assignedPodsByPG[pgFullName] = assigned
}
assigned.Insert(pod.Name)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
if len(assigned) >= int(pg.Spec.MinMember) {
return Success
}
return Wait
}

// Unreserve removes assigned pod from asssigned pod map from PodGroupInfo when schedule or bind failed.
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved
func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, pod *corev1.Pod) {
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved
pgFullName, _ := pgMgr.GetPodGroup(ctx, pod)
if pgFullName == "" {
return
}

pgMgr.RWMutex.Lock()
defer pgMgr.RWMutex.Unlock()
assigned, exist := pgMgr.assignedPodsByPG[pgFullName]
if exist {
assigned.Delete(pod.Name)
if len(assigned) == 0 {
delete(pgMgr.assignedPodsByPG, pgFullName)
}
}
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
pgName := util.GetPodGroupLabel(pod)
Expand Down Expand Up @@ -243,26 +332,6 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod)
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int {
nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List()
if err != nil {
klog.ErrorS(err, "Cannot get nodeInfos from frameworkHandle")
return 0
}
var count int
for _, nodeInfo := range nodeInfos {
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" {
count++
}
}
}

return count
}

// CheckClusterResource checks if resource capacity of the cluster can satisfy <resourceRequest>.
// It returns an error detailing the resource gap if not satisfied; otherwise returns nil.
func CheckClusterResource(nodeList []*framework.NodeInfo, resourceRequest corev1.ResourceList, desiredPodGroupName string) error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clicache "k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -169,6 +170,7 @@ func TestPreFilter(t *testing.T) {
scheduleTimeout: &scheduleTimeout,
permittedPG: newCache(),
backedOffPG: newCache(),
assignedPodsByPG: make(map[string]sets.Set[string]),
}

informerFactory.Start(ctx.Done())
Expand Down Expand Up @@ -263,19 +265,17 @@ func TestPermit(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(cs, 0)
podInformer := informerFactory.Core().V1().Pods()

pgMgr := &PodGroupManager{
client: client,
snapshotSharedLister: tu.NewFakeSharedLister(tt.existingPods, nodes),
podLister: podInformer.Lister(),
scheduleTimeout: &scheduleTimeout,
}
pgMgr := NewPodGroupManager(client, tu.NewFakeSharedLister(tt.existingPods, nodes), &scheduleTimeout, podInformer)

informerFactory.Start(ctx.Done())
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
t.Fatal("WaitForCacheSync failed")
}
addFunc := AddPodFactory(pgMgr)
for _, p := range tt.existingPods {
podInformer.Informer().GetStore().Add(p)
// we call add func here because we can not ensure existing pods are added before premit are called
addFunc(p)
}

if got := pgMgr.Permit(ctx, tt.pod); got != tt.want {
Expand Down
3 changes: 2 additions & 1 deletion pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt

// This indicates there are already enough Pods satisfying the PodGroup,
// so don't bother to reject the whole PodGroup.
assigned := cs.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace)
assigned := cs.pgMgr.GetAssignedPodCount(pgName)
if assigned >= int(pg.Spec.MinMember) {
klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
Expand Down Expand Up @@ -247,6 +247,7 @@ func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleSta
if pg == nil {
return
}
cs.pgMgr.Unreserve(ctx, pod)
cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
if waitingPod.GetPod().Namespace == pod.Namespace && util.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name {
klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg))
Expand Down
19 changes: 11 additions & 8 deletions pkg/coscheduling/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,24 +605,27 @@ func TestPostFilter(t *testing.T) {
cs := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(cs, 0)
podInformer := informerFactory.Core().V1().Pods()

pgMgr := core.NewPodGroupManager(
client,
tu.NewFakeSharedLister(tt.existingPods, nodes),
&scheduleTimeout,
podInformer,
)
pl := &Coscheduling{
frameworkHandler: f,
pgMgr: core.NewPodGroupManager(
client,
tu.NewFakeSharedLister(tt.existingPods, nodes),
&scheduleTimeout,
podInformer,
),
scheduleTimeout: &scheduleTimeout,
pgMgr: pgMgr,
scheduleTimeout: &scheduleTimeout,
}

informerFactory.Start(ctx.Done())
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
t.Fatal("WaitForCacheSync failed")
}
addFunc := core.AddPodFactory(pgMgr)
for _, p := range tt.existingPods {
podInformer.Informer().GetStore().Add(p)
// we call add func here because we can not ensure existing pods are added before premit are called
addFunc(p)
}

_, got := pl.PostFilter(ctx, framework.NewCycleState(), tt.pod, nodeStatusMap)
Expand Down
Loading