diff --git a/pkg/backends/alb/pool/pool.go b/pkg/backends/alb/pool/pool.go index 0713ac4b2..e19462de5 100644 --- a/pkg/backends/alb/pool/pool.go +++ b/pkg/backends/alb/pool/pool.go @@ -21,6 +21,7 @@ import ( "context" "net/http" "sync" + "sync/atomic" "github.com/trickstercache/trickster/v2/pkg/backends/healthcheck" ) @@ -76,7 +77,7 @@ type pool struct { targets []*Target healthy []http.Handler healthyFloor int - pos uint64 + pos atomic.Uint64 mtx sync.RWMutex ctx context.Context ch chan bool diff --git a/pkg/backends/alb/pool/round_robin.go b/pkg/backends/alb/pool/round_robin.go index d074050df..1ae09e45f 100644 --- a/pkg/backends/alb/pool/round_robin.go +++ b/pkg/backends/alb/pool/round_robin.go @@ -18,7 +18,6 @@ package pool import ( "net/http" - "sync/atomic" ) func nextRoundRobin(p *pool) []http.Handler { @@ -28,6 +27,6 @@ func nextRoundRobin(p *pool) []http.Handler { if len(t) == 0 { return nil } - i := atomic.AddUint64(&p.pos, 1) % uint64(len(t)) + i := p.pos.Add(1) % uint64(len(t)) return []http.Handler{t[i]} } diff --git a/pkg/backends/healthcheck/status.go b/pkg/backends/healthcheck/status.go index f15d90694..0f0f88d5c 100644 --- a/pkg/backends/healthcheck/status.go +++ b/pkg/backends/healthcheck/status.go @@ -32,7 +32,7 @@ import ( type Status struct { name string description string - status int32 + status atomic.Int32 detail string failingSince time.Time subscribers []chan bool @@ -45,11 +45,11 @@ type StatusLookup map[string]*Status func (s *Status) String() string { sb := strings.Builder{} - sb.WriteString(fmt.Sprintf("target: %s\nstatus: %d\n", s.name, s.status)) - if s.status < 1 { + sb.WriteString(fmt.Sprintf("target: %s\nstatus: %d\n", s.name, s.status.Load())) + if s.status.Load() < 1 { sb.WriteString(fmt.Sprintf("detail: %s\n", s.detail)) } - if s.status < 0 { + if s.status.Load() < 0 { sb.WriteString(fmt.Sprintf("since: %d", s.failingSince.Unix())) } return sb.String() @@ -58,8 +58,8 @@ func (s *Status) String() string { // Headers returns a header set indicating the Status func (s *Status) Headers() http.Header { h := http.Header{} - h.Set(headers.NameTrkHCStatus, strconv.Itoa(int(s.status))) - if s.status < 1 { + h.Set(headers.NameTrkHCStatus, strconv.Itoa(int(s.status.Load()))) + if s.status.Load() < 1 { h.Set(headers.NameTrkHCDetail, s.detail) } return h @@ -67,7 +67,7 @@ func (s *Status) Headers() http.Header { // Set updates the status func (s *Status) Set(i int32) { - atomic.StoreInt32(&s.status, i) + s.status.Store(i) for _, ch := range s.subscribers { ch <- i == i } @@ -80,7 +80,7 @@ func (s *Status) Prober() func(http.ResponseWriter) { // Get provides the current status func (s *Status) Get() int { - return int(atomic.LoadInt32(&s.status)) + return int(s.status.Load()) } // Detail provides the current detail diff --git a/pkg/backends/healthcheck/status_test.go b/pkg/backends/healthcheck/status_test.go index 41a8543fa..ca43ce244 100644 --- a/pkg/backends/healthcheck/status_test.go +++ b/pkg/backends/healthcheck/status_test.go @@ -28,10 +28,10 @@ func TestString(t *testing.T) { status := &Status{ name: "test", description: "test-description", - status: -1, detail: "status-detail", failingSince: tm, } + status.status.Store(-1) const expected = "target: test\nstatus: -1\ndetail: status-detail\nsince: 0" s := status.String() if s != expected { @@ -68,7 +68,8 @@ func TestProber(t *testing.T) { func TestGet(t *testing.T) { - status := &Status{status: 8480} + status := &Status{} + status.status.Store(8480) if status.Get() != 8480 { t.Error("expected 8480 got", status.Get()) } diff --git a/pkg/backends/healthcheck/target.go b/pkg/backends/healthcheck/target.go index 0a6d12fbf..bbf722487 100644 --- a/pkg/backends/healthcheck/target.go +++ b/pkg/backends/healthcheck/target.go @@ -44,8 +44,8 @@ type target struct { status *Status failureThreshold int recoveryThreshold int - failConsecutiveCnt int32 - successConsecutiveCnt int32 + failConsecutiveCnt atomic.Int32 + successConsecutiveCnt atomic.Int32 ks int // used internally and is not thread safe, do not expose ctx context.Context cancel context.CancelFunc @@ -210,15 +210,15 @@ func (t *target) probe() { var passed bool if err != nil || resp == nil { t.status.detail = fmt.Sprintf("error probing target: %v", err) - errCnt = int(atomic.AddInt32(&t.failConsecutiveCnt, 1)) - atomic.StoreInt32(&t.successConsecutiveCnt, 0) + errCnt = int(t.failConsecutiveCnt.Add(1)) + t.successConsecutiveCnt.Store(0) } else if !t.isGoodCode(resp.StatusCode) || !t.isGoodHeader(resp.Header) || !t.isGoodBody(resp.Body) { - errCnt = int(atomic.AddInt32(&t.failConsecutiveCnt, 1)) - atomic.StoreInt32(&t.successConsecutiveCnt, 0) + errCnt = int(t.failConsecutiveCnt.Add(1)) + t.successConsecutiveCnt.Store(0) } else { resp.Body.Close() - successCnt = int(atomic.AddInt32(&t.successConsecutiveCnt, 1)) - atomic.StoreInt32(&t.failConsecutiveCnt, 0) + successCnt = int(t.successConsecutiveCnt.Add(1)) + t.failConsecutiveCnt.Store(0) passed = true } if !passed && t.ks != -1 && (errCnt == t.failureThreshold || t.ks == 0) { @@ -244,7 +244,7 @@ func (t *target) demandProbe(w http.ResponseWriter) { resp, err := t.httpClient.Do(r) h := w.Header() if err != nil { - if t.status != nil && t.status.status != 0 { + if t.status != nil && t.status.Get() != 0 { sh := t.status.Headers() for k := range sh { h.Set(k, sh.Get(k)) @@ -257,7 +257,7 @@ func (t *target) demandProbe(w http.ResponseWriter) { for k := range resp.Header { h.Set(k, resp.Header.Get(k)) } - if t.status != nil && t.status.status != 0 { + if t.status != nil && t.status.Get() != 0 { sh := t.status.Headers() for k := range sh { h.Set(k, sh.Get(k)) diff --git a/pkg/backends/healthcheck/target_test.go b/pkg/backends/healthcheck/target_test.go index c373fb407..12e9cadba 100644 --- a/pkg/backends/healthcheck/target_test.go +++ b/pkg/backends/healthcheck/target_test.go @@ -226,15 +226,15 @@ func TestProbe(t *testing.T) { ec: []int{200}, } target.probe() - if target.successConsecutiveCnt != 1 { + if target.successConsecutiveCnt.Load() != 1 { t.Error("expected 1 got ", target.successConsecutiveCnt) } target.ec[0] = 404 target.probe() - if target.successConsecutiveCnt != 0 { + if target.successConsecutiveCnt.Load() != 0 { t.Error("expected 0 got ", target.successConsecutiveCnt) } - if target.failConsecutiveCnt != 1 { + if target.failConsecutiveCnt.Load() != 1 { t.Error("expected 1 got ", target.failConsecutiveCnt) } @@ -262,7 +262,7 @@ func TestDemandProbe(t *testing.T) { // simulate a failed probe (bad response) w = httptest.NewRecorder() - target.status.status = -1 + target.status.status.Store(-1) target.demandProbe(w) if w.Code != 200 { @@ -272,7 +272,7 @@ func TestDemandProbe(t *testing.T) { // simulate a failed probe (unreachable) ts.Close() w = httptest.NewRecorder() - target.status.status = -1 + target.status.status.Store(-1) target.demandProbe(w) if w.Code != 500 { diff --git a/pkg/backends/influxdb/flux/errors_test.go b/pkg/backends/influxdb/flux/errors_test.go index d33e998d8..20d506cfa 100644 --- a/pkg/backends/influxdb/flux/errors_test.go +++ b/pkg/backends/influxdb/flux/errors_test.go @@ -1,3 +1,19 @@ +/* + * Copyright 2018 The Trickster Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package flux import "testing" diff --git a/pkg/encoding/reader/reader.go b/pkg/encoding/reader/reader.go index 012c48d1c..693a147fb 100644 --- a/pkg/encoding/reader/reader.go +++ b/pkg/encoding/reader/reader.go @@ -44,7 +44,7 @@ type nopReadCloserResetter struct { // readCloserResetter implements ReadCloserResetter type readCloserResetter struct { io.Reader - closeCnt int32 + closeCnt atomic.Int32 resetter Resetter } @@ -53,7 +53,7 @@ type readCloserResetter struct { func (rc *readCloserResetter) Close() error { // This gracefully handles when Close is called more than once and ensures // only the first caller, even in a burst, is able to proceed. - if atomic.AddInt32(&rc.closeCnt, 1) != 1 { + if rc.closeCnt.Add(1) != 1 { return nil } // if the underlying io.Reader is actually itself an io.ReadCloser, call diff --git a/pkg/locks/locks.go b/pkg/locks/locks.go index 138064614..23bf5e988 100644 --- a/pkg/locks/locks.go +++ b/pkg/locks/locks.go @@ -56,18 +56,18 @@ func newNamedLock(name string, locker *namedLocker) *namedLock { type namedLock struct { sync.RWMutex name string - queueSize int32 + queueSize atomic.Int32 locker *namedLocker subsequentWriter bool } func (nl *namedLock) release(unlockFunc func()) { - qs := atomic.AddInt32(&nl.queueSize, -1) + qs := nl.queueSize.Add(-1) if qs == 0 { nl.locker.mapLock.Lock() // recheck queue size after getting the lock since another client // might have joined since the map lock was acquired - if nl.queueSize == 0 { + if nl.queueSize.Load() == 0 { delete(nl.locker.locks, nl.name) } nl.locker.mapLock.Unlock() @@ -121,7 +121,7 @@ func (lk *namedLocker) acquire(lockName string, isWrite bool) (NamedLock, error) } lk.locks[lockName] = nl } - atomic.AddInt32(&nl.queueSize, 1) + nl.queueSize.Add(1) mapUnlockFunc() if isWrite { diff --git a/pkg/proxy/engines/deltaproxycache.go b/pkg/proxy/engines/deltaproxycache.go index 4ade6c06b..b848bdfce 100644 --- a/pkg/proxy/engines/deltaproxycache.go +++ b/pkg/proxy/engines/deltaproxycache.go @@ -565,7 +565,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade client backends.TimeseriesBackend, pr *proxyRequest, wur timeseries.UnmarshalerReaderFunc, span trace.Span) ([]timeseries.Timeseries, int64, *http.Response, error) { - var uncachedValueCount int64 + var uncachedValueCount atomic.Int64 var wg sync.WaitGroup var appendLock, respLock sync.Mutex var err error @@ -616,7 +616,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade appendLock.Unlock() return } - atomic.AddInt64(&uncachedValueCount, nts.ValueCount()) + uncachedValueCount.Add(nts.ValueCount()) nts.SetTimeRangeQuery(rsc.TimeRangeQuery) nts.SetExtents([]timeseries.Extent{*e}) appendLock.Lock() @@ -654,5 +654,5 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade }(&el[i], pr.Clone()) } wg.Wait() - return mts, uncachedValueCount, mresp, err + return mts, uncachedValueCount.Load(), mresp, err } diff --git a/pkg/proxy/engines/progressive_collapse_forwarder.go b/pkg/proxy/engines/progressive_collapse_forwarder.go index c7017060c..96b998a5e 100644 --- a/pkg/proxy/engines/progressive_collapse_forwarder.go +++ b/pkg/proxy/engines/progressive_collapse_forwarder.go @@ -46,14 +46,14 @@ type ProgressiveCollapseForwarder interface { type progressiveCollapseForwarder struct { resp *http.Response - rIndex uint64 + rIndex atomic.Uint64 dataIndex uint64 data [][]byte dataLen uint64 dataStore []byte dataStoreLen uint64 readCond *sync.Cond - serverReadDone int32 + serverReadDone atomic.Int32 clientWaitgroup *sync.WaitGroup serverWaitCond *sync.Cond } @@ -75,17 +75,17 @@ func NewPCF(resp *http.Response, contentLength int64) ProgressiveCollapseForward pcf := &progressiveCollapseForwarder{ resp: resp, - rIndex: 0, dataIndex: 0, data: refs, dataLen: uint64(len(refs)), dataStore: dataStore, dataStoreLen: uint64(contentLength), readCond: rc, - serverReadDone: 0, clientWaitgroup: &wg, serverWaitCond: sd, } + pcf.rIndex.Store(0) + pcf.serverReadDone.Store(0) return pcf } @@ -133,7 +133,7 @@ func (pcf *progressiveCollapseForwarder) AddClient(w io.Writer) error { // WaitServerComplete blocks until the object has been retrieved from the origin server // Need to get payload before can send to actual cache func (pcf *progressiveCollapseForwarder) WaitServerComplete() { - if atomic.LoadInt32(&pcf.serverReadDone) != 0 { + if pcf.serverReadDone.Load() != 0 { return } pcf.serverWaitCond.L.Lock() @@ -149,7 +149,7 @@ func (pcf *progressiveCollapseForwarder) WaitAllComplete() { // GetBody returns the underlying body of the data written into a PCF func (pcf *progressiveCollapseForwarder) GetBody() ([]byte, error) { - if atomic.LoadInt32(&pcf.serverReadDone) == 0 { + if pcf.serverReadDone.Load() == 0 { return nil, errors.ErrServerRequestNotCompleted } return pcf.dataStore[0:pcf.dataIndex], nil @@ -163,14 +163,14 @@ func (pcf *progressiveCollapseForwarder) GetResp() *http.Response { // Write writes the data in b to the ProgressiveCollapseForwarders data store, // adds a reference to that data, and increments the read index. func (pcf *progressiveCollapseForwarder) Write(b []byte) (int, error) { - n := atomic.LoadUint64(&pcf.rIndex) + n := pcf.rIndex.Load() if pcf.dataIndex+uint64(len(b)) > pcf.dataStoreLen || n > pcf.dataLen { return 0, io.ErrShortWrite } pcf.data[n] = pcf.dataStore[pcf.dataIndex : pcf.dataIndex+uint64(len(b))] copy(pcf.data[n], b) pcf.dataIndex += uint64(len(b)) - atomic.AddUint64(&pcf.rIndex, 1) + pcf.rIndex.Add(1) pcf.readCond.Broadcast() return len(b), nil } @@ -178,7 +178,7 @@ func (pcf *progressiveCollapseForwarder) Write(b []byte) (int, error) { // Close signals all things waiting on the server response body to complete. // This should be triggered by the client io.EOF func (pcf *progressiveCollapseForwarder) Close() { - atomic.AddInt32(&pcf.serverReadDone, 1) + pcf.serverReadDone.Add(1) pcf.serverWaitCond.Broadcast() pcf.readCond.Broadcast() } @@ -186,12 +186,12 @@ func (pcf *progressiveCollapseForwarder) Close() { // Read will return the given index data requested by the read is behind the PCF readindex, // else blocks and waits for the data func (pcf *progressiveCollapseForwarder) IndexRead(index uint64, b []byte) (int, error) { - i := atomic.LoadUint64(&pcf.rIndex) + i := pcf.rIndex.Load() if index >= i { // need to check completion and return io.EOF if index > pcf.dataLen { return 0, errors.ErrReadIndexTooLarge - } else if atomic.LoadInt32(&pcf.serverReadDone) != 0 { + } else if pcf.serverReadDone.Load() != 0 { return 0, io.EOF } pcf.readCond.L.Lock() diff --git a/pkg/proxy/handlers/switch.go b/pkg/proxy/handlers/switch.go index 969c40b6b..7db950396 100644 --- a/pkg/proxy/handlers/switch.go +++ b/pkg/proxy/handlers/switch.go @@ -26,7 +26,7 @@ import ( type SwitchHandler struct { router http.Handler oldRouter http.Handler - reloading int32 + reloading atomic.Int32 } // NewSwitchHandler returns a New *SwitchHandler @@ -60,13 +60,13 @@ func (s *SwitchHandler) Handler() http.Handler { } func (s *SwitchHandler) isReloading() bool { - return atomic.LoadInt32(&s.reloading) != 0 + return s.reloading.Load() != 0 } func (s *SwitchHandler) setReloading(isReloading bool) { if isReloading { - atomic.StoreInt32(&s.reloading, 1) + s.reloading.Store(1) return } - atomic.StoreInt32(&s.reloading, 0) + s.reloading.Store(0) } diff --git a/pkg/proxy/handlers/switch_test.go b/pkg/proxy/handlers/switch_test.go index aad6c63e5..86ad34221 100644 --- a/pkg/proxy/handlers/switch_test.go +++ b/pkg/proxy/handlers/switch_test.go @@ -57,7 +57,7 @@ func TestHandler(t *testing.T) { t.Error("router mismatch") } - sh.reloading = 1 + sh.reloading.Store(1) x = sh.Handler() if x != router { t.Error("router mismatch") diff --git a/pkg/testutil/readers/bad_reader.go b/pkg/testutil/readers/bad_reader.go index 2dbe0cccf..1c02c6100 100644 --- a/pkg/testutil/readers/bad_reader.go +++ b/pkg/testutil/readers/bad_reader.go @@ -1,3 +1,19 @@ +/* + * Copyright 2018 The Trickster Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package readers import "errors" diff --git a/pkg/testutil/readers/bad_reader_test.go b/pkg/testutil/readers/bad_reader_test.go index ae0063bbc..a87987b49 100644 --- a/pkg/testutil/readers/bad_reader_test.go +++ b/pkg/testutil/readers/bad_reader_test.go @@ -1,3 +1,19 @@ +/* + * Copyright 2018 The Trickster Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package readers import "testing" diff --git a/pkg/timeseries/dataset/point.go b/pkg/timeseries/dataset/point.go index 5d788c345..2554a092d 100644 --- a/pkg/timeseries/dataset/point.go +++ b/pkg/timeseries/dataset/point.go @@ -50,17 +50,18 @@ func (p *Point) Clone() Point { // Size returns the memory utilization of the Points in bytes func (p Points) Size() int64 { - var c int64 = 16 + var c atomic.Int64 + c.Store(16) var wg sync.WaitGroup for i, pt := range p { wg.Add(1) go func(s, e int64, j int) { - atomic.AddInt64(&c, s) + c.Add(s) wg.Done() }(int64(pt.Size), int64(pt.Epoch), i) } wg.Wait() - return c + return c.Load() } // Clone returns a perfect copy of the Points