Skip to content

Commit

Permalink
Executor Vaas Hook should register port with proper label (not first …
Browse files Browse the repository at this point in the history
…defined port)
  • Loading branch information
andrzejwaw authored and tomez committed Feb 1, 2019
1 parent 90f54a1 commit 0396532
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 9 deletions.
21 changes: 15 additions & 6 deletions hook/vaas/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
const vaasBackendIDKey = "vaas-backend-id"
const vaasDirectorLabelKey = "director"
const vaasAsyncLabelKey = "vaas-queue"
const vaasFrontendSyncPortLabelKey = "frontend-sync"

// vaasInitialWeight is an environment variable used to override initial weight.
const vaasInitialWeight = "VAAS_INITIAL_WEIGHT"
Expand Down Expand Up @@ -72,10 +73,18 @@ func (sh *Hook) RegisterBackend(taskInfo mesosutils.TaskInfo) error {
return err
}

ports := taskInfo.GetPorts()
portLabelKey := fmt.Sprintf("%s:%s", vaasFrontendSyncPortLabelKey, director)
port, err := taskInfo.GetFirstPortWithLabel(portLabelKey)

if len(ports) < 1 {
return errors.New("service has no ports available")
if port == nil {
ports := taskInfo.GetPorts()
if len(ports) < 1 {
return errors.New("service has no ports available")
}
port = &ports[0]
log.Warnf("Port with label %s not found, using first port %v", portLabelKey, port.GetNumber())
} else {
log.Infof("Port with label %s found, using port %v", portLabelKey, port.GetNumber())
}

var initialWeight *int
Expand Down Expand Up @@ -106,7 +115,7 @@ func (sh *Hook) RegisterBackend(taskInfo mesosutils.TaskInfo) error {
Director: fmt.Sprintf("%s%d/", apiDirectorPath, directorID),
Weight: initialWeight,
DC: *dc,
Port: int(ports[0].GetNumber()),
Port: int(port.GetNumber()),
InheritTimeProfile: true,
Tags: tags,
}
Expand All @@ -128,14 +137,14 @@ func (sh *Hook) RegisterBackend(taskInfo mesosutils.TaskInfo) error {
// DeregisterBackend deletes backend from VaaS.
func (sh *Hook) DeregisterBackend(_ mesosutils.TaskInfo) error {
if sh.backendID != nil {
log.WithField(vaasBackendIDKey, sh.backendID).
log.WithField(vaasBackendIDKey, *sh.backendID).
Info("backendID is set - scheduling backend for deletion via VaaS")

if err := sh.client.DeleteBackend(*sh.backendID); err != nil {
return err
}

log.WithField(vaasBackendIDKey, sh.backendID).
log.WithField(vaasBackendIDKey, *sh.backendID).
Info("Successfully scheduled backend for deletion via VaaS")
// we will not try to remove the same backend (and get an error) if this hook gets called again
sh.backendID = nil
Expand Down
37 changes: 34 additions & 3 deletions hook/vaas/hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,35 @@ func prepareTaskInfo() mesosutils.TaskInfo {
return mesosutils.TaskInfo{mesos.TaskInfo{Discovery: &discovery}}
}

func prepareTaskInfoWithMultiplePortsAndFrontendPortLabel(directorName string) mesosutils.TaskInfo {
tag := "tag"
frontLabel := mesos.Label{Key: fmt.Sprintf("%s:%s", "frontend-sync", directorName), Value: &tag}
labelList := []mesos.Label{frontLabel}
labels := mesos.Labels{Labels: labelList}

ports := mesos.Ports{
Ports: []mesos.Port{{Number: uint32(8080)}, {Number: uint32(8081), Labels: &labels}},
}
discovery := mesos.DiscoveryInfo{Ports: &ports}

return mesosutils.TaskInfo{mesos.TaskInfo{Discovery: &discovery}}
}

func prepareTaskInfoWithDirectorWithLabeledPort(directorName string, extraLabels ...mesos.Label) (taskInfo mesosutils.TaskInfo) {
taskInfo = prepareTaskInfoWithMultiplePortsAndFrontendPortLabel(directorName)
tag := "tag"
directorLabel := mesos.Label{Key: "director", Value: &directorName}
weightLabel := mesos.Label{Key: "weight:50", Value: &tag}
labelList := []mesos.Label{directorLabel, weightLabel}
labelList = append(labelList, extraLabels...)
labels := mesos.Labels{Labels: labelList}
taskInfo.TaskInfo.Labels = &labels

taskInfo.TaskInfo.Command = &mesos.CommandInfo{}

return taskInfo
}

func prepareTaskInfoWithDirector(directorName string, extraLabels ...mesos.Label) (taskInfo mesosutils.TaskInfo) {
taskInfo = prepareTaskInfo()
tag := "tag"
Expand Down Expand Up @@ -85,17 +114,19 @@ func TestIfBackendIDSetWhenBackendRegistrationSucceeds(t *testing.T) {
mockClient.On("GetDC", "dc6").Return(&mockDC, nil)
mockClient.On("FindDirectorID", "abc456").Return(456, nil)
weight := 50

mockClient.On("AddBackend", &Backend{
Address: runenv.IP().String(),
DC: mockDC,
Director: "/api/v0.1/director/456/",
InheritTimeProfile: true,
Port: 8080,
Port: 8081,
Weight: &weight,
Tags: []string(nil),
}).Return("/api/v0.1/backend/123/", nil)

serviceHook := Hook{client: mockClient}
err := serviceHook.RegisterBackend(prepareTaskInfoWithDirector("abc456"))
err := serviceHook.RegisterBackend(prepareTaskInfoWithDirectorWithLabeledPort("abc456"))

require.NoError(t, err)
expectedId := 123
Expand Down Expand Up @@ -275,4 +306,4 @@ func TestIfNewHookCreatesNoopHookWhenHookDisabled(t *testing.T) {

require.NoError(t, err)
assert.IsType(t, hook.NoopHook{}, h)
}
}
18 changes: 18 additions & 0 deletions mesosutils/taskinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ func (h TaskInfo) GetPorts() []mesos.Port {
return h.TaskInfo.GetDiscovery().GetPorts().GetPorts()
}

// GetFirstPortWithLabel returns port with specified label
func (h TaskInfo) GetFirstPortWithLabel(portLabel string) (*mesos.Port, error) {
ports := h.GetPorts()
if len(ports) < 1 {
return nil, fmt.Errorf("service has no ports available")
}
for i := range ports {
if ports[i].Labels != nil && ports[i].Labels.Labels != nil {
for _, label := range ports[i].Labels.Labels {
if portLabel == label.Key {
return &ports[i], nil
}
}
}
}
return nil, fmt.Errorf("No port with label %s", portLabel)
}

// FindEnvValue returns the value of an environment variable
func (h TaskInfo) FindEnvValue(key string) string {
for _, envVar := range h.TaskInfo.GetCommand().GetEnvironment().GetVariables() {
Expand Down

0 comments on commit 0396532

Please sign in to comment.