From 0386051d978b50d07735d5825e80a21cb7240711 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 27 Jun 2023 18:58:15 +0200 Subject: [PATCH] nrt: cache: add support for dedicated informer Add an option to create and use a separate informer to get unfiltered pod listing from the apiserver. Due to mismatched view with respect to the kubelet, the plugin needs access to pod in terminal phase which are not deleted to make sure the reconciliation is done correctly. xref: https://github.com/kubernetes-sigs/scheduler-plugins/issues/598 xref: https://github.com/kubernetes/kubernetes/issues/119423 Signed-off-by: Francesco Romani --- apis/config/types.go | 13 ++++ apis/config/v1/defaults.go | 5 ++ apis/config/v1/defaults_test.go | 1 + apis/config/v1/types.go | 13 ++++ apis/config/v1/zz_generated.conversion.go | 2 + apis/config/v1/zz_generated.deepcopy.go | 5 ++ apis/config/v1beta3/defaults.go | 5 ++ apis/config/v1beta3/defaults_test.go | 1 + apis/config/v1beta3/types.go | 13 ++++ .../config/v1beta3/zz_generated.conversion.go | 2 + apis/config/v1beta3/zz_generated.deepcopy.go | 5 ++ apis/config/zz_generated.deepcopy.go | 5 ++ pkg/noderesourcetopology/cache/overreserve.go | 7 --- pkg/noderesourcetopology/cache/store.go | 4 -- pkg/noderesourcetopology/cache/store_test.go | 12 +++- pkg/noderesourcetopology/pluginhelpers.go | 3 +- .../podprovider/podprovider.go | 63 +++++++++++++++++++ 17 files changed, 146 insertions(+), 13 deletions(-) create mode 100644 pkg/noderesourcetopology/podprovider/podprovider.go diff --git a/apis/config/types.go b/apis/config/types.go index 959c80fd8a..3a702ee4dd 100644 --- a/apis/config/types.go +++ b/apis/config/types.go @@ -176,6 +176,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -192,6 +200,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Shared" + InformerMode *CacheInformerMode } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/defaults.go b/apis/config/v1/defaults.go index 6f1b4df5b1..ec42590821 100644 --- a/apis/config/v1/defaults.go +++ b/apis/config/v1/defaults.go @@ -89,6 +89,8 @@ var ( defaultResyncMethod = CacheResyncAutodetect + defaultInformerMode = CacheInformerShared + // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } + if obj.Cache.InformerMode == nil { + obj.Cache.InformerMode = &defaultInformerMode + } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1/defaults_test.go b/apis/config/v1/defaults_test.go index f964eab8d7..6f42a78db2 100644 --- a/apis/config/v1/defaults_test.go +++ b/apis/config/v1/defaults_test.go @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, + InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1/types.go b/apis/config/v1/types.go index a0e92d4bbf..c85fa7a20d 100644 --- a/apis/config/v1/types.go +++ b/apis/config/v1/types.go @@ -174,6 +174,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Shared" + InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/zz_generated.conversion.go b/apis/config/v1/zz_generated.conversion.go index 4c8388b74f..90f58e7915 100644 --- a/apis/config/v1/zz_generated.conversion.go +++ b/apis/config/v1/zz_generated.conversion.go @@ -344,6 +344,7 @@ func Convert_config_NetworkOverheadArgs_To_v1_NetworkOverheadArgs(in *config.Net func autoConvert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -355,6 +356,7 @@ func Convert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in func autoConvert_config_NodeResourceTopologyCache_To_v1_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1/zz_generated.deepcopy.go b/apis/config/v1/zz_generated.deepcopy.go index be48756989..0553952b48 100644 --- a/apis/config/v1/zz_generated.deepcopy.go +++ b/apis/config/v1/zz_generated.deepcopy.go @@ -220,6 +220,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/apis/config/v1beta3/defaults.go b/apis/config/v1beta3/defaults.go index eed0492ff8..1bdeb4e520 100644 --- a/apis/config/v1beta3/defaults.go +++ b/apis/config/v1beta3/defaults.go @@ -89,6 +89,8 @@ var ( defaultResyncMethod = CacheResyncAutodetect + defaultInformerMode = CacheInformerShared + // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } + if obj.Cache.InformerMode == nil { + obj.Cache.InformerMode = &defaultInformerMode + } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1beta3/defaults_test.go b/apis/config/v1beta3/defaults_test.go index 9813b416d2..90aa95df7b 100644 --- a/apis/config/v1beta3/defaults_test.go +++ b/apis/config/v1beta3/defaults_test.go @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, + InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1beta3/types.go b/apis/config/v1beta3/types.go index 4c3b2fc9fc..9c23b2ab2a 100644 --- a/apis/config/v1beta3/types.go +++ b/apis/config/v1beta3/types.go @@ -174,6 +174,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Shared" + InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1beta3/zz_generated.conversion.go b/apis/config/v1beta3/zz_generated.conversion.go index 5ed3b6a64e..2a7c9eb8bf 100644 --- a/apis/config/v1beta3/zz_generated.conversion.go +++ b/apis/config/v1beta3/zz_generated.conversion.go @@ -344,6 +344,7 @@ func Convert_config_NetworkOverheadArgs_To_v1beta3_NetworkOverheadArgs(in *confi func autoConvert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -355,6 +356,7 @@ func Convert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCac func autoConvert_config_NodeResourceTopologyCache_To_v1beta3_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1beta3/zz_generated.deepcopy.go b/apis/config/v1beta3/zz_generated.deepcopy.go index 377aef4c9f..3773eb0837 100644 --- a/apis/config/v1beta3/zz_generated.deepcopy.go +++ b/apis/config/v1beta3/zz_generated.deepcopy.go @@ -220,6 +220,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/apis/config/zz_generated.deepcopy.go b/apis/config/zz_generated.deepcopy.go index a783ada931..c2db32ee06 100644 --- a/apis/config/zz_generated.deepcopy.go +++ b/apis/config/zz_generated.deepcopy.go @@ -170,6 +170,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/pkg/noderesourcetopology/cache/overreserve.go b/pkg/noderesourcetopology/cache/overreserve.go index 68f754f558..f32925be00 100644 --- a/pkg/noderesourcetopology/cache/overreserve.go +++ b/pkg/noderesourcetopology/cache/overreserve.go @@ -29,9 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" podlisterv1 "k8s.io/client-go/listers/core/v1" - k8scache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -267,11 +265,6 @@ func (ov *OverReserve) FlushNodes(logID string, nrts ...*topologyv1alpha2.NodeRe } } -func InformerFromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { - podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut - return podHandle.Informer(), podHandle.Lister() -} - // to be used only in tests func (ov *OverReserve) Store() *nrtStore { return ov.nrts diff --git a/pkg/noderesourcetopology/cache/store.go b/pkg/noderesourcetopology/cache/store.go index 74534c7dbd..96dc9e346e 100644 --- a/pkg/noderesourcetopology/cache/store.go +++ b/pkg/noderesourcetopology/cache/store.go @@ -253,10 +253,6 @@ func makeNodeToPodDataMap(podLister podlisterv1.PodLister, logID string) (map[st return nodeToObjsMap, err } for _, pod := range pods { - if pod.Status.Phase != corev1.PodRunning { - // we are interested only about nodes which consume resources - continue - } nodeObjs := nodeToObjsMap[pod.Spec.NodeName] nodeObjs = append(nodeObjs, podData{ Namespace: pod.Namespace, diff --git a/pkg/noderesourcetopology/cache/store_test.go b/pkg/noderesourcetopology/cache/store_test.go index 7f915af45b..6626a10f86 100644 --- a/pkg/noderesourcetopology/cache/store_test.go +++ b/pkg/noderesourcetopology/cache/store_test.go @@ -588,9 +588,19 @@ func TestMakeNodeToPodDataMap(t *testing.T) { Spec: corev1.PodSpec{ NodeName: "node1", }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + }, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, }, }, - expected: make(map[string][]podData), }, { description: "single pod running", diff --git a/pkg/noderesourcetopology/pluginhelpers.go b/pkg/noderesourcetopology/pluginhelpers.go index c4db883761..a8e5ef84a4 100644 --- a/pkg/noderesourcetopology/pluginhelpers.go +++ b/pkg/noderesourcetopology/pluginhelpers.go @@ -34,6 +34,7 @@ import ( apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" ) @@ -56,7 +57,7 @@ func initNodeTopologyInformer(tcfg *apiconfig.NodeResourceTopologyMatchArgs, han return nrtcache.NewPassthrough(client), nil } - podSharedInformer, podLister := nrtcache.InformerFromHandle(handle) + podSharedInformer, podLister := podprovider.NewFromHandle(handle, tcfg.Cache) nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister) if err != nil { diff --git a/pkg/noderesourcetopology/podprovider/podprovider.go b/pkg/noderesourcetopology/podprovider/podprovider.go new file mode 100644 index 0000000000..75347b0c39 --- /dev/null +++ b/pkg/noderesourcetopology/podprovider/podprovider.go @@ -0,0 +1,63 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprovider + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + podlisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + k8scache "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + + apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" +) + +func WantsDedicatedInformer(cacheConf *apiconfig.NodeResourceTopologyCache) bool { + if cacheConf == nil { + return false + } + if cacheConf.InformerMode == nil { + return false + } + infMode := *cacheConf.InformerMode + return infMode == apiconfig.CacheInformerDedicated +} + +func NewFromHandle(handle framework.Handle, cacheConf *apiconfig.NodeResourceTopologyCache) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { + dedicated := WantsDedicatedInformer(cacheConf) + if !dedicated { + podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut + return podHandle.Informer(), podHandle.Lister() + } + + podInformer := coreinformers.NewFilteredPodInformer(handle.ClientSet(), metav1.NamespaceAll, 0, cache.Indexers{}, nil) + podLister := podlisterv1.NewPodLister(podInformer.GetIndexer()) + + klog.V(5).InfoS("Start custom pod informer") + ctx := context.Background() + go podInformer.Run(ctx.Done()) + + klog.V(5).InfoS("Syncing custom pod informer") + cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) + klog.V(5).InfoS("Synced custom pod informer") + + return podInformer, podLister +}