Skip to content

Commit

Permalink
Fix deadlock on multiple kill events (#111)
Browse files Browse the repository at this point in the history
After first kill event received from Mesos agent executor is stopping
the goroutine that is responsible for listening to events channel.
This commit changes the events channel to be buffered so sending
second event will not lock the whole executor process. Also the kill
logic is now based on task ID received from Mesos agent, so if
executor missed the launch event it can still return valid TaskStatus
messages.
  • Loading branch information
medzin authored Apr 23, 2018
1 parent f506fde commit a27cc69
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
51 changes: 37 additions & 14 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ type Event struct {
// additional debug message.
Message string

//TODO(janisz): Create domain objects for this.
// TODO(medzin): remove abstractions, because they only obscure all the communication
kill executor.Event_Kill
subscribed executor.Event_Subscribed
launch executor.Event_Launch
}
Expand Down Expand Up @@ -115,6 +116,9 @@ const (
// Kill means command should be killed and executor exit.
Kill

// Shutdown means executor should kill all tasks and exit.
Shutdown

// Subscribed means executor attach to mesos Agent.
Subscribed
// Launch means executor should start a task.
Expand Down Expand Up @@ -143,11 +147,14 @@ func NewExecutor(cfg Config, hooks ...hook.Hook) *Executor {
config: cfg,
context: ctx,
contextCancel: ctxCancel,
events: make(chan Event),
hookManager: hook.Manager{Hooks: hooks},
stateUpdater: state.BufferedUpdater(cfg.MesosConfig, cfg.StateUpdateBufferSize),
clock: systemClock{},
random: newRandom(),
// workaound for the problem when Mesos agent sends many KILL events to
// the executor, and it locks itself on this channel, because after first
// kill nobody is listening to it
events: make(chan Event, 128),
hookManager: hook.Manager{Hooks: hooks},
stateUpdater: state.BufferedUpdater(cfg.MesosConfig, cfg.StateUpdateBufferSize),
clock: systemClock{},
random: newRandom(),
}
}

Expand Down Expand Up @@ -208,7 +215,7 @@ SUBSCRIBE_LOOP:
case <-recoveryTimeout.C:
return fmt.Errorf("failed to re-establish subscription with agent within %v, aborting", e.config.MesosConfig.RecoveryTimeout)
case <-e.context.Done():
log.Debug("Executor context cancelled, breaking subscribe loop")
log.Info("Executor context cancelled, breaking subscribe loop")
break SUBSCRIBE_LOOP
case <-shouldConnect:
subscribe := calls.Subscribe(nil, e.stateUpdater.GetUnacknowledged()).With(callOptions...)
Expand All @@ -231,6 +238,7 @@ SUBSCRIBE_LOOP:
}
}

log.Info("Trying to to send remaining state updates with %s timeout", e.config.StateUpdateWaitTimeout)
if err := e.stateUpdater.Wait(e.config.StateUpdateWaitTimeout); err != nil { // try to send remaining state updates
log.WithError(err).Error("Unable to send remaining state updates to Mesos agent")
}
Expand Down Expand Up @@ -264,8 +272,10 @@ func (e *Executor) handleMesosEvent(event executor.Event) error {
e.events <- Event{Type: Subscribed, subscribed: *event.GetSubscribed()}
case executor.Event_LAUNCH:
e.events <- Event{Type: Launch, launch: *event.GetLaunch()}
case executor.Event_KILL, executor.Event_SHUTDOWN:
e.events <- Event{Type: Kill}
case executor.Event_KILL:
e.events <- Event{Type: Kill, kill: *event.GetKill()}
case executor.Event_SHUTDOWN:
e.events <- Event{Type: Shutdown}
case executor.Event_ERROR:
return errMustAbort
case executor.Event_ACKNOWLEDGED:
Expand Down Expand Up @@ -351,11 +361,11 @@ func (e *Executor) taskEventLoop() {
return
case Kill:
e.shutDown(taskInfo, cmd)
message := "Task killed due to receiving an event from Mesos agent"
taskID := mesos.TaskID{Value: "MISSING"}
if taskInfo != nil {
taskID = taskInfo.GetTaskID()
}
// relaying on TaskInfo can be tricky here, as the launch event may
// be lost, so we will not have it, and agent still waits for some
// TaskStatus with valid ID
taskID := event.kill.GetTaskID()
message := "Task killed due to receiving a kill event from Mesos agent"
e.stateUpdater.UpdateWithOptions(
taskID,
mesos.TASK_KILLED,
Expand All @@ -364,6 +374,19 @@ func (e *Executor) taskEventLoop() {
},
)
return
case Shutdown:
e.shutDown(taskInfo, cmd)
// it is possible to receive a shutdown without launch
if taskInfo != nil {
message := "Task killed due to receiving a shutdown event from Mesos agent"
e.stateUpdater.UpdateWithOptions(
event.kill.GetTaskID(),
mesos.TASK_KILLED,
state.OptionalInfo{
Message: &message,
},
)
}
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ func TestCertificateCheckScheduleTaskKillBeforeCertificateExpires(t *testing.T)
func TestIfNotPanicsWhenKillWithoutLaunch(t *testing.T) {
stateUpdater := new(mockUpdater)
stateUpdater.On("UpdateWithOptions",
mock.AnythingOfType("mesos.TaskID"),
mock.MatchedBy(func(taskID mesos.TaskID) bool {
return taskID.GetValue() == "taskID"
}),
mesos.TASK_KILLED,
mock.AnythingOfType("state.OptionalInfo")).Once()
events := make(chan Event, 1)
Expand All @@ -375,7 +377,7 @@ func TestIfNotPanicsWhenKillWithoutLaunch(t *testing.T) {
}

assert.NotPanics(t, func() {
events <- Event{Type: Kill}
events <- Event{Type: Kill, kill: executor.Event_Kill{TaskID: mesos.TaskID{Value: "taskID"}}}
exec.taskEventLoop()
stateUpdater.AssertExpectations(t)
})
Expand Down
2 changes: 1 addition & 1 deletion gometalinter.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"Aggregate": true,
"Concurrency": 2,
"Cyclo": 14,
"Cyclo": 15,
"Deadline": "300s",
"DisableAll": true,
"Enable": [
Expand Down

0 comments on commit a27cc69

Please sign in to comment.