Skip to content

Commit

Permalink
Change to event cache and file cache
Browse files Browse the repository at this point in the history
Signed-off-by: Vitor Savian <vitor.savian@suse.com>
  • Loading branch information
vitorsavian committed Nov 26, 2024
1 parent 1de0713 commit d016f6f
Showing 1 changed file with 39 additions and 29 deletions.
68 changes: 39 additions & 29 deletions pkg/agent/containerd/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
)

type Watcher struct {
watcher *fsnotify.Watcher
filesMap map[string]fs.FileInfo
workqueue workqueue.TypedDelayingInterface[fsnotify.Event]
watcher *fsnotify.Watcher
filesCache map[string]fs.FileInfo
eventCache map[string]fsnotify.Event
workqueue workqueue.TypedDelayingInterface[string]
}

func CreateWatcher() (*Watcher, error) {
Expand All @@ -34,9 +35,10 @@ func CreateWatcher() (*Watcher, error) {
}

return &Watcher{
watcher: watcher,
filesMap: make(map[string]fs.FileInfo),
workqueue: workqueue.TypedNewDelayingQueue[fsnotify.Event](),
watcher: watcher,
filesCache: make(map[string]fs.FileInfo),
eventCache: make(map[string]fsnotify.Event),
workqueue: workqueue.TypedNewDelayingQueue[string](),
}, nil
}

Expand Down Expand Up @@ -85,15 +87,15 @@ func (w *Watcher) Populate(path string) error {

if isFileSupported(dirEntry.Name()) {
// insert the file into the state map that will have the state from the file
w.filesMap[filepath.Join(path, dirEntry.Name())] = fileInfo
w.filesCache[filepath.Join(path, dirEntry.Name())] = fileInfo
}
}

return merr.NewErrors(errs...)
}

func (w *Watcher) ClearMap() {
w.filesMap = make(map[string]fs.FileInfo)
w.filesCache = make(map[string]fs.FileInfo)
}

func (w *Watcher) runWorkerForImages(ctx context.Context, cfg *config.Node) {
Expand Down Expand Up @@ -123,44 +125,48 @@ func (w *Watcher) runWorkerForImages(ctx context.Context, cfg *config.Node) {
}

func (w *Watcher) processNextEventForImages(ctx context.Context, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) bool {
event, shutdown := w.workqueue.Get()
key, shutdown := w.workqueue.Get()

if shutdown {
return false
}

if err := w.processImageEvent(ctx, event, cfg, client, imageClient); err != nil {
if err := w.processImageEvent(ctx, key, cfg, client, imageClient); err != nil {
logrus.Errorf("Failed to process image event: %v", err)
}

return true
}

func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) error {
defer w.workqueue.Done(event)
func (w *Watcher) processImageEvent(ctx context.Context, key string, cfg *config.Node, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) error {
defer w.workqueue.Done(key)

event, ok := w.eventCache[key]
if !ok {
return nil
}

if event.Has(fsnotify.Write) {
newStateFile, err := os.Stat(event.Name)
if err != nil {
logrus.Errorf("Failed to get file %s info for image event WRITE: %s", event.Name, err.Error())
logrus.Errorf("Failed to get file %s info for image event WRITE: %v", key, err)
return err
}

// we do not want to handle directorys, only files
if newStateFile.IsDir() {
w.workqueue.Done(event)
w.workqueue.Done(key)
return nil
}

if !isFileSupported(event.Name) {
w.workqueue.Done(event)
w.workqueue.Done(key)
return nil
}

lastStateFile := w.filesMap[event.Name]
w.filesMap[event.Name] = newStateFile

if (newStateFile.Size() != lastStateFile.Size()) && newStateFile.ModTime().After(lastStateFile.ModTime()) {
lastStateFile := w.filesCache[event.Name]
w.filesCache[event.Name] = newStateFile
if lastStateFile == nil || (newStateFile.Size() != lastStateFile.Size()) && newStateFile.ModTime().After(lastStateFile.ModTime()) {
logrus.Debugf("File met the requirements for import to containerd image store: %s", event.Name)
start := time.Now()
if err := preloadFile(ctx, cfg, client, imageClient, event.Name); err != nil {
Expand All @@ -169,6 +175,8 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c
}
logrus.Infof("Imported images from %s in %s", event.Name, time.Since(start))
}

w.workqueue.Done(key)
}

if event.Has(fsnotify.Create) {
Expand Down Expand Up @@ -214,23 +222,24 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c
}
}

w.workqueue.Done(event)
w.workqueue.Done(key)
return nil
}

if !isFileSupported(event.Name) {
w.workqueue.Done(event)
w.workqueue.Done(key)
return nil
}

w.filesMap[event.Name] = info
w.filesCache[event.Name] = info
logrus.Debugf("File added to watcher controller: %s", event.Name)
start := time.Now()
if err := preloadFile(ctx, cfg, client, imageClient, event.Name); err != nil {
logrus.Errorf("Error encountered while importing %s: %v", event.Name, err)
return err
}
logrus.Infof("Imported images from %s in %s", event.Name, time.Since(start))
w.workqueue.Done(key)
}

if event.Has(fsnotify.Remove) {
Expand All @@ -242,12 +251,12 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c
}

if !isFileSupported(event.Name) {
w.workqueue.Done(event)
w.workqueue.Done(key)
return nil
}

// delete the file from the file map
delete(w.filesMap, event.Name)
// delete the file from the file map and the event to clean the caches
delete(w.filesCache, event.Name)
logrus.Debugf("Removed file from the image watcher controller: %s", event.Name)
}

Expand All @@ -260,13 +269,13 @@ func (w *Watcher) processImageEvent(ctx context.Context, event fsnotify.Event, c
}

if !isFileSupported(event.Name) {
w.workqueue.Done(event)
w.workqueue.Done(key)
return nil
}

// delete the file from the file map
delete(w.filesMap, event.Name)
logrus.Debugf("Removed file from the image watcher controller: %s", event.Name)
delete(w.filesCache, event.Name)
logrus.Debugf("Removed file from the image watcher controller: %s", key)
}

return nil
Expand Down Expand Up @@ -324,7 +333,8 @@ func watchImages(ctx context.Context, cfg *config.Node) {

// this part is to specify to only get events that were from /agent/images
if strings.Contains(event.Name, "/agent/images") {
w.workqueue.AddAfter(event, 2*time.Second)
w.eventCache[event.Name] = event
w.workqueue.AddAfter(event.Name, 2*time.Second)
}

case err, ok := <-w.watcher.Errors:
Expand Down

0 comments on commit d016f6f

Please sign in to comment.