Skip to content

Commit

Permalink
Merge pull request #584 from 24sama/bug-fix
Browse files Browse the repository at this point in the history
fix: the bug that the config file is different from the original order
  • Loading branch information
ks-ci-bot authored Jul 23, 2021
2 parents 531ede4 + 56eb4b7 commit fb0ad3a
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 103 deletions.
3 changes: 3 additions & 0 deletions apis/kubekey/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ type HostCfg struct {
IsEtcd bool `json:"-"`
IsMaster bool `json:"-"`
IsWorker bool `json:"-"`

EtcdExist bool `json:"-"`
EtcdName string `json:"-"`
}

// RoleGroups defines the grouping of role for hosts (etcd / master / worker).
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/add/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func ExecTasks(mgr *manager.Manager) error {
{Task: install.InitOS, ErrMsg: "Failed to init OS"},
{Task: docker.InstallerDocker, ErrMsg: "Failed to install docker", Skip: skipCondition1},
{Task: install.PrePullImages, ErrMsg: "Failed to pre-pull images", Skip: skipCondition1},
{Task: etcd.GetEtcdStatus, ErrMsg: "Failed to get etcd status"},
{Task: etcd.GenerateEtcdCerts, ErrMsg: "Failed to generate etcd certs"},
{Task: etcd.SyncEtcdCertsToMaster, ErrMsg: "Failed to sync etcd certs"},
{Task: etcd.GenerateEtcdService, ErrMsg: "Failed to create etcd service"},
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func ExecTasks(mgr *manager.Manager) error {
{Task: InitOS, ErrMsg: "Failed to init OS"},
{Task: docker.InstallerDocker, ErrMsg: "Failed to install docker", Skip: isK3s},
{Task: PrePullImages, ErrMsg: "Failed to pre-pull images", Skip: isK3s},
{Task: etcd.GetEtcdStatus, ErrMsg: "Failed to get etcd status"},
{Task: etcd.GenerateEtcdCerts, ErrMsg: "Failed to generate etcd certs"},
{Task: etcd.SyncEtcdCertsToMaster, ErrMsg: "Failed to sync etcd certs"},
{Task: etcd.GenerateEtcdService, ErrMsg: "Failed to create etcd service"},
Expand Down
145 changes: 78 additions & 67 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,37 @@ var (
etcdStatus = ""
)

const (
NEWETCDCLUSTER = "new"
EXISTETCDCLUSTER = "existing"
)

// GetEtcdStatus Get the status of the ETCD cluster and determine the ETCD information for the current node.
func GetEtcdStatus(mgr *manager.Manager) error {
mgr.Logger.Infoln("Getting etcd status")

return mgr.RunTaskOnEtcdNodes(getEtcdStatus, false)
}

func getEtcdStatus(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg) error {
output, _ := mgr.Runner.ExecuteCmd("sudo -E /bin/sh -c \"[ -f /etc/etcd.env ] && echo 'Configuration file already exists' || echo 'Configuration file will be created'\"", 0, true)
if strings.TrimSpace(output) == "Configuration file already exists" {
etcdEnv, _ := mgr.Runner.ExecuteCmd("sudo -E /bin/sh -c \"cat /etc/etcd.env | grep ETCD_NAME\"", 0, true)
node.EtcdName = etcdEnv[strings.Index(etcdEnv, "=")+1:]
node.EtcdExist = true
peerAddresses = append(peerAddresses, fmt.Sprintf("%s=https://%s:2380", node.EtcdName, node.InternalAddress))
etcdStatus = EXISTETCDCLUSTER
} else {
node.EtcdName = fmt.Sprintf("etcd-%s", node.Name)
node.EtcdExist = false
}

if len(peerAddresses) == 0 {
etcdStatus = NEWETCDCLUSTER
}
return nil
}

func GenerateEtcdCerts(mgr *manager.Manager) error {
if mgr.InCluster {
if err := kubekeycontroller.UpdateClusterConditions(mgr, "Init etcd cluster", metav1.Now(), metav1.Now(), false, 3); err != nil {
Expand All @@ -54,9 +85,9 @@ func GenerateEtcdCerts(mgr *manager.Manager) error {
return mgr.RunTaskOnEtcdNodes(generateCerts, true)
}

func generateCerts(mgr *manager.Manager, _ *kubekeyapiv1alpha1.HostCfg) error {
func generateCerts(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg) error {

if mgr.Runner.Index == 0 {
if (etcdStatus == NEWETCDCLUSTER && mgr.Runner.Index == 0) || (etcdStatus == EXISTETCDCLUSTER && strings.Contains(peerAddresses[0], node.InternalAddress)) {
certsScript, err := tmpl.GenerateEtcdSslScript(mgr)
if err != nil {
return err
Expand Down Expand Up @@ -161,7 +192,7 @@ func generateEtcdService(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg)
return err
}

etcdService, err := tmpl.GenerateEtcdService(mgr.Runner.Index, mgr.EtcdContainer)
etcdService, err := tmpl.GenerateEtcdService(node.EtcdName, mgr.EtcdContainer)
if err != nil {
return err
}
Expand All @@ -174,7 +205,7 @@ func generateEtcdService(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg)
if mgr.EtcdContainer {
checkEtcd, _ := mgr.Runner.ExecuteCmd("sudo -E /bin/sh -c \"[ -f /usr/local/bin/etcd ] && echo 'etcd already exists' || echo 'etcd will be installed'\"", 0, true)
if strings.TrimSpace(checkEtcd) == "etcd will be installed" {
etcdBin, err := tmpl.GenerateEtcdBinary(mgr, mgr.Runner.Index)
etcdBin, err := tmpl.GenerateEtcdBinary(mgr, node.EtcdName)
if err != nil {
return err
}
Expand Down Expand Up @@ -231,77 +262,57 @@ func installEtcdBinaries(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg)
func SetupEtcdCluster(mgr *manager.Manager) error {
mgr.Logger.Infoln("Starting etcd cluster")

return mgr.RunTaskOnEtcdNodes(setupEtcdCluster, false)
if err := mgr.RunTaskOnEtcdNodes(setupEtcdCluster, false); err != nil {
return err
}

return nil
}

// Configuring and starting etcd cluster.
func setupEtcdCluster(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg) error {
var localPeerAddresses []string
output, _ := mgr.Runner.ExecuteCmd("sudo -E /bin/sh -c \"[ -f /etc/etcd.env ] && echo 'Configuration file already exists' || echo 'Configuration file will be created'\"", 0, true)
if strings.TrimSpace(output) == "Configuration file already exists" {
outTmp, _ := mgr.Runner.ExecuteCmd("sudo cat /etc/etcd.env | awk 'NR==1{print $6}'", 0, true)
if outTmp != kubekeyapiv1alpha1.DefaultEtcdVersion {
if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "existing"); err != nil {
return err
}
}
if node.EtcdExist {
if err := healthCheck(mgr, node); err != nil {
return err
}
etcdStatus = "existing"
for i := 0; i <= mgr.Runner.Index; i++ {
localPeerAddresses = append(localPeerAddresses, fmt.Sprintf("etcd%d=https://%s:2380", i+1, mgr.EtcdNodes[i].InternalAddress))
return nil
}

peerAddresses = append(peerAddresses, fmt.Sprintf("%s=https://%s:2380", node.EtcdName, node.InternalAddress))

switch etcdStatus {
case NEWETCDCLUSTER:
if err := refreshConfig(mgr, node, peerAddresses, NEWETCDCLUSTER); err != nil {
return err
}
if mgr.Runner.Index == len(mgr.EtcdNodes)-1 {
peerAddresses = localPeerAddresses
case EXISTETCDCLUSTER:
if err := refreshConfig(mgr, node, peerAddresses, EXISTETCDCLUSTER); err != nil {
return err
}
} else {
for i := 0; i <= mgr.Runner.Index; i++ {
localPeerAddresses = append(localPeerAddresses, fmt.Sprintf("etcd%d=https://%s:2380", i+1, mgr.EtcdNodes[i].InternalAddress))
joinMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s\"", node.Name, node.Name, etcdBinDir, accessAddresses, node.EtcdName, fmt.Sprintf("https://%s:2380", node.InternalAddress))
fmt.Println(joinMemberCmd)
_, err := mgr.Runner.ExecuteCmd(joinMemberCmd, 2, true)
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to add etcd member")
}
if mgr.Runner.Index == len(mgr.EtcdNodes)-1 {
peerAddresses = localPeerAddresses
if err := restartEtcd(mgr); err != nil {
return err
}
if mgr.Runner.Index == 0 {
if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "new"); err != nil {
return err
}
etcdStatus = "new"
} else {
switch etcdStatus {
case "new":
if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "new"); err != nil {
return err
}
case "existing":
if err := refreshConfig(mgr, node, mgr.Runner.Index, localPeerAddresses, "existing"); err != nil {
return err
}
joinMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --endpoints=%s member add %s %s\"", node.Name, node.Name, etcdBinDir, accessAddresses, fmt.Sprintf("etcd%d", mgr.Runner.Index+1), fmt.Sprintf("https://%s:2380", node.InternalAddress))
_, err := mgr.Runner.ExecuteCmd(joinMemberCmd, 2, true)
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to add etcd member")
}
if err := restartEtcd(mgr); err != nil {
return err
}
if err := healthCheck(mgr, node); err != nil {
return err
}
checkMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list\"", node.Name, node.Name, etcdBinDir, accessAddresses)
memberList, err := mgr.Runner.ExecuteCmd(checkMemberCmd, 2, true)
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to list etcd member")
}
if !strings.Contains(memberList, fmt.Sprintf("https://%s:2379", node.InternalAddress)) {
return errors.Wrap(errors.WithStack(err), "Failed to add etcd member")
}
default:
return errors.New("Failed to get etcd cluster status")
}
if err := healthCheck(mgr, node); err != nil {
return err
}

checkMemberCmd := fmt.Sprintf("sudo -E /bin/sh -c \"export ETCDCTL_API=2;export ETCDCTL_CERT_FILE='/etc/ssl/etcd/ssl/admin-%s.pem';export ETCDCTL_KEY_FILE='/etc/ssl/etcd/ssl/admin-%s-key.pem';export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';%s/etcdctl --no-sync --endpoints=%s member list\"", node.Name, node.Name, etcdBinDir, accessAddresses)
memberList, err := mgr.Runner.ExecuteCmd(checkMemberCmd, 2, true)
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to list etcd member")
}
if !strings.Contains(memberList, fmt.Sprintf("https://%s:2379", node.InternalAddress)) {
return errors.Wrap(errors.WithStack(err), "Failed to add etcd member")
}
default:
return errors.New("Failed to get etcd cluster status")
}

return nil
}

Expand Down Expand Up @@ -349,8 +360,8 @@ func backupEtcd(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg) error {

func refreshEtcdConfig(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg) error {

if etcdStatus == "new" {
if err := refreshConfig(mgr, node, mgr.Runner.Index, peerAddresses, "new"); err != nil {
if etcdStatus == NEWETCDCLUSTER {
if err := refreshConfig(mgr, node, peerAddresses, NEWETCDCLUSTER); err != nil {
return err
}
if err := restartEtcd(mgr); err != nil {
Expand All @@ -361,7 +372,7 @@ func refreshEtcdConfig(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg) e
}
}

if err := refreshConfig(mgr, node, mgr.Runner.Index, peerAddresses, "existing"); err != nil {
if err := refreshConfig(mgr, node, peerAddresses, EXISTETCDCLUSTER); err != nil {
return err
}

Expand Down Expand Up @@ -390,8 +401,8 @@ healthCheckLoop:
return nil
}

func refreshConfig(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg, index int, endpoints []string, state string) error {
etcdEnv, err := tmpl.GenerateEtcdEnv(node, index, endpoints, state)
func refreshConfig(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg, endpoints []string, state string) error {
etcdEnv, err := tmpl.GenerateEtcdEnv(node, endpoints, state)
if err != nil {
return err
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/etcd/tmpl/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package tmpl

import (
"fmt"
"strings"
"text/template"

Expand Down Expand Up @@ -126,30 +125,30 @@ ETCDCTL_CERT_FILE=/etc/ssl/etcd/ssl/admin-{{ .Hostname }}.pem
)

// GenerateEtcdBinary is used to generate etcd's container binary content.
func GenerateEtcdBinary(mgr *manager.Manager, index int) (string, error) {
func GenerateEtcdBinary(mgr *manager.Manager, name string) (string, error) {
return util.Render(EtcdTempl, util.Data{
"Name": fmt.Sprintf("etcd%d", index+1),
"Name": name,
"EtcdImage": preinstall.GetImage(mgr, "etcd").ImageName(),
})
}

// GenerateEtcdService is used to generate the etcd's service content for systemd.
func GenerateEtcdService(index int, etcdContainer bool) (string, error) {
func GenerateEtcdService(name string, etcdContainer bool) (string, error) {
return util.Render(EtcdServiceTempl, util.Data{
"Name": fmt.Sprintf("etcd%d", index+1),
"Name": name,
"EtcdContainer": etcdContainer,
})
}

// GenerateEtcdEnv is used to generate the etcd's env content.
func GenerateEtcdEnv(node *kubekeyapiv1alpha1.HostCfg, index int, endpoints []string, state string) (string, error) {
func GenerateEtcdEnv(node *kubekeyapiv1alpha1.HostCfg, endpoints []string, state string) (string, error) {
UnsupportedArch := false
if node.Arch != "amd64" {
UnsupportedArch = true
}
return util.Render(EtcdEnvTempl, util.Data{
"Tag": kubekeyapiv1alpha1.DefaultEtcdVersion,
"Name": fmt.Sprintf("etcd%d", index+1),
"Name": node.EtcdName,
"Ip": node.InternalAddress,
"Hostname": node.Name,
"State": state,
Expand Down
63 changes: 34 additions & 29 deletions pkg/kubernetes/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,42 @@ var (

// GetClusterStatus is used to fetch status and info from cluster.
func GetClusterStatus(mgr *manager.Manager, _ *kubekeyapiv1alpha1.HostCfg) error {
if mgr.Runner.Index == 0 {
if clusterStatus["clusterInfo"] == "" {
output, err := mgr.Runner.ExecuteCmd("sudo -E /bin/sh -c \"[ -f /etc/kubernetes/admin.conf ] && echo 'Cluster already exists.' || echo 'Cluster will be created.'\"", 0, true)
if strings.Contains(output, "Cluster will be created") {
clusterIsExist = false
} else {
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to find /etc/kubernetes/admin.conf")
}
clusterIsExist = true
if output, err := mgr.Runner.ExecuteCmd("sudo cat /etc/kubernetes/manifests/kube-apiserver.yaml | grep 'image:' | awk -F '[:]' '{print $(NF-0)}'", 0, true); err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to find current version")
} else {
if !strings.Contains(output, "No such file or directory") {
clusterStatus["version"] = output
}
}
kubeCfgBase64Cmd := "cat /etc/kubernetes/admin.conf | base64 --wrap=0"
kubeConfigStr, err1 := mgr.Runner.ExecuteCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", kubeCfgBase64Cmd), 1, false)
if err1 != nil {
return errors.Wrap(errors.WithStack(err1), "Failed to get cluster kubeconfig")
}
clusterStatus["kubeconfig"] = kubeConfigStr
if err := loadKubeConfig(mgr); err != nil {
return err
}
if err := getJoinNodesCmd(mgr); err != nil {
return err
}
if clusterStatus["clusterInfo"] != "" {
return nil
}

output, err := mgr.Runner.ExecuteCmd("sudo -E /bin/sh -c \"[ -f /etc/kubernetes/admin.conf ] && echo 'Cluster already exists.' || echo 'Cluster will be created.'\"", 0, true)
if strings.Contains(output, "Cluster will be created") {
clusterIsExist = false
return nil
} else {
if err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to find /etc/kubernetes/admin.conf")
}

clusterIsExist = true
if output, err := mgr.Runner.ExecuteCmd("sudo cat /etc/kubernetes/manifests/kube-apiserver.yaml | grep 'image:' | awk -F '[:]' '{print $(NF-0)}'", 0, true); err != nil {
return errors.Wrap(errors.WithStack(err), "Failed to find current version")
} else {
if !strings.Contains(output, "No such file or directory") {
clusterStatus["version"] = output
}
}

kubeCfgBase64Cmd := "cat /etc/kubernetes/admin.conf | base64 --wrap=0"
kubeConfigStr, err1 := mgr.Runner.ExecuteCmd(fmt.Sprintf("sudo -E /bin/sh -c \"%s\"", kubeCfgBase64Cmd), 1, false)
if err1 != nil {
return errors.Wrap(errors.WithStack(err1), "Failed to get cluster kubeconfig")
}

clusterStatus["kubeconfig"] = kubeConfigStr

if err := loadKubeConfig(mgr); err != nil {
return err
}
if err := getJoinNodesCmd(mgr); err != nil {
return err
}
}
return nil
}
Expand Down

0 comments on commit fb0ad3a

Please sign in to comment.