Skip to content

Commit

Permalink
updated async requests handler
Browse files Browse the repository at this point in the history
  • Loading branch information
adranwit committed Jun 4, 2024
1 parent eed2a20 commit 11becc0
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions gateway/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/viant/afs/url"
"github.com/viant/xdatly/handler/async"
"log"
"sync"
"time"
)

Expand All @@ -19,6 +20,8 @@ func (s *Service) watchAsyncJob(ctx context.Context) {
if s.Config.MaxJobs > 0 {
limiter = make(chan bool, s.Config.MaxJobs)
}
pending := make(map[string]bool)
var mux sync.RWMutex
for {
objects, _ := s.fs.List(ctx, s.Config.JobURL, option.NewRecursive(true))
objectCount := 0
Expand All @@ -37,6 +40,16 @@ func (s *Service) watchAsyncJob(ctx context.Context) {
if object.IsDir() {
continue
}
mux.RLock()
isPending := pending[object.URL()]
mux.RUnlock()
if isPending {
continue
}
mux.Lock()
pending[object.URL()] = true
mux.Unlock()

if limiter != nil {
limiter <- true
}
Expand All @@ -46,6 +59,9 @@ func (s *Service) watchAsyncJob(ctx context.Context) {
if limiter != nil {
<-limiter
}
mux.Lock()
delete(pending, object.URL())
mux.Unlock()
}()
router, _ := s.Router()
if router != nil {
Expand Down

0 comments on commit 11becc0

Please sign in to comment.