Skip to content

Commit

Permalink
refactor: use compression pool to reduce GC
Browse files Browse the repository at this point in the history
Signed-off-by: kaixuan xu <triumph_9431@qq.com>
  • Loading branch information
xkx9431 committed Dec 26, 2024
1 parent 8d58ae4 commit 330bfc5
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 55 deletions.
37 changes: 20 additions & 17 deletions lib/pool/compression_cache_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,52 @@ import (
)

var (
gzipReaderPool = NewCachePool[*gzip.Reader](nil, 2*runtime.NumCPU())
gzipReaderPool = NewCachePool[*gzip.Reader](func() *gzip.Reader {
return new(gzip.Reader)
}, 2*runtime.NumCPU())

snappyReaderPool = NewCachePool[*snappy.Reader](func() *snappy.Reader {
return snappy.NewReader(bytes.NewReader(nil))
return snappy.NewReader(nil)
}, 2*runtime.NumCPU())

zstdDecoderPool = NewCachePool[*zstd.Decoder](func() *zstd.Decoder {
decoder, _ := zstd.NewReader(nil)
decoder, error := zstd.NewReader(nil)
if error != nil {
return nil
}
return decoder
}, 2*runtime.NumCPU())
)

func GetGzipReader(body []byte) (*gzip.Reader, error) {
// gzip reader not support new with nil writer
// so we need to create a new reader if pool is empty
if gzipReaderPool.AvailableOffers() == gzipReaderPool.Capacity() {
return gzip.NewReader(bytes.NewReader(body))
}
reader := gzipReaderPool.Get()
if reader == nil {
gzipReader := gzipReaderPool.Get()
if gzipReader == nil {
return nil, errors.New("failed to get gzip reader")
}
err := reader.Reset(bytes.NewReader(body))
err := gzipReader.Reset(bytes.NewReader(body))
if err != nil {
gzipReaderPool.Put(gzipReader) // Return the reader to the pool if reset fails
return nil, err
}
return reader, nil
return gzipReader, nil
}

func PutGzipReader(reader *gzip.Reader) {
reader.Close()
gzipReaderPool.Put(reader)
}

func GetSnappyReader(body []byte) (*snappy.Reader, error) {
reader := snappyReaderPool.Get()
if reader == nil {
snappyReader := snappyReaderPool.Get()
if snappyReader == nil {
return nil, errors.New("failed to get snappy reader")
}
reader.Reset(bytes.NewReader(body))

return reader, nil
snappyReader.Reset(bytes.NewReader(body))
return snappyReader, nil
}

func PutSnappyReader(reader *snappy.Reader) {
reader.Reset(nil)
snappyReaderPool.Put(reader)
}

Expand All @@ -78,8 +79,10 @@ func GetZstdDecoder(body []byte) (*zstd.Decoder, error) {
if decoder == nil {
return nil, errors.New("failed to get zstd decoder")
}

err := decoder.Reset(bytes.NewReader(body))
if err != nil {
zstdDecoderPool.Put(decoder) // Return the decoder to the pool if reset fails
return nil, err
}
return decoder, nil
Expand Down
24 changes: 5 additions & 19 deletions lib/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ func NewCachePool[T any](newFunc func() T, maxSize int) *CachePool[T] {
return &CachePool[T]{
pool: sync.Pool{
New: func() interface{} {
if newFunc != nil {
return newFunc()
}
return nil
return newFunc()
},
},
capacityChan: make(chan struct{}, maxSize),
Expand All @@ -41,31 +38,20 @@ func NewCachePool[T any](newFunc func() T, maxSize int) *CachePool[T] {

func (c *CachePool[T]) Get() T {
select {
case c.capacityChan <- struct{}{}:
case <-c.capacityChan:
item := c.pool.Get()
if item == nil && c.newFunc != nil {
return c.newFunc()
}

return item.(T)
default:
var zero T
return zero
return c.newFunc()
}
}

func (c *CachePool[T]) Put(x T) {
select {
case <-c.capacityChan:
case c.capacityChan <- struct{}{}:
c.pool.Put(x)
default:
// Pool is full, discard the item
}
}

func (c *CachePool[T]) AvailableOffers() int {
return cap(c.capacityChan) - len(c.capacityChan)
}

func (c *CachePool[T]) Capacity() int {
return cap(c.capacityChan)
}
43 changes: 24 additions & 19 deletions lib/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,34 @@ func TestCachePool(t *testing.T) {
t.Errorf("expected the same item, got different items")
}

if pool.AvailableOffers() != 1 {
t.Errorf("The expected remaining capacity of the pool is 1, got %d", pool.AvailableOffers())
}
pool.Put(item2)
}

item3 := pool.Get().(*struct{})
if item3 == nil {
t.Errorf("expected non-nil item, got nil")
}
func TestPoolDiscardWhenFull(t *testing.T) {
// Create a pool with a capacity of 1
pool := NewCachePool(func() interface{} {
return 1
}, 1)

item4 := pool.Get().(*struct{})
if item4 == nil {
t.Errorf("expected non-nil item, got nil")
}
// Get an item from the pool
item1 := pool.Get().(int)

if pool.AvailableOffers() != 0 {
t.Errorf("The expected remaining capacity of the pool is 0, got %d", pool.AvailableOffers())
}
// Put the item back into the pool
pool.Put(item1)

// Try to put another item into the pool, which should be discarded
item2 := 2
pool.Put(item2)

pool.Put(item3)
pool.Put(item4)
// Get an item from the pool
item3 := pool.Get().(int)

// Ensure the item is the same as the first one, meaning the second item was discarded
if item1 != item3 {
t.Errorf("expected the same item, got different items")
}

if pool.AvailableOffers() != 2 {
t.Errorf("The expected remaining capacity of the pool is 2, got %d", pool.AvailableOffers())
// Ensure the discarded item is not the same as the one in the pool
if item2 == item3 {
t.Errorf("expected different items, got the same item")
}
}

0 comments on commit 330bfc5

Please sign in to comment.