Skip to content

Commit

Permalink
Add leader transfer logic (#63)
Browse files Browse the repository at this point in the history
* Add leader transfer logic before defragmentation
  • Loading branch information
AmirAllahveran authored Jan 27, 2025
1 parent 7083dea commit 8241946
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ It adds the following extra flags,
| `--defrag-rule` | defragmentation rule (etcd-defrag will run defragmentation if the rule is empty or it is evaluated to true), defaults to empty. See more details below. |
| `--dry-run` | evaluate whether or not endpoints require defragmentation, but don't actually perform it, defaults to `false`. |
| `--exclude-localhost` | whether to exclude localhost endpoints, defaults to `false`. |
| `--move-leader` | whether to move the leader to a randomly picked non-leader ID and make it the new leader, defaults to `false`. |

See the complete flags below,
```
Expand Down Expand Up @@ -70,6 +71,7 @@ Flags:
--keepalive-time duration keepalive time for client connections (default 2s)
--keepalive-timeout duration keepalive timeout for client connections (default 6s)
--key string identify secure client using this TLS key file
--move-leader whether to move the leader to a randomly picked non-leader ID and make it the new leader
--password string password for authentication (if this option is used, --user option shouldn't include password)
--user string username[:password] for authentication (prompt if password is not supplied)
--version print the version and exit
Expand Down
22 changes: 22 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,25 @@ func defragment(gcfg globalConfig, ep string) error {
_, err = c.Defragment(ctx, ep)
return err
}

func transferLeadership(gcfg globalConfig, leaderEp string, transfereeID uint64) error {
cfgSpec := clientConfigWithoutEndpoints(gcfg)
cfgSpec.Endpoints = []string{leaderEp}
c, err := createClient(cfgSpec)
if err != nil {
return fmt.Errorf("failed to create client for leader endpoint %s: %w", leaderEp, err)
}
defer c.Close()

ctx, cancel := commandCtx(gcfg.commandTimeout)
defer cancel()

fmt.Printf("Requesting leader at %s to transfer leadership to member ID %d...\n", leaderEp, transfereeID)
_, err = c.MoveLeader(ctx, transfereeID)
if err != nil {
return fmt.Errorf("failed to move leader: %w", err)
}

fmt.Println("successfully transferred leadership from", leaderEp, "to member ID ", transfereeID)
return nil
}
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type globalConfig struct {
endpoints []string
useClusterEndpoints bool
excludeLocalhost bool
moveLeader bool

dialTimeout time.Duration
commandTimeout time.Duration
Expand Down
37 changes: 37 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func newDefragCommand() *cobra.Command {
defragCmd.Flags().StringSliceVar(&globalCfg.endpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd endpoints")
defragCmd.Flags().BoolVar(&globalCfg.useClusterEndpoints, "cluster", false, "use all endpoints from the cluster member list")
defragCmd.Flags().BoolVar(&globalCfg.excludeLocalhost, "exclude-localhost", false, "whether to exclude localhost endpoints")
defragCmd.Flags().BoolVar(&globalCfg.moveLeader, "move-leader", false, "whether to move the leader to a randomly picked non-leader ID and make it the new leader")

defragCmd.Flags().DurationVar(&globalCfg.dialTimeout, "dial-timeout", 2*time.Second, "dial timeout for client connections")
defragCmd.Flags().DurationVar(&globalCfg.commandTimeout, "command-timeout", 30*time.Second, "command timeout (excluding dial timeout)")
Expand Down Expand Up @@ -154,6 +155,42 @@ func defragCommandFunc(cmd *cobra.Command, args []string) {
continue
}

// Check if the member is a leader and move the leader if necessary
if globalCfg.moveLeader {
if status.Resp.Leader == status.Resp.Header.MemberId {
fmt.Printf("Member %q is the leader. Attempting to move the leader...\n", ep)

// Identify a non-leader member to transfer the leadership
newLeaderID := uint64(0)
for _, memberStatus := range statusList {
if memberStatus.Resp.Header.MemberId != status.Resp.Header.MemberId {
newLeaderID = memberStatus.Resp.Header.MemberId
break
}
}

if newLeaderID == 0 {
failures++
fmt.Fprintf(os.Stderr, "Failed to find a non-leader member to transfer leadership from %q.\n", ep)
if !globalCfg.continueOnError {
break
}
continue
}

// Perform the leader transfer
err = transferLeadership(globalCfg, status.Ep, newLeaderID)
if err != nil {
failures++
fmt.Fprintf(os.Stderr, "Failed to move leader from %s to member ID %d: %v\n", status.Ep, newLeaderID, err)
if !globalCfg.continueOnError {
break
}
continue
}
}
}

fmt.Printf("Defragmenting endpoint %q\n", ep)
startTs := time.Now()
err = defragment(globalCfg, ep)
Expand Down

0 comments on commit 8241946

Please sign in to comment.