Skip to content

Commit

Permalink
VReplication: Support ignoring the source Keyspace on MoveTables canc…
Browse files Browse the repository at this point in the history
…el/complete (#17729)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Feb 25, 2025
1 parent b72683d commit 96f4535
Show file tree
Hide file tree
Showing 19 changed files with 1,480 additions and 1,015 deletions.
22 changes: 12 additions & 10 deletions go/cmd/vtctldclient/command/vreplication/common/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

var CancelOptions = struct {
KeepData bool
KeepRoutingRules bool
Shards []string
DeleteBatchSize int64
KeepData bool
KeepRoutingRules bool
Shards []string
DeleteBatchSize int64
IgnoreSourceKeyspace bool
}{}

func GetCancelCommand(opts *SubCommandsOpts) *cobra.Command {
Expand All @@ -58,12 +59,13 @@ func commandCancel(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
DeleteBatchSize: CancelOptions.DeleteBatchSize,
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
DeleteBatchSize: CancelOptions.DeleteBatchSize,
IgnoreSourceKeyspace: CancelOptions.IgnoreSourceKeyspace,
}
resp, err := GetClient().WorkflowDelete(GetCommandCtx(), req)
if err != nil {
Expand Down
24 changes: 13 additions & 11 deletions go/cmd/vtctldclient/command/vreplication/common/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
)

var CompleteOptions = struct {
KeepData bool
KeepRoutingRules bool
RenameTables bool
DryRun bool
Shards []string
KeepData bool
KeepRoutingRules bool
RenameTables bool
DryRun bool
Shards []string
IgnoreSourceKeyspace bool
}{}

func GetCompleteCommand(opts *SubCommandsOpts) *cobra.Command {
Expand All @@ -41,12 +42,13 @@ func commandComplete(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.MoveTablesCompleteRequest{
Workflow: BaseOptions.Workflow,
TargetKeyspace: BaseOptions.TargetKeyspace,
KeepData: CompleteOptions.KeepData,
KeepRoutingRules: CompleteOptions.KeepRoutingRules,
RenameTables: CompleteOptions.RenameTables,
DryRun: CompleteOptions.DryRun,
Workflow: BaseOptions.Workflow,
TargetKeyspace: BaseOptions.TargetKeyspace,
KeepData: CompleteOptions.KeepData,
KeepRoutingRules: CompleteOptions.KeepRoutingRules,
RenameTables: CompleteOptions.RenameTables,
DryRun: CompleteOptions.DryRun,
IgnoreSourceKeyspace: CompleteOptions.IgnoreSourceKeyspace,
}
resp, err := GetClient().MoveTablesComplete(GetCommandCtx(), req)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ func registerCommands(root *cobra.Command) {
complete.Flags().BoolVar(&common.CompleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules in place that direct table traffic from the source keyspace to the target keyspace of the MoveTables workflow.")
complete.Flags().BoolVar(&common.CompleteOptions.RenameTables, "rename-tables", false, "Keep the original source table data that was copied by the MoveTables workflow, but rename each table to '_<tablename>_old'.")
complete.Flags().BoolVar(&common.CompleteOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")
complete.Flags().BoolVar(&common.CompleteOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is completed and cleaned up. This allows the workflow to be completed if the source keyspace has been deleted or is not currently available.")
common.AddShardSubsetFlag(complete, &common.CompleteOptions.Shards)
base.AddCommand(complete)

cancel := common.GetCancelCommand(opts)
cancel.Flags().BoolVar(&common.CancelOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the MoveTables workflow in the target keyspace.")
cancel.Flags().BoolVar(&common.CancelOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the MoveTables workflow.")
cancel.Flags().Int64Var(&common.CancelOptions.DeleteBatchSize, "delete-batch-size", DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant workflow, delete the records in batches of this size.")
cancel.Flags().BoolVar(&common.CancelOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is canceled and cleaned up. This allows the workflow to be canceled if the source keyspace has been deleted or is not currently available.")
common.AddShardSubsetFlag(cancel, &common.CancelOptions.Shards)
base.AddCommand(cancel)
}
Expand Down
20 changes: 11 additions & 9 deletions go/cmd/vtctldclient/command/vreplication/workflow/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

var (
deleteOptions = struct {
KeepData bool
KeepRoutingRules bool
DeleteBatchSize int64
KeepData bool
KeepRoutingRules bool
DeleteBatchSize int64
IgnoreSourceKeyspace bool
}{}

// delete makes a WorkflowDelete gRPC call to a vtctld.
Expand All @@ -51,12 +52,13 @@ func commandDelete(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
DeleteBatchSize: deleteOptions.DeleteBatchSize,
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
DeleteBatchSize: deleteOptions.DeleteBatchSize,
IgnoreSourceKeyspace: deleteOptions.IgnoreSourceKeyspace,
}
resp, err := common.GetClient().WorkflowDelete(common.GetCommandCtx(), req)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func registerCommands(root *cobra.Command) {
delete.Flags().BoolVar(&deleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.")
delete.Flags().BoolVar(&deleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.")
delete.Flags().Int64Var(&deleteOptions.DeleteBatchSize, "delete-batch-size", movetables.DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant MoveTables workflow, delete the records in batches of this size.")
delete.Flags().BoolVar(&deleteOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is deleted and cleaned up. This allows the workflow to be deleted if the source keyspace has been deleted or is not currently available. NOTE: this is only used with MoveTables.")
common.AddShardSubsetFlag(delete, &baseOptions.Shards)
base.AddCommand(delete)

Expand Down
61 changes: 35 additions & 26 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"path"
"runtime"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -415,10 +415,7 @@ func (vc *VitessCluster) setupVtctldClient() {
// CleanupDataroot deletes the vtdataroot directory. Since we run multiple tests sequentially in a single CI test shard,
// we can run out of disk space due to all the leftover artifacts from previous tests.
func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
// This is always set to "true" on GitHub Actions runners:
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
if !ok || strings.ToLower(ci) != "true" {
if debugMode {
// Leave the directory in place to support local debugging.
return
}
Expand Down Expand Up @@ -792,29 +789,9 @@ func (vc *VitessCluster) teardown() {
}
}

var wg sync.WaitGroup

for _, keyspace := range keyspaces {
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
wg.Add(1)
go func(tablet2 *Tablet) {
defer wg.Done()
if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 {
if err := tablet2.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process: %s", err.Error())
}
}
if err := tablet2.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s %s", tablet2.Name, err.Error())
} else {
log.Infof("Successfully stopped vttablet %s", tablet2.Name)
}
}(tablet)
}
}
_ = vc.TearDownKeyspace(keyspace)
}
wg.Wait()
if err := vc.Vtctld.TearDown(); err != nil {
log.Infof("Error stopping Vtctld: %s", err.Error())
} else {
Expand All @@ -836,6 +813,38 @@ func (vc *VitessCluster) teardown() {
}
}

func (vc *VitessCluster) TearDownKeyspace(ks *Keyspace) error {
eg := errgroup.Group{}
for _, shard := range ks.Shards {
for _, tablet := range shard.Tablets {
eg.Go(func() error {
if tablet.DbServer != nil && tablet.DbServer.TabletUID > 0 {
if err := tablet.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process associated with vttablet %s: %v", tablet.Name, err)
return err
}
}
if err := tablet.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s: %v", tablet.Name, err)
return err
} else {
log.Infof("Successfully stopped vttablet %s", tablet.Name)
}
return nil
})
}
}
return eg.Wait()
}

func (vc *VitessCluster) DeleteKeyspace(t testing.TB, ksName string) {
out, err := vc.VtctldClient.ExecuteCommandWithOutput("DeleteKeyspace", ksName, "--recursive")
if err != nil {
log.Error("DeleteKeyspace failed with error: , output: %s", err, out)
}
require.NoError(t, err)
}

// TearDown brings down a cluster, deleting processes, removing topo keys
func (vc *VitessCluster) TearDown() {
if debugMode {
Expand Down
7 changes: 7 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,13 @@ func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, tab
}
}

// confirmNoWorkflows confirms that there are no active workflows in the given keyspace.
func confirmNoWorkflows(t *testing.T, keyspace string) {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetWorkflows", keyspace, "--compact", "--include-logs=false")
require.NoError(t, err)
require.True(t, isEmptyWorkflowShowOutput(output))
}

// getVReplicationConfig returns the vreplication config for one random workflow for a given tablet. Currently, this is
// used when there is only one workflow, so we are using this simple method to get the config.
func getVReplicationConfig(t *testing.T, tab *cluster.VttabletProcess) map[string]string {
Expand Down
Loading

0 comments on commit 96f4535

Please sign in to comment.