Skip to content

Commit

Permalink
fix session open close handling bug (#150)
Browse files Browse the repository at this point in the history
* handle session open and close failure

* handle session open and close failure

* amend

* amend
  • Loading branch information
guowenjian90 authored Dec 2, 2022
1 parent cfa5d41 commit a5dfb1b
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 204 deletions.
2 changes: 1 addition & 1 deletion cmd/fornaxtest/app/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var (
"cpu": util.ResourceQuantity(0.5*1000, v1.ResourceCPU),
},
Requests: map[v1.ResourceName]resource.Quantity{
"memory": util.ResourceQuantity(100*1024*1024, v1.ResourceMemory),
"memory": util.ResourceQuantity(50*1024*1024, v1.ResourceMemory),
"cpu": util.ResourceQuantity(0.01*1000, v1.ResourceCPU),
},
},
Expand Down
24 changes: 13 additions & 11 deletions pkg/fornaxcore/application/application_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,30 +366,32 @@ func (am *ApplicationManager) syncApplication(ctx context.Context, applicationKe
// 2, find how many more pods required for remaining pending sessions
if syncErr == nil {
sessionSummary := pool.summarySession()
numOfOccupiedPod, numOfPendingPod, numOfIdlePod := pool.activePodNums()
numOfUnoccupiedPod := numOfPendingPod + numOfIdlePod
numOfAllocatedPod, numOfPendingPod, numOfIdlePod := pool.activePodNums()
numOfUnAllocatedPod := numOfPendingPod + numOfIdlePod
numOfPendingSession := sessionSummary.pendingCount
numOfDesiredUnoccupiedPod := am.calculateDesiredIdlePods(application, numOfOccupiedPod, numOfUnoccupiedPod, numOfPendingSession)
numOfDesiredPod = numOfOccupiedPod + numOfDesiredUnoccupiedPod
klog.InfoS("Syncing application pod", "application", applicationKey, "pending-sessions", numOfPendingSession, "active-pods", numOfOccupiedPod+numOfUnoccupiedPod, "pending-pods", numOfPendingPod, "idle-pods", numOfIdlePod, "desired-pending+idle-pods", numOfDesiredUnoccupiedPod)
if numOfDesiredUnoccupiedPod > numOfUnoccupiedPod {
numOfDesiredUnAllocatedPod := am.calculateDesiredIdlePods(application, numOfAllocatedPod, numOfUnAllocatedPod, numOfPendingSession)
numOfDesiredPod = numOfAllocatedPod + numOfDesiredUnAllocatedPod
klog.InfoS("Syncing application pod", "application", applicationKey, "pending-sessions", numOfPendingSession, "active-pods", numOfAllocatedPod+numOfUnAllocatedPod, "pending-pods", numOfPendingPod, "idle-pods", numOfIdlePod, "desired-pending+idle-pods", numOfDesiredUnAllocatedPod)
if numOfDesiredUnAllocatedPod > numOfUnAllocatedPod {
action = fornaxv1.DeploymentActionCreateInstance
} else if numOfDesiredUnoccupiedPod < numOfUnoccupiedPod {
} else if numOfDesiredUnAllocatedPod < numOfUnAllocatedPod {
action = fornaxv1.DeploymentActionDeleteInstance
}
// pending session will need pods immediately, the rest of pods can be created as a standby pod
desiredAddition := numOfDesiredUnoccupiedPod - numOfUnoccupiedPod
desiredAddition := numOfDesiredUnAllocatedPod - numOfUnAllocatedPod
syncErr = am.deployApplicationPods(pool, application, desiredAddition)

// take care of timeout and deleting pods
am.pruneDeadPods(pool)
}
} else {
numOfDesiredPod = 0
action = fornaxv1.DeploymentActionDeleteInstance
syncErr = am.cleanupDeletedApplication(pool)
// numOfAllocatedPod, numOfPendingPod, numOfIdlePod := pool.activePodNums()
// desiredAddition := 0 - (numOfIdlePod + numOfPendingPod + numOfAllocatedPod)
// syncErr = am.deployApplicationPods(pool, application, desiredAddition)
}

// take care of timeout and deleting pods
am.pruneDeadPods(pool)
newStatus := am.calculateStatus(pool, application, numOfDesiredPod, action, syncErr)
am.applicationStatusManager.UpdateApplicationStatus(application, newStatus)
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/fornaxcore/application/application_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (am *ApplicationManager) onPodEventFromNode(podEvent *ie.PodEvent) {
}

// When a pod is created or updated, add this pod reference to app pods pool
// set pod state from pending => idle if pod is running successfully on node
// set pod state from idle => allocated if pod has a session on node
// request node to terminate pod if application not found
// retry deleting pod if pod already moved to deleting queue, in this case, node did not received termination request somehow
func (am *ApplicationManager) handlePodAddUpdateFromNode(pod *v1.Pod) {
podName := util.Name(pod)
applicationKey, err := am.getPodApplicationKey(pod)
Expand All @@ -92,20 +96,17 @@ func (am *ApplicationManager) handlePodAddUpdateFromNode(pod *v1.Pod) {
}

if len(applicationKey) == 0 {
klog.InfoS("Pod does not belong to any application, terminated it", "pod", podName, "labels", pod.GetLabels())
klog.InfoS("Pod does not belong to any application, terminate it", "pod", podName, "labels", pod.GetLabels())
am.podManager.TerminatePod(podName)
return
} else {
pool := am.getOrCreateApplicationPool(applicationKey)
ap := pool.getPod(podName)
if ap != nil && ap.state == PodStateDeleting {
// this pod was requested to terminate, and node did not receive termination or failed to do it, try it again
am.deleteApplicationPod(pool, ap.podName)
return
}
if ap != nil && ap.state == PodStateAllocated {
// this pod is assigned to session by FornaxCore, but node have not report back yet, or message lost, skip
// after session setup timeout, this pod will be released
return
}
if util.PodIsPending(pod) {
Expand All @@ -127,7 +128,8 @@ func (am *ApplicationManager) handlePodAddUpdateFromNode(pod *v1.Pod) {
am.enqueueApplication(applicationKey)
}

// When a pod is deleted, find application that manages it and remove pod reference from its pod pool
// When a pod is deleted on node, find application that manages it and remove pod reference from its pod pool
// if pod has a session associated, cleanup sessions
func (am *ApplicationManager) handlePodDeleteFromNode(pod *v1.Pod) {
podName := util.Name(pod)
if pod.DeletionTimestamp == nil {
Expand Down Expand Up @@ -155,27 +157,28 @@ func (am *ApplicationManager) handlePodDeleteFromNode(pod *v1.Pod) {
am.cleanupSessionOnDeletedPod(pool, podName)
pool.deletePod(podName)
}
// enqueue application to evaluate application status
am.enqueueApplication(applicationKey)
}

// move pod to deleting state and request node to terminate pod
func (am *ApplicationManager) deleteApplicationPod(pool *ApplicationPool, podName string) error {
podState := pool.getPod(podName)
if podState == nil {
return nil
}

// reset pod deletiontimestamp and retry if deletion timeout
if podState.state == PodStateDeleting {
pod := am.podManager.FindPod(podName)
if pod != nil && pod.DeletionTimestamp != nil && pod.DeletionTimestamp.Time.Before(time.Now().Add(-1*DefaultPodDeletingTimeoutDuration)) {
// reset pod deletiontimestamp and retry if deletion timeout
pod.DeletionTimestamp = nil
} else {
return nil
}
} else {
pool.addOrUpdatePod(podName, PodStateDeleting, []string{})
}

pool.addOrUpdatePod(podName, PodStateDeleting, []string{})
err := am.podManager.TerminatePod(podName)
if err != nil {
if err == fornaxpod.PodNotFoundError {
Expand Down
129 changes: 64 additions & 65 deletions pkg/fornaxcore/application/application_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,47 +77,57 @@ func (pool *ApplicationPool) _getPodNoLock(podName string) *ApplicationPod {
return nil
}

func (pool *ApplicationPool) podStateTransitionAllowed(oldState, newState ApplicationPodState) bool {
if oldState == newState {
return true
} else if oldState == PodStatePending {
return true
} else if oldState == PodStateIdle && newState != PodStatePending {
return true
} else if oldState == PodStateAllocated && newState != PodStatePending {
return true
} else if oldState == PodStateDeleting && newState == PodStateDeleting {
return true
}
return false
}

// find pod in a state map, move it to different state map and add session bundle on it
func (pool *ApplicationPool) addOrUpdatePod(podName string, podState ApplicationPodState, sessionIds []string) *ApplicationPod {
func (pool *ApplicationPool) addOrUpdatePod(podName string, podState ApplicationPodState, sessionNames []string) *ApplicationPod {
pool.mu.Lock()
defer pool.mu.Unlock()
if p := pool._getPodNoLock(podName); p != nil {
// pod state should not be reverted to avoid race condition
if p.state == PodStateDeleting && podState != PodStateDeleting {
// do not change a pod state already marked as deleting pods
return p
}
if p.state != PodStatePending && podState == PodStatePending {
// do not change a pod state to pending if pod is not pending anymore
if !pool.podStateTransitionAllowed(p.state, podState) {
return p
}
}
return pool._addOrUpdatePodNoLock(podName, podState, sessionIds)
return pool._addOrUpdatePodNoLock(podName, podState, sessionNames)
}

func (pool *ApplicationPool) _addOrUpdatePodNoLock(podName string, podState ApplicationPodState, sessionIds []string) *ApplicationPod {
// move pod from a state bucket to new state bucket and update its session map
func (pool *ApplicationPool) _addOrUpdatePodNoLock(podName string, podNewState ApplicationPodState, sessionNames []string) *ApplicationPod {
for _, pods := range pool.podsByState {
if p, f := pods[podName]; f {
for _, v := range sessionIds {
for _, v := range sessionNames {
p.sessions[v] = true
}
if p.state == podState {
if p.state == podNewState {
return p
} else {
p.state = podState
pool.podsByState[podState][podName] = p
p.state = podNewState
pool.podsByState[podNewState][podName] = p
delete(pods, podName)
return p
}
}
}

// not found, add it
p := NewApplicationPod(podName, podState)
for _, v := range sessionIds {
p := NewApplicationPod(podName, podNewState)
for _, v := range sessionNames {
p.sessions[v] = true
}
pool.podsByState[podState][podName] = p
pool.podsByState[podNewState][podName] = p
return p
}

Expand Down Expand Up @@ -220,32 +230,8 @@ func (pool *ApplicationPool) summarySession() ApplicationSessionSummary {
summary.deletingCount = len(pool.sessions[SessionStateDeleting])
summary.runningCount = len(pool.sessions[SessionStateRunning])
summary.startingCount = len(pool.sessions[SessionStateStarting])
summary.pendingCount = len(pool.sessions[SessionStatePending])

for _, s := range pool.sessions[SessionStatePending] {
timeoutDuration := DefaultSessionOpenTimeoutDuration
if s.session.Spec.OpenTimeoutSeconds > 0 {
timeoutDuration = time.Duration(s.session.Spec.OpenTimeoutSeconds) * time.Second
}
pendingTimeoutTimeStamp := time.Now().Add(-1 * timeoutDuration)
if s.session.CreationTimestamp.Time.Before(pendingTimeoutTimeStamp) {
summary.timeoutCount += 1
} else {
summary.pendingCount += 1
}
}

for _, s := range pool.sessions[SessionStateStarting] {
timeoutDuration := DefaultSessionOpenTimeoutDuration
if s.session.Spec.OpenTimeoutSeconds > 0 {
timeoutDuration = time.Duration(s.session.Spec.OpenTimeoutSeconds) * time.Second
}
pendingTimeoutTimeStamp := time.Now().Add(-1 * timeoutDuration)
if s.session.CreationTimestamp.Time.Before(pendingTimeoutTimeStamp) {
summary.timeoutCount += 1
} else {
summary.startingCount += 1
}
}
return summary
}

Expand All @@ -265,59 +251,74 @@ func (pool *ApplicationPool) _getSessionNoLock(key string) *ApplicationSession {
return nil
}

func (pool *ApplicationPool) addSession(sessionId string, session *fornaxv1.ApplicationSession) {
func (pool *ApplicationPool) addSession(sessionName string, session *fornaxv1.ApplicationSession) {
pool.mu.Lock()
defer pool.mu.Unlock()
newState := SessionStatePending
if session.DeletionTimestamp != nil {
if session.DeletionTimestamp != nil || util.SessionIsClosing(session) {
newState = SessionStateDeleting
} else if util.SessionIsOpen(session) {
newState = SessionStateRunning
} else if util.SessionIsStarting(session) {
newState = SessionStateStarting
} else if util.SessionIsPending(session) {
newState = SessionStatePending
} else if util.SessionIsClosing(session) {
newState = SessionStateDeleting
} else if util.SessionIsOpen(session) {
newState = SessionStateRunning
} else {
// do not add a terminal state session, instead of deleting and return
pool._deleteSessionNoLock(session)
return
}

s := pool._getSessionNoLock(sessionId)
s := pool._getSessionNoLock(sessionName)
if s != nil {
if newState != s.state {
delete(pool.sessions[s.state], sessionId)
if pool.sessionStateTransitionAllowed(s.state, newState) {
delete(pool.sessions[s.state], sessionName)
} else {
return
}
}

// update pool with new state
pool.sessions[newState][sessionId] = &ApplicationSession{
pool.sessions[newState][sessionName] = &ApplicationSession{
session: session,
state: newState,
}
if session.Status.PodReference != nil {
podName := session.Status.PodReference.Name
pool._addOrUpdatePodNoLock(podName, PodStateAllocated, []string{string(session.GetUID())})
pool._addOrUpdatePodNoLock(podName, PodStateAllocated, []string{sessionName})
}
}

func (pool *ApplicationPool) sessionStateTransitionAllowed(oldState, newState ApplicationSessionState) bool {
if oldState == newState {
return true
} else if oldState == SessionStatePending {
return true
} else if oldState == SessionStateStarting && newState != SessionStatePending {
return true
} else if oldState == SessionStateRunning && newState != SessionStatePending && newState != SessionStateStarting {
return true
} else if oldState == SessionStateDeleting && newState != SessionStatePending && newState != SessionStateStarting && newState != SessionStateRunning {
return true
}
return false
}

func (pool *ApplicationPool) deleteSession(session *fornaxv1.ApplicationSession) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool._deleteSessionNoLock(session)
}

// delete a session from application pool, and delete it from referenced pod's session map, and change pod state back to idle state,
// only allow from allocated => idle when delete a session from this pod, pod is in pending/deleting state should keep its state
func (pool *ApplicationPool) _deleteSessionNoLock(session *fornaxv1.ApplicationSession) {
sessionId := string(session.GetUID())
sessionName := util.Name(session)
if session.Status.PodReference != nil {
podName := session.Status.PodReference.Name
for _, podsOfState := range pool.podsByState {
if pod, found := podsOfState[podName]; found {
delete(pod.sessions, sessionId)
delete(pod.sessions, sessionName)
if len(pod.sessions) == 0 && pod.state == PodStateAllocated {
// only allow from allocated => idle when delete a session from this pod, pod is in pending/deleting state should keep its state
delete(podsOfState, podName)
pod.state = PodStateIdle
pool.podsByState[PodStateIdle][podName] = pod
Expand All @@ -327,14 +328,14 @@ func (pool *ApplicationPool) _deleteSessionNoLock(session *fornaxv1.ApplicationS
}
}
for _, v := range pool.sessions {
delete(v, sessionId)
delete(v, sessionName)
}
}

// getNonRunningSessions return a list of session of different states,
// pending, not assigned to pod yet
// deleting, delete requested
// timeout, session timedout to get a pod, or session assigned to node, but timeout to get session state from node
// 1/ pending, not assigned to pod yet
// 2/ deleting, delete requested
// 3/ timeout, session timedout to get a pod, or session assigned to node, but timeout to get session state from node
func (pool *ApplicationPool) getNonRunningSessions() (pendingSessions, deletingSessions, timeoutSessions []*ApplicationSession) {
pool.mu.RLock()
defer pool.mu.RUnlock()
Expand All @@ -360,8 +361,6 @@ func (pool *ApplicationPool) getNonRunningSessions() (pendingSessions, deletingS
pendingTimeoutTimeStamp := time.Now().Add(-1 * timeoutDuration)
if s.session.CreationTimestamp.Time.Before(pendingTimeoutTimeStamp) {
timeoutSessions = append(timeoutSessions, s)
} else {
pendingSessions = append(pendingSessions, s)
}
}

Expand All @@ -375,13 +374,13 @@ func (pool *ApplicationPool) getNonRunningSessions() (pendingSessions, deletingS
// add active session into application's session pool and delete terminal session from pool
// add session/delete session will update pod state according pod's session usage
func updateSessionPool(pool *ApplicationPool, session *fornaxv1.ApplicationSession) {
sessionId := string(session.GetUID())
sessionName := util.Name(session)
if util.SessionInTerminalState(session) {
pool.deleteSession(session)
} else {
// a trick to make sure pending session are sorted using micro second, api server truncate creation timestamp to second
session.CreationTimestamp = *util.NewCurrentMetaTime()
pool.addSession(sessionId, session)
pool.addSession(sessionName, session)
}
}

Expand Down
Loading

0 comments on commit a5dfb1b

Please sign in to comment.