Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor coscheduling to use controller-runtime client #652

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 14 additions & 30 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ import (
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
pginformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions/scheduling/v1alpha1"
pglister "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/util"
)

Expand All @@ -56,7 +54,7 @@ const (
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *corev1.Pod) Status
GetPodGroup(*corev1.Pod) (string, *v1alpha1.PodGroup)
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
CalculateAssignedPods(string, string) int
Expand All @@ -66,8 +64,8 @@ type Manager interface {

// PodGroupManager defines the scheduling operation called
type PodGroupManager struct {
// pgClient is a podGroup client
pgClient pgclientset.Interface
// client is a generic controller-runtime client to manipulate both core resources and PodGroups.
client client.Client
// snapshotSharedLister is pod shared list
snapshotSharedLister framework.SharedLister
// scheduleTimeout is the default timeout for podgroup scheduling.
Expand All @@ -77,21 +75,17 @@ type PodGroupManager struct {
permittedPG *gochache.Cache
// backedOffPG stores the podgorup name which failed scheudling recently.
backedOffPG *gochache.Cache
// pgLister is podgroup lister
pgLister pglister.PodGroupLister
// podLister is pod lister
podLister listerv1.PodLister
sync.RWMutex
}

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(pgClient pgclientset.Interface, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration,
pgInformer pginformer.PodGroupInformer, podInformer informerv1.PodInformer) *PodGroupManager {
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
pgMgr := &PodGroupManager{
pgClient: pgClient,
client: client,
snapshotSharedLister: snapshotSharedLister,
scheduleTimeout: scheduleTimeout,
pgLister: pgInformer.Lister(),
podLister: podInformer.Lister(),
permittedPG: gochache.New(3*time.Second, 3*time.Second),
backedOffPG: gochache.New(10*time.Second, 10*time.Second),
Expand Down Expand Up @@ -149,7 +143,7 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
pgFullName, pg := pgMgr.GetPodGroup(pod)
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pg == nil {
return nil
}
Expand Down Expand Up @@ -200,7 +194,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er

// Permit permits a pod to run, if the minMember match, it would send a signal to chan.
func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
pgFullName, pg := pgMgr.GetPodGroup(pod)
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pgFullName == "" {
return PodGroupNotSpecified
}
Expand All @@ -224,8 +218,8 @@ func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time
if len(pgName) == 0 {
return ts
}
pg, err := pgMgr.pgLister.PodGroups(pod.Namespace).Get(pgName)
if err != nil {
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
return ts
}
return pg.CreationTimestamp.Time
Expand All @@ -236,27 +230,17 @@ func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) {
pgMgr.permittedPG.Delete(pgFullName)
}

// PatchPodGroup patches a podGroup.
func (pgMgr *PodGroupManager) PatchPodGroup(pgName string, namespace string, patch []byte) error {
if len(patch) == 0 {
return nil
}
_, err := pgMgr.pgClient.SchedulingV1alpha1().PodGroups(namespace).Patch(context.TODO(), pgName,
types.MergePatchType, patch, metav1.PatchOptions{})
return err
}

// GetPodGroup returns the PodGroup that a Pod belongs to in cache.
func (pgMgr *PodGroupManager) GetPodGroup(pod *corev1.Pod) (string, *v1alpha1.PodGroup) {
func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) {
pgName := util.GetPodGroupLabel(pod)
if len(pgName) == 0 {
return "", nil
}
pg, err := pgMgr.pgLister.PodGroups(pod.Namespace).Get(pgName)
if err != nil {
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), nil
}
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), pg
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
Expand Down
Loading