Skip to content

Commit

Permalink
Merge pull request #2287 from Thor-wl/0606-1.6
Browse files Browse the repository at this point in the history
[cherry-pick]fix some bugs about rescheduling plugin
  • Loading branch information
volcano-sh-bot authored Jun 9, 2022
2 parents b2f6fe7 + b80e86a commit 44f32b0
Show file tree
Hide file tree
Showing 20 changed files with 2,229 additions and 120 deletions.
25 changes: 25 additions & 0 deletions LICENSES/vendor/github.com/mitchellh/mapstructure/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/go-multierror v1.0.0
github.com/imdario/mergo v0.3.5
github.com/mitchellh/mapstructure v1.5.0
github.com/onsi/ginkgo/v2 v2.0.0
github.com/onsi/gomega v1.17.0
github.com/prometheus/client_golang v1.12.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/ipvs v1.0.1/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hxQ=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/moby/sys/mountinfo v0.4.1/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A=
Expand Down
45 changes: 25 additions & 20 deletions pkg/controllers/job/plugins/distributed-framework/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ import (
)

const (
MpiPluginName = "mpi"
DefaultPort = 22
// MPIPluginName is the name of the plugin
MPIPluginName = "mpi"
// DefaultPort is the default port for ssh
DefaultPort = 22
// DefaultMaster is the default task name of master host
DefaultMaster = "master"
// DefaultWorker is the default task name of worker host
DefaultWorker = "worker"
MpiHost = "MPI_HOST"
// MPIHost is the environment variable key of MPI host
MPIHost = "MPI_HOST"
)

type mpiPlugin struct {
type MPIPlugin struct {
mpiArguments []string
clientset pluginsinterface.PluginClientset
masterName string
Expand All @@ -45,18 +50,18 @@ type mpiPlugin struct {

// New creates mpi plugin.
func New(client pluginsinterface.PluginClientset, arguments []string) pluginsinterface.PluginInterface {
mp := mpiPlugin{mpiArguments: arguments, clientset: client}
mp := MPIPlugin{mpiArguments: arguments, clientset: client}
mp.addFlags()
return &mp
}

func NewInstance(arguments []string) mpiPlugin {
mp := mpiPlugin{mpiArguments: arguments}
func NewInstance(arguments []string) MPIPlugin {
mp := MPIPlugin{mpiArguments: arguments}
mp.addFlags()
return mp
}

func (mp *mpiPlugin) addFlags() {
func (mp *MPIPlugin) addFlags() {
flagSet := flag.NewFlagSet(mp.Name(), flag.ContinueOnError)
flagSet.StringVar(&mp.masterName, "master", DefaultMaster, "name of master role task")
flagSet.StringVar(&mp.workerName, "worker", DefaultWorker, "name of worker role task")
Expand All @@ -66,18 +71,18 @@ func (mp *mpiPlugin) addFlags() {
}
}

func (mp *mpiPlugin) Name() string {
return MpiPluginName
func (mp *MPIPlugin) Name() string {
return MPIPluginName
}

func (mp *mpiPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
func (mp *MPIPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
isMaster := false
workerHosts := ""
env := v1.EnvVar{}
if helpers.GetTaskKey(pod) == mp.masterName {
workerHosts = mp.generateTaskHosts(job.Spec.Tasks[helpers.GetTasklndexUnderJob(mp.workerName, job)], job.Name)
env = v1.EnvVar{
Name: MpiHost,
Name: MPIHost,
Value: workerHosts,
}

Expand All @@ -102,7 +107,7 @@ func (mp *mpiPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
return nil
}

func (mp *mpiPlugin) generateTaskHosts(task batch.TaskSpec, jobName string) string {
func (mp *MPIPlugin) generateTaskHosts(task batch.TaskSpec, jobName string) string {
hosts := ""
for i := 0; i < int(task.Replicas); i++ {
hostName := task.Template.Spec.Hostname
Expand All @@ -121,7 +126,7 @@ func (mp *mpiPlugin) generateTaskHosts(task batch.TaskSpec, jobName string) stri
return hosts[:len(hosts)-1]
}

func (mp *mpiPlugin) openContainerPort(c *v1.Container, index int, pod *v1.Pod, isInitContainer bool) {
func (mp *MPIPlugin) openContainerPort(c *v1.Container, index int, pod *v1.Pod, isInitContainer bool) {
SSHPortRight := false
for _, p := range c.Ports {
if p.ContainerPort == int32(mp.port) {
Expand All @@ -142,34 +147,34 @@ func (mp *mpiPlugin) openContainerPort(c *v1.Container, index int, pod *v1.Pod,
}
}

func (mp *mpiPlugin) OnJobAdd(job *batch.Job) error {
func (mp *MPIPlugin) OnJobAdd(job *batch.Job) error {
if job.Status.ControlledResources["plugin-"+mp.Name()] == mp.Name() {
return nil
}
job.Status.ControlledResources["plugin-"+mp.Name()] = mp.Name()
return nil
}

func (mp *mpiPlugin) OnJobDelete(job *batch.Job) error {
func (mp *MPIPlugin) OnJobDelete(job *batch.Job) error {
if job.Status.ControlledResources["plugin-"+mp.Name()] != mp.Name() {
return nil
}
delete(job.Status.ControlledResources, "plugin-"+mp.Name())
return nil
}

func (mp *mpiPlugin) OnJobUpdate(job *batch.Job) error {
func (mp *MPIPlugin) OnJobUpdate(job *batch.Job) error {
return nil
}

func (mp *mpiPlugin) GetMasterName() string {
func (mp *MPIPlugin) GetMasterName() string {
return mp.masterName
}

func (mp *mpiPlugin) GetWorkerName() string {
func (mp *MPIPlugin) GetWorkerName() string {
return mp.workerName
}

func (mp *mpiPlugin) GetMpiArguments() []string {
func (mp *MPIPlugin) GetMpiArguments() []string {
return mp.mpiArguments
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestMpi(t *testing.T) {
plugins := make(map[string][]string)
plugins[MpiPluginName] = []string{"--port=5000"}
plugins[MPIPluginName] = []string{"--port=5000"}
testjob5000 := &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "test-mpi-1"},
Spec: v1alpha1.JobSpec{
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestMpi(t *testing.T) {

for index, testcase := range testcases {
t.Run(testcase.Name, func(t *testing.T) {
mp := New(pluginsinterface.PluginClientset{}, testcase.Job.Spec.Plugins[MpiPluginName])
mp := New(pluginsinterface.PluginClientset{}, testcase.Job.Spec.Plugins[MPIPluginName])
if err := mp.OnPodCreate(testcase.Pod, testcase.Job); err != nil {
t.Errorf("Case %d (%s): expect no error, but got error %v", index, testcase.Name, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/shuffle/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (shuffle *Action) Execute(ssn *framework.Session) {
// Evict target workloads
victims := ssn.VictimTasks(tasks)
for victim := range victims {
klog.V(5).Infof("Victim %s: [ns: %s, job: %s]\n", victim.Name, victim.Namespace, victim.Job)
klog.V(3).Infof("pod %s from namespace %s and job %s will be evicted.\n", victim.Name, victim.Namespace, string(victim.Job))
if err := ssn.Evict(victim, "shuffle"); err != nil {
klog.Errorf("Failed to evict Task <%s/%s>: %v\n", victim.Namespace, victim.Name, err)
continue
Expand Down
62 changes: 36 additions & 26 deletions pkg/scheduler/plugins/rescheduling/low_node_utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,29 @@ func (lnuc *LowNodeUtilizationConf) parse(configs map[string]interface{}) {
}
lowThresholdsConfigs, ok := configs["thresholds"]
if ok {
lowConfigs, _ := lowThresholdsConfigs.(map[string]int)
parseThreshold(lowConfigs, lnuc, "Thresholds")
lowConfigs, ok := lowThresholdsConfigs.(map[interface{}]interface{})
if !ok {
klog.Warningln("Assert lowThresholdsConfigs to map error, abort the configuration parse.")
return
}
config := make(map[string]int)
for k, v := range lowConfigs {
config[k.(string)] = v.(int)
}
parseThreshold(config, lnuc, "Thresholds")
}
targetThresholdsConfigs, ok := configs["targetThresholds"]
if ok {
targetConfigs, _ := targetThresholdsConfigs.(map[string]int)
parseThreshold(targetConfigs, lnuc, "TargetThresholds")
targetConfigs, ok := targetThresholdsConfigs.(map[interface{}]interface{})
if !ok {
klog.Warningln("Assert targetThresholdsConfigs to map error, abort the configuration parse.")
return
}
config := make(map[string]int)
for k, v := range targetConfigs {
config[k.(string)] = v.(int)
}
parseThreshold(config, lnuc, "TargetThresholds")
}
}

Expand Down Expand Up @@ -98,7 +114,7 @@ var victimsFnForLnu = func(tasks []*api.TaskInfo) []*api.TaskInfo {
var config map[string]interface{}
config, ok := parametersConfig.(map[string]interface{})
if !ok {
klog.Error("parameters parse error for lowNodeUtilization")
klog.Errorln("parameters parse error for lowNodeUtilization")
return victims
}
utilizationConfig.parse(config)
Expand All @@ -125,21 +141,19 @@ var victimsFnForLnu = func(tasks []*api.TaskInfo) []*api.TaskInfo {
}

// lowThresholdFilter filter nodes which all resource dimensions are under the low utilization threshold
func lowThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface{}) bool {
func lowThresholdFilter(usage *NodeUtilization, config interface{}) bool {
utilizationConfig := parseArgToConfig(config)
if utilizationConfig == nil {
klog.V(4).Infof("lack of LowNodeUtilizationConf pointer parameter")
klog.V(4).Infoln("lack of LowNodeUtilizationConf pointer parameter")
return false
}

if node.Spec.Unschedulable {
if usage.nodeInfo.Spec.Unschedulable {
return false
}
nodeCapacity := getNodeCapacity(node)
for rName, usage := range usage.utilization {
if thresholdPercent, ok := utilizationConfig.Thresholds[string(rName)]; ok {
threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity)
if usage.Cmp(*threshold) == 1 {
for rName, usagePercent := range usage.utilization {
if threshold, ok := utilizationConfig.Thresholds[string(rName)]; ok {
if usagePercent >= threshold {
return false
}
}
Expand All @@ -148,18 +162,16 @@ func lowThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface{
}

// highThresholdFilter filter nodes which at least one resource dimension above the target utilization threshold
func highThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface{}) bool {
func highThresholdFilter(usage *NodeUtilization, config interface{}) bool {
utilizationConfig := parseArgToConfig(config)
if utilizationConfig == nil {
klog.V(4).Infof("lack of LowNodeUtilizationConf pointer parameter")
return false
}

nodeCapacity := getNodeCapacity(node)
for rName, usage := range usage.utilization {
if thresholdPercent, ok := utilizationConfig.TargetThresholds[string(rName)]; ok {
threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity)
if usage.Cmp(*threshold) == 1 {
for rName, usagePercent := range usage.utilization {
if threshold, ok := utilizationConfig.TargetThresholds[string(rName)]; ok {
if usagePercent > threshold {
return true
}
}
Expand All @@ -168,14 +180,12 @@ func highThresholdFilter(node *v1.Node, usage *NodeUtilization, config interface
}

// isContinueEvictPods judges whether continue to select victim pods
func isContinueEvictPods(node *v1.Node, usage *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, config interface{}) bool {
func isContinueEvictPods(usage *NodeUtilization, totalAllocatableResource map[v1.ResourceName]*resource.Quantity, config interface{}) bool {
var isNodeOverused bool
utilizationConfig := parseArgToConfig(config)
nodeCapacity := getNodeCapacity(node)
for rName, usage := range usage.utilization {
if thresholdPercent, ok := utilizationConfig.TargetThresholds[string(rName)]; ok {
threshold := getThresholdForNode(rName, thresholdPercent, nodeCapacity)
if usage.Cmp(*threshold) == 1 {
if threshold, ok := utilizationConfig.TargetThresholds[string(rName)]; ok {
if usage >= threshold {
isNodeOverused = true
break
}
Expand All @@ -185,8 +195,8 @@ func isContinueEvictPods(node *v1.Node, usage *NodeUtilization, totalAllocatable
return false
}

for rName := range totalAllocatableResource {
if totalAllocatableResource[rName].CmpInt64(0) == 0 {
for _, amount := range totalAllocatableResource {
if amount.CmpInt64(0) == 0 {
return false
}
}
Expand Down
Loading

0 comments on commit 44f32b0

Please sign in to comment.