Skip to content

Commit

Permalink
Merge pull request #44 from Venafi/scheduler
Browse files Browse the repository at this point in the history
Scheduler
  • Loading branch information
arykalin authored Mar 31, 2020
2 parents f6fba7c + 87ba3a4 commit 627a060
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 99 deletions.
4 changes: 3 additions & 1 deletion plugin/pki/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func Backend(conf *logical.BackendConfig) *backend {
if b.storage == nil {
log.Println("Can't start queue when storage is nil")
} else {
go b.importToTPP(b.storage, conf)
b.taskStorage.init()
b.importToTPP(b.storage, conf)
}

return &b
Expand All @@ -111,6 +112,7 @@ type backend struct {
crlLifetime time.Duration
revokeStorageLock sync.RWMutex
tidyCASGuard *uint32
taskStorage taskStorageStruct
}

const backendHelp = `
Expand Down
182 changes: 84 additions & 98 deletions plugin/pki/path_import_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
hconsts "github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/logical"
"log"
"strings"
"sync"
"time"
)
Expand All @@ -27,12 +28,6 @@ type Job struct {
storage logical.Storage
}

//Result tructure for import queue worker
type Result struct {
job Job
result string
}

// This returns the list of queued for import to TPP certificates
func pathImportQueue(b *backend) *framework.Path {
ret := &framework.Path{
Expand Down Expand Up @@ -98,105 +93,46 @@ func (b *backend) pathUpdateImportQueue(ctx context.Context, req *logical.Reques
return logical.ListResponse(entries), nil
}

func (b *backend) importToTPP(storage logical.Storage, conf *logical.BackendConfig) {
func (b *backend) fillImportQueueTask(roleName string, noOfWorkers int, storage logical.Storage, conf *logical.BackendConfig) {
ctx := context.Background()
jobs := make(chan Job, 100)
replicationState := conf.System.ReplicationState()
//Checking if we are on master or on the stanby Vault server
isSlave := !(conf.System.LocalMount() || !replicationState.HasState(hconsts.ReplicationPerformanceSecondary)) ||
replicationState.HasState(hconsts.ReplicationDRSecondary) ||
replicationState.HasState(hconsts.ReplicationPerformanceStandby)
if isSlave {
log.Println("We're on slave. Sleeping")
return
}
log.Println("We're on master. Starting to import certificates")
//var err error
importPath := "import-queue/" + roleName + "/"

log.Println("Starting new import routine")
for {
replicationState := conf.System.ReplicationState()
//Checking if we are on master or on the stanby Vault server
if !(conf.System.LocalMount() || !replicationState.HasState(hconsts.ReplicationPerformanceSecondary)) ||
replicationState.HasState(hconsts.ReplicationDRSecondary) ||
replicationState.HasState(hconsts.ReplicationPerformanceStandby) {
log.Println("We're on slave. Sleeping")
time.Sleep(10 * time.Second)
continue
}
log.Println("We're on master. Starting to import certificates")
roles, err := storage.List(ctx, "role/")
if err != nil {
log.Printf("Couldn't get list of roles %s", roles)
time.Sleep(time.Second)
continue
}

if len(roles) == 0 {
log.Printf("Role list is empty. Sleeping.")
time.Sleep(time.Second)
continue
}

var wg sync.WaitGroup
for _, roleName := range roles {
//Firing go routine for each role
wg.Add(1)
go func(roleName string) {
defer wg.Done()
log.Println("Started routine for role", roleName)
//var err error
importPath := "import-queue/" + roleName + "/"

entries, err := storage.List(ctx, importPath)
if err != nil {
log.Printf("Could not get queue list from path %s: %s", err, importPath)
time.Sleep(3 * time.Second)
return
}
log.Printf("Queue list on path %s has length %v", importPath, len(entries))

//Update role since it's settings may be changed
role, err := b.getRole(ctx, storage, roleName)
if err != nil {
log.Printf("Error getting role %v: %s\n Exiting.", role, err)
time.Sleep(3 * time.Second)
return
}
if role == nil {
log.Printf("Unknown role %v\n Exiting for path %s.", role, importPath)
time.Sleep(3 * time.Second)
return
}

noOfWorkers := role.TPPImportWorkers
if len(entries) > 0 {
log.Printf("Creating %d of jobs for %d workers.\n", len(entries), noOfWorkers)
var jobs = make(chan Job, len(entries))
var results = make(chan Result, len(entries))
startTime := time.Now()
go b.createWorkerPool(noOfWorkers, results, jobs)
go allocate(jobs, entries, ctx, storage, roleName, importPath)
for result := range results {
log.Printf("Job id: %d ### Processed entry: %s , result:\n %v\n", result.job.id, result.job.entry, result.result)
}
log.Printf("Total time taken %v seconds.\n", time.Since(startTime))
}
log.Println("Waiting for next turn")
time.Sleep(time.Duration(role.TPPImportTimeout) * time.Second) //todo: maybe need to sub working time from prev line
}(roleName)
}
wg.Wait()
entries, err := storage.List(ctx, importPath)
if err != nil {
log.Printf("Could not get queue list from path %s: %s", err, importPath)
return
}
}
log.Printf("Queue list on path %s has length %v", importPath, len(entries))

func (b *backend) createWorkerPool(noOfWorkers int, results chan Result, jobs chan Job) {
var wg sync.WaitGroup
wg.Add(noOfWorkers)
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go b.worker(&wg, results, jobs)
}
wg.Wait()
close(results)
}

func (b *backend) worker(wg *sync.WaitGroup, results chan Result, jobs chan Job) {
for job := range jobs {
output := Result{job, b.processImportToTPP(job)}
results <- output
go func() {
defer func() {
r := recover()
if r != nil {
log.Println("recover", r)
}
wg.Done()
}()
for job := range jobs {
result := b.processImportToTPP(job)
log.Printf("Job id: %d ### Processed entry: %s , result:\n %v\n", job.id, job.entry, result)
}
}()
}
wg.Done()
}

func allocate(jobs chan Job, entries []string, ctx context.Context, storage logical.Storage, roleName string, importPath string) {
for i, entry := range entries {
log.Printf("Allocating job for entry %s", entry)
job := Job{
Expand All @@ -210,6 +146,56 @@ func allocate(jobs chan Job, entries []string, ctx context.Context, storage logi
jobs <- job
}
close(jobs)
wg.Wait()
}

func (b *backend) importToTPP(storage logical.Storage, conf *logical.BackendConfig) {
b.taskStorage.register("importcontroler", func() {
b.controlImportQueue(storage, conf)
}, 1, time.Second)
}

func (b *backend) controlImportQueue(storage logical.Storage, conf *logical.BackendConfig) {
ctx := context.Background()
const fillQueuePrefix = "fillqueue-"
roles, err := storage.List(ctx, "role/")
if err != nil {
log.Printf("Couldn't get list of roles %s", roles)
return
}

for i := range roles {
roleName := roles[i]
//Update role since it's settings may be changed
role, err := b.getRole(ctx, storage, roleName)
if err != nil {
log.Printf("Error getting role %v: %s\n Exiting.", role, err)
continue
}
if role == nil {
log.Printf("Unknown role %v\n", role)
continue
}

b.taskStorage.register(fillQueuePrefix+roleName, func() {
log.Printf("run queue filler %s", roleName)
b.fillImportQueueTask(roleName, role.TPPImportWorkers, storage, conf)
}, 1, time.Duration(role.TPPImportTimeout)*time.Second)

}
stringInSlice := func(s string, sl []string) bool {
for i := range sl {
if sl[i] == s {
return true
}
}
return false
}
for _, taskName := range b.taskStorage.getTasksNames() {
if strings.HasPrefix(taskName, fillQueuePrefix) && !stringInSlice(strings.TrimPrefix(taskName, fillQueuePrefix), roles) {
b.taskStorage.del(taskName)
}
}
}

func (b *backend) processImportToTPP(job Job) string {
Expand Down
95 changes: 95 additions & 0 deletions plugin/pki/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package pki

import (
"log"
"sync"
"sync/atomic"
"time"
)

type backgroundTask struct {
name string
f func()
workers int64
currentWorkers int64
interval time.Duration
lastRun time.Time
}

type taskStorageStruct struct {
tasks []backgroundTask
sync.RWMutex
}

func (task *backgroundTask) cancel() {

}

func (s *taskStorageStruct) getTasksNames() []string {
s.RLock()
defer s.RUnlock()
l := make([]string, len(s.tasks))
for i := range s.tasks {
l[i] = s.tasks[i].name
}
return l
}

func (s *taskStorageStruct) register(name string, f func(), count int, interval time.Duration) {
s.Lock()
defer s.Unlock()
task := backgroundTask{name: name, f: f, workers: int64(count), interval: interval}
for i := range s.tasks {
if s.tasks[i].name == task.name {
log.Printf("duplicated task %v", name)
return
}
}
s.tasks = append(s.tasks, task)
}

func (s *taskStorageStruct) del(taskName string) {
s.Lock()
defer s.Unlock()
for i := range s.tasks {
if s.tasks[i].name == taskName {
s.tasks[i].cancel()
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
return
}
}
}

func (s *taskStorageStruct) init() {
go s.loop()
}

func (s *taskStorageStruct) loop() {
for {
s.RLock()
for i := range s.tasks {
if s.tasks[i].currentWorkers >= s.tasks[i].workers {
continue
}
if time.Since(s.tasks[i].lastRun) < s.tasks[i].interval {
continue
}
currentTask := &s.tasks[i]
atomic.AddInt64(&currentTask.currentWorkers, 1)
go func(counter *int64) {
defer func(counter *int64) {
r := recover()
if r != nil {
log.Printf("job failed. recover: %v\n", r)
//todo: better log
}
atomic.AddInt64(counter, -1)
}(counter)
currentTask.f()
}(&currentTask.currentWorkers)
currentTask.lastRun = time.Now()
}
s.RUnlock()
time.Sleep(time.Second)
}
}
Loading

0 comments on commit 627a060

Please sign in to comment.