Skip to content

Commit

Permalink
Atomic fix (#669)
Browse files Browse the repository at this point in the history
* Replace atomic call in roundrobin
* Healthcheck atomic fix, plus license headers
* pcf, switch, point atomic fixed
* Fixed string test
* Fix static int assignment to atomic in test
* Respected the Gorilla Authors

---------

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>
  • Loading branch information
jnichols-git authored May 17, 2023
1 parent a13a322 commit bb292d1
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 56 deletions.
3 changes: 2 additions & 1 deletion pkg/backends/alb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"net/http"
"sync"
"sync/atomic"

"github.com/trickstercache/trickster/v2/pkg/backends/healthcheck"
)
Expand Down Expand Up @@ -76,7 +77,7 @@ type pool struct {
targets []*Target
healthy []http.Handler
healthyFloor int
pos uint64
pos atomic.Uint64
mtx sync.RWMutex
ctx context.Context
ch chan bool
Expand Down
3 changes: 1 addition & 2 deletions pkg/backends/alb/pool/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package pool

import (
"net/http"
"sync/atomic"
)

func nextRoundRobin(p *pool) []http.Handler {
Expand All @@ -28,6 +27,6 @@ func nextRoundRobin(p *pool) []http.Handler {
if len(t) == 0 {
return nil
}
i := atomic.AddUint64(&p.pos, 1) % uint64(len(t))
i := p.pos.Add(1) % uint64(len(t))
return []http.Handler{t[i]}
}
16 changes: 8 additions & 8 deletions pkg/backends/healthcheck/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
type Status struct {
name string
description string
status int32
status atomic.Int32
detail string
failingSince time.Time
subscribers []chan bool
Expand All @@ -45,11 +45,11 @@ type StatusLookup map[string]*Status

func (s *Status) String() string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("target: %s\nstatus: %d\n", s.name, s.status))
if s.status < 1 {
sb.WriteString(fmt.Sprintf("target: %s\nstatus: %d\n", s.name, s.status.Load()))
if s.status.Load() < 1 {
sb.WriteString(fmt.Sprintf("detail: %s\n", s.detail))
}
if s.status < 0 {
if s.status.Load() < 0 {
sb.WriteString(fmt.Sprintf("since: %d", s.failingSince.Unix()))
}
return sb.String()
Expand All @@ -58,16 +58,16 @@ func (s *Status) String() string {
// Headers returns a header set indicating the Status
func (s *Status) Headers() http.Header {
h := http.Header{}
h.Set(headers.NameTrkHCStatus, strconv.Itoa(int(s.status)))
if s.status < 1 {
h.Set(headers.NameTrkHCStatus, strconv.Itoa(int(s.status.Load())))
if s.status.Load() < 1 {
h.Set(headers.NameTrkHCDetail, s.detail)
}
return h
}

// Set updates the status
func (s *Status) Set(i int32) {
atomic.StoreInt32(&s.status, i)
s.status.Store(i)
for _, ch := range s.subscribers {
ch <- i == i
}
Expand All @@ -80,7 +80,7 @@ func (s *Status) Prober() func(http.ResponseWriter) {

// Get provides the current status
func (s *Status) Get() int {
return int(atomic.LoadInt32(&s.status))
return int(s.status.Load())
}

// Detail provides the current detail
Expand Down
5 changes: 3 additions & 2 deletions pkg/backends/healthcheck/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func TestString(t *testing.T) {
status := &Status{
name: "test",
description: "test-description",
status: -1,
detail: "status-detail",
failingSince: tm,
}
status.status.Store(-1)
const expected = "target: test\nstatus: -1\ndetail: status-detail\nsince: 0"
s := status.String()
if s != expected {
Expand Down Expand Up @@ -68,7 +68,8 @@ func TestProber(t *testing.T) {

func TestGet(t *testing.T) {

status := &Status{status: 8480}
status := &Status{}
status.status.Store(8480)
if status.Get() != 8480 {
t.Error("expected 8480 got", status.Get())
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/backends/healthcheck/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type target struct {
status *Status
failureThreshold int
recoveryThreshold int
failConsecutiveCnt int32
successConsecutiveCnt int32
failConsecutiveCnt atomic.Int32
successConsecutiveCnt atomic.Int32
ks int // used internally and is not thread safe, do not expose
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -210,15 +210,15 @@ func (t *target) probe() {
var passed bool
if err != nil || resp == nil {
t.status.detail = fmt.Sprintf("error probing target: %v", err)
errCnt = int(atomic.AddInt32(&t.failConsecutiveCnt, 1))
atomic.StoreInt32(&t.successConsecutiveCnt, 0)
errCnt = int(t.failConsecutiveCnt.Add(1))
t.successConsecutiveCnt.Store(0)
} else if !t.isGoodCode(resp.StatusCode) || !t.isGoodHeader(resp.Header) || !t.isGoodBody(resp.Body) {
errCnt = int(atomic.AddInt32(&t.failConsecutiveCnt, 1))
atomic.StoreInt32(&t.successConsecutiveCnt, 0)
errCnt = int(t.failConsecutiveCnt.Add(1))
t.successConsecutiveCnt.Store(0)
} else {
resp.Body.Close()
successCnt = int(atomic.AddInt32(&t.successConsecutiveCnt, 1))
atomic.StoreInt32(&t.failConsecutiveCnt, 0)
successCnt = int(t.successConsecutiveCnt.Add(1))
t.failConsecutiveCnt.Store(0)
passed = true
}
if !passed && t.ks != -1 && (errCnt == t.failureThreshold || t.ks == 0) {
Expand All @@ -244,7 +244,7 @@ func (t *target) demandProbe(w http.ResponseWriter) {
resp, err := t.httpClient.Do(r)
h := w.Header()
if err != nil {
if t.status != nil && t.status.status != 0 {
if t.status != nil && t.status.Get() != 0 {
sh := t.status.Headers()
for k := range sh {
h.Set(k, sh.Get(k))
Expand All @@ -257,7 +257,7 @@ func (t *target) demandProbe(w http.ResponseWriter) {
for k := range resp.Header {
h.Set(k, resp.Header.Get(k))
}
if t.status != nil && t.status.status != 0 {
if t.status != nil && t.status.Get() != 0 {
sh := t.status.Headers()
for k := range sh {
h.Set(k, sh.Get(k))
Expand Down
10 changes: 5 additions & 5 deletions pkg/backends/healthcheck/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,15 @@ func TestProbe(t *testing.T) {
ec: []int{200},
}
target.probe()
if target.successConsecutiveCnt != 1 {
if target.successConsecutiveCnt.Load() != 1 {
t.Error("expected 1 got ", target.successConsecutiveCnt)
}
target.ec[0] = 404
target.probe()
if target.successConsecutiveCnt != 0 {
if target.successConsecutiveCnt.Load() != 0 {
t.Error("expected 0 got ", target.successConsecutiveCnt)
}
if target.failConsecutiveCnt != 1 {
if target.failConsecutiveCnt.Load() != 1 {
t.Error("expected 1 got ", target.failConsecutiveCnt)
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func TestDemandProbe(t *testing.T) {

// simulate a failed probe (bad response)
w = httptest.NewRecorder()
target.status.status = -1
target.status.status.Store(-1)
target.demandProbe(w)

if w.Code != 200 {
Expand All @@ -272,7 +272,7 @@ func TestDemandProbe(t *testing.T) {
// simulate a failed probe (unreachable)
ts.Close()
w = httptest.NewRecorder()
target.status.status = -1
target.status.status.Store(-1)
target.demandProbe(w)

if w.Code != 500 {
Expand Down
16 changes: 16 additions & 0 deletions pkg/backends/influxdb/flux/errors_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2018 The Trickster Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package flux

import "testing"
Expand Down
4 changes: 2 additions & 2 deletions pkg/encoding/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type nopReadCloserResetter struct {
// readCloserResetter implements ReadCloserResetter
type readCloserResetter struct {
io.Reader
closeCnt int32
closeCnt atomic.Int32
resetter Resetter
}

Expand All @@ -53,7 +53,7 @@ type readCloserResetter struct {
func (rc *readCloserResetter) Close() error {
// This gracefully handles when Close is called more than once and ensures
// only the first caller, even in a burst, is able to proceed.
if atomic.AddInt32(&rc.closeCnt, 1) != 1 {
if rc.closeCnt.Add(1) != 1 {
return nil
}
// if the underlying io.Reader is actually itself an io.ReadCloser, call
Expand Down
8 changes: 4 additions & 4 deletions pkg/locks/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ func newNamedLock(name string, locker *namedLocker) *namedLock {
type namedLock struct {
sync.RWMutex
name string
queueSize int32
queueSize atomic.Int32
locker *namedLocker
subsequentWriter bool
}

func (nl *namedLock) release(unlockFunc func()) {
qs := atomic.AddInt32(&nl.queueSize, -1)
qs := nl.queueSize.Add(-1)
if qs == 0 {
nl.locker.mapLock.Lock()
// recheck queue size after getting the lock since another client
// might have joined since the map lock was acquired
if nl.queueSize == 0 {
if nl.queueSize.Load() == 0 {
delete(nl.locker.locks, nl.name)
}
nl.locker.mapLock.Unlock()
Expand Down Expand Up @@ -121,7 +121,7 @@ func (lk *namedLocker) acquire(lockName string, isWrite bool) (NamedLock, error)
}
lk.locks[lockName] = nl
}
atomic.AddInt32(&nl.queueSize, 1)
nl.queueSize.Add(1)
mapUnlockFunc()

if isWrite {
Expand Down
6 changes: 3 additions & 3 deletions pkg/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
client backends.TimeseriesBackend, pr *proxyRequest, wur timeseries.UnmarshalerReaderFunc,
span trace.Span) ([]timeseries.Timeseries, int64, *http.Response, error) {

var uncachedValueCount int64
var uncachedValueCount atomic.Int64
var wg sync.WaitGroup
var appendLock, respLock sync.Mutex
var err error
Expand Down Expand Up @@ -616,7 +616,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
appendLock.Unlock()
return
}
atomic.AddInt64(&uncachedValueCount, nts.ValueCount())
uncachedValueCount.Add(nts.ValueCount())
nts.SetTimeRangeQuery(rsc.TimeRangeQuery)
nts.SetExtents([]timeseries.Extent{*e})
appendLock.Lock()
Expand Down Expand Up @@ -654,5 +654,5 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
}(&el[i], pr.Clone())
}
wg.Wait()
return mts, uncachedValueCount, mresp, err
return mts, uncachedValueCount.Load(), mresp, err
}
22 changes: 11 additions & 11 deletions pkg/proxy/engines/progressive_collapse_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ type ProgressiveCollapseForwarder interface {

type progressiveCollapseForwarder struct {
resp *http.Response
rIndex uint64
rIndex atomic.Uint64
dataIndex uint64
data [][]byte
dataLen uint64
dataStore []byte
dataStoreLen uint64
readCond *sync.Cond
serverReadDone int32
serverReadDone atomic.Int32
clientWaitgroup *sync.WaitGroup
serverWaitCond *sync.Cond
}
Expand All @@ -75,17 +75,17 @@ func NewPCF(resp *http.Response, contentLength int64) ProgressiveCollapseForward

pcf := &progressiveCollapseForwarder{
resp: resp,
rIndex: 0,
dataIndex: 0,
data: refs,
dataLen: uint64(len(refs)),
dataStore: dataStore,
dataStoreLen: uint64(contentLength),
readCond: rc,
serverReadDone: 0,
clientWaitgroup: &wg,
serverWaitCond: sd,
}
pcf.rIndex.Store(0)
pcf.serverReadDone.Store(0)

return pcf
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (pcf *progressiveCollapseForwarder) AddClient(w io.Writer) error {
// WaitServerComplete blocks until the object has been retrieved from the origin server
// Need to get payload before can send to actual cache
func (pcf *progressiveCollapseForwarder) WaitServerComplete() {
if atomic.LoadInt32(&pcf.serverReadDone) != 0 {
if pcf.serverReadDone.Load() != 0 {
return
}
pcf.serverWaitCond.L.Lock()
Expand All @@ -149,7 +149,7 @@ func (pcf *progressiveCollapseForwarder) WaitAllComplete() {

// GetBody returns the underlying body of the data written into a PCF
func (pcf *progressiveCollapseForwarder) GetBody() ([]byte, error) {
if atomic.LoadInt32(&pcf.serverReadDone) == 0 {
if pcf.serverReadDone.Load() == 0 {
return nil, errors.ErrServerRequestNotCompleted
}
return pcf.dataStore[0:pcf.dataIndex], nil
Expand All @@ -163,35 +163,35 @@ func (pcf *progressiveCollapseForwarder) GetResp() *http.Response {
// Write writes the data in b to the ProgressiveCollapseForwarders data store,
// adds a reference to that data, and increments the read index.
func (pcf *progressiveCollapseForwarder) Write(b []byte) (int, error) {
n := atomic.LoadUint64(&pcf.rIndex)
n := pcf.rIndex.Load()
if pcf.dataIndex+uint64(len(b)) > pcf.dataStoreLen || n > pcf.dataLen {
return 0, io.ErrShortWrite
}
pcf.data[n] = pcf.dataStore[pcf.dataIndex : pcf.dataIndex+uint64(len(b))]
copy(pcf.data[n], b)
pcf.dataIndex += uint64(len(b))
atomic.AddUint64(&pcf.rIndex, 1)
pcf.rIndex.Add(1)
pcf.readCond.Broadcast()
return len(b), nil
}

// Close signals all things waiting on the server response body to complete.
// This should be triggered by the client io.EOF
func (pcf *progressiveCollapseForwarder) Close() {
atomic.AddInt32(&pcf.serverReadDone, 1)
pcf.serverReadDone.Add(1)
pcf.serverWaitCond.Broadcast()
pcf.readCond.Broadcast()
}

// Read will return the given index data requested by the read is behind the PCF readindex,
// else blocks and waits for the data
func (pcf *progressiveCollapseForwarder) IndexRead(index uint64, b []byte) (int, error) {
i := atomic.LoadUint64(&pcf.rIndex)
i := pcf.rIndex.Load()
if index >= i {
// need to check completion and return io.EOF
if index > pcf.dataLen {
return 0, errors.ErrReadIndexTooLarge
} else if atomic.LoadInt32(&pcf.serverReadDone) != 0 {
} else if pcf.serverReadDone.Load() != 0 {
return 0, io.EOF
}
pcf.readCond.L.Lock()
Expand Down
Loading

0 comments on commit bb292d1

Please sign in to comment.