Skip to content

Commit

Permalink
fix data race
Browse files Browse the repository at this point in the history
  • Loading branch information
nidorx committed Oct 4, 2024
1 parent 2fc41eb commit 6900505
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 36 deletions.
31 changes: 17 additions & 14 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ type Entry struct {

// Chunk stores up to 900 log entries that will be persisted in the storage
type Chunk struct {
mu sync.Mutex
id int32 // The identifier of this chunk
cap int32 // Configured batch size
book int32 // Number of scheduled writes in this chunk
write int32 // Number of writes completed
size int64 // Size of content (in bytes)
epochStart int64 // First epoch
epochEnd int64 // Last epoch
retries int // Number of attempts to persist in the storage
locked atomic.Bool // Indicates if this chunk no longer accepts writes
next *Chunk // Pointer to the next chunk
entries [900]*Entry // The log entries in this chunk
mu sync.RWMutex // (next, Depth()) only
id int32 // The identifier of this chunk
cap int32 // Configured batch size
book int32 // Number of scheduled writes in this chunk
write int32 // Number of writes completed
size int64 // Size of content (in bytes)
epochStart int64 // First epoch
epochEnd int64 // Last epoch
retries int32 // Number of attempts to persist in the storage
locked atomic.Bool // Indicates if this chunk no longer accepts writes
next *Chunk // Pointer to the next chunk
entries [900]*Entry // The log entries in this chunk
}

// NewChunk creates a new chunk with the specified capacity
Expand Down Expand Up @@ -77,10 +77,13 @@ func (c *Chunk) Depth() int {
if c.Empty() {
return 0
}
if c.next == nil {
c.mu.RLock()
n := c.next
c.mu.RUnlock()
if n == nil {
return 1
}
return 1 + c.next.Depth()
return 1 + n.Depth()
}

// First retrieves the epoch of the first entry in this chunk
Expand Down
18 changes: 13 additions & 5 deletions ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type IngesterConfig struct {
MaxDirtyChunks int

// MaxFlushRetry defines the number of retry attempts to persist a chunk in case of failure (default 3).
MaxFlushRetry int
MaxFlushRetry int32

// FlushAfterSec defines how long a chunk can remain inactive before being sent to storage (default 3 seconds).
FlushAfterSec int
Expand All @@ -53,6 +53,7 @@ type Ingester interface {
type ingester struct {
flushChunk *Chunk // The chunk that will be saved to the database
writeChunk *Chunk // The chunk currently receiving log entries
flushChunkId int32 // ID of the currently active flush chunk
writeChunkId int32 // ID of the currently active write chunk
config *IngesterConfig // Configuration options for the ingester
storage Storage // The storage backend used to persist chunks
Expand Down Expand Up @@ -157,11 +158,11 @@ func (i *ingester) routineCheck() {
if chunk.Ready() {
// If the chunk is ready to be written to storage, flush it
if err := i.storage.Flush(chunk); err != nil {
chunk.retries++
retries := atomic.AddInt32(&chunk.retries, 1)
slog.Error("[sqlog] error writing chunk", slog.Any("error", err))

// If retries exceed the limit, move to the next chunk
if chunk.retries > i.config.MaxFlushRetry {
if retries > i.config.MaxFlushRetry {
chunk = chunk.Next()
chunk.Lock()
} else {
Expand All @@ -186,6 +187,7 @@ func (i *ingester) routineCheck() {
}

i.flushChunk = chunk
atomic.StoreInt32(&i.flushChunkId, chunk.id)

// Close the storage after flushing all logs
if err := i.storage.Close(); err != nil {
Expand All @@ -200,6 +202,10 @@ func (i *ingester) routineCheck() {
}
}

func (i *ingester) getFlushChunk() {

}

// doRoutineCheck handles the periodic maintenance of chunks, flushing them if they
// meet the conditions for size or age, and ensuring memory usage stays within limits.
func (i *ingester) doRoutineCheck() {
Expand All @@ -212,10 +218,10 @@ func (i *ingester) doRoutineCheck() {
// Flush the chunk if it's ready to be persisted
if chunk.Ready() {
if err := i.storage.Flush(chunk); err != nil {
chunk.retries++
retries := atomic.AddInt32(&chunk.retries, 1)
slog.Error("[sqlog] error writing chunk", slog.Any("error", err))

if chunk.retries > i.config.MaxFlushRetry {
if retries > i.config.MaxFlushRetry {
chunk.Init(i.config.Chunks + 1)
} else {
break
Expand All @@ -235,13 +241,15 @@ func (i *ingester) doRoutineCheck() {
break
}
i.flushChunk = i.flushChunk.Next()
atomic.StoreInt32(&i.flushChunkId, i.flushChunk.id)
}

// Limit memory consumption by discarding old chunks if necessary
if !i.flushChunk.Empty() && i.flushChunk.Depth() > i.config.MaxDirtyChunks {
for {
if i.flushChunk.Depth() > i.config.MaxDirtyChunks {
i.flushChunk = i.flushChunk.Next()
atomic.StoreInt32(&i.flushChunkId, i.flushChunk.id)
} else {
break
}
Expand Down
31 changes: 14 additions & 17 deletions ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqlog
import (
"errors"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -101,10 +102,15 @@ func Test_Ingester_MaxChunkSizeBytes(t *testing.T) {

func Test_Ingester_MaxFlushRetry(t *testing.T) {

var chunk *Chunk
var (
mu sync.Mutex
chunk *Chunk
)

storage := &testMockStorage{
flush: func(c *Chunk) error {
mu.Lock()
defer mu.Unlock()
chunk = c
return errors.New("test")
},
Expand All @@ -122,25 +128,18 @@ func Test_Ingester_MaxFlushRetry(t *testing.T) {
ingester.Ingest(time.Now(), 0, []byte(`{"msg":"test"}`))

waitMax(5*time.Second, func() bool {
return (chunk != nil && chunk.retries > 1)
mu.Lock()
defer mu.Unlock()
return (chunk != nil && atomic.LoadInt32(&chunk.retries) > 1)
})

assert.Equal(t, chunk.retries, 2)
assert.Equal(t, int32(2), atomic.LoadInt32(&chunk.retries))
}

func Test_Ingester_MaxDirtyChunks(t *testing.T) {

var (
lastChunk *Chunk
chunks []*Chunk
)

storage := &testMockStorage{
flush: func(c *Chunk) error {
lastChunk = c
if c != lastChunk {
chunks = append(chunks, c)
}
return errors.New("test")
},
}
Expand All @@ -162,15 +161,13 @@ func Test_Ingester_MaxDirtyChunks(t *testing.T) {
}

waitMax(5*time.Second, func() bool {
return ingester.flushChunk.id == 10
return atomic.LoadInt32(&ingester.flushChunkId) == 10
})

ingester.Close()

lastChunk = ingester.flushChunk

// lastChunk.id = 10 = (numEntries/ChunkSize)
assert.Equal(t, int32(10), lastChunk.id)
assert.Equal(t, int32(10), atomic.LoadInt32(&ingester.flushChunkId))
}

func Test_Ingester_Close(t *testing.T) {
Expand Down Expand Up @@ -251,7 +248,7 @@ func Test_Ingester_Close_MaxFlushRetry(t *testing.T) {
assert.NoError(t, err)

assert.True(t, closed, "Storage.Close not called")
assert.Equal(t, chunk.retries, 2)
assert.Equal(t, int32(2), atomic.LoadInt32(&chunk.retries))
}

func waitMax(max time.Duration, condition func() bool) {
Expand Down

0 comments on commit 6900505

Please sign in to comment.