Skip to content

Commit

Permalink
Handle multiple clients
Browse files Browse the repository at this point in the history
Co-Authored-By: Franco Batastini <66926111+Bata340@users.noreply.github.com>
  • Loading branch information
brunograssano and Bata340 committed Nov 7, 2023
1 parent 686deb8 commit 3c74b7b
Show file tree
Hide file tree
Showing 31 changed files with 679 additions and 473 deletions.
100 changes: 61 additions & 39 deletions avg_calculator_ex4/avgcalculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,69 +9,84 @@ import (
)

type AvgCalculator struct {
toJourneySavers []queueProtocol.ProducerProtocolInterface
pricesConsumer queueProtocol.ConsumerProtocolInterface
c *AvgCalculatorConfig
toJourneySavers []queueProtocol.ProducerProtocolInterface
pricesConsumer queueProtocol.ConsumerProtocolInterface
c *AvgCalculatorConfig
valuesReceivedByClient map[string]PartialSum
}

func NewAvgCalculator(toJourneySavers []queueProtocol.ProducerProtocolInterface, pricesConsumer queueProtocol.ConsumerProtocolInterface, c *AvgCalculatorConfig) *AvgCalculator {
return &AvgCalculator{toJourneySavers: toJourneySavers, pricesConsumer: pricesConsumer, c: c}
return &AvgCalculator{
toJourneySavers: toJourneySavers,
pricesConsumer: pricesConsumer,
c: c,
valuesReceivedByClient: make(map[string]PartialSum),
}
}

// CalculateAvgLoop Waits for the final results from the journey savers,
// performs the calculation, and sends the results back
func (a *AvgCalculator) CalculateAvgLoop() {
log.Infof("AvgCalculator | Started Avg Calculator loop")
log.Infof("AvgCalculator | Starting await of internalSavers | Now waiting for %v savers...", len(a.toJourneySavers))
for {
sumOfPrices := float32(0)
sumOfRows := 0
log.Infof("AvgCalculator | Starting await of internalSavers | Now waiting for %v savers...", len(a.toJourneySavers))
msg, ok := a.pricesConsumer.Pop()
if !ok {
log.Errorf("AvgCalculator | Consumer closed when not expected, exiting average calculator")
return
}
log.Debugf("AvgCalculator | Received message from saver")

for sentResults := uint(0); sentResults < a.c.SaversCount; sentResults++ {
msg, ok := a.pricesConsumer.Pop()
if !ok {
log.Errorf("AvgCalculator | Consumer closed when not expected, exiting average calculator")
return
}
log.Debugf("AvgCalculator | Received message from saver")
if msg.TypeMessage != dataStructure.EOFFlightRows {
log.Warnf("AvgCalculator | Warning Message | Received a message that was not expected | Skipping...")
continue
}
a.handleEofMsg(msg)
}
}

if msg.TypeMessage != dataStructure.EOFFlightRows {
log.Warnf("AvgCalculator | Warning Message | Received a message that was not expected | Skipping...")
continue
}
func (a *AvgCalculator) handleEofMsg(msg *dataStructure.Message) {
currPartialSum := a.getPartialSumOfClient(msg)

prices, err := msg.DynMaps[0].GetAsFloat(utils.LocalPrice)
if err != nil {
log.Errorf("AvgCalculator | Error getting localPrice | %v", err)
continue
}
sumOfPrices += prices
prices, err := msg.DynMaps[0].GetAsFloat(utils.LocalPrice)
if err != nil {
log.Errorf("AvgCalculator | Error getting localPrice | %v", err)
return
}

rows, err := msg.DynMaps[0].GetAsInt(utils.LocalQuantity)
if err != nil {
log.Errorf("AvgCalculator | Error getting localQuantity | %v", err)
continue
}
sumOfRows += rows
currPartialSum.sumOfPrices += prices
rows, err := msg.DynMaps[0].GetAsInt(utils.LocalQuantity)
if err != nil {
log.Errorf("AvgCalculator | Error getting localQuantity | %v", err)
return
}
currPartialSum.sumOfRows += rows
currPartialSum.numOfSavers++
a.valuesReceivedByClient[msg.ClientId] = currPartialSum

log.Debugf("AvgCalculator | New Accum Price: %v | New Accum Count: %v", sumOfPrices, sumOfRows)
}
log.Infof("AvgCalculator | Received all local JourneySaver values, calculating average")
avg := a.calculateAvg(sumOfRows, sumOfPrices)
log.Infof("AvgCalculator | General Average is: %v | Now sending to journey savers...", avg)
a.sendToJourneySavers(avg)
log.Debugf("AvgCalculator | New Accum Price: %v | New Accum Count: %v", a.valuesReceivedByClient[msg.ClientId].sumOfPrices, a.valuesReceivedByClient[msg.ClientId].sumOfRows)
if a.valuesReceivedByClient[msg.ClientId].numOfSavers == len(a.toJourneySavers) {
a.onFinishedClientId(msg.ClientId)
}
}

func (a *AvgCalculator) getPartialSumOfClient(msg *dataStructure.Message) PartialSum {
_, exists := a.valuesReceivedByClient[msg.ClientId]
if !exists {
a.valuesReceivedByClient[msg.ClientId] = PartialSum{sumOfRows: 0, sumOfPrices: 0, numOfSavers: 0}
}
return a.valuesReceivedByClient[msg.ClientId]
}

// sendToJourneySavers Sends the average to the journey savers
func (a *AvgCalculator) sendToJourneySavers(avg float32) {
func (a *AvgCalculator) sendToJourneySavers(avg float32, clientId string) {
avgBytes := serializer.SerializeFloat(avg)
dynMap := make(map[string][]byte)
dynMap[utils.FinalAvg] = avgBytes
data := []*dataStructure.DynamicMap{dataStructure.NewDynamicMap(dynMap)}
msg := &dataStructure.Message{TypeMessage: dataStructure.FinalAvgMsg, DynMaps: data}
msg := &dataStructure.Message{TypeMessage: dataStructure.FinalAvgMsg, DynMaps: data, ClientId: clientId}
for i, channel := range a.toJourneySavers {
log.Infof("AvgCalculator | Sending average to saver %v", i)
log.Infof("AvgCalculator | Sending average for client %v to saver %v", clientId, i)
err := channel.Send(msg)
if err != nil {
log.Errorf("AvgCalculator | Error sending avg | %v", err)
Expand All @@ -90,3 +105,10 @@ func (a *AvgCalculator) calculateAvg(sumOfRows int, sumOfPrices float32) float32
log.Infof("AvgCalculator | Sum of prices: %v | Total rows: %v | Avg: %v", sumOfPrices, sumOfRows, avg)
return avg
}

func (a *AvgCalculator) onFinishedClientId(clientId string) {
log.Infof("AvgCalculator | Received all local JourneySaver values, calculating average")
avg := a.calculateAvg(a.valuesReceivedByClient[clientId].sumOfRows, a.valuesReceivedByClient[clientId].sumOfPrices)
log.Infof("AvgCalculator | General Average is: %v | Now sending to journey savers...", avg)
a.sendToJourneySavers(avg, clientId)
}
3 changes: 1 addition & 2 deletions avg_calculator_ex4/avgcalculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ func assertAvgFromChannel(t *testing.T, expectedAvg float32, consumer chan *data
select {
case msg := <-consumer:
avg, err := msg.DynMaps[0].GetAsFloat(utils.FinalAvg)

assert.Nilf(t, err, "Got error: %v", err)
assert.Equalf(t, expectedAvg, avg, "Different avg: %v", avg)
case <-time.After(1 * time.Second):
Expand All @@ -40,7 +39,7 @@ func TestShouldSendTheAverageToTheConsumers(t *testing.T) {
chan2 := make(chan *dataStructures.Message, 1)
avgCalculator := &AvgCalculator{toJourneySavers: []queueProtocol.ProducerProtocolInterface{queueProtocol.NewProducerChannel(chan1), queueProtocol.NewProducerChannel(chan2)}}

go avgCalculator.sendToJourneySavers(5)
go avgCalculator.sendToJourneySavers(5, "1")

assertAvgFromChannel(t, 5, chan1)
assertAvgFromChannel(t, 5, chan2)
Expand Down
7 changes: 7 additions & 0 deletions avg_calculator_ex4/partial_sum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

type PartialSum struct {
sumOfPrices float32
sumOfRows int
numOfSavers int
}
42 changes: 0 additions & 42 deletions common/filemanager/filemover.go

This file was deleted.

43 changes: 43 additions & 0 deletions common/filemanager/fileutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package filemanager

import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"os"
)

func MoveFiles(files []string, folderName string) error {

if _, err := os.Stat(folderName); os.IsNotExist(err) {
err := os.Mkdir(folderName, os.ModePerm)
if err != nil && !os.IsExist(err) {
log.Errorf("FileMover | Error creating directory %v | %v", folderName, err)
return err
}
}
for _, file := range files {
err := os.Rename(file, fmt.Sprintf("%v/%v", folderName, file))
if err != nil {
log.Errorf("FileMover | Error moving file to '%v/%v' | %v", folderName, file, err)
return err
}
}
return nil
}

func RenameFile(file string, newName string) error {
err := os.Rename(file, newName)
if err != nil {
log.Errorf("FileRenamer | Error renaming file | %v", err)
return err
}
return nil
}

func DirectoryExists(file string) bool {
if _, err := os.Stat(file); errors.Is(err, os.ErrNotExist) {
return false
}
return true
}
120 changes: 120 additions & 0 deletions common/getters/client_getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package getters

import (
"fmt"
"github.com/brunograssano/Distribuidos-TP1/common/communication"
dataStructures "github.com/brunograssano/Distribuidos-TP1/common/data_structures"
"github.com/brunograssano/Distribuidos-TP1/common/filemanager"
socketsProtocol "github.com/brunograssano/Distribuidos-TP1/common/protocol/sockets"
"github.com/brunograssano/Distribuidos-TP1/common/serializer"
"github.com/brunograssano/Distribuidos-TP1/common/utils"
log "github.com/sirupsen/logrus"
)

type ClientGetter struct {
config *GetterConfig
sph *socketsProtocol.SocketProtocolHandler
clientId string
stop chan bool
join chan bool
}

func NewClientGetter(clientSocket *communication.TCPSocket, config *GetterConfig, stop chan bool, join chan bool) *ClientGetter {
return &ClientGetter{
config: config,
sph: socketsProtocol.NewSocketProtocolHandler(clientSocket),
clientId: "",
stop: stop,
join: join,
}
}

func (c *ClientGetter) HandleClientGetter() {
msg, err := c.sph.Read()
if err != nil {
log.Errorf("Client Getter | Error getting first message with client id")
return
}
defer c.sph.Close()

c.clientId = msg.ClientId
if filemanager.DirectoryExists(msg.ClientId) {
c.sendResults()
} else {
c.askLaterForResults()
}
c.join <- true
close(c.join)
}

// askLaterForResults Tells the client to wait and finishes the connection
func (c *ClientGetter) askLaterForResults() {
log.Infof("Client Getter %v | Client asked for results when they are not ready. Answer 'Later'", c.clientId)
err := c.sph.Write(&dataStructures.Message{
TypeMessage: dataStructures.Later,
DynMaps: make([]*dataStructures.DynamicMap, 0),
})
if err != nil {
log.Errorf("Client Getter %v | Error trying to send 'Later' Message to socket...", c.clientId)
}
}

// sendResults Sends the saved results to the client
func (c *ClientGetter) sendResults() {
log.Infof("Client Getter %v | Sending results to client", c.clientId)
var currBatch []*dataStructures.DynamicMap
curLengthOfBatch := 0
for _, filename := range c.config.FileNames {
reader, err := filemanager.NewFileReader(fmt.Sprintf("%v/%v_%v.csv", c.clientId, filename, c.clientId))
if err != nil {
log.Errorf("Client Getter %v | Error trying to open file: %v | %v | Skipping it...", c.clientId, filename, err)
continue
}
for reader.CanRead() {
select {
case <-c.stop:
log.Warnf("Client Getter %v | Received signal while sending file, stopping transfer", c.clientId)
return
default:
}
line := reader.ReadLine()
currBatch = append(currBatch, serializer.DeserializeFromString(line))
curLengthOfBatch++
if uint(curLengthOfBatch) >= c.config.MaxLinesPerSend {
c.sendBatch(currBatch)
currBatch = make([]*dataStructures.DynamicMap, 0)
}
}
err = reader.Err()
if err != nil {
log.Errorf("Client Getter %v| Error reading file | %v", c.clientId, err)
}
utils.CloseFileAndNotifyError(reader.FileManager)
}
if curLengthOfBatch > 0 {
c.sendBatch(currBatch)
}
c.sendEOF()
}

func (c *ClientGetter) sendEOF() {
log.Infof("Client Getter %v | Sending EOF to client...", c.clientId)
err := c.sph.Write(&dataStructures.Message{
TypeMessage: dataStructures.EOFGetter,
DynMaps: []*dataStructures.DynamicMap{},
})
if err != nil {
log.Errorf("Client Getter %v | Error trying to send EOF | %v", c.clientId, err)
}
}

// sendBatch sends a specific batch of DynamicMaps via protocol handler
func (c *ClientGetter) sendBatch(batch []*dataStructures.DynamicMap) {
err := c.sph.Write(&dataStructures.Message{
TypeMessage: dataStructures.FlightRows,
DynMaps: batch,
})
if err != nil {
log.Errorf("Client Getter %v | Error sending batch from getter | %v", c.clientId, err)
}
}
Loading

0 comments on commit 3c74b7b

Please sign in to comment.