Skip to content

Commit

Permalink
Add a --timeout flag to allow retrying operations for a longer time (#…
Browse files Browse the repository at this point in the history
…830)

* Initialize and propagate a global context with timeout

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>

* Propagate context deeper into Each/ParallelEach

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>

* Abort Each/ParallelEach early on context errors

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>

* Introduce retry.AdaptiveTimeout and deepen propagation

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>

* Abort phase manager on dead context

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>

* Use context timeout for joined check

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>

* Test context cancel in manager and fix cleanup bug

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>

---------

Signed-off-by: Kimmo Lehto <klehto@mirantis.com>
  • Loading branch information
kke authored Feb 19, 2025
1 parent c8ec0d1 commit a4af9bb
Show file tree
Hide file tree
Showing 55 changed files with 361 additions and 174 deletions.
5 changes: 3 additions & 2 deletions action/apply.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -103,7 +104,7 @@ func NewApply(opts ApplyOptions) *Apply {
}

// Run the Apply action
func (a Apply) Run() error {
func (a Apply) Run(ctx context.Context) error {
if len(a.Phases) == 0 {
// for backwards compatibility with the old Apply struct without NewApply(..)
tmpApply := NewApply(a.ApplyOptions)
Expand All @@ -118,7 +119,7 @@ func (a Apply) Run() error {

var result error

if result = a.Manager.Run(); result != nil {
if result = a.Manager.Run(ctx); result != nil {
log.Info(phase.Colorize.Red("==> Apply failed").String())
return result
}
Expand Down
5 changes: 3 additions & 2 deletions action/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"time"

Expand All @@ -13,7 +14,7 @@ type Backup struct {
Manager *phase.Manager
}

func (b Backup) Run() error {
func (b Backup) Run(ctx context.Context) error {
start := time.Now()

lockPhase := &phase.Lock{}
Expand All @@ -32,7 +33,7 @@ func (b Backup) Run() error {
&phase.Disconnect{},
)

if err := b.Manager.Run(); err != nil {
if err := b.Manager.Run(ctx); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion action/config_edit.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"io"
"os"
Expand All @@ -20,7 +21,7 @@ type ConfigEdit struct {
Stdin io.Reader
}

func (c ConfigEdit) Run() error {
func (c ConfigEdit) Run(_ context.Context) error {
stdoutFile, ok := c.Stdout.(*os.File)

if !ok || !isatty.IsTerminal(stdoutFile.Fd()) {
Expand Down
3 changes: 2 additions & 1 deletion action/config_status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"io"

Expand All @@ -15,7 +16,7 @@ type ConfigStatus struct {
Writer io.Writer
}

func (c ConfigStatus) Run() error {
func (c ConfigStatus) Run(_ context.Context) error {
h := c.Config.Spec.K0sLeader()

if err := h.Connect(); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions action/kubeconfig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package action

import (
"context"

"github.com/k0sproject/k0sctl/phase"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
)
Expand All @@ -15,7 +17,7 @@ type Kubeconfig struct {
Kubeconfig string
}

func (k *Kubeconfig) Run() error {
func (k *Kubeconfig) Run(ctx context.Context) error {
// Change so that the internal config has only single controller host as we
// do not need to connect to all nodes
k.Manager.Config.Spec.Hosts = cluster.Hosts{k.Manager.Config.Spec.K0sLeader()}
Expand All @@ -27,5 +29,5 @@ func (k *Kubeconfig) Run() error {
&phase.Disconnect{},
)

return k.Manager.Run()
return k.Manager.Run(ctx)
}
5 changes: 3 additions & 2 deletions action/reset.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"io"
"os"
Expand All @@ -20,7 +21,7 @@ type Reset struct {
Force bool
}

func (r Reset) Run() error {
func (r Reset) Run(ctx context.Context) error {
if !r.Force {
if stdoutFile, ok := r.Stdout.(*os.File); ok && !isatty.IsTerminal(stdoutFile.Fd()) {
return fmt.Errorf("reset requires --force")
Expand Down Expand Up @@ -66,7 +67,7 @@ func (r Reset) Run() error {
&phase.Disconnect{},
)

if err := r.Manager.Run(); err != nil {
if err := r.Manager.Run(ctx); err != nil {
return err
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ var applyCommand = &cli.Command{
redactFlag,
retryIntervalFlag,
retryTimeoutFlag,
timeoutFlag,
},
Before: actions(initLogging, initConfig, initManager, displayLogo, displayCopyright, warnOldCache),
After: actions(cancelTimeout),
Action: func(ctx *cli.Context) error {
var kubeconfigOut io.Writer

Expand Down Expand Up @@ -95,7 +97,7 @@ var applyCommand = &cli.Command{

applyAction := action.NewApply(applyOpts)

if err := applyAction.Run(); err != nil {
if err := applyAction.Run(ctx.Context); err != nil {
return fmt.Errorf("apply failed - log file saved to %s: %w", ctx.Context.Value(ctxLogFileKey{}).(string), err)
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ var backupCommand = &cli.Command{
debugFlag,
traceFlag,
redactFlag,
timeoutFlag,
retryIntervalFlag,
retryTimeoutFlag,
},
Before: actions(initLogging, initConfig, initManager, displayLogo, displayCopyright),
After: actions(cancelTimeout),
Action: func(ctx *cli.Context) error {
backupAction := action.Backup{
Manager: ctx.Context.Value(ctxManagerKey{}).(*phase.Manager),
}

if err := backupAction.Run(); err != nil {
if err := backupAction.Run(ctx.Context); err != nil {
return fmt.Errorf("backup failed - log file saved to %s: %w", ctx.Context.Value(ctxLogFileKey{}).(string), err)
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/config_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ var configEditCommand = &cli.Command{
debugFlag,
traceFlag,
redactFlag,
timeoutFlag,
},
Before: actions(initLogging, initConfig),
After: actions(cancelTimeout),
Action: func(ctx *cli.Context) error {
configEditAction := action.ConfigEdit{
Config: ctx.Context.Value(ctxConfigsKey{}).(*v1beta1.Cluster),
Expand All @@ -25,6 +27,6 @@ var configEditCommand = &cli.Command{
Stdin: ctx.App.Reader,
}

return configEditAction.Run()
return configEditAction.Run(ctx.Context)
},
}
4 changes: 3 additions & 1 deletion cmd/config_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ var configStatusCommand = &cli.Command{
debugFlag,
traceFlag,
redactFlag,
timeoutFlag,
&cli.StringFlag{
Name: "output",
Usage: "kubectl output formatting",
Aliases: []string{"o"},
},
},
Before: actions(initLogging, initConfig),
After: actions(cancelTimeout),
Action: func(ctx *cli.Context) error {
cfg, err := readConfig(ctx)
if err != nil {
Expand All @@ -33,6 +35,6 @@ var configStatusCommand = &cli.Command{
Writer: ctx.App.Writer,
}

return configStatusAction.Run()
return configStatusAction.Run(ctx.Context)
},
}
21 changes: 21 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type (
)

var (
globalCancel context.CancelFunc

debugFlag = &cli.BoolFlag{
Name: "debug",
Usage: "Enable debug logging",
Expand Down Expand Up @@ -81,6 +83,18 @@ var (
Value: 5,
}

timeoutFlag = &cli.DurationFlag{
Name: "timeout",
Usage: "Timeout for the whole operation",
Value: 0,
Action: func(ctx *cli.Context, d time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx.Context, d)
ctx.Context = timeoutCtx
globalCancel = cancel
return nil
},
}

retryTimeoutFlag = &cli.DurationFlag{
Name: "default-timeout",
Usage: "Default timeout when waiting for node state changes",
Expand All @@ -104,6 +118,13 @@ var (
Colorize = aurora.NewAurora(false)
)

func cancelTimeout(_ *cli.Context) error {
if globalCancel != nil {
globalCancel()
}
return nil
}

// actions can be used to chain action functions (for urfave/cli's Before, After, etc)
func actions(funcs ...func(*cli.Context) error) func(*cli.Context) error {
return func(ctx *cli.Context) error {
Expand Down
4 changes: 3 additions & 1 deletion cmd/kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ var kubeconfigCommand = &cli.Command{
debugFlag,
traceFlag,
redactFlag,
timeoutFlag,
retryIntervalFlag,
retryTimeoutFlag,
},
Before: actions(initSilentLogging, initConfig, initManager),
After: actions(cancelTimeout),
Action: func(ctx *cli.Context) error {
kubeconfigAction := action.Kubeconfig{
Manager: ctx.Context.Value(ctxManagerKey{}).(*phase.Manager),
Expand All @@ -46,7 +48,7 @@ var kubeconfigCommand = &cli.Command{
KubeconfigCluster: ctx.String("cluster"),
}

if err := kubeconfigAction.Run(); err != nil {
if err := kubeconfigAction.Run(ctx.Context); err != nil {
return fmt.Errorf("getting kubeconfig failed - log file saved to %s: %w", ctx.Context.Value(ctxLogFileKey{}).(string), err)
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var resetCommand = &cli.Command{
debugFlag,
traceFlag,
redactFlag,
timeoutFlag,
retryIntervalFlag,
retryTimeoutFlag,
&cli.BoolFlag{
Expand All @@ -28,14 +29,15 @@ var resetCommand = &cli.Command{
},
},
Before: actions(initLogging, initConfig, initManager, displayCopyright),
After: actions(cancelTimeout),
Action: func(ctx *cli.Context) error {
resetAction := action.Reset{
Manager: ctx.Context.Value(ctxManagerKey{}).(*phase.Manager),
Force: ctx.Bool("force"),
Stdout: ctx.App.Writer,
}

if err := resetAction.Run(); err != nil {
if err := resetAction.Run(ctx.Context); err != nil {
return fmt.Errorf("reset failed - log file saved to %s: %w", ctx.Context.Value(ctxLogFileKey{}).(string), err)
}

Expand Down
3 changes: 2 additions & 1 deletion phase/apply_manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package phase

import (
"bytes"
"context"
"fmt"
"io"

Expand Down Expand Up @@ -36,7 +37,7 @@ func (p *ApplyManifests) ShouldRun() bool {
}

// Run the phase
func (p *ApplyManifests) Run() error {
func (p *ApplyManifests) Run(_ context.Context) error {
for name, content := range p.Config.Metadata.Manifests {
if err := p.apply(name, content); err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions phase/arm_prepare.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package phase

import (
"context"
"strings"

"github.com/k0sproject/version"
Expand Down Expand Up @@ -62,11 +63,11 @@ func (p *PrepareArm) ShouldRun() bool {
}

// Run the phase
func (p *PrepareArm) Run() error {
return p.parallelDo(p.hosts, p.etcdUnsupportedArch)
func (p *PrepareArm) Run(ctx context.Context) error {
return p.parallelDo(ctx, p.hosts, p.etcdUnsupportedArch)
}

func (p *PrepareArm) etcdUnsupportedArch(h *cluster.Host) error {
func (p *PrepareArm) etcdUnsupportedArch(_ context.Context, h *cluster.Host) error {
log.Warnf("%s: enabling ETCD_UNSUPPORTED_ARCH=%s override - you may encounter problems with etcd", h, h.Metadata.Arch)
h.Environment["ETCD_UNSUPPORTED_ARCH"] = h.Metadata.Arch

Expand Down
3 changes: 2 additions & 1 deletion phase/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package phase

import (
"context"
"fmt"
"os"
"path"
Expand Down Expand Up @@ -54,7 +55,7 @@ func (p *Backup) ShouldRun() bool {
}

// Run the phase
func (p *Backup) Run() error {
func (p *Backup) Run(_ context.Context) error {
h := p.leader

log.Infof("%s: backing up", h)
Expand Down
8 changes: 4 additions & 4 deletions phase/configure_k0s.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ func (p *ConfigureK0s) generateDefaultConfig() (string, error) {
}

// Run the phase
func (p *ConfigureK0s) Run() error {
func (p *ConfigureK0s) Run(ctx context.Context) error {
controllers := p.Config.Spec.Hosts.Controllers().Filter(func(h *cluster.Host) bool {
return !h.Reset && len(h.Metadata.K0sNewConfig) > 0
})
return p.parallelDo(controllers, p.configureK0s)
return p.parallelDo(ctx, controllers, p.configureK0s)
}

func (p *ConfigureK0s) validateConfig(h *cluster.Host, configPath string) error {
Expand Down Expand Up @@ -258,7 +258,7 @@ func (p *ConfigureK0s) validateConfig(h *cluster.Host, configPath string) error
return nil
}

func (p *ConfigureK0s) configureK0s(h *cluster.Host) error {
func (p *ConfigureK0s) configureK0s(ctx context.Context, h *cluster.Host) error {
path := h.K0sConfigPath()
if h.Configurer.FileExist(h, path) {
if !h.Configurer.FileContains(h, path, " generated-by-k0sctl") {
Expand Down Expand Up @@ -301,7 +301,7 @@ func (p *ConfigureK0s) configureK0s(h *cluster.Host) error {
}

log.Infof("%s: waiting for k0s service to start", h)
return retry.Timeout(context.TODO(), retry.DefaultTimeout, node.ServiceRunningFunc(h, h.K0sServiceName()))
return retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, node.ServiceRunningFunc(h, h.K0sServiceName()))
}

return nil
Expand Down
Loading

0 comments on commit a4af9bb

Please sign in to comment.