Skip to content

Commit

Permalink
Add External Priority Multipliers (#4174)
Browse files Browse the repository at this point in the history
* Create auction.proto

* wip

* wip

* wip

* wip

* test

* test

* lint

* log msg

* default disable priority multipliers

* add missing file
  • Loading branch information
d80tb7 authored Jan 29, 2025
1 parent a5afc8a commit fcee91c
Show file tree
Hide file tree
Showing 25 changed files with 1,325 additions and 17 deletions.
3 changes: 2 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pulsar:
armadaApi:
armadaUrl: "server:50051"
forceNoTls: true
priorityMultiplier:
enabled: false
postgres:
connection:
host: postgres
Expand Down Expand Up @@ -118,4 +120,3 @@ scheduling:
experimentalIndicativePricing:
basePrice: 100.0
basePriority: 500.0

9 changes: 9 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Configuration struct {
DatabaseFetchSize int `validate:"required"`
// Frequency at which queues will be fetched from the API
QueueRefreshPeriod time.Duration `validate:"required"`
// Allows queue priority multipliers to be fetched from an external source
PriorityMultiplier PriorityMultiplierConfig
}

type LeaderConfig struct {
Expand Down Expand Up @@ -295,3 +297,10 @@ type ExperimentalIndicativePricing struct {
BasePrice float64
BasePriority float64
}

type PriorityMultiplierConfig struct {
Enabled bool
UpdateFrequency time.Duration
ServiceUrl string
ForceNoTls bool
}
28 changes: 28 additions & 0 deletions internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type cycleMetrics struct {
fairnessError *prometheus.GaugeVec
demand *prometheus.GaugeVec
cappedDemand *prometheus.GaugeVec
queueWeight *prometheus.GaugeVec
rawQueueWeight *prometheus.GaugeVec
scheduleCycleTime prometheus.Histogram
reconciliationCycleTime prometheus.Histogram
gangsConsidered *prometheus.GaugeVec
Expand Down Expand Up @@ -106,6 +108,22 @@ func newCycleMetrics() *cycleMetrics {
poolAndQueueLabels,
)

queueWeight := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "queue_weight",
Help: "Weight of the queue after multipliers have been applied",
},
poolAndQueueLabels,
)

rawQueueWeight := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "raw_queue_weight",
Help: "Weight of the queue before multipliers have been applied",
},
poolAndQueueLabels,
)

fairnessError := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "fairness_error",
Expand Down Expand Up @@ -212,6 +230,8 @@ func newCycleMetrics() *cycleMetrics {
actualShare: actualShare,
demand: demand,
cappedDemand: cappedDemand,
queueWeight: queueWeight,
rawQueueWeight: rawQueueWeight,
fairnessError: fairnessError,
scheduleCycleTime: scheduleCycleTime,
gangsConsidered: gangsConsidered,
Expand Down Expand Up @@ -242,6 +262,8 @@ func newCycleMetrics() *cycleMetrics {
evictedJobs,
evictedResources,
spotPrice,
queueWeight,
rawQueueWeight,
},
reconciliationCycleTime: reconciliationCycleTime,
}
Expand Down Expand Up @@ -286,6 +308,8 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult)
m.actualShare.WithLabelValues(pool, queue).Set(actualShare)
m.demand.WithLabelValues(pool, queue).Set(demand)
m.cappedDemand.WithLabelValues(pool, queue).Set(cappedDemand)
m.queueWeight.WithLabelValues(pool, queue).Set(queueContext.Weight)
m.rawQueueWeight.WithLabelValues(pool, queue).Set(queueContext.RawWeight)
}
m.fairnessError.WithLabelValues(pool).Set(schedContext.FairnessError())
m.spotPrice.WithLabelValues(pool).Set(schedContext.SpotPrice)
Expand Down Expand Up @@ -331,6 +355,8 @@ func (m *cycleMetrics) describe(ch chan<- *prometheus.Desc) {
m.fairnessError.Describe(ch)
m.demand.Describe(ch)
m.cappedDemand.Describe(ch)
m.queueWeight.Describe(ch)
m.rawQueueWeight.Describe(ch)
m.scheduleCycleTime.Describe(ch)
m.gangsConsidered.Describe(ch)
m.gangsScheduled.Describe(ch)
Expand All @@ -357,6 +383,8 @@ func (m *cycleMetrics) collect(ch chan<- prometheus.Metric) {
m.fairnessError.Collect(ch)
m.demand.Collect(ch)
m.cappedDemand.Collect(ch)
m.rawQueueWeight.Collect(ch)
m.queueWeight.Collect(ch)
m.scheduleCycleTime.Collect(ch)
m.gangsConsidered.Collect(ch)
m.gangsScheduled.Collect(ch)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/scheduler/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package schedulermocks
//go:generate mockgen -destination=./leases_getter.go -package=schedulermocks "k8s.io/client-go/kubernetes/typed/coordination/v1" LeasesGetter,LeaseInterface
//go:generate mockgen -destination=./job_repository.go -package=schedulermocks "github.com/armadaproject/armada/internal/scheduler/database" JobRepository
//go:generate mockgen -destination=./executor_repository.go -package=schedulermocks "github.com/armadaproject/armada/internal/scheduler/database" ExecutorRepository
//go:generate mockgen -destination=./grpc.go -package=schedulermocks "github.com/armadaproject/armada/pkg/executorapi" ExecutorApi_LeaseJobRunsServer
//go:generate mockgen -destination=./executor_api.go -package=schedulermocks "github.com/armadaproject/armada/pkg/executorapi" ExecutorApi_LeaseJobRunsServer
//go:generate mockgen -destination=./queue_cache.go -package=schedulermocks "github.com/armadaproject/armada/internal/scheduler/queue" QueueCache
//go:generate mockgen -destination=./api.go -package=schedulermocks "github.com/armadaproject/armada/pkg/api" SubmitClient,Submit_GetQueuesClient
//go:generate mockgen -destination=./priority_override.go -package=schedulermocks "github.com/armadaproject/armada/pkg/priorityoverride" PriorityMultiplierServiceClient
63 changes: 63 additions & 0 deletions internal/scheduler/mocks/priority_override.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 92 additions & 0 deletions internal/scheduler/prioritymultiplier/service_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package prioritymultiplier

import (
"fmt"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/armadaproject/armada/internal/common/armadacontext"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/pkg/priorityoverride"
)

func NewServiceClient(config schedulerconfig.PriorityMultiplierConfig) (priorityoverride.PriorityMultiplierServiceClient, error) {
creds := credentials.NewClientTLSFromCert(nil, "")
if config.ForceNoTls {
creds = insecure.NewCredentials()
}
client, err := grpc.NewClient(config.ServiceUrl, grpc.WithTransportCredentials(creds))
if err != nil {
return nil, err
}
return priorityoverride.NewPriorityMultiplierServiceClient(client), nil
}

// ServiceProvider is an implementation of Provider that fetches priority multipliers from the Priority Multiplier Service.
// We cache the multipliers in memory so that we can continue scheduling even if the API is unavailable
type ServiceProvider struct {
updateFrequency time.Duration
apiClient priorityoverride.PriorityMultiplierServiceClient
multipliers atomic.Pointer[map[multiplierKey]float64]
}

func NewServiceProvider(apiClient priorityoverride.PriorityMultiplierServiceClient, updateFrequency time.Duration) *ServiceProvider {
return &ServiceProvider{
updateFrequency: updateFrequency,
apiClient: apiClient,
multipliers: atomic.Pointer[map[multiplierKey]float64]{},
}
}

func (p *ServiceProvider) Run(ctx *armadacontext.Context) error {
if err := p.fetchMultipliers(ctx); err != nil {
ctx.Warnf("Error fetching multipliers: %v", err)
}
ticker := time.NewTicker(p.updateFrequency)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := p.fetchMultipliers(ctx); err != nil {
ctx.Warnf("Error fetching multipliers: %v", err)
}
}
}
}

func (p *ServiceProvider) Ready() bool {
return p.multipliers.Load() != nil
}

func (p *ServiceProvider) Multiplier(pool, queue string) (float64, error) {
multipliers := p.multipliers.Load()
if multipliers == nil {
return 0, fmt.Errorf("no multipliers available")
}
multiplier, ok := (*multipliers)[multiplierKey{pool: pool, queue: queue}]
if !ok {
return 1.0, nil
}
return multiplier, nil
}

func (p *ServiceProvider) fetchMultipliers(ctx *armadacontext.Context) error {
resp, err := p.apiClient.GetPriorityMultipliers(ctx, &priorityoverride.PriorityMultiplierRequest{})
if err != nil {
return err
}
multipliers := make(map[multiplierKey]float64)
for _, poolMultipliers := range resp.PoolPriorityMultipliers {
for queue, multiplier := range poolMultipliers.Multipliers {
key := multiplierKey{pool: poolMultipliers.Pool, queue: queue}
multipliers[key] = multiplier
}
}
p.multipliers.Store(&multipliers)
return nil
}
Loading

0 comments on commit fcee91c

Please sign in to comment.