From d016f6f4e02b4352bb97313e47609be9c4532770 Mon Sep 17 00:00:00 2001 From: Vitor Savian Date: Tue, 26 Nov 2024 01:14:39 -0300 Subject: [PATCH] Change to event cache and file cache Signed-off-by: Vitor Savian --- pkg/agent/containerd/watcher.go | 68 +++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/pkg/agent/containerd/watcher.go b/pkg/agent/containerd/watcher.go index c2e18f196764..9ea6e7a81d44 100644 --- a/pkg/agent/containerd/watcher.go +++ b/pkg/agent/containerd/watcher.go @@ -22,9 +22,10 @@ import ( ) type Watcher struct { - watcher *fsnotify.Watcher - filesMap map[string]fs.FileInfo - workqueue workqueue.TypedDelayingInterface[fsnotify.Event] + watcher *fsnotify.Watcher + filesCache map[string]fs.FileInfo + eventCache map[string]fsnotify.Event + workqueue workqueue.TypedDelayingInterface[string] } func CreateWatcher() (*Watcher, error) { @@ -34,9 +35,10 @@ func CreateWatcher() (*Watcher, error) { } return &Watcher{ - watcher: watcher, - filesMap: make(map[string]fs.FileInfo), - workqueue: workqueue.TypedNewDelayingQueue[fsnotify.Event](), + watcher: watcher, + filesCache: make(map[string]fs.FileInfo), + eventCache: make(map[string]fsnotify.Event), + workqueue: workqueue.TypedNewDelayingQueue[string](), }, nil } @@ -85,7 +87,7 @@ func (w *Watcher) Populate(path string) error { if isFileSupported(dirEntry.Name()) { // insert the file into the state map that will have the state from the file - w.filesMap[filepath.Join(path, dirEntry.Name())] = fileInfo + w.filesCache[filepath.Join(path, dirEntry.Name())] = fileInfo } } @@ -93,7 +95,7 @@ func (w *Watcher) Populate(path string) error { } func (w *Watcher) ClearMap() { - w.filesMap = make(map[string]fs.FileInfo) + w.filesCache = make(map[string]fs.FileInfo) } func (w *Watcher) runWorkerForImages(ctx context.Context, cfg *config.Node) { @@ -123,44 +125,48 @@ func (w *Watcher) runWorkerForImages(ctx context.Context, cfg *config.Node) { } func (w *Watcher) processNextEventForImages(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) bool { - event, shutdown := w.workqueue.Get() + key, shutdown := w.workqueue.Get() if shutdown { return false } - if err := w.processImageEvent(ctx, event, cfg, client, imageClient); err != nil { + if err := w.processImageEvent(ctx, key, cfg, client, imageClient); err != nil { logrus.Errorf("Failed to process image event: %v", err) } return true } -func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) error { - defer w.workqueue.Done(event) +func (w *Watcher) processImageEvent(ctx context.Context, key string, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) error { + defer w.workqueue.Done(key) + + event, ok := w.eventCache[key] + if !ok { + return nil + } if event.Has(fsnotify.Write) { newStateFile, err := os.Stat(event.Name) if err != nil { - logrus.Errorf("Failed to get file %s info for image event WRITE: %s", event.Name, err.Error()) + logrus.Errorf("Failed to get file %s info for image event WRITE: %v", key, err) return err } // we do not want to handle directorys, only files if newStateFile.IsDir() { - w.workqueue.Done(event) + w.workqueue.Done(key) return nil } if !isFileSupported(event.Name) { - w.workqueue.Done(event) + w.workqueue.Done(key) return nil } - lastStateFile := w.filesMap[event.Name] - w.filesMap[event.Name] = newStateFile - - if (newStateFile.Size() != lastStateFile.Size()) && newStateFile.ModTime().After(lastStateFile.ModTime()) { + lastStateFile := w.filesCache[event.Name] + w.filesCache[event.Name] = newStateFile + if lastStateFile == nil || (newStateFile.Size() != lastStateFile.Size()) && newStateFile.ModTime().After(lastStateFile.ModTime()) { logrus.Debugf("File met the requirements for import to containerd image store: %s", event.Name) start := time.Now() if err := preloadFile(ctx, cfg, client, imageClient, event.Name); err != nil { @@ -169,6 +175,8 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c } logrus.Infof("Imported images from %s in %s", event.Name, time.Since(start)) } + + w.workqueue.Done(key) } if event.Has(fsnotify.Create) { @@ -214,16 +222,16 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c } } - w.workqueue.Done(event) + w.workqueue.Done(key) return nil } if !isFileSupported(event.Name) { - w.workqueue.Done(event) + w.workqueue.Done(key) return nil } - w.filesMap[event.Name] = info + w.filesCache[event.Name] = info logrus.Debugf("File added to watcher controller: %s", event.Name) start := time.Now() if err := preloadFile(ctx, cfg, client, imageClient, event.Name); err != nil { @@ -231,6 +239,7 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c return err } logrus.Infof("Imported images from %s in %s", event.Name, time.Since(start)) + w.workqueue.Done(key) } if event.Has(fsnotify.Remove) { @@ -242,12 +251,12 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c } if !isFileSupported(event.Name) { - w.workqueue.Done(event) + w.workqueue.Done(key) return nil } - // delete the file from the file map - delete(w.filesMap, event.Name) + // delete the file from the file map and the event to clean the caches + delete(w.filesCache, event.Name) logrus.Debugf("Removed file from the image watcher controller: %s", event.Name) } @@ -260,13 +269,13 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c } if !isFileSupported(event.Name) { - w.workqueue.Done(event) + w.workqueue.Done(key) return nil } // delete the file from the file map - delete(w.filesMap, event.Name) - logrus.Debugf("Removed file from the image watcher controller: %s", event.Name) + delete(w.filesCache, event.Name) + logrus.Debugf("Removed file from the image watcher controller: %s", key) } return nil @@ -324,7 +333,8 @@ func watchImages(ctx context.Context, cfg *config.Node) { // this part is to specify to only get events that were from /agent/images if strings.Contains(event.Name, "/agent/images") { - w.workqueue.AddAfter(event, 2*time.Second) + w.eventCache[event.Name] = event + w.workqueue.AddAfter(event.Name, 2*time.Second) } case err, ok := <-w.watcher.Errors: