From a27cc697e456532fce4c81aa1a6970bb317d67c1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Adam=20Medzi=C5=84ski?= <medzin@users.noreply.github.com>
Date: Mon, 23 Apr 2018 11:49:21 +0200
Subject: [PATCH] Fix deadlock on multiple kill events (#111)

After first kill event received from Mesos agent executor is stopping
the goroutine that is responsible for listening to events channel.
This commit changes the events channel to be buffered so sending
second event will not lock the whole executor process. Also the kill
logic is now based on task ID received from Mesos agent, so if
executor missed the launch event it can still return valid TaskStatus
messages.
---
 executor.go       | 51 ++++++++++++++++++++++++++++++++++-------------
 executor_test.go  |  6 ++++--
 gometalinter.json |  2 +-
 3 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/executor.go b/executor.go
index 85dc8967..d37e7296 100644
--- a/executor.go
+++ b/executor.go
@@ -86,7 +86,8 @@ type Event struct {
 	// additional debug message.
 	Message string
 
-	//TODO(janisz): Create domain objects for this.
+	// TODO(medzin): remove abstractions, because they only obscure all the communication
+	kill       executor.Event_Kill
 	subscribed executor.Event_Subscribed
 	launch     executor.Event_Launch
 }
@@ -115,6 +116,9 @@ const (
 	// Kill means command should be killed and executor exit.
 	Kill
 
+	// Shutdown means executor should kill all tasks and exit.
+	Shutdown
+
 	// Subscribed means executor attach to mesos Agent.
 	Subscribed
 	// Launch means executor should start a task.
@@ -143,11 +147,14 @@ func NewExecutor(cfg Config, hooks ...hook.Hook) *Executor {
 		config:        cfg,
 		context:       ctx,
 		contextCancel: ctxCancel,
-		events:        make(chan Event),
-		hookManager:   hook.Manager{Hooks: hooks},
-		stateUpdater:  state.BufferedUpdater(cfg.MesosConfig, cfg.StateUpdateBufferSize),
-		clock:         systemClock{},
-		random:        newRandom(),
+		// workaound for the problem when Mesos agent sends many KILL events to
+		// the executor, and it locks itself on this channel, because after first
+		// kill nobody is listening to it
+		events:       make(chan Event, 128),
+		hookManager:  hook.Manager{Hooks: hooks},
+		stateUpdater: state.BufferedUpdater(cfg.MesosConfig, cfg.StateUpdateBufferSize),
+		clock:        systemClock{},
+		random:       newRandom(),
 	}
 }
 
@@ -208,7 +215,7 @@ SUBSCRIBE_LOOP:
 		case <-recoveryTimeout.C:
 			return fmt.Errorf("failed to re-establish subscription with agent within %v, aborting", e.config.MesosConfig.RecoveryTimeout)
 		case <-e.context.Done():
-			log.Debug("Executor context cancelled, breaking subscribe loop")
+			log.Info("Executor context cancelled, breaking subscribe loop")
 			break SUBSCRIBE_LOOP
 		case <-shouldConnect:
 			subscribe := calls.Subscribe(nil, e.stateUpdater.GetUnacknowledged()).With(callOptions...)
@@ -231,6 +238,7 @@ SUBSCRIBE_LOOP:
 		}
 	}
 
+	log.Info("Trying to to send remaining state updates with %s timeout", e.config.StateUpdateWaitTimeout)
 	if err := e.stateUpdater.Wait(e.config.StateUpdateWaitTimeout); err != nil { // try to send remaining state updates
 		log.WithError(err).Error("Unable to send remaining state updates to Mesos agent")
 	}
@@ -264,8 +272,10 @@ func (e *Executor) handleMesosEvent(event executor.Event) error {
 		e.events <- Event{Type: Subscribed, subscribed: *event.GetSubscribed()}
 	case executor.Event_LAUNCH:
 		e.events <- Event{Type: Launch, launch: *event.GetLaunch()}
-	case executor.Event_KILL, executor.Event_SHUTDOWN:
-		e.events <- Event{Type: Kill}
+	case executor.Event_KILL:
+		e.events <- Event{Type: Kill, kill: *event.GetKill()}
+	case executor.Event_SHUTDOWN:
+		e.events <- Event{Type: Shutdown}
 	case executor.Event_ERROR:
 		return errMustAbort
 	case executor.Event_ACKNOWLEDGED:
@@ -351,11 +361,11 @@ func (e *Executor) taskEventLoop() {
 			return
 		case Kill:
 			e.shutDown(taskInfo, cmd)
-			message := "Task killed due to receiving an event from Mesos agent"
-			taskID := mesos.TaskID{Value: "MISSING"}
-			if taskInfo != nil {
-				taskID = taskInfo.GetTaskID()
-			}
+			// relaying on TaskInfo can be tricky here, as the launch event may
+			// be lost, so we will not have it, and agent still waits for some
+			// TaskStatus with valid ID
+			taskID := event.kill.GetTaskID()
+			message := "Task killed due to receiving a kill event from Mesos agent"
 			e.stateUpdater.UpdateWithOptions(
 				taskID,
 				mesos.TASK_KILLED,
@@ -364,6 +374,19 @@ func (e *Executor) taskEventLoop() {
 				},
 			)
 			return
+		case Shutdown:
+			e.shutDown(taskInfo, cmd)
+			// it is possible to receive a shutdown without launch
+			if taskInfo != nil {
+				message := "Task killed due to receiving a shutdown event from Mesos agent"
+				e.stateUpdater.UpdateWithOptions(
+					event.kill.GetTaskID(),
+					mesos.TASK_KILLED,
+					state.OptionalInfo{
+						Message: &message,
+					},
+				)
+			}
 		}
 	}
 }
diff --git a/executor_test.go b/executor_test.go
index 9531e529..93f92cdd 100644
--- a/executor_test.go
+++ b/executor_test.go
@@ -363,7 +363,9 @@ func TestCertificateCheckScheduleTaskKillBeforeCertificateExpires(t *testing.T)
 func TestIfNotPanicsWhenKillWithoutLaunch(t *testing.T) {
 	stateUpdater := new(mockUpdater)
 	stateUpdater.On("UpdateWithOptions",
-		mock.AnythingOfType("mesos.TaskID"),
+		mock.MatchedBy(func(taskID mesos.TaskID) bool {
+			return taskID.GetValue() == "taskID"
+		}),
 		mesos.TASK_KILLED,
 		mock.AnythingOfType("state.OptionalInfo")).Once()
 	events := make(chan Event, 1)
@@ -375,7 +377,7 @@ func TestIfNotPanicsWhenKillWithoutLaunch(t *testing.T) {
 	}
 
 	assert.NotPanics(t, func() {
-		events <- Event{Type: Kill}
+		events <- Event{Type: Kill, kill: executor.Event_Kill{TaskID: mesos.TaskID{Value: "taskID"}}}
 		exec.taskEventLoop()
 		stateUpdater.AssertExpectations(t)
 	})
diff --git a/gometalinter.json b/gometalinter.json
index 2f9d3729..af6cc3a9 100644
--- a/gometalinter.json
+++ b/gometalinter.json
@@ -1,7 +1,7 @@
 {
   "Aggregate": true,
   "Concurrency": 2,
-  "Cyclo": 14,
+  "Cyclo": 15,
   "Deadline": "300s",
   "DisableAll": true,
   "Enable": [