Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Filter informer events (#13)
Browse files Browse the repository at this point in the history
* Filter informer events

* PR comments
  • Loading branch information
EngHabu authored Sep 17, 2019
1 parent fcad091 commit 31b0fb2
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
39 changes: 23 additions & 16 deletions go/tasks/v1/flytek8s/mux_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"sync"
"time"

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/lyft/flytestdlib/promutils"

"github.com/lyft/flytestdlib/logger"

"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
Expand Down Expand Up @@ -111,7 +116,7 @@ func Initialize(ctx context.Context, watchNamespace string, resyncPeriod time.Du
return nil
}

func RegisterResource(ctx context.Context, resourceToWatch runtime.Object, handler Handler) error {
func RegisterResource(_ context.Context, resourceToWatch runtime.Object, handler Handler, metricsScope promutils.Scope) error {
if instance == nil {
return fmt.Errorf("instance not initialized")
}
Expand All @@ -132,30 +137,32 @@ func RegisterResource(ctx context.Context, resourceToWatch runtime.Object, handl
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
resourceToWatch.GetObjectKind().GroupVersionKind().Kind)

updateCount := labeled.NewCounter("informer_update", "Update events from informer", metricsScope)
droppedUpdateCount := labeled.NewCounter("informer_update_dropped", "Update events from informer that have the same resource version", metricsScope)

err := src.Start(ctrlHandler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
err := handler.Handle(ctx, evt.Object)
if err != nil {
logger.Warnf(ctx, "Failed to handle Create event for object [%v]", evt.Object)
}
},
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
err := handler.Handle(ctx, evt.ObjectNew)
if err != nil {
logger.Warnf(ctx, "Failed to handle Update event for object [%v]", evt.ObjectNew)
if evt.MetaNew == nil {
logger.Warn(context.Background(), "Received an Update event with nil MetaNew.")
} else if evt.MetaOld == nil || evt.MetaOld.GetResourceVersion() != evt.MetaNew.GetResourceVersion() {
newCtx := contextutils.WithNamespace(context.Background(), evt.MetaNew.GetNamespace())
updateCount.Inc(newCtx)

logger.Debugf(newCtx, "Enqueueing owner for updated object [%v/%v]", evt.MetaNew.GetNamespace(), evt.MetaNew.GetName())
err := handler.Handle(newCtx, evt.ObjectNew)
if err != nil {
logger.Warnf(newCtx, "Failed to handle Update event for object [%v]", evt.ObjectNew)
}
} else {
newCtx := contextutils.WithNamespace(context.Background(), evt.MetaNew.GetNamespace())
droppedUpdateCount.Inc(newCtx)
}
},
DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
err := handler.Handle(ctx, evt.Object)
if err != nil {
logger.Warnf(ctx, "Failed to handle Delete event for object [%v]", evt.Object)
}
},
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
err := handler.Handle(ctx, evt.Object)
if err != nil {
logger.Warnf(ctx, "Failed to handle Generic event for object [%v]", evt.Object)
}
},
}, q)

Expand Down
3 changes: 2 additions & 1 deletion go/tasks/v1/flytek8s/plugin_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type K8sTaskExecutorMetrics struct {
type ownerRegisteringHandler struct {
ownerKind string
enqueueOwner types.EnqueueOwner
metricsScope promutils.Scope
}

// A common handle for all k8s-resource reliant task executors that push workflow id on the work queue.
Expand Down Expand Up @@ -113,7 +114,7 @@ func (e *K8sTaskExecutor) Initialize(ctx context.Context, params types.ExecutorI
return RegisterResource(ctx, e.resourceToWatch, ownerRegisteringHandler{
enqueueOwner: params.EnqueueOwner,
ownerKind: params.OwnerKind,
})
}, metricScope)
}

func (e K8sTaskExecutor) HandleTaskSuccess(ctx context.Context, taskCtx types.TaskContext) (types.TaskStatus, error) {
Expand Down

0 comments on commit 31b0fb2

Please sign in to comment.