Skip to content

Commit

Permalink
controller: migrate nrt to ctrl runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
zwpaper committed Nov 1, 2023
1 parent ca5d17b commit b1354b9
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 56 deletions.
4 changes: 3 additions & 1 deletion pkg/noderesourcetopology/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package cache

import (
"context"

corev1 "k8s.io/api/core/v1"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
Expand All @@ -30,7 +32,7 @@ type Interface interface {
// The pod argument is used only for logging purposes.
// Returns a boolean to signal the caller if the NRT data is clean. If false, then the node has foreign
// Pods detected - so it should be ignored or handled differently by the caller.
GetCachedNRTCopy(nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool)
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool)

// NodeMaybeOverReserved declares a node was filtered out for not enough resources available.
// This means this node is eligible for a resync. When a node is marked discarded (dirty), it matters not
Expand Down
18 changes: 10 additions & 8 deletions pkg/noderesourcetopology/cache/discardreserved.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ limitations under the License.
package cache

import (
"context"
"sync"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
listerv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/listers/topology/v1alpha2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

// DiscardReserved is intended to solve similiar problem as Overreserve Cache,
Expand All @@ -42,17 +44,17 @@ import (
type DiscardReserved struct {
rMutex sync.RWMutex
reservationMap map[string]map[types.UID]bool // Key is NodeName, value is Pod UID : reserved status
lister listerv1alpha2.NodeResourceTopologyLister
client ctrlclient.Client
}

func NewDiscardReserved(lister listerv1alpha2.NodeResourceTopologyLister) Interface {
func NewDiscardReserved(client ctrlclient.Client) Interface {
return &DiscardReserved{
lister: lister,
client: client,
reservationMap: make(map[string]map[types.UID]bool),
}
}

func (pt *DiscardReserved) GetCachedNRTCopy(nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
pt.rMutex.RLock()
defer pt.rMutex.RUnlock()
if t, ok := pt.reservationMap[nodeName]; ok {
Expand All @@ -61,8 +63,8 @@ func (pt *DiscardReserved) GetCachedNRTCopy(nodeName string, _ *corev1.Pod) (*to
}
}

nrt, err := pt.lister.Get(nodeName)
if err != nil {
nrt := &topologyv1alpha2.NodeResourceTopology{}
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
return nil, false
}
return nrt, true
Expand Down
33 changes: 17 additions & 16 deletions pkg/noderesourcetopology/cache/overreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,66 +17,67 @@ limitations under the License.
package cache

import (
"context"
"errors"
"fmt"
"sync"
"time"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"github.com/k8stopologyawareschedwg/podfingerprint"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
podlisterv1 "k8s.io/client-go/listers/core/v1"
k8scache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
listerv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/listers/topology/v1alpha2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify"

"github.com/k8stopologyawareschedwg/podfingerprint"
)

type OverReserve struct {
ctrlclient.Client
lock sync.Mutex
nrts *nrtStore
assumedResources map[string]*resourceStore // nodeName -> resourceStore
// nodesMaybeOverreserved counts how many times a node is filtered out. This is used as trigger condition to try
// to resync nodes. See The documentation of Resync() below for more details.
nodesMaybeOverreserved counter
nodesWithForeignPods counter
nrtLister listerv1alpha2.NodeResourceTopologyLister
podLister podlisterv1.PodLister
resyncMethod apiconfig.CacheResyncMethod
}

func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, nrtLister listerv1alpha2.NodeResourceTopologyLister, podLister podlisterv1.PodLister) (*OverReserve, error) {
if nrtLister == nil || podLister == nil {
func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister) (*OverReserve, error) {
if client == nil || podLister == nil {
return nil, fmt.Errorf("nrtcache: received nil references")
}

resyncMethod := getCacheResyncMethod(cfg)

nrtObjs, err := nrtLister.List(labels.Everything())
if err != nil {
nrtObjs := &topologyv1alpha2.NodeResourceTopologyList{}
if err := client.List(context.Background(), nrtObjs); err != nil {
return nil, err
}

klog.V(3).InfoS("nrtcache: initializing", "objects", len(nrtObjs), "method", resyncMethod)
klog.V(3).InfoS("nrtcache: initializing", "objects", len(nrtObjs.Items), "method", resyncMethod)
obj := &OverReserve{
nrts: newNrtStore(nrtObjs),
Client: client,
nrts: newNrtStore(nrtObjs.Items),
assumedResources: make(map[string]*resourceStore),
nodesMaybeOverreserved: newCounter(),
nodesWithForeignPods: newCounter(),
nrtLister: nrtLister,
podLister: podLister,
resyncMethod: resyncMethod,
}
return obj, nil
}

func (ov *OverReserve) GetCachedNRTCopy(nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
ov.lock.Lock()
defer ov.lock.Unlock()
if ov.nodesWithForeignPods.IsSet(nodeName) {
Expand Down Expand Up @@ -208,8 +209,8 @@ func (ov *OverReserve) Resync() {

var nrtUpdates []*topologyv1alpha2.NodeResourceTopology
for _, nodeName := range nodeNames {
nrtCandidate, err := ov.nrtLister.Get(nodeName)
if err != nil {
nrtCandidate := &topologyv1alpha2.NodeResourceTopology{}
if err := ov.Get(context.Background(), types.NamespacedName{Name: nodeName}, nrtCandidate); err != nil {
klog.V(3).InfoS("nrtcache: failed to get NodeTopology", "logID", logID, "node", nodeName, "error", err)
continue
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/noderesourcetopology/cache/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,31 @@ limitations under the License.
package cache

import (
"context"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
listerv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/listers/topology/v1alpha2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type Passthrough struct {
lister listerv1alpha2.NodeResourceTopologyLister
client ctrlclient.Client
}

func NewPassthrough(lister listerv1alpha2.NodeResourceTopologyLister) Interface {
func NewPassthrough(client ctrlclient.Client) Interface {
return Passthrough{
lister: lister,
client: client,
}
}

func (pt Passthrough) GetCachedNRTCopy(nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
klog.V(5).InfoS("Lister for nodeResTopoPlugin", "lister", pt.lister)
nrt, err := pt.lister.Get(nodeName)
if err != nil {
func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
klog.V(5).InfoS("Lister for nodeResTopoPlugin")
nrt := &topologyv1alpha2.NodeResourceTopology{}
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
klog.V(5).ErrorS(err, "Cannot get NodeTopologies from NodeResourceTopologyLister")
return nil, true
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/noderesourcetopology/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
topologyv1alpha2attr "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2/helper/attribute"
"github.com/k8stopologyawareschedwg/podfingerprint"

apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify"
"sigs.k8s.io/scheduler-plugins/pkg/util"

"github.com/k8stopologyawareschedwg/podfingerprint"
)

// nrtStore maps the NRT data by node name. It is not thread safe and needs to be protected by a lock.
Expand All @@ -43,7 +42,7 @@ type nrtStore struct {
}

// newNrtStore creates a new nrtStore and initializes it with copies of the provided Node Resource Topology data.
func newNrtStore(nrts []*topologyv1alpha2.NodeResourceTopology) *nrtStore {
func newNrtStore(nrts []topologyv1alpha2.NodeResourceTopology) *nrtStore {
data := make(map[string]*topologyv1alpha2.NodeResourceTopology, len(nrts))
for _, nrt := range nrts {
data[nrt.Name] = nrt.DeepCopy()
Expand Down
2 changes: 1 addition & 1 deletion pkg/noderesourcetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
}

nodeName := nodeInfo.Node().Name
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(nodeName, pod)
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
if !ok {
klog.V(2).InfoS("invalid topology data", "node", nodeName)
return framework.NewStatus(framework.Unschedulable, "invalid node topology data")
Expand Down
24 changes: 7 additions & 17 deletions pkg/noderesourcetopology/pluginhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package noderesourcetopology

import (
"context"
"fmt"
"strconv"
"strings"
Expand All @@ -30,8 +29,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
topoclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
topologyinformers "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/informers/externalversions"

ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache"
Expand All @@ -43,32 +42,23 @@ const (
)

func initNodeTopologyInformer(tcfg *apiconfig.NodeResourceTopologyMatchArgs, handle framework.Handle) (nrtcache.Interface, error) {
topoClient, err := topoclientset.NewForConfig(handle.KubeConfig())
client, err := ctrlclient.New(handle.KubeConfig(), ctrlclient.Options{})
if err != nil {
klog.ErrorS(err, "Cannot create clientset for NodeTopologyResource", "kubeConfig", handle.KubeConfig())
klog.ErrorS(err, "Cannot create client for NodeTopologyResource", "kubeConfig", handle.KubeConfig())
return nil, err
}

topologyInformerFactory := topologyinformers.NewSharedInformerFactory(topoClient, 0)
nodeTopologyInformer := topologyInformerFactory.Topology().V1alpha2().NodeResourceTopologies()
nodeTopologyLister := nodeTopologyInformer.Lister()

klog.V(5).InfoS("Start nodeTopologyInformer")
ctx := context.Background()
topologyInformerFactory.Start(ctx.Done())
topologyInformerFactory.WaitForCacheSync(ctx.Done())

if tcfg.DiscardReservedNodes {
return nrtcache.NewDiscardReserved(nodeTopologyLister), nil
return nrtcache.NewDiscardReserved(client), nil
}

if tcfg.CacheResyncPeriodSeconds <= 0 {
return nrtcache.NewPassthrough(nodeTopologyLister), nil
return nrtcache.NewPassthrough(client), nil
}

podSharedInformer, podLister := nrtcache.InformerFromHandle(handle)

nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, nodeTopologyLister, podLister)
nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/noderesourcetopology/score.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState,
return framework.MaxNodeScore, nil
}

nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(nodeName, pod)
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)

if !ok {
klog.V(4).InfoS("noderesourcetopology is not valid for node", "node", nodeName)
Expand Down

0 comments on commit b1354b9

Please sign in to comment.