Skip to content

Commit

Permalink
Merge pull request #51 from csaunders/worker-idle-notification
Browse files Browse the repository at this point in the history
Adds a flag to touch a file when workers go idle
  • Loading branch information
csaunders committed May 28, 2015
2 parents 9bb8a1f + 0b19f54 commit cdf14ea
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
4 changes: 3 additions & 1 deletion cmd/theme/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ func FileManipulationCommandParser(cmd string, args []string) (result map[string
func WatchCommandParser(cmd string, args []string) (result map[string]interface{}, set *flag.FlagSet) {
result = make(map[string]interface{})
currentDir, _ := os.Getwd()
var environment, directory string
var environment, directory, notifyFile string

set = makeFlagSet(cmd)
set.StringVar(&environment, "env", themekit.DefaultEnvironment, "environment to run command")
set.StringVar(&directory, "dir", currentDir, "directory that config.yml is located")
set.StringVar(&notifyFile, "notify", "", "file to touch when workers have gone idle")
set.Parse(args)

client, err := loadThemeClient(directory, environment)
Expand All @@ -152,6 +153,7 @@ func WatchCommandParser(cmd string, args []string) (result map[string]interface{
}

result["themeClient"] = client
result["notifyFile"] = notifyFile

return
}
Expand Down
14 changes: 12 additions & 2 deletions commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package commands

import (
"fmt"
"github.com/csaunders/themekit"
"os"
"time"

"github.com/csaunders/themekit"
)

type WatchOptions struct {
BasicOptions
Directory string
Directory string
NotifyFile string
}

func WatchCommand(args map[string]interface{}) chan bool {
Expand All @@ -18,6 +21,7 @@ func WatchCommand(args map[string]interface{}) chan bool {
extractThemeClient(&options.Client, args)
extractEventLog(&options.EventLog, args)
extractString(&options.Directory, "directory", args)
extractString(&options.NotifyFile, "notifyFile", args)

return Watch(options)
}
Expand All @@ -32,6 +36,12 @@ func Watch(options WatchOptions) chan bool {
bucket := themekit.NewLeakyBucket(config.BucketSize, config.RefillRate, 1)
bucket.TopUp()
foreman := themekit.NewForeman(bucket)
foreman.OnIdle = func() {
if len(options.NotifyFile) > 0 {
os.Create(options.NotifyFile)
os.Chtimes(options.NotifyFile, time.Now(), time.Now())
}
}
watcher := constructFileWatcher(options.Directory, config)
foreman.JobQueue = watcher
foreman.IssueWork()
Expand Down
11 changes: 11 additions & 0 deletions foreman.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package themekit

import "time"

type Foreman struct {
bucket *LeakyBucket
halt chan bool
JobQueue chan AssetEvent
WorkerQueue chan AssetEvent
OnIdle func()
}

func NewForeman(bucket *LeakyBucket) Foreman {
Expand All @@ -13,21 +16,29 @@ func NewForeman(bucket *LeakyBucket) Foreman {
halt: make(chan bool),
JobQueue: make(chan AssetEvent),
WorkerQueue: make(chan AssetEvent),
OnIdle: func() {},
}
}

func (f Foreman) IssueWork() {
f.bucket.StartDripping()
go func() {
notifyProcessed := false
for {
select {
case job := <-f.JobQueue:
f.bucket.GetDrop()
notifyProcessed = true
go func() {
f.WorkerQueue <- job
}()
case <-f.halt:
return
case <-time.Tick(1 * time.Second):
if notifyProcessed {
notifyProcessed = false
f.OnIdle()
}
}
}
}()
Expand Down

0 comments on commit cdf14ea

Please sign in to comment.