From 69005053b8bff30f96bc44adb7c2406c97d1e457 Mon Sep 17 00:00:00 2001 From: Alex Rodin Date: Fri, 4 Oct 2024 08:29:23 -0300 Subject: [PATCH] fix data race --- chunk.go | 31 +++++++++++++++++-------------- ingester.go | 18 +++++++++++++----- ingester_test.go | 31 ++++++++++++++----------------- 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/chunk.go b/chunk.go index 2a66afb..832e988 100644 --- a/chunk.go +++ b/chunk.go @@ -15,18 +15,18 @@ type Entry struct { // Chunk stores up to 900 log entries that will be persisted in the storage type Chunk struct { - mu sync.Mutex - id int32 // The identifier of this chunk - cap int32 // Configured batch size - book int32 // Number of scheduled writes in this chunk - write int32 // Number of writes completed - size int64 // Size of content (in bytes) - epochStart int64 // First epoch - epochEnd int64 // Last epoch - retries int // Number of attempts to persist in the storage - locked atomic.Bool // Indicates if this chunk no longer accepts writes - next *Chunk // Pointer to the next chunk - entries [900]*Entry // The log entries in this chunk + mu sync.RWMutex // (next, Depth()) only + id int32 // The identifier of this chunk + cap int32 // Configured batch size + book int32 // Number of scheduled writes in this chunk + write int32 // Number of writes completed + size int64 // Size of content (in bytes) + epochStart int64 // First epoch + epochEnd int64 // Last epoch + retries int32 // Number of attempts to persist in the storage + locked atomic.Bool // Indicates if this chunk no longer accepts writes + next *Chunk // Pointer to the next chunk + entries [900]*Entry // The log entries in this chunk } // NewChunk creates a new chunk with the specified capacity @@ -77,10 +77,13 @@ func (c *Chunk) Depth() int { if c.Empty() { return 0 } - if c.next == nil { + c.mu.RLock() + n := c.next + c.mu.RUnlock() + if n == nil { return 1 } - return 1 + c.next.Depth() + return 1 + n.Depth() } // First retrieves the epoch of the first entry in this chunk diff --git a/ingester.go b/ingester.go index 4e97a2e..e40a821 100644 --- a/ingester.go +++ b/ingester.go @@ -30,7 +30,7 @@ type IngesterConfig struct { MaxDirtyChunks int // MaxFlushRetry defines the number of retry attempts to persist a chunk in case of failure (default 3). - MaxFlushRetry int + MaxFlushRetry int32 // FlushAfterSec defines how long a chunk can remain inactive before being sent to storage (default 3 seconds). FlushAfterSec int @@ -53,6 +53,7 @@ type Ingester interface { type ingester struct { flushChunk *Chunk // The chunk that will be saved to the database writeChunk *Chunk // The chunk currently receiving log entries + flushChunkId int32 // ID of the currently active flush chunk writeChunkId int32 // ID of the currently active write chunk config *IngesterConfig // Configuration options for the ingester storage Storage // The storage backend used to persist chunks @@ -157,11 +158,11 @@ func (i *ingester) routineCheck() { if chunk.Ready() { // If the chunk is ready to be written to storage, flush it if err := i.storage.Flush(chunk); err != nil { - chunk.retries++ + retries := atomic.AddInt32(&chunk.retries, 1) slog.Error("[sqlog] error writing chunk", slog.Any("error", err)) // If retries exceed the limit, move to the next chunk - if chunk.retries > i.config.MaxFlushRetry { + if retries > i.config.MaxFlushRetry { chunk = chunk.Next() chunk.Lock() } else { @@ -186,6 +187,7 @@ func (i *ingester) routineCheck() { } i.flushChunk = chunk + atomic.StoreInt32(&i.flushChunkId, chunk.id) // Close the storage after flushing all logs if err := i.storage.Close(); err != nil { @@ -200,6 +202,10 @@ func (i *ingester) routineCheck() { } } +func (i *ingester) getFlushChunk() { + +} + // doRoutineCheck handles the periodic maintenance of chunks, flushing them if they // meet the conditions for size or age, and ensuring memory usage stays within limits. func (i *ingester) doRoutineCheck() { @@ -212,10 +218,10 @@ func (i *ingester) doRoutineCheck() { // Flush the chunk if it's ready to be persisted if chunk.Ready() { if err := i.storage.Flush(chunk); err != nil { - chunk.retries++ + retries := atomic.AddInt32(&chunk.retries, 1) slog.Error("[sqlog] error writing chunk", slog.Any("error", err)) - if chunk.retries > i.config.MaxFlushRetry { + if retries > i.config.MaxFlushRetry { chunk.Init(i.config.Chunks + 1) } else { break @@ -235,6 +241,7 @@ func (i *ingester) doRoutineCheck() { break } i.flushChunk = i.flushChunk.Next() + atomic.StoreInt32(&i.flushChunkId, i.flushChunk.id) } // Limit memory consumption by discarding old chunks if necessary @@ -242,6 +249,7 @@ func (i *ingester) doRoutineCheck() { for { if i.flushChunk.Depth() > i.config.MaxDirtyChunks { i.flushChunk = i.flushChunk.Next() + atomic.StoreInt32(&i.flushChunkId, i.flushChunk.id) } else { break } diff --git a/ingester_test.go b/ingester_test.go index 5d7f341..dbac1e6 100644 --- a/ingester_test.go +++ b/ingester_test.go @@ -3,6 +3,7 @@ package sqlog import ( "errors" "sync" + "sync/atomic" "testing" "time" @@ -101,10 +102,15 @@ func Test_Ingester_MaxChunkSizeBytes(t *testing.T) { func Test_Ingester_MaxFlushRetry(t *testing.T) { - var chunk *Chunk + var ( + mu sync.Mutex + chunk *Chunk + ) storage := &testMockStorage{ flush: func(c *Chunk) error { + mu.Lock() + defer mu.Unlock() chunk = c return errors.New("test") }, @@ -122,25 +128,18 @@ func Test_Ingester_MaxFlushRetry(t *testing.T) { ingester.Ingest(time.Now(), 0, []byte(`{"msg":"test"}`)) waitMax(5*time.Second, func() bool { - return (chunk != nil && chunk.retries > 1) + mu.Lock() + defer mu.Unlock() + return (chunk != nil && atomic.LoadInt32(&chunk.retries) > 1) }) - assert.Equal(t, chunk.retries, 2) + assert.Equal(t, int32(2), atomic.LoadInt32(&chunk.retries)) } func Test_Ingester_MaxDirtyChunks(t *testing.T) { - var ( - lastChunk *Chunk - chunks []*Chunk - ) - storage := &testMockStorage{ flush: func(c *Chunk) error { - lastChunk = c - if c != lastChunk { - chunks = append(chunks, c) - } return errors.New("test") }, } @@ -162,15 +161,13 @@ func Test_Ingester_MaxDirtyChunks(t *testing.T) { } waitMax(5*time.Second, func() bool { - return ingester.flushChunk.id == 10 + return atomic.LoadInt32(&ingester.flushChunkId) == 10 }) ingester.Close() - lastChunk = ingester.flushChunk - // lastChunk.id = 10 = (numEntries/ChunkSize) - assert.Equal(t, int32(10), lastChunk.id) + assert.Equal(t, int32(10), atomic.LoadInt32(&ingester.flushChunkId)) } func Test_Ingester_Close(t *testing.T) { @@ -251,7 +248,7 @@ func Test_Ingester_Close_MaxFlushRetry(t *testing.T) { assert.NoError(t, err) assert.True(t, closed, "Storage.Close not called") - assert.Equal(t, chunk.retries, 2) + assert.Equal(t, int32(2), atomic.LoadInt32(&chunk.retries)) } func waitMax(max time.Duration, condition func() bool) {