Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtorc: use golang.org/x/sync/semaphore, add flag for db concurrency #17837

Merged
merged 2 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Flags:
--audit-purge-duration duration Duration for which audit logs are held before being purged. Should be in multiples of days (default 168h0m0s)
--audit-to-backend Whether to store the audit log in the VTOrc database
--audit-to-syslog Whether to store the audit log in the syslog
--backend-read-concurrency int Maximum concurrency for reads to the backend (default 32)
--backend-write-concurrency int Maximum concurrency for writes to the backend (default 24)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtorc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ var (
},
)

backendReadConcurrency = viperutil.Configure(
"backend-read-concurrency",
viperutil.Options[int64]{
FlagName: "backend-read-concurrency",
Default: 32,
Dynamic: false,
},
)

backendWriteConcurrency = viperutil.Configure(
"backend-write-concurrency",
viperutil.Options[int64]{
FlagName: "backend-write-concurrency",
Default: 24,
Dynamic: false,
},
)

waitReplicasTimeout = viperutil.Configure(
"wait-replicas-timeout",
viperutil.Options[time.Duration]{
Expand Down Expand Up @@ -199,6 +217,8 @@ func registerFlags(fs *pflag.FlagSet) {
fs.Bool("audit-to-backend", auditToBackend.Default(), "Whether to store the audit log in the VTOrc database")
fs.Bool("audit-to-syslog", auditToSyslog.Default(), "Whether to store the audit log in the syslog")
fs.Duration("audit-purge-duration", auditPurgeDuration.Default(), "Duration for which audit logs are held before being purged. Should be in multiples of days")
fs.Int64("backend-read-concurrency", backendReadConcurrency.Default(), "Maximum concurrency for reads to the backend")
fs.Int64("backend-write-concurrency", backendWriteConcurrency.Default(), "Maximum concurrency for writes to the backend")
fs.Bool("prevent-cross-cell-failover", preventCrossCellFailover.Default(), "Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover")
fs.Duration("wait-replicas-timeout", waitReplicasTimeout.Default(), "Duration for which to wait for replica's to respond when issuing RPCs")
fs.Duration("tolerable-replication-lag", tolerableReplicationLag.Default(), "Amount of replication lag that is considered acceptable for a tablet to be eligible for promotion when Vitess makes the choice of a new primary in PRS")
Expand All @@ -218,6 +238,8 @@ func registerFlags(fs *pflag.FlagSet) {
auditToBackend,
auditToSyslog,
auditPurgeDuration,
backendReadConcurrency,
backendWriteConcurrency,
waitReplicasTimeout,
tolerableReplicationLag,
topoInformationRefreshDuration,
Expand Down Expand Up @@ -303,6 +325,16 @@ func SetAuditPurgeDays(days int64) {
auditPurgeDuration.Set(time.Duration(days) * 24 * time.Hour)
}

// GetBackendReadConcurrency returns the max backend read concurrency.
func GetBackendReadConcurrency() int64 {
return backendReadConcurrency.Get()
}

// GetBackendWriteConcurrency returns the max backend write concurrency.
func GetBackendWriteConcurrency() int64 {
return backendWriteConcurrency.Get()
}

// GetWaitReplicasTimeout is a getter function.
func GetWaitReplicasTimeout() time.Duration {
return waitReplicasTimeout.Get()
Expand Down
33 changes: 22 additions & 11 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package inst

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -30,6 +31,7 @@ import (

"github.com/patrickmn/go-cache"
"github.com/sjmudd/stopwatch"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"
Expand All @@ -47,13 +49,11 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
backendDBConcurrency = 20
)
const maxBackendOpTime = time.Second * 5

var (
instanceReadChan = make(chan bool, backendDBConcurrency)
instanceWriteChan = make(chan bool, backendDBConcurrency)
instanceReadSem = semaphore.NewWeighted(config.GetBackendReadConcurrency())
instanceWriteSem = semaphore.NewWeighted(config.GetBackendWriteConcurrency())
)

var forgetAliases *cache.Cache
Expand Down Expand Up @@ -88,7 +88,12 @@ func initializeInstanceDao() {
func ExecDBWriteFunc(f func() error) error {
m := query.NewMetric()

instanceWriteChan <- true
ctx, cancel := context.WithTimeout(context.Background(), maxBackendOpTime)
defer cancel()

if err := instanceWriteSem.Acquire(ctx, 1); err != nil {
return err
Comment on lines +91 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't have a test before, but maybe its worth adding one to check that these concurrency controls are respected.

Copy link
Contributor Author

@timvaillancourt timvaillancourt Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GuptaManan100 sounds good 👍

I've been optimizing code as I come across it lately and only now I see ExecDBWriteFunc is used only in a handful of write operations and instanceReadChan/instanceReadSem is never used

So some more cleanup is needed. Do we want all reads/writes to respect a limit? And considering a majority of reads/writes in VTOrc aren't respecting a limit, do we need one? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine too, but if we want to make all the read and write calls acqurie this lock too, then that would technically be more correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GuptaManan100 I'm torn, I suspect this semaphore made more sense when the backend was remote and not in-memory

For sqlite3 there is a single writer so I think it already will constrain concurrency, so potentially only a read limit will be useful 🤔

I merged this PR too soon when I saw it had 2 x reviews, but I can follow up with whatever we decide

}
m.WaitLatency = time.Since(m.Timestamp)

// catch the exec time and error if there is one
Expand All @@ -106,7 +111,7 @@ func ExecDBWriteFunc(f func() error) error {
}
m.ExecuteLatency = time.Since(m.Timestamp.Add(m.WaitLatency))
_ = backendWrites.Append(m)
<-instanceWriteChan // assume this takes no time
instanceWriteSem.Release(1)
}()
res := f()
return res
Expand Down Expand Up @@ -646,10 +651,16 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In
}
return instances, err
}
instanceReadChan <- true
instances, err := readFunc()
<-instanceReadChan
return instances, err

ctx, cancel := context.WithTimeout(context.Background(), maxBackendOpTime)
defer cancel()

if err := instanceReadSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer instanceReadSem.Release(1)

return readFunc()
}

// ReadInstance reads an instance from the vtorc backend database
Expand Down
Loading