Skip to content

Commit

Permalink
feat: added methods for using different part of planning based on cac…
Browse files Browse the repository at this point in the history
…hed and uncached plan, moved prepare path to it

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Feb 24, 2025
1 parent 62636f9 commit 476e038
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 64 deletions.
17 changes: 10 additions & 7 deletions go/vt/sqlparser/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type (
// RewriteASTResult holds the result of rewriting the AST, including bind variable needs.
RewriteASTResult struct {
*BindVarNeeds
AST Statement // The rewritten AST
UpdateQueryFromAST bool
AST Statement // The rewritten AST
UpdateQueryFromAST bool
ForeignKeyCheckState *bool
}
// VSchemaViews provides access to view definitions within the VSchema.
VSchemaViews interface {
Expand Down Expand Up @@ -230,19 +231,21 @@ func (nz *normalizer) noteAliasedExprName(node *AliasedExpr) {
// It finalizes normalization logic based on node types.
func (nz *normalizer) walkUp(cursor *Cursor) bool {
// Add SET_VAR comments if applicable.
if supportOptimizerHint, supports := cursor.Node().(SupportOptimizerHint); supports {
if stmt, supports := cursor.Node().(SupportOptimizerHint); supports {
if nz.setVarComment != "" {
newComments, err := supportOptimizerHint.GetParsedComments().AddQueryHint(nz.setVarComment)
newComments, err := stmt.GetParsedComments().AddQueryHint(nz.setVarComment)
if err != nil {
nz.err = err
return false
}
supportOptimizerHint.SetComments(newComments)
stmt.SetComments(newComments)
nz.useASTQuery = true
}

// use foreign key checks of normalizer and set the query hint in the query.
if nz.fkChecksState != nil {
newComments := supportOptimizerHint.GetParsedComments().SetMySQLSetVarValue(sysvars.ForeignKeyChecks, FkChecksStateString(nz.fkChecksState))
supportOptimizerHint.SetComments(newComments)
newComments := stmt.GetParsedComments().SetMySQLSetVarValue(sysvars.ForeignKeyChecks, FkChecksStateString(nz.fkChecksState))
stmt.SetComments(newComments)
nz.useASTQuery = true
}
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type Plan struct {
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query
QueryHints sqlparser.QueryHints
QueryHints sqlparser.QueryHints // QueryHints are the SET_VAR hints that were used to generate this plan
ParamsCount int // ParameterCount is the number of parameters in the query

ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
Expand Down
183 changes: 144 additions & 39 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,127 @@ func (e *Executor) ParseDestinationTarget(targetString string) (string, topodata
return econtext.ParseDestinationTarget(targetString, defaultTabletType, e.VSchema())
}

func (e *Executor) fetchOrCreatePlan(
ctx context.Context,
safeSession *econtext.SafeSession,
sql string,
bindVars map[string]*querypb.BindVariable,
parameterize bool,
preparedPlan bool,
logStats *logstats.LogStats,
) (
plan *engine.Plan, vcursor *econtext.VCursorImpl, err error) {
if e.VSchema() == nil {
return nil, nil, vterrors.VT13001("vschema not initialized")
}

query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ = econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, nullResultsObserver{}, e.vConfig)

var setVarComment string
if e.vConfig.SetVarEnabled {
setVarComment = vcursor.PrepareSetVarComment()
}

if preparedPlan {
planKey := buildPlanKey(ctx, vcursor, query, setVarComment)
plan, logStats.CachedPlan = e.plans.Get(planKey.Hash(), e.epoch.Load())
}

if plan == nil {
plan, logStats.CachedPlan, err = e.getCachedOrBuildPlan(ctx, vcursor, query, bindVars, setVarComment, parameterize, preparedPlan)
if err != nil {
return nil, nil, err
}
}

qh := plan.QueryHints
vcursor.SetIgnoreMaxMemoryRows(qh.IgnoreMaxMemoryRows)
vcursor.SetConsolidator(qh.Consolidator)
vcursor.SetWorkloadName(qh.Workload)
vcursor.SetPriority(qh.Priority)
vcursor.SetExecQueryTimeout(qh.Timeout)

logStats.SQL = comments.Leading + plan.Original + comments.Trailing
logStats.BindVariables = sqltypes.CopyBindVariables(bindVars)

return plan, vcursor, nil
}

func (e *Executor) getCachedOrBuildPlan(
ctx context.Context,
vcursor *econtext.VCursorImpl,
query string,
bindVars map[string]*querypb.BindVariable,
setVarComment string,
parameterize bool,
preparedPlan bool,
) (plan *engine.Plan, cached bool, err error) {
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
return nil, false, err
}

defer func() {
if err == nil {
vcursor.CheckForReservedConnection(setVarComment, stmt)
}
}()

qh, err := sqlparser.BuildQueryHints(stmt)
if err != nil {
return nil, false, err
}

if qh.ForeignKeyChecks == nil {
qh.ForeignKeyChecks = vcursor.SafeSession.ForeignKeyChecks()
}
vcursor.SetForeignKeyCheckState(qh.ForeignKeyChecks)

paramsCount := 0
if preparedPlan {
// We need to count the number of arguments in the statement before we plan the query.
// Planning could add additional arguments to the statement.
paramsCount = countArguments(stmt)
if bindVars == nil {
bindVars = make(map[string]*querypb.BindVariable)
}
}

rewriteASTResult, err := sqlparser.Normalize(
stmt,
reservedVars,
bindVars,
parameterize,
vcursor.GetKeyspace(),
vcursor.SafeSession.GetSelectLimit(),
setVarComment,
vcursor.GetSystemVariablesCopy(),
qh.ForeignKeyChecks,
vcursor,
)
if err != nil {
return nil, false, err
}
stmt = rewriteASTResult.AST
bindVarNeeds := rewriteASTResult.BindVarNeeds
if rewriteASTResult.UpdateQueryFromAST && !preparedPlan {
query = sqlparser.String(stmt)
}

planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan()
if planCachable {
// build Plan key
planKey := buildPlanKey(ctx, vcursor, query, setVarComment)
plan, cached, err = e.plans.GetOrLoad(planKey.Hash(), e.epoch.Load(), func() (*engine.Plan, error) {
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh, paramsCount)
})
return plan, cached, err
}
plan, err = e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh, paramsCount)
return plan, false, err
}

// getPlan computes the plan for the given query. If one is in
// the cache, it reuses it.
func (e *Executor) getPlan(
Expand All @@ -1090,7 +1211,7 @@ func (e *Executor) getPlan(
comments sqlparser.MarginComments,
bindVars map[string]*querypb.BindVariable,
reservedVars *sqlparser.ReservedVars,
usePreparedPlan, allowParameterization bool,
allowParameterization bool,
logStats *logstats.LogStats,
) (plan *engine.Plan, err error) {
if e.VSchema() == nil {
Expand All @@ -1101,23 +1222,15 @@ func (e *Executor) getPlan(
setVarComment = vcursor.PrepareSetVarComment()
}

if usePreparedPlan {
planKey := createPlanKey(ctx, vcursor, query, setVarComment)
plan, logStats.CachedPlan = e.plans.Get(planKey.Hash(), e.epoch.Load())
}

if plan == nil {
plan, err = e.getCachedOrBuild(ctx, vcursor, query, stmt, reservedVars, bindVars, usePreparedPlan, allowParameterization, comments, logStats, setVarComment)
if err != nil {
return nil, err
}
plan, err = e.getCachedOrBuild(ctx, vcursor, query, stmt, reservedVars, bindVars, allowParameterization, comments, logStats, setVarComment)
if err != nil {
return nil, err
}

qh := plan.QueryHints
vcursor.SetIgnoreMaxMemoryRows(qh.IgnoreMaxMemoryRows)
vcursor.SetConsolidator(qh.Consolidator)
vcursor.SetWorkloadName(qh.Workload)
vcursor.UpdateForeignKeyChecksState(qh.ForeignKeyChecks)
vcursor.SetPriority(qh.Priority)
vcursor.SetExecQueryTimeout(qh.Timeout)

Expand All @@ -1141,7 +1254,6 @@ func (e *Executor) getCachedOrBuild(
stmt sqlparser.Statement,
reservedVars *sqlparser.ReservedVars,
bindVars map[string]*querypb.BindVariable,
useOriginalQuery bool,
allowParameterization bool,
comments sqlparser.MarginComments,
logStats *logstats.LogStats,
Expand All @@ -1152,7 +1264,11 @@ func (e *Executor) getCachedOrBuild(
if err != nil {
return nil, err
}
vcursor.UpdateForeignKeyChecksState(qh.ForeignKeyChecks)

if qh.ForeignKeyChecks == nil {
qh.ForeignKeyChecks = vcursor.SafeSession.ForeignKeyChecks()
}
vcursor.SetForeignKeyCheckState(qh.ForeignKeyChecks)

rewriteASTResult, err := sqlparser.Normalize(
stmt,
Expand All @@ -1163,15 +1279,15 @@ func (e *Executor) getCachedOrBuild(
vcursor.SafeSession.GetSelectLimit(),
setVarComment,
vcursor.GetSystemVariablesCopy(),
vcursor.GetForeignKeyChecksState(),
qh.ForeignKeyChecks,
vcursor,
)
if err != nil {
return nil, err
}
stmt = rewriteASTResult.AST
bindVarNeeds := rewriteASTResult.BindVarNeeds
if rewriteASTResult.UpdateQueryFromAST && !useOriginalQuery {
if rewriteASTResult.UpdateQueryFromAST {
query = sqlparser.String(stmt)
}

Expand All @@ -1181,19 +1297,19 @@ func (e *Executor) getCachedOrBuild(
planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan()
if planCachable {
// build Plan key
planKey := createPlanKey(ctx, vcursor, query, setVarComment)
planKey := buildPlanKey(ctx, vcursor, query, setVarComment)

var plan *engine.Plan
var err error
plan, logStats.CachedPlan, err = e.plans.GetOrLoad(planKey.Hash(), e.epoch.Load(), func() (*engine.Plan, error) {
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh)
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh, -1)
})
return plan, err
}
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh)
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh, -1)
}

func createPlanKey(ctx context.Context, vcursor *econtext.VCursorImpl, query string, setVarComment string) engine.PlanKey {
func buildPlanKey(ctx context.Context, vcursor *econtext.VCursorImpl, query string, setVarComment string) engine.PlanKey {
allDest := getDestinations(ctx, vcursor)

return engine.PlanKey{
Expand Down Expand Up @@ -1240,12 +1356,14 @@ func (e *Executor) buildStatement(
reservedVars *sqlparser.ReservedVars,
bindVarNeeds *sqlparser.BindVarNeeds,
qh sqlparser.QueryHints,
paramsCount int,
) (*engine.Plan, error) {
plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, e.ddlConfig)
if err != nil {
return nil, err
}

plan.ParamsCount = paramsCount
plan.Warnings = vcursor.GetAndEmptyWarnings()
plan.QueryHints = qh

Expand Down Expand Up @@ -1448,7 +1566,7 @@ func (e *Executor) initVConfig(warnOnShardedOnly bool, pv plancontext.PlannerVer
}
}

func countArguments(statement sqlparser.Statement) (paramsCount uint16) {
func countArguments(statement sqlparser.Statement) (paramsCount int) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
switch node := node.(type) {
case *sqlparser.Argument:
Expand All @@ -1461,7 +1579,7 @@ func countArguments(statement sqlparser.Statement) (paramsCount uint16) {
return
}

func prepareBindVars(paramsCount uint16) map[string]*querypb.BindVariable {
func prepareBindVars(paramsCount int) map[string]*querypb.BindVariable {
bindVars := make(map[string]*querypb.BindVariable, paramsCount)
for i := range paramsCount {
parameterID := fmt.Sprintf("v%d", i+1)
Expand All @@ -1471,21 +1589,7 @@ func prepareBindVars(paramsCount uint16) map[string]*querypb.BindVariable {
}

func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.SafeSession, sql string, logStats *logstats.LogStats) ([]*querypb.Field, uint16, error) {
query, comments := sqlparser.SplitMarginComments(sql)

vcursor, _ := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, nullResultsObserver{}, e.vConfig)

stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
return nil, 0, err
}

// We need to count the number of arguments in the statement before we plan the query.
// Planning could add additional arguments to the statement.
paramsCount := countArguments(stmt)
bindVars := prepareBindVars(paramsCount)

plan, err := e.getPlan(ctx, vcursor, sql, stmt, comments, bindVars, reservedVars /* usePreparedPlan */, true /* allowParameterization */, false, logStats)
plan, vcursor, err := e.fetchOrCreatePlan(ctx, safeSession, sql, nil, false, true, logStats)
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)

Expand All @@ -1494,6 +1598,7 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.Safe
return nil, 0, err
}

bindVars := prepareBindVars(plan.ParamsCount)
err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession)
if err != nil {
logStats.Error = err
Expand All @@ -1512,7 +1617,7 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.Safe

plan.AddStats(1, time.Since(logStats.StartTime), logStats.ShardQueries, qr.RowsAffected, uint64(len(qr.Rows)), errCount)

return qr.Fields, paramsCount, err
return qr.Fields, uint16(plan.ParamsCount), err
}

func parseAndValidateQuery(query string, parser *sqlparser.Parser) (sqlparser.Statement, *sqlparser.ReservedVars, error) {
Expand Down Expand Up @@ -1661,7 +1766,7 @@ func (e *Executor) PlanPrepareStmt(ctx context.Context, vcursor *econtext.VCurso

// creating this log stats to not interfere with the original log stats.
lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.Session().GetSessionUUID(), nil, streamlog.GetQueryLogConfig())
plan, err := e.getPlan(ctx, vcursor, query, sqlparser.Clone(stmt), vcursor.GetMarginComments(), map[string]*querypb.BindVariable{}, reservedVars, false, false, lStats)
plan, err := e.getPlan(ctx, vcursor, query, sqlparser.Clone(stmt), vcursor.GetMarginComments(), map[string]*querypb.BindVariable{}, reservedVars, false, lStats)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestPlanKey(t *testing.T) {
ss := econtext.NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString})
resolver := &fakeResolver{resolveShards: tc.resolvedShard}
vc, _ := econtext.NewVCursorImpl(ss, makeComments(""), e, nil, e.vm, e.VSchema(), resolver, nil, nullResultsObserver{}, cfg)
key := createPlanKey(ctx, vc, "SELECT 1", tc.setVarComment)
key := buildPlanKey(ctx, vc, "SELECT 1", tc.setVarComment)
require.Equal(t, tc.expectedPlanPrefixKey, key.DebugString(), "test case %d", i)
})
}
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func assertCacheContains(t *testing.T, e *Executor, vc *econtext.VCursorImpl, sq
return true
})
} else {
h := createPlanKey(context.Background(), vc, sql, "")
h := buildPlanKey(context.Background(), vc, sql, "")
plan, _ = e.plans.Get(h.Hash(), e.epoch.Load())
}
require.Truef(t, plan != nil, "plan not found for query: %s", sql)
Expand All @@ -1682,7 +1682,7 @@ func getPlanCached(t *testing.T, ctx context.Context, e *Executor, vcursor *econ

stmt, reservedVars, err := parseAndValidateQuery(sql, sqlparser.NewTestParser())
require.NoError(t, err)
plan, err := e.getPlan(context.Background(), vcursor, sql, stmt, comments, bindVars, reservedVars, false, e.config.Normalize, logStats)
plan, err := e.getPlan(context.Background(), vcursor, sql, stmt, comments, bindVars, reservedVars, e.config.Normalize, logStats)
require.NoError(t, err)

// Wait for cache to settle
Expand Down Expand Up @@ -1846,7 +1846,7 @@ func TestGetPlanPriority(t *testing.T) {
qh, _ := sqlparser.BuildQueryHints(stmt)
priorityFromStatement := qh.Priority

_, err = r.getPlan(context.Background(), vCursor, testCase.sql, stmt, makeComments("/* some comment */"), map[string]*querypb.BindVariable{}, nil, false, true, logStats)
_, err = r.getPlan(context.Background(), vCursor, testCase.sql, stmt, makeComments("/* some comment */"), map[string]*querypb.BindVariable{}, nil, true, logStats)
if testCase.expectedError != nil {
assert.ErrorIs(t, err, testCase.expectedError)
} else {
Expand Down
Loading

0 comments on commit 476e038

Please sign in to comment.