diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index 17842fa1..bcd0d467 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -7,6 +7,7 @@ import ( "path/filepath" "slices" "strings" + "sync" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" @@ -37,20 +38,29 @@ type IngestorAPI struct { notifier notifier.Notifier Cfg *config.KubehoundConfig providers *providers.ProvidersFactoryConfig + + mu *sync.RWMutex // mutex to sync write to the runIDs map + runIDs map[string]bool // runIDs map to monitor and avoid concurrency processing on the same runID } var ( - _ API = (*IngestorAPI)(nil) - ErrAlreadyIngested = errors.New("ingestion already completed") + _ API = (*IngestorAPI)(nil) + ErrAlreadyIngested = errors.New("ingestion already completed") + ErrCurrentlyIngesting = errors.New("runID currently being processed skipping this request") ) func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notifier notifier.Notifier, p *providers.ProvidersFactoryConfig) *IngestorAPI { + var mu sync.RWMutex + var runIDs = make(map[string]bool) + return &IngestorAPI{ notifier: notifier, puller: puller, Cfg: cfg, providers: p, + mu: &mu, + runIDs: runIDs, } } @@ -137,6 +147,13 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { //nolint: clusterName := md.ClusterName runID := md.RunID + err = g.lockRunID(runID) + if err != nil { + return err + } + + defer g.unlockRunID(runID) + err = g.Cfg.ComputeDynamic(config.WithClusterName(clusterName), config.WithRunID(runID)) if err != nil { return err @@ -285,3 +302,28 @@ func (g *IngestorAPI) isAlreadyIngestedInDB(ctx context.Context, clusterName str func (g *IngestorAPI) Notify(ctx context.Context, clusterName string, runID string) error { return g.notifier.Notify(ctx, clusterName, runID) } + +// Using a map to monitor all runIDs being processed, +// Using a mutex to write/read data to the runIDs map +func (g *IngestorAPI) lockRunID(runID string) error { + g.mu.Lock() + defer g.mu.Unlock() + entry, ok := g.runIDs[runID] + + // If a runID is being processed, dropping the request + if ok && entry { + return fmt.Errorf("%w [runID:%s]", ErrCurrentlyIngesting, runID) + } + + // Locking the current runID + g.runIDs[runID] = true + + return nil +} + +// Delocking the runID +func (g *IngestorAPI) unlockRunID(runID string) { + g.mu.Lock() + g.runIDs[runID] = false + g.mu.Unlock() +}