Skip to content

Commit

Permalink
Do not /start pods which are not supposed to be part of the cluster (#…
Browse files Browse the repository at this point in the history
…753)

* Do not /start pods which are not supposed to be part of the cluster

* Modify the scale_unbalanced test to check for the initContainerStatus, not containerStatus (since it is not going to be started)

* Do not check the HostID of the pod if it is not ready
  • Loading branch information
burmanm authored Feb 14, 2025
1 parent ab95ae1 commit 7b13601
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti
* [CHANGE] [#527](https://github.com/k8ssandra/cass-operator/issues/527) Migrate the Kustomize configuration to Kustomize 5 only. Support for using Kustomize 4.x to generate config is no longer supported.
* [ENHANCEMENT] [#729](https://github.com/k8ssandra/cass-operator/issues/729) Modify NewMgmtClient to support additional transport option for the http.Client
* [ENHANCEMENT] [#737](https://github.com/k8ssandra/cass-operator/issues/737) Before issuing PVC deletion when deleting a datacenter, verify the PVCs that match the labels are not actually used by any pods.
* [BUGFIX] [#744](https://github.com/k8ssandra/cass-operator/issues/744) If StatefulSet was manually modified outside CassandraDatacenter, do not start such pods as they would need to be decommissioned instantly and could have IP conflict issues when doing so.

## v1.23.0

Expand Down
23 changes: 14 additions & 9 deletions pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -1998,15 +1998,18 @@ func (rc *ReconciliationContext) startOneNodePerRack(endpointData httphelper.Cas
func (rc *ReconciliationContext) startAllNodes(endpointData httphelper.CassMetadataEndpoints) (bool, error) {
rc.ReqLogger.Info("reconcile_racks::startAllNodes")

for podRankWithinRack := int32(0); ; podRankWithinRack++ {
for podRankWithinRack := 0; ; podRankWithinRack++ {

done := true
for _, statefulSet := range rc.statefulSets {

maxPodRankInThisRack := *statefulSet.Spec.Replicas - 1
for idx := range rc.desiredRackInformation {
rackInfo := rc.desiredRackInformation[idx]
statefulSet := rc.statefulSets[idx]

maxPodRankInThisRack := rackInfo.NodeCount - 1
if podRankWithinRack <= maxPodRankInThisRack {

podName := getStatefulSetPodNameForIdx(statefulSet, podRankWithinRack)
podName := getStatefulSetPodNameForIdx(statefulSet, int32(podRankWithinRack))
pod := rc.getDCPodByName(podName)
notReady, err := rc.startNode(pod, false, endpointData)
if notReady || err != nil {
Expand Down Expand Up @@ -2266,12 +2269,14 @@ func (rc *ReconciliationContext) CheckCassandraNodeStatuses() result.ReconcileRe
dc := rc.Datacenter
logger := rc.ReqLogger

// Check that we have a HostID for every pod in the datacenter
// Check that we have a HostID for every pod in the datacenter that has started
for _, pod := range rc.dcPods {
nodeStatus, ok := dc.Status.NodeStatuses[pod.Name]
if !ok || nodeStatus.HostID == "" {
logger.Info("Missing host id", "pod", pod.Name)
return result.RequeueSoon(2)
if isServerStarted(pod) {
nodeStatus, ok := dc.Status.NodeStatuses[pod.Name]
if !ok || nodeStatus.HostID == "" {
logger.Info("Missing host id", "pod", pod.Name)
return result.RequeueSoon(2)
}
}
}

Expand Down
163 changes: 163 additions & 0 deletions pkg/reconciliation/reconcile_racks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,13 +2187,172 @@ func TestReconciliationContext_startAllNodes(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rc, _, _ := setupTest()
rc.statefulSets = nil
for _, rackName := range []string{"rack1", "rack2", "rack3"} {
rackPods := tt.racks[rackName]
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: rackName},
Spec: appsv1.StatefulSetSpec{Replicas: ptr.To(int32(len(rackPods)))},
}
rc.statefulSets = append(rc.statefulSets, sts)
rc.desiredRackInformation = append(rc.desiredRackInformation, &RackInformation{
NodeCount: len(rackPods),
RackName: rackName,
})
for i, started := range rackPods {
p := &corev1.Pod{}
p.Name = getStatefulSetPodNameForIdx(sts, int32(i))
p.Labels = map[string]string{}
p.Status.ContainerStatuses = []corev1.ContainerStatus{
{
Name: "cassandra",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now().Add(-time.Minute)},
},
},
Ready: bool(started),
},
}
p.Status.PodIP = "127.0.0.1"
if started {
p.Labels[api.CassNodeState] = stateStarted
} else {
p.Labels[api.CassNodeState] = stateReadyToStart
}
rc.dcPods = append(rc.dcPods, p)
}
}

mockClient := mocks.NewClient(t)
rc.Client = mockClient

done := make(chan struct{})
if tt.wantNotReady {
// mock the calls in labelServerPodStarting:
// patch the pod: pod.Labels[api.CassNodeState] = stateStarting
k8sMockClientPatch(mockClient, nil)
// patch the dc status: dc.Status.LastServerNodeStarted = metav1.Now()
k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil)

res := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("OK")),
}

mockHttpClient := mocks.NewHttpClient(t)
mockHttpClient.On("Do",
mock.MatchedBy(
func(req *http.Request) bool {
return req != nil
})).
Return(res, nil).
Once().
Run(func(mock.Arguments) { close(done) })

client := httphelper.NodeMgmtClient{
Client: mockHttpClient,
Log: rc.ReqLogger,
Protocol: "http",
}
rc.NodeMgmtClient = client

}

epData := httphelper.CassMetadataEndpoints{
Entity: []httphelper.EndpointState{},
}

gotNotReady, err := rc.startAllNodes(epData)

assert.NoError(t, err)
assert.Equalf(t, tt.wantNotReady, gotNotReady, "expected not ready to be %v", tt.wantNotReady)

if tt.wantNotReady {
select {
case <-done:
case <-time.After(2 * time.Second):
assert.Fail(t, "No pod start occurred")
}
}

fakeRecorder := rc.Recorder.(*record.FakeRecorder)
close(fakeRecorder.Events)
if assert.Lenf(t, fakeRecorder.Events, len(tt.wantEvents), "expected %d events, got %d", len(tt.wantEvents), len(fakeRecorder.Events)) {
var gotEvents []string
for i := range fakeRecorder.Events {
gotEvents = append(gotEvents, i)
}
assert.Equal(t, tt.wantEvents, gotEvents)
}

mockClient.AssertExpectations(t)
})
}
}

func TestReconciliationContext_startAllNodes_onlyRackInformation(t *testing.T) {

// A boolean representing the state of a pod (started or not).
type pod bool

// racks is a map of rack names to a list of pods in that rack.
type racks map[string][]pod

tests := []struct {
name string
racks racks
wantNotReady bool
wantEvents []string
}{
{
name: "balanced racks, all started",
racks: racks{
"rack1": {true, true, true},
"rack2": {true, true, true},
"rack3": {true, true, true, false},
},
wantNotReady: false,
},
{
name: "unbalanced racks, all started",
racks: racks{
"rack1": {true, true},
"rack2": {true},
"rack3": {true, true, true, false},
},
wantNotReady: false,
},
{
name: "unbalanced racks, part of decommission",
racks: racks{
"rack1": {},
"rack2": {true},
"rack3": {true, false},
},
wantNotReady: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rc, _, _ := setupTest()
rc.statefulSets = nil
for _, rackName := range []string{"rack1", "rack2", "rack3"} {
rackPods := tt.racks[rackName]
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: rackName},
Spec: appsv1.StatefulSetSpec{Replicas: ptr.To(int32(len(rackPods)))},
}
rc.statefulSets = append(rc.statefulSets, sts)
podCount := len(rackPods)
if rackName == "rack3" {
// rack3 has a node that was created by modifying the StS directly. We do not want to start it
podCount--
}
rc.desiredRackInformation = append(rc.desiredRackInformation, &RackInformation{
NodeCount: podCount,
RackName: rackName,
})
for i, started := range rackPods {
p := &corev1.Pod{}
p.Name = getStatefulSetPodNameForIdx(sts, int32(i))
Expand Down Expand Up @@ -2349,6 +2508,10 @@ func TestStartOneNodePerRack(t *testing.T) {
Spec: appsv1.StatefulSetSpec{Replicas: ptr.To(int32(len(rackPods)))},
}
rc.statefulSets = append(rc.statefulSets, sts)
rc.desiredRackInformation = append(rc.desiredRackInformation, &RackInformation{
NodeCount: len(rackPods),
RackName: rackName,
})
for i, started := range rackPods {
p := &corev1.Pod{}
p.Name = getStatefulSetPodNameForIdx(sts, int32(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ var _ = Describe(testName, func() {

extraPod := "cluster1-dc1-r2-sts-2"

step = "check that the extra pod is ready"
json = "jsonpath={.items[*].status.containerStatuses[0].ready}"
step = "check that the extra pod is created"
json = "jsonpath={.items[*].status.initContainerStatuses[0].ready}"
k = kubectl.Get("pod").
WithFlag("field-selector", fmt.Sprintf("metadata.name=%s", extraPod)).
FormatOutput(json)
Expand Down

0 comments on commit 7b13601

Please sign in to comment.