Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and container killer script #28

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[EDIT] Change channels for rabbit queues in Ex3 Saver
Co-Authored-By: Bruno Grassano <53836177+brunograssano@users.noreply.github.com>
  • Loading branch information
Bata340 and brunograssano committed Nov 21, 2023
commit c2c27302e6ed4260507edcf6f0e861a77448f83c
32 changes: 10 additions & 22 deletions saver_ex_3/ex3/ex3handler.go
Original file line number Diff line number Diff line change
@@ -2,13 +2,11 @@ package ex3

import (
"fmt"
dataStructures "github.com/brunograssano/Distribuidos-TP1/common/data_structures"
"github.com/brunograssano/Distribuidos-TP1/common/dispatcher"
"github.com/brunograssano/Distribuidos-TP1/common/filemanager"
"github.com/brunograssano/Distribuidos-TP1/common/getters"
queueProtocol "github.com/brunograssano/Distribuidos-TP1/common/protocol/queues"
"github.com/brunograssano/Distribuidos-TP1/common/queuefactory"
"github.com/brunograssano/Distribuidos-TP1/common/utils"
log "github.com/sirupsen/logrus"
)

@@ -17,33 +15,29 @@ type Ex3Handler struct {
journeyDispatcher []*dispatcher.JourneyDispatcher
savers []*SaverForEx3
getter *getters.Getter
channels []chan *dataStructures.Message
finishedSignals chan string
outputFilenames []string
quantityFinishedByClient map[string]uint
}

// NewEx3Handler Creates a new exercise 3 handler
func NewEx3Handler(c *SaverConfig, qFactory queuefactory.QueueProtocolFactory) *Ex3Handler {
var channels []chan *dataStructures.Message
func NewEx3Handler(c *SaverConfig, dispatchersQFactory queuefactory.QueueProtocolFactory, internalQFactory queuefactory.QueueProtocolFactory) *Ex3Handler {

// Creation of the JourneySavers, they handle the prices per journey
var internalSavers []*SaverForEx3
var internalSaversConsumers []*SaverForEx3
var outputFileNames []string
finishSignal := make(chan string, c.InternalSaversCount)
var toInternalSaversChannels []queueProtocol.ProducerProtocolInterface
var toInternalSavers []queueProtocol.ProducerProtocolInterface
log.Infof("Ex3Handler | Creating %v savers...", int(c.InternalSaversCount))
for i := 0; i < int(c.InternalSaversCount); i++ {
internalSaverChannel := make(chan *dataStructures.Message, utils.BufferSizeChannels)
channels = append(channels, internalSaverChannel)
internalSavers = append(internalSavers, NewSaverForEx3(
queueProtocol.NewConsumerChannel(internalSaverChannel),
internalSaversConsumers = append(internalSaversConsumers, NewSaverForEx3(
internalQFactory.CreateConsumer(fmt.Sprintf("saver3-internal-%v-%v", c.ID, i)),
c,
finishSignal,
i,
))
outputFileNames = append(outputFileNames, fmt.Sprintf("%v_%v", c.OutputFilePrefix, i))
toInternalSaversChannels = append(toInternalSaversChannels, queueProtocol.NewProducerChannel(internalSaverChannel))
toInternalSavers = append(toInternalSavers, internalQFactory.CreateProducer(fmt.Sprintf("saver3-internal-%v-%v", c.ID, i)))
log.Infof("Ex3Handler | Created Saver #%v correctly...", i)
}

@@ -52,9 +46,9 @@ func NewEx3Handler(c *SaverConfig, qFactory queuefactory.QueueProtocolFactory) *
var jds []*dispatcher.JourneyDispatcher
for i := uint(0); i < c.DispatchersCount; i++ {
// We create the input queue to the EX3 service
inputQueue := qFactory.CreateConsumer(fmt.Sprintf("%v-%v", c.InputQueueName, c.ID))
prodToInput := qFactory.CreateProducer(c.ID)
jds = append(jds, dispatcher.NewJourneyDispatcher(inputQueue, prodToInput, toInternalSaversChannels))
inputQueue := dispatchersQFactory.CreateConsumer(fmt.Sprintf("%v-%v", c.InputQueueName, c.ID))
prodToInput := dispatchersQFactory.CreateProducer(c.ID)
jds = append(jds, dispatcher.NewJourneyDispatcher(inputQueue, prodToInput, toInternalSavers))
}

getterConf := getters.NewGetterConfig(c.ID, outputFileNames, c.GetterAddress, c.GetterBatchLines)
@@ -66,8 +60,7 @@ func NewEx3Handler(c *SaverConfig, qFactory queuefactory.QueueProtocolFactory) *
return &Ex3Handler{
c: c,
journeyDispatcher: jds,
channels: channels,
savers: internalSavers,
savers: internalSaversConsumers,
getter: getter,
finishedSignals: finishSignal,
outputFilenames: outputFileNames,
@@ -122,11 +115,6 @@ func (se3 *Ex3Handler) StartHandler() {
// Close Closes the handler of the exercise 4
func (se3 *Ex3Handler) Close() {
log.Infof("Ex3Handler | Closing resources...")
log.Infof("Ex3Handler | Starting channel closing...")
for idx, channel := range se3.channels {
log.Infof("Ex3Handler | Closing channel #%v", idx)
close(channel)
}
close(se3.finishedSignals)
log.Infof("Ex3Handler | Closing Getter")
se3.getter.Close()
2 changes: 1 addition & 1 deletion saver_ex_3/main.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ func main() {
}
qMiddleware := middleware.NewQueueMiddleware(config.RabbitAddress)
qFactory := queuefactory.NewTopicFactory(qMiddleware, []string{"", config.ID}, config.InputQueueName)
saverEx3 := ex3.NewEx3Handler(config, qFactory)
saverEx3 := ex3.NewEx3Handler(config, qFactory, queuefactory.NewSimpleQueueFactory(qMiddleware))
go saverEx3.StartHandler()
endSigHB := heartbeat.StartHeartbeat(config.AddressesHealthCheckers, config.ServiceName)
<-sigs