From a51e5a21e210e1d131bfc1bf4bca7e68db3bc8ad Mon Sep 17 00:00:00 2001 From: Jackson Coelho Date: Tue, 18 Feb 2025 17:37:57 +0100 Subject: [PATCH] chore: flush idle dataobjects after X seconds (#16348) --- docs/sources/shared/configuration.md | 5 ++ pkg/dataobj/consumer/config.go | 10 ++- pkg/dataobj/consumer/partition_processor.go | 92 +++++++++++++++++---- pkg/dataobj/consumer/service.go | 2 +- 4 files changed, 92 insertions(+), 17 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d75d6a4307ae8..4cb8c9b108f3e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -847,6 +847,11 @@ dataobj: # CLI flag: -dataobj-consumer.sha-prefix-size [shaprefixsize: | default = 2] + # The maximum amount of time to wait in seconds before flushing an object + # that is no longer receiving new writes + # CLI flag: -dataobj-consumer.idle-flush-timeout + [idle_flush_timeout: | default = 1h] + querier: # Enable the dataobj querier. # CLI flag: -dataobj-querier-enabled diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go index 3998e28d21d14..b76f9b6672d12 100644 --- a/pkg/dataobj/consumer/config.go +++ b/pkg/dataobj/consumer/config.go @@ -2,6 +2,7 @@ package consumer import ( "flag" + "time" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/uploader" @@ -9,7 +10,8 @@ import ( type Config struct { dataobj.BuilderConfig - UploaderConfig uploader.Config `yaml:"uploader"` + UploaderConfig uploader.Config `yaml:"uploader"` + IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"` } func (cfg *Config) Validate() error { @@ -27,4 +29,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f) + + if cfg.IdleFlushTimeout <= 0 { + cfg.IdleFlushTimeout = 60 * 60 * time.Second // default to 1 hour + } + + f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes") } diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index cf8345364b7ba..b9030b354ed85 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -39,6 +39,10 @@ type partitionProcessor struct { bucket objstore.Bucket bufPool *sync.Pool + // Idle stream handling + idleFlushTimout time.Duration + lastFlush time.Time + // Metrics metrics *partitionOffsetMetrics @@ -50,7 +54,21 @@ type partitionProcessor struct { logger log.Logger } -func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, uploaderCfg uploader.Config, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer, bufPool *sync.Pool) *partitionProcessor { +func newPartitionProcessor( + ctx context.Context, + client *kgo.Client, + builderCfg dataobj.BuilderConfig, + uploaderCfg uploader.Config, + bucket objstore.Bucket, + tenantID string, + virtualShard int32, + topic string, + partition int32, + logger log.Logger, + reg prometheus.Registerer, + bufPool *sync.Pool, + idleFlushTimeout time.Duration, +) *partitionProcessor { ctx, cancel := context.WithCancel(ctx) decoder, err := kafka.NewDecoder() if err != nil { @@ -95,6 +113,8 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d uploader: uploader, metastoreManager: metastoreManager, bufPool: bufPool, + idleFlushTimout: idleFlushTimeout, + lastFlush: time.Now(), } } @@ -115,6 +135,9 @@ func (p *partitionProcessor) start() { return } p.processRecord(record) + + case <-time.After(p.idleFlushTimout): + p.idleFlush() } } }() @@ -163,6 +186,29 @@ func (p *partitionProcessor) initBuilder() error { return initErr } +func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error { + flushedDataobjStats, err := p.builder.Flush(flushBuffer) + if err != nil { + level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) + return err + } + + objectPath, err := p.uploader.Upload(p.ctx, flushBuffer) + if err != nil { + level.Error(p.logger).Log("msg", "failed to upload object", "err", err) + return err + } + + if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil { + level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) + return err + } + + p.lastFlush = time.Now() + + return nil +} + func (p *partitionProcessor) processRecord(record *kgo.Record) { // Update offset metric at the end of processing defer p.metrics.updateOffset(record.Offset) @@ -200,20 +246,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { flushBuffer.Reset() - flushedDataobjStats, err := p.builder.Flush(flushBuffer) - if err != nil { - level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) - return - } - - objectPath, err := p.uploader.Upload(p.ctx, flushBuffer) - if err != nil { - level.Error(p.logger).Log("msg", "failed to upload object", "err", err) - return - } - - if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil { - level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) + if err := p.flushStream(flushBuffer); err != nil { + level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) return } }() @@ -251,3 +285,31 @@ func (p *partitionProcessor) commitRecords(record *kgo.Record) error { } return lastErr } + +// idleFlush flushes the file if it has been idle for too long. +// This is used to avoid holding on to memory for too long. +// We compare the current time with the last flush time to determine if the builder has been idle. +func (p *partitionProcessor) idleFlush() { + if p.builder == nil { + return + } + + now := time.Now() + if now.Sub(p.lastFlush) < p.idleFlushTimout { + return // Avoid checking too frequently + } + + func() { + flushBuffer := p.bufPool.Get().(*bytes.Buffer) + defer p.bufPool.Put(flushBuffer) + + flushBuffer.Reset() + + if err := p.flushStream(flushBuffer); err != nil { + level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) + return + } + + p.lastFlush = now + }() +} diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index 1c36bf2057040..08769051437a1 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -100,7 +100,7 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie } for _, partition := range parts { - processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool) + processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool, s.cfg.IdleFlushTimeout) s.partitionHandlers[topic][partition] = processor processor.start() }