Skip to content

Commit

Permalink
chore: flush idle dataobjects after X seconds (#16348)
Browse files Browse the repository at this point in the history
  • Loading branch information
fcjack authored Feb 18, 2025
1 parent 879168b commit a51e5a2
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 17 deletions.
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,11 @@ dataobj:
# CLI flag: -dataobj-consumer.sha-prefix-size
[shaprefixsize: <int> | default = 2]

# The maximum amount of time to wait in seconds before flushing an object
# that is no longer receiving new writes
# CLI flag: -dataobj-consumer.idle-flush-timeout
[idle_flush_timeout: <duration> | default = 1h]

querier:
# Enable the dataobj querier.
# CLI flag: -dataobj-querier-enabled
Expand Down
10 changes: 9 additions & 1 deletion pkg/dataobj/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package consumer

import (
"flag"
"time"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
)

type Config struct {
dataobj.BuilderConfig
UploaderConfig uploader.Config `yaml:"uploader"`
UploaderConfig uploader.Config `yaml:"uploader"`
IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"`
}

func (cfg *Config) Validate() error {
Expand All @@ -27,4 +29,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)

if cfg.IdleFlushTimeout <= 0 {
cfg.IdleFlushTimeout = 60 * 60 * time.Second // default to 1 hour
}

f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")
}
92 changes: 77 additions & 15 deletions pkg/dataobj/consumer/partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type partitionProcessor struct {
bucket objstore.Bucket
bufPool *sync.Pool

// Idle stream handling
idleFlushTimout time.Duration
lastFlush time.Time

// Metrics
metrics *partitionOffsetMetrics

Expand All @@ -50,7 +54,21 @@ type partitionProcessor struct {
logger log.Logger
}

func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, uploaderCfg uploader.Config, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer, bufPool *sync.Pool) *partitionProcessor {
func newPartitionProcessor(
ctx context.Context,
client *kgo.Client,
builderCfg dataobj.BuilderConfig,
uploaderCfg uploader.Config,
bucket objstore.Bucket,
tenantID string,
virtualShard int32,
topic string,
partition int32,
logger log.Logger,
reg prometheus.Registerer,
bufPool *sync.Pool,
idleFlushTimeout time.Duration,
) *partitionProcessor {
ctx, cancel := context.WithCancel(ctx)
decoder, err := kafka.NewDecoder()
if err != nil {
Expand Down Expand Up @@ -95,6 +113,8 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d
uploader: uploader,
metastoreManager: metastoreManager,
bufPool: bufPool,
idleFlushTimout: idleFlushTimeout,
lastFlush: time.Now(),
}
}

Expand All @@ -115,6 +135,9 @@ func (p *partitionProcessor) start() {
return
}
p.processRecord(record)

case <-time.After(p.idleFlushTimout):
p.idleFlush()
}
}
}()
Expand Down Expand Up @@ -163,6 +186,29 @@ func (p *partitionProcessor) initBuilder() error {
return initErr
}

func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error {
flushedDataobjStats, err := p.builder.Flush(flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return err
}

objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return err
}

if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return err
}

p.lastFlush = time.Now()

return nil
}

func (p *partitionProcessor) processRecord(record *kgo.Record) {
// Update offset metric at the end of processing
defer p.metrics.updateOffset(record.Offset)
Expand Down Expand Up @@ -200,20 +246,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {

flushBuffer.Reset()

flushedDataobjStats, err := p.builder.Flush(flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return
}

objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return
}

if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
if err := p.flushStream(flushBuffer); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}
}()
Expand Down Expand Up @@ -251,3 +285,31 @@ func (p *partitionProcessor) commitRecords(record *kgo.Record) error {
}
return lastErr
}

// idleFlush flushes the file if it has been idle for too long.
// This is used to avoid holding on to memory for too long.
// We compare the current time with the last flush time to determine if the builder has been idle.
func (p *partitionProcessor) idleFlush() {
if p.builder == nil {
return
}

now := time.Now()
if now.Sub(p.lastFlush) < p.idleFlushTimout {
return // Avoid checking too frequently
}

func() {
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
defer p.bufPool.Put(flushBuffer)

flushBuffer.Reset()

if err := p.flushStream(flushBuffer); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}

p.lastFlush = now
}()
}
2 changes: 1 addition & 1 deletion pkg/dataobj/consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie
}

for _, partition := range parts {
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool)
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool, s.cfg.IdleFlushTimeout)
s.partitionHandlers[topic][partition] = processor
processor.start()
}
Expand Down

0 comments on commit a51e5a2

Please sign in to comment.