Skip to content

Commit

Permalink
Simplify work processor logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Brandon Berhent committed Aug 12, 2022
1 parent ed1d061 commit 354cfea
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 33 deletions.
10 changes: 9 additions & 1 deletion apps/client/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ func (ws *WebsocketService) StartWSClient(ctx context.Context, workQueueChan cha
continue
}

// Queue
// If the backlog is too large, no-op
if queue.Len() > 99 {
fmt.Printf("\nBacklog is too large, skipping hash %s", serverMsg.Hash)
}

// Queue this work
queue.Put(serverMsg)

// Signal channel that we have work to do
workQueueChan <- &serverMsg
} else if serverMsg.MessageType == serializableModels.WorkCancel {
// Delete pending work from queue
Expand Down
46 changes: 17 additions & 29 deletions apps/client/work/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,34 @@ type WorkProcessor struct {
Queue *models.RandomAccessQueue
// WorkQueueChan is where we write requests from the websocket
WorkQueueChan chan *serializableModels.ClientMessage
// WorkProcessChan is where we actually read from the queue and compute work
WorkProcessChan chan bool
WSService *websocket.WebsocketService
WorkPool *WorkPool
WSService *websocket.WebsocketService
WorkPool *WorkPool
}

func NewWorkProcessor(ws *websocket.WebsocketService, gpuOnly bool) *WorkProcessor {
wp := NewWorkPool(gpuOnly)
return &WorkProcessor{
Queue: models.NewRandomAccessQueue(),
WorkQueueChan: make(chan *serializableModels.ClientMessage, 100),
WorkProcessChan: make(chan bool),
WSService: ws,
WorkPool: wp,
Queue: models.NewRandomAccessQueue(),
WorkQueueChan: make(chan *serializableModels.ClientMessage, 100),
WSService: ws,
WorkPool: wp,
}
}

// RequestQueueWorker - is a worker that receives work requests directly from the websocket, adds them to the queue, and determines what should be worked on next
func (wp *WorkProcessor) StartRequestQueueWorker() {
for c := range wp.WorkQueueChan {
// If the backlog is too large, no-op
if wp.Queue.Len() > 100 {
continue
}
// Add to queue
wp.Queue.Put(*c)
// Add to work processing channel
wp.WorkProcessChan <- true
}
}

// WorkProcessor - is a worker that actually generates work and sends the result back over the websocket
func (wp *WorkProcessor) StartWorkProcessor() {
for range wp.WorkProcessChan {
// Get random work item
for range wp.WorkQueueChan {
// Pop random unit of work from queue, begin computation
workItem := wp.Queue.PopRandom()
if workItem != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Generate work
ch := make(chan string, 1)
// Generate work with timeout
ch := make(chan string)

// Benchmark
startT := time.Now()

go func() {
result, err := wp.WorkPool.WorkGenerate(workItem)
if err != nil {
Expand All @@ -65,13 +52,15 @@ func (wp *WorkProcessor) StartWorkProcessor() {
default:
ch <- result
case <-ctx.Done():
fmt.Printf("\n❌ Error: took longer than 10s to generate work for %s", workItem.Hash)
}
}()

select {
case result := <-ch:
if result != "" {
endT := time.Now()
delta := endT.Sub(startT).Seconds()
fmt.Printf(" Work result: %s in %.2fs", result, delta)
// Send result back to server
clientWorkResult := serializableModels.ClientWorkResponse{
RequestID: workItem.RequestID,
Expand All @@ -92,5 +81,4 @@ func (wp *WorkProcessor) StartWorkProcessor() {
// Start both workers
func (wp *WorkProcessor) StartAsync() {
go wp.StartRequestQueueWorker()
go wp.StartWorkProcessor()
}
7 changes: 4 additions & 3 deletions services/moneybags/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,20 @@ func main() {

for _, payment := range payments {
if !*dryRun {
// Keep original to update the database
origPaymentID := strings.Clone(payment.ID)
// Ensure ID is not longer than 64 chars
payment.ID = Sha256(payment.ID)
payment.AmountRaw = "100000000000000000000000000000"
res, err := rppClient.MakeSendRequest(payment)
if err != nil {
fmt.Printf("\n❌ Error sending payment, ID %s, %v", payment.ID, err)
fmt.Printf("\n❌ Error sending payment, ID %s, %v", origPaymentID, err)
fmt.Printf("\nContinuing tho...")
continue
}
fmt.Printf("\n💸 Sent payment, ID %s, %v", origPaymentID, res.Block)
err = paymentRepo.SetBlockHash(tx, origPaymentID, res.Block)
if err != nil {
fmt.Printf("\n❌ Error setting payment block hash, ID %s, hash %s, %v", payment.ID, res.Block, err)
fmt.Printf("\n❌ Error setting payment block hash, ID %s, hash %s, %v", origPaymentID, res.Block, err)
fmt.Printf("\nContinuing tho...")
}
} else {
Expand Down

0 comments on commit 354cfea

Please sign in to comment.