Skip to content

Commit

Permalink
feat: cluster cache should expose synchronization error (argoproj#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Matyushentsev authored May 18, 2020
1 parent 8430dc0 commit 9163758
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
6 changes: 6 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
coverage:
status:
patch: off
project:
default:
threshold: 2
46 changes: 42 additions & 4 deletions pkg/utils/kube/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ type apiMeta struct {
watchCancel context.CancelFunc
}

// ClusterInfo holds cluster cache stats
type ClusterInfo struct {
Server string
K8SVersion string
ResourcesCount int
APIsCount int
// Server holds cluster API server URL
Server string
// K8SVersion holds Kubernetes version
K8SVersion string
// ResourcesCount holds number of observed Kubernetes resources
ResourcesCount int
// APIsCount holds number of observed Kubernetes API count
APIsCount int
// LastCacheSyncTime holds time of most recent cache synchronization
LastCacheSyncTime *time.Time
// SyncError holds most recent cache synchronization error
SyncError error
}

type Settings struct {
Expand All @@ -59,20 +67,35 @@ type OnResourceUpdatedHandler func(newRes *Resource, oldRes *Resource, namespace
type Unsubscribe func()

type ClusterCache interface {
// EnsureSynced checks cache state and synchronizes it if necessary
EnsureSynced() error
// GetServerVersion returns observed cluster version
GetServerVersion() string
// GetAPIGroups returns information about observed API groups
GetAPIGroups() []metav1.APIGroup
// Invalidate cache and executes callback that optionally might update cache settings
Invalidate(settingsCallback func(*rest.Config, []string, Settings) (*rest.Config, []string, Settings))
// GetNamespaceTopLevelResources returns top level resources in the specified namespace
GetNamespaceTopLevelResources(namespace string) map[kube.ResourceKey]*Resource
// IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree
IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource))
// IsNamespaced answers if specified group/kind is a namespaced resource API or not
IsNamespaced(gk schema.GroupKind) (bool, error)
// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
// The function returns all resources from cache for those `isManaged` function returns true and resources
// specified in targetObjs list.
GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(r *Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error)
// GetClusterInfo returns cluster cache statistics
GetClusterInfo() ClusterInfo
// SetPopulateResourceInfoHandler sets callback that populates resource information in the cache
SetPopulateResourceInfoHandler(handler OnPopulateResourceInfoHandler)
// OnResourceUpdated register event handler that is executed every time when resource get's updated in the cache
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
// OnEvent register event handler that is executed every time when new K8S event received
OnEvent(handler OnEventHandler) Unsubscribe
}

// NewClusterCache creates new instance of cluster cache
func NewClusterCache(settings Settings, config *rest.Config, namespaces []string, kubectl kube.Kubectl) *clusterCache {
return &clusterCache{
settings: settings,
Expand Down Expand Up @@ -115,12 +138,14 @@ type clusterCache struct {
eventHandlers map[uint64]OnEventHandler
}

// SetPopulateResourceInfoHandler sets callback that populates resource information in the cache
func (c *clusterCache) SetPopulateResourceInfoHandler(handler OnPopulateResourceInfoHandler) {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
c.populateResourceInfoHandler = handler
}

// OnResourceUpdated register event handler that is executed every time when resource get's updated in the cache
func (c *clusterCache) OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
Expand All @@ -143,6 +168,7 @@ func (c *clusterCache) getResourceUpdatedHandlers() []OnResourceUpdatedHandler {
return handlers
}

// OnEvent register event handler that is executed every time when new K8S event received
func (c *clusterCache) OnEvent(handler OnEventHandler) Unsubscribe {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
Expand All @@ -165,10 +191,12 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler {
return handlers
}

// GetServerVersion returns observed cluster version
func (c *clusterCache) GetServerVersion() string {
return c.serverVersion
}

// GetAPIGroups returns information about observed API groups
func (c *clusterCache) GetAPIGroups() []metav1.APIGroup {
return c.apiGroups
}
Expand Down Expand Up @@ -285,6 +313,7 @@ func (c *clusterCache) setNode(n *Resource) {
ns[key] = n
}

// Invalidate cache and executes callback that optionally might update cache settings
func (c *clusterCache) Invalidate(settingsCallback func(*rest.Config, []string, Settings) (*rest.Config, []string, Settings)) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -553,6 +582,7 @@ func (c *clusterCache) sync() (err error) {
return nil
}

// EnsureSynced checks cache state and synchronizes it if necessary
func (c *clusterCache) EnsureSynced() error {
// first check if cluster is synced *without lock*
if c.synced() {
Expand All @@ -573,6 +603,7 @@ func (c *clusterCache) EnsureSynced() error {
return c.syncError
}

// GetNamespaceTopLevelResources returns top level resources in the specified namespace
func (c *clusterCache) GetNamespaceTopLevelResources(namespace string) map[kube.ResourceKey]*Resource {
c.lock.RLock()
defer c.lock.RUnlock()
Expand All @@ -585,6 +616,7 @@ func (c *clusterCache) GetNamespaceTopLevelResources(namespace string) map[kube.
return resources
}

// IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree
func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource)) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -615,13 +647,17 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour
}
}

// IsNamespaced answers if specified group/kind is a namespaced resource API or not
func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) {
if isNamespaced, ok := c.namespacedResources[gk]; ok {
return isNamespaced, nil
}
return false, errors.NewNotFound(schema.GroupResource{Group: gk.Group}, "")
}

// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
// The function returns all resources from cache for those `isManaged` function returns true and resources
// specified in targetObjs list.
func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(r *Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down Expand Up @@ -748,6 +784,7 @@ var (
}
)

// GetClusterInfo returns cluster cache statistics
func (c *clusterCache) GetClusterInfo() ClusterInfo {
c.lock.RLock()
defer c.lock.RUnlock()
Expand All @@ -757,6 +794,7 @@ func (c *clusterCache) GetClusterInfo() ClusterInfo {
ResourcesCount: len(c.resources),
Server: c.config.Host,
LastCacheSyncTime: c.syncTime,
SyncError: c.syncError,
}
}

Expand Down

0 comments on commit 9163758

Please sign in to comment.