From 51f4772bd95ac8824d961cd386ab77cad72c0ce5 Mon Sep 17 00:00:00 2001 From: Franco Batastini <66926111+Bata340@users.noreply.github.com> Date: Tue, 21 Nov 2023 19:09:18 -0300 Subject: [PATCH 1/3] [FIX] Saver 4 Rewrite & Healthcheck dying on restart Co-Authored-By: Bruno Grassano <53836177+brunograssano@users.noreply.github.com> --- .../{UDPClient.go => udp_client.go} | 33 +++++- common/leader/leader_election_service_test.go | 88 --------------- ...on_service.go => leaderelectionservice.go} | 15 ++- common/leader/leaderelectionservice_test.go | 87 +++++++++++++++ common/leader/replica_state.go | 4 +- .../generated/docker-compose-dev.yaml | 100 +++++++++++------- .../templates/docker-compose-template.yaml | 8 +- docker-compose-dev.yaml | 100 +++++++++++------- ex4_journey_saver/journeysaver.go | 8 +- healthchecker/healthcheck_config.go | 38 ++----- simple_saver/saver/saver.go | 4 + 11 files changed, 269 insertions(+), 216 deletions(-) rename common/communication/{UDPClient.go => udp_client.go} (74%) delete mode 100644 common/leader/leader_election_service_test.go rename common/leader/{leader_election_service.go => leaderelectionservice.go} (92%) create mode 100644 common/leader/leaderelectionservice_test.go diff --git a/common/communication/UDPClient.go b/common/communication/udp_client.go similarity index 74% rename from common/communication/UDPClient.go rename to common/communication/udp_client.go index 9322ffb..9b77a04 100644 --- a/common/communication/UDPClient.go +++ b/common/communication/udp_client.go @@ -8,24 +8,39 @@ import ( ) type UdpClient struct { - conn net.Conn + conn net.Conn + address string } const UdpReadTimeout = 400 func NewUdpClient(address string) (*UdpClient, error) { + conn, err := connectUDP(address) + return &UdpClient{ + conn: conn, + address: address, + }, + err +} + +func connectUDP(address string) (net.Conn, error) { conn, err := net.Dial("udp", address) if err != nil { log.Errorf("UdpClient | Error trying to create | %v", err) return nil, err } - return &UdpClient{ - conn: conn, - }, - nil + return conn, nil } func (u *UdpClient) Receive(sizeToRecv uint) ([]byte, *net.UDPAddr, error) { + if u.conn == nil { + conn, err := connectUDP(u.address) + if err != nil { + log.Errorf("UdpClient | Error trying to create | %v", err) + return []byte{}, nil, err + } + u.conn = conn + } err := u.conn.SetReadDeadline(time.Now().Add(UdpReadTimeout * time.Millisecond)) if err != nil { log.Errorf("UdpClient | Error setting read timeout | %v", err) @@ -46,6 +61,14 @@ func (u *UdpClient) Receive(sizeToRecv uint) ([]byte, *net.UDPAddr, error) { } func (u *UdpClient) Send(message []byte, _ *net.UDPAddr) (int, error) { + if u.conn == nil { + conn, err := connectUDP(u.address) + if err != nil { + log.Errorf("UdpClient | Error trying to create | %v", err) + return 0, err + } + u.conn = conn + } sizeSent, err := u.conn.Write(message) if err != nil { log.Errorf("UdpServer | Error trying to Write | %v", err) diff --git a/common/leader/leader_election_service_test.go b/common/leader/leader_election_service_test.go deleted file mode 100644 index b26e64b..0000000 --- a/common/leader/leader_election_service_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package leader - -import ( - "github.com/stretchr/testify/assert" - "net" - "testing" - "time" -) - -func TestReceivingElectionFromMinorIDShouldRetransmitToGreaterID(t *testing.T) { - addressMap := make(map[uint8]*net.UDPAddr) - addressMap[2] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50012} - addressMap[3] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50013} - leaderService1 := NewLeaderElectionService(1, addressMap, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50011}) - defer leaderService1.Close() - - addressMap2 := make(map[uint8]*net.UDPAddr) - addressMap2[1] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50011} - addressMap2[3] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50013} - leaderService2 := NewLeaderElectionService(2, addressMap2, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50012}) - defer leaderService2.Close() - - addressMap3 := make(map[uint8]*net.UDPAddr) - addressMap3[1] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50011} - addressMap3[2] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50012} - leaderService3 := NewLeaderElectionService(3, addressMap3, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50013}) - defer leaderService3.Close() - - go leaderService1.ReceiveNetMessages() - go leaderService2.ReceiveNetMessages() - go leaderService3.ReceiveNetMessages() - - time.Sleep(2 * time.Second) - - assert.Falsef(t, leaderService1.AmILeader(), "LeaderService1 should not be leader") - assert.Falsef(t, leaderService2.AmILeader(), "LeaderService2 should not be leader") - assert.Truef(t, leaderService3.AmILeader(), "LeaderService3 should be leader") -} - -func TestShouldProclaimItselfAsTheLeaderIfItIsTheOnlyNode(t *testing.T) { - addressMap := make(map[uint8]*net.UDPAddr) - addressMap[2] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50022} - addressMap[3] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50023} - leaderService1 := NewLeaderElectionService(1, addressMap, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50021}) - defer leaderService1.Close() - - go leaderService1.ReceiveNetMessages() - - time.Sleep(5 * time.Second) - - assert.Truef(t, leaderService1.AmILeader(), "LeaderService1 should be leader") -} - -func TestLeaderFallsDownAndNewLeaderIsElected(t *testing.T) { - addressMap := make(map[uint8]*net.UDPAddr) - addressMap[2] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50042} - addressMap[3] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50043} - leaderService1 := NewLeaderElectionService(1, addressMap, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50041}) - defer leaderService1.Close() - - addressMap2 := make(map[uint8]*net.UDPAddr) - addressMap2[1] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50041} - addressMap2[3] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50043} - leaderService2 := NewLeaderElectionService(2, addressMap2, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50042}) - defer leaderService2.Close() - - addressMap3 := make(map[uint8]*net.UDPAddr) - addressMap3[1] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50041} - addressMap3[2] = &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50042} - leaderService3 := NewLeaderElectionService(3, addressMap3, &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50043}) - - go leaderService1.ReceiveNetMessages() - go leaderService2.ReceiveNetMessages() - go leaderService3.ReceiveNetMessages() - - time.Sleep(2 * time.Second) - - assert.Falsef(t, leaderService1.AmILeader(), "LeaderService1 should not be leader") - assert.Falsef(t, leaderService2.AmILeader(), "LeaderService2 should not be leader") - assert.Truef(t, leaderService3.AmILeader(), "LeaderService3 should be leader") - leaderService3.Close() - - time.Sleep(5 * time.Second) - - assert.Falsef(t, leaderService1.AmILeader(), "LeaderService1 should not be the new leader") - assert.Truef(t, leaderService2.AmILeader(), "LeaderService2 should be the new leader") - -} diff --git a/common/leader/leader_election_service.go b/common/leader/leaderelectionservice.go similarity index 92% rename from common/leader/leader_election_service.go rename to common/leader/leaderelectionservice.go index 761e63b..2845e58 100644 --- a/common/leader/leader_election_service.go +++ b/common/leader/leaderelectionservice.go @@ -7,6 +7,7 @@ import ( "github.com/brunograssano/Distribuidos-TP1/common/protocol/sockets" log "github.com/sirupsen/logrus" "net" + "strconv" ) type ElectionService interface { @@ -18,22 +19,26 @@ type ElectionService interface { type LeaderElectionService struct { currentState BullyState id uint8 - address *net.UDPAddr + address []string leaderID uint8 listener *sockets.UdpProtocolhandler - netAddresses map[uint8]*net.UDPAddr + netAddresses map[uint8][]string netClientSockets map[uint8]*sockets.UdpProtocolhandler leaderDown chan bool } -func NewLeaderElectionService(id uint8, networkNodes map[uint8]*net.UDPAddr, myAddr *net.UDPAddr) *LeaderElectionService { +func NewLeaderElectionService(id uint8, networkNodes map[uint8][]string, myAddr []string) *LeaderElectionService { netClientSockets := make(map[uint8]*sockets.UdpProtocolhandler) - listenerSocketUdp, err := communication.NewUdpServer(myAddr.IP.String(), myAddr.Port) + port, err := strconv.Atoi(myAddr[1]) + if err != nil { + log.Errorf("LeaderElectionService | Error converting port from string to int | %v", err) + } + listenerSocketUdp, err := communication.NewUdpServer(myAddr[0], port) if err != nil { log.Fatalf("LeaderElectionService %v | Error trying to create UDP Server Socket | %v", id, err) } for idNode, udpAddr := range networkNodes { - udpCli, err := communication.NewUdpClient(fmt.Sprintf("%v:%v", udpAddr.IP.String(), udpAddr.Port)) + udpCli, err := communication.NewUdpClient(fmt.Sprintf("%v:%v", udpAddr[0], udpAddr[1])) if err != nil { log.Errorf("LeaderElectionService | Error trying to create UDP Client Socket | %v", err) } diff --git a/common/leader/leaderelectionservice_test.go b/common/leader/leaderelectionservice_test.go new file mode 100644 index 0000000..0ff521e --- /dev/null +++ b/common/leader/leaderelectionservice_test.go @@ -0,0 +1,87 @@ +package leader + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestReceivingElectionFromMinorIDShouldRetransmitToGreaterID(t *testing.T) { + addressMap := make(map[uint8][]string) + addressMap[2] = []string{"127.0.0.1", "50012"} + addressMap[3] = []string{"127.0.0.1", "50013"} + leaderService1 := NewLeaderElectionService(1, addressMap, []string{"127.0.0.1", "50011"}) + defer leaderService1.Close() + + addressMap2 := make(map[uint8][]string) + addressMap2[1] = []string{"127.0.0.1", "50011"} + addressMap2[3] = []string{"127.0.0.1", "50013"} + leaderService2 := NewLeaderElectionService(2, addressMap2, []string{"127.0.0.1", "50012"}) + defer leaderService2.Close() + + addressMap3 := make(map[uint8][]string) + addressMap3[1] = []string{"127.0.0.1", "50011"} + addressMap3[2] = []string{"127.0.0.1", "50012"} + leaderService3 := NewLeaderElectionService(3, addressMap3, []string{"127.0.0.1", "50013"}) + defer leaderService3.Close() + + go leaderService1.ReceiveNetMessages() + go leaderService2.ReceiveNetMessages() + go leaderService3.ReceiveNetMessages() + + time.Sleep(2 * time.Second) + + assert.Falsef(t, leaderService1.AmILeader(), "LeaderService1 should not be leader") + assert.Falsef(t, leaderService2.AmILeader(), "LeaderService2 should not be leader") + assert.Truef(t, leaderService3.AmILeader(), "LeaderService3 should be leader") +} + +func TestShouldProclaimItselfAsTheLeaderIfItIsTheOnlyNode(t *testing.T) { + addressMap := make(map[uint8][]string) + addressMap[2] = []string{"127.0.0.1", "50022"} + addressMap[3] = []string{"127.0.0.1", "50023"} + leaderService1 := NewLeaderElectionService(1, addressMap, []string{"127.0.0.1", "50021"}) + defer leaderService1.Close() + + go leaderService1.ReceiveNetMessages() + + time.Sleep(5 * time.Second) + + assert.Truef(t, leaderService1.AmILeader(), "LeaderService1 should be leader") +} + +func TestLeaderFallsDownAndNewLeaderIsElected(t *testing.T) { + addressMap := make(map[uint8][]string) + addressMap[2] = []string{"127.0.0.1", "50042"} + addressMap[3] = []string{"127.0.0.1", "50043"} + leaderService1 := NewLeaderElectionService(1, addressMap, []string{"127.0.0.1", "50041"}) + defer leaderService1.Close() + + addressMap2 := make(map[uint8][]string) + addressMap2[1] = []string{"127.0.0.1", "50041"} + addressMap2[3] = []string{"127.0.0.1", "50043"} + leaderService2 := NewLeaderElectionService(2, addressMap2, []string{"127.0.0.1", "50042"}) + defer leaderService2.Close() + + addressMap3 := make(map[uint8][]string) + addressMap3[1] = []string{"127.0.0.1", "50041"} + addressMap3[2] = []string{"127.0.0.1", "50042"} + leaderService3 := NewLeaderElectionService(3, addressMap3, []string{"127.0.0.1", "50043"}) + + go leaderService1.ReceiveNetMessages() + go leaderService2.ReceiveNetMessages() + go leaderService3.ReceiveNetMessages() + + time.Sleep(2 * time.Second) + + assert.Falsef(t, leaderService1.AmILeader(), "LeaderService1 should not be leader") + assert.Falsef(t, leaderService2.AmILeader(), "LeaderService2 should not be leader") + assert.Truef(t, leaderService3.AmILeader(), "LeaderService3 should be leader") + leaderService3.Close() + + time.Sleep(5 * time.Second) + + assert.Falsef(t, leaderService1.AmILeader(), "LeaderService1 should not be the new leader") + assert.Truef(t, leaderService2.AmILeader(), "LeaderService2 should be the new leader") + +} diff --git a/common/leader/replica_state.go b/common/leader/replica_state.go index 64a8a94..e0e7c32 100644 --- a/common/leader/replica_state.go +++ b/common/leader/replica_state.go @@ -19,8 +19,8 @@ type ReplicaState struct { leaderDown chan bool } -func NewReplicaState(leaderAddress *net.UDPAddr, nodeId uint8, leaderDown chan bool) (*ReplicaState, error) { - udpCli, err := communication.NewUdpClient(fmt.Sprintf("%v:%v", leaderAddress.IP.String(), leaderAddress.Port)) +func NewReplicaState(leaderAddress []string, nodeId uint8, leaderDown chan bool) (*ReplicaState, error) { + udpCli, err := communication.NewUdpClient(fmt.Sprintf("%v:%v", leaderAddress[0], leaderAddress[1])) if err != nil { log.Errorf("ReplicaState | Error instantiating udp Client | %v", err) return nil, err diff --git a/compose-generator/generated/docker-compose-dev.yaml b/compose-generator/generated/docker-compose-dev.yaml index a92b465..101154b 100644 --- a/compose-generator/generated/docker-compose-dev.yaml +++ b/compose-generator/generated/docker-compose-dev.yaml @@ -29,7 +29,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_1 - CLI_RABBITMQ_QUEUE_OUTPUT=saver1_queue - CLI_REDUCER_COLUMNS=legId,route,totalFare - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -48,7 +48,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_1 - CLI_RABBITMQ_QUEUE_OUTPUT=saver1_queue - CLI_REDUCER_COLUMNS=legId,route,totalFare - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -69,7 +69,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_2 - CLI_RABBITMQ_QUEUE_OUTPUT=saver2_queue - CLI_REDUCER_COLUMNS=legId,route - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -88,7 +88,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_2 - CLI_RABBITMQ_QUEUE_OUTPUT=saver2_queue - CLI_REDUCER_COLUMNS=legId,route - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -110,7 +110,7 @@ services: - CLI_RABBITMQ_QUEUES_INPUT=filters_stopovers - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_1 - CLI_RABBITMQ_EXCHANGE_OUTPUTS=saver_3 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -129,7 +129,7 @@ services: - CLI_RABBITMQ_QUEUES_INPUT=filters_stopovers - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_1 - CLI_RABBITMQ_EXCHANGE_OUTPUTS=saver_3 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -150,7 +150,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=flight_row_processor - CLI_RABBITMQ_QUEUE_OUTPUT_EX123=filters_stopovers,distance_calculator - CLI_RABBITMQ_QUEUE_OUTPUT_EX4=ex4_solver - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -169,7 +169,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=flight_row_processor - CLI_RABBITMQ_QUEUE_OUTPUT_EX123=filters_stopovers,distance_calculator - CLI_RABBITMQ_QUEUE_OUTPUT_EX4=ex4_solver - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -189,7 +189,7 @@ services: - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUES_INPUT=filters_distances - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_2 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -207,7 +207,7 @@ services: - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUES_INPUT=filters_distances - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_2 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -231,8 +231,8 @@ services: - CLI_QUEUES_AIRPORTS_EXCHANGE_TYPE=fanout - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTEXCHANGE=AirportsExchange - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTROUTINGKEY=airports - - CLI_COMPLETER_FILENAME=flightrows.csv - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_COMPLETER_FILENAME=flightrows + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -254,8 +254,8 @@ services: - CLI_QUEUES_AIRPORTS_EXCHANGE_TYPE=fanout - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTEXCHANGE=AirportsExchange - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTROUTINGKEY=airports - - CLI_COMPLETER_FILENAME=flightrows.csv - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_COMPLETER_FILENAME=flightrows + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -278,7 +278,7 @@ services: - CLI_RABBITMQ_QUEUE_OUTPUTS_SAVER=sink_ex4_queue - CLI_INTERNAL_SAVERS_COUNT=6 - CLI_RABBITMQ_RK_INPUT=0 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -299,7 +299,7 @@ services: - CLI_RABBITMQ_QUEUE_OUTPUTS_SAVER=sink_ex4_queue - CLI_INTERNAL_SAVERS_COUNT=6 - CLI_RABBITMQ_RK_INPUT=6 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -320,7 +320,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=ex4_solver - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -346,7 +346,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=ex4_solver - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -374,7 +374,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=accum_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -393,7 +393,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=accum_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -414,7 +414,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=sink_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=saver4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -433,7 +433,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=sink_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=saver4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -452,10 +452,10 @@ services: - CLI_NAME=saver-ex1-1 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver1_queue - - CLI_SAVER_OUTPUT=results_ex1.csv + - CLI_SAVER_OUTPUT=results_ex1 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex1-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -472,10 +472,10 @@ services: - CLI_NAME=saver-ex1-2 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver1_queue - - CLI_SAVER_OUTPUT=results_ex1.csv + - CLI_SAVER_OUTPUT=results_ex1 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex1-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -494,10 +494,10 @@ services: - CLI_NAME=saver-ex2-1 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver2_queue - - CLI_SAVER_OUTPUT=results_ex2.csv + - CLI_SAVER_OUTPUT=results_ex2 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex2-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -514,10 +514,10 @@ services: - CLI_NAME=saver-ex2-2 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver2_queue - - CLI_SAVER_OUTPUT=results_ex2.csv + - CLI_SAVER_OUTPUT=results_ex2 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex2-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -539,7 +539,7 @@ services: - CLI_SAVER_OUTPUT=results_ex3 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex3-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -559,7 +559,7 @@ services: - CLI_SAVER_OUTPUT=results_ex3 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex3-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -578,10 +578,10 @@ services: - CLI_NAME=saver-ex4-1 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver4_queue - - CLI_SAVER_OUTPUT=results_ex4.csv + - CLI_SAVER_OUTPUT=results_ex4 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex4-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -598,10 +598,10 @@ services: - CLI_NAME=saver-ex4-2 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver4_queue - - CLI_SAVER_OUTPUT=results_ex4.csv + - CLI_SAVER_OUTPUT=results_ex4 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex4-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -626,7 +626,7 @@ services: - CLI_QUEUES_AIRPORTS_EXCHANGE_NAME=AirportsExchange - CLI_QUEUES_AIRPORTS_EXCHANGE_ROUTINGKEY=airports - CLI_QUEUES_FLIGHTROWS=flight_row_processor - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -687,10 +687,10 @@ services: - CLI_LOG_LEVEL=INFO - CLI_NAME=healthchecker-1 - CLI_HEALTHCHECKER_ADDRESS=healthchecker-1:8080 - - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=2:healthchecker-2:8081 + - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=2:healthchecker-2:8081,3:healthchecker-3:8081 - CLI_HEALTHCHECKER_ELECTION_UDP_ADDRESS=healthchecker-1:8081 - CLI_HEALTHCHECKER_ELECTION_ID=1 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -705,10 +705,28 @@ services: - CLI_LOG_LEVEL=INFO - CLI_NAME=healthchecker-2 - CLI_HEALTHCHECKER_ADDRESS=healthchecker-2:8080 - - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=1:healthchecker-1:8081 + - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=1:healthchecker-1:8081,3:healthchecker-3:8081 - CLI_HEALTHCHECKER_ELECTION_UDP_ADDRESS=healthchecker-2:8081 - CLI_HEALTHCHECKER_ELECTION_ID=2 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-3:8080 + networks: + - testing_net + volumes: + - ./healthchecker/config.yaml:/config.yaml + - /var/run/docker.sock:/var/run/docker.sock + + healthchecker-3: + container_name: healthchecker-3 + image: healthchecker:latest + environment: + - CLI_ID=33 + - CLI_LOG_LEVEL=INFO + - CLI_NAME=healthchecker-3 + - CLI_HEALTHCHECKER_ADDRESS=healthchecker-3:8080 + - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=1:healthchecker-1:8081,2:healthchecker-2:8081 + - CLI_HEALTHCHECKER_ELECTION_UDP_ADDRESS=healthchecker-3:8081 + - CLI_HEALTHCHECKER_ELECTION_ID=3 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 networks: - testing_net volumes: diff --git a/compose-generator/templates/docker-compose-template.yaml b/compose-generator/templates/docker-compose-template.yaml index af0ad0e..d809169 100644 --- a/compose-generator/templates/docker-compose-template.yaml +++ b/compose-generator/templates/docker-compose-template.yaml @@ -137,7 +137,7 @@ services: - CLI_QUEUES_AIRPORTS_EXCHANGE_TYPE=fanout - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTEXCHANGE=AirportsExchange - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTROUTINGKEY=airports - - CLI_COMPLETER_FILENAME=flightrows.csv + - CLI_COMPLETER_FILENAME=flightrows - CLI_HEALTHCHECKER_ADDRESSES={% for hc in range(1,healthcheckers + 1) %}healthchecker-{{hc}}:8080{{ "," if not loop.last else "" }}{% endfor %} networks: - testing_net @@ -247,7 +247,7 @@ services: - CLI_NAME=saver-ex1-{{i}} - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver1_queue - - CLI_SAVER_OUTPUT=results_ex1.csv + - CLI_SAVER_OUTPUT=results_ex1 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex1-{{i}}:8080 - CLI_HEALTHCHECKER_ADDRESSES={% for hc in range(1,healthcheckers + 1) %}healthchecker-{{hc}}:8080{{ "," if not loop.last else "" }}{% endfor %} @@ -269,7 +269,7 @@ services: - CLI_NAME=saver-ex2-{{i}} - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver2_queue - - CLI_SAVER_OUTPUT=results_ex2.csv + - CLI_SAVER_OUTPUT=results_ex2 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex2-{{i}}:8080 - CLI_HEALTHCHECKER_ADDRESSES={% for hc in range(1,healthcheckers + 1) %}healthchecker-{{hc}}:8080{{ "," if not loop.last else "" }}{% endfor %} @@ -313,7 +313,7 @@ services: - CLI_NAME=saver-ex4-{{i}} - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver4_queue - - CLI_SAVER_OUTPUT=results_ex4.csv + - CLI_SAVER_OUTPUT=results_ex4 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex4-{{i}}:8080 - CLI_HEALTHCHECKER_ADDRESSES={% for hc in range(1,healthcheckers + 1) %}healthchecker-{{hc}}:8080{{ "," if not loop.last else "" }}{% endfor %} diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml index a92b465..101154b 100644 --- a/docker-compose-dev.yaml +++ b/docker-compose-dev.yaml @@ -29,7 +29,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_1 - CLI_RABBITMQ_QUEUE_OUTPUT=saver1_queue - CLI_REDUCER_COLUMNS=legId,route,totalFare - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -48,7 +48,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_1 - CLI_RABBITMQ_QUEUE_OUTPUT=saver1_queue - CLI_REDUCER_COLUMNS=legId,route,totalFare - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -69,7 +69,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_2 - CLI_RABBITMQ_QUEUE_OUTPUT=saver2_queue - CLI_REDUCER_COLUMNS=legId,route - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -88,7 +88,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=dim_reducer_saver_2 - CLI_RABBITMQ_QUEUE_OUTPUT=saver2_queue - CLI_REDUCER_COLUMNS=legId,route - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -110,7 +110,7 @@ services: - CLI_RABBITMQ_QUEUES_INPUT=filters_stopovers - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_1 - CLI_RABBITMQ_EXCHANGE_OUTPUTS=saver_3 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -129,7 +129,7 @@ services: - CLI_RABBITMQ_QUEUES_INPUT=filters_stopovers - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_1 - CLI_RABBITMQ_EXCHANGE_OUTPUTS=saver_3 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -150,7 +150,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=flight_row_processor - CLI_RABBITMQ_QUEUE_OUTPUT_EX123=filters_stopovers,distance_calculator - CLI_RABBITMQ_QUEUE_OUTPUT_EX4=ex4_solver - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -169,7 +169,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=flight_row_processor - CLI_RABBITMQ_QUEUE_OUTPUT_EX123=filters_stopovers,distance_calculator - CLI_RABBITMQ_QUEUE_OUTPUT_EX4=ex4_solver - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -189,7 +189,7 @@ services: - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUES_INPUT=filters_distances - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_2 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -207,7 +207,7 @@ services: - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUES_INPUT=filters_distances - CLI_RABBITMQ_QUEUES_OUTPUT=dim_reducer_saver_2 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -231,8 +231,8 @@ services: - CLI_QUEUES_AIRPORTS_EXCHANGE_TYPE=fanout - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTEXCHANGE=AirportsExchange - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTROUTINGKEY=airports - - CLI_COMPLETER_FILENAME=flightrows.csv - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_COMPLETER_FILENAME=flightrows + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -254,8 +254,8 @@ services: - CLI_QUEUES_AIRPORTS_EXCHANGE_TYPE=fanout - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTEXCHANGE=AirportsExchange - CLI_RABBITMQ_QUEUE_INPUT_AIRPORTROUTINGKEY=airports - - CLI_COMPLETER_FILENAME=flightrows.csv - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_COMPLETER_FILENAME=flightrows + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -278,7 +278,7 @@ services: - CLI_RABBITMQ_QUEUE_OUTPUTS_SAVER=sink_ex4_queue - CLI_INTERNAL_SAVERS_COUNT=6 - CLI_RABBITMQ_RK_INPUT=0 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -299,7 +299,7 @@ services: - CLI_RABBITMQ_QUEUE_OUTPUTS_SAVER=sink_ex4_queue - CLI_INTERNAL_SAVERS_COUNT=6 - CLI_RABBITMQ_RK_INPUT=6 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -320,7 +320,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=ex4_solver - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -346,7 +346,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=ex4_solver - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -374,7 +374,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=accum_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -393,7 +393,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=accum_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=journey_savers_ex4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -414,7 +414,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=sink_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=saver4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -433,7 +433,7 @@ services: - CLI_RABBITMQ_QUEUE_INPUT=sink_ex4_queue - CLI_RABBITMQ_QUEUE_OUTPUT=saver4_queue - CLI_SAVERS_COUNT=12 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -452,10 +452,10 @@ services: - CLI_NAME=saver-ex1-1 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver1_queue - - CLI_SAVER_OUTPUT=results_ex1.csv + - CLI_SAVER_OUTPUT=results_ex1 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex1-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -472,10 +472,10 @@ services: - CLI_NAME=saver-ex1-2 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver1_queue - - CLI_SAVER_OUTPUT=results_ex1.csv + - CLI_SAVER_OUTPUT=results_ex1 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex1-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -494,10 +494,10 @@ services: - CLI_NAME=saver-ex2-1 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver2_queue - - CLI_SAVER_OUTPUT=results_ex2.csv + - CLI_SAVER_OUTPUT=results_ex2 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex2-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -514,10 +514,10 @@ services: - CLI_NAME=saver-ex2-2 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver2_queue - - CLI_SAVER_OUTPUT=results_ex2.csv + - CLI_SAVER_OUTPUT=results_ex2 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex2-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -539,7 +539,7 @@ services: - CLI_SAVER_OUTPUT=results_ex3 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex3-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -559,7 +559,7 @@ services: - CLI_SAVER_OUTPUT=results_ex3 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex3-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -578,10 +578,10 @@ services: - CLI_NAME=saver-ex4-1 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver4_queue - - CLI_SAVER_OUTPUT=results_ex4.csv + - CLI_SAVER_OUTPUT=results_ex4 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex4-1:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -598,10 +598,10 @@ services: - CLI_NAME=saver-ex4-2 - CLI_LOG_LEVEL=INFO - CLI_RABBITMQ_QUEUE_INPUT=saver4_queue - - CLI_SAVER_OUTPUT=results_ex4.csv + - CLI_SAVER_OUTPUT=results_ex4 - CLI_GETTER_BATCH_LINES=100 - CLI_GETTER_ADDRESS=saver-ex4-2:8080 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -626,7 +626,7 @@ services: - CLI_QUEUES_AIRPORTS_EXCHANGE_NAME=AirportsExchange - CLI_QUEUES_AIRPORTS_EXCHANGE_ROUTINGKEY=airports - CLI_QUEUES_FLIGHTROWS=flight_row_processor - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -687,10 +687,10 @@ services: - CLI_LOG_LEVEL=INFO - CLI_NAME=healthchecker-1 - CLI_HEALTHCHECKER_ADDRESS=healthchecker-1:8080 - - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=2:healthchecker-2:8081 + - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=2:healthchecker-2:8081,3:healthchecker-3:8081 - CLI_HEALTHCHECKER_ELECTION_UDP_ADDRESS=healthchecker-1:8081 - CLI_HEALTHCHECKER_ELECTION_ID=1 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-2:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-2:8080,healthchecker-3:8080 networks: - testing_net volumes: @@ -705,10 +705,28 @@ services: - CLI_LOG_LEVEL=INFO - CLI_NAME=healthchecker-2 - CLI_HEALTHCHECKER_ADDRESS=healthchecker-2:8080 - - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=1:healthchecker-1:8081 + - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=1:healthchecker-1:8081,3:healthchecker-3:8081 - CLI_HEALTHCHECKER_ELECTION_UDP_ADDRESS=healthchecker-2:8081 - CLI_HEALTHCHECKER_ELECTION_ID=2 - - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-3:8080 + networks: + - testing_net + volumes: + - ./healthchecker/config.yaml:/config.yaml + - /var/run/docker.sock:/var/run/docker.sock + + healthchecker-3: + container_name: healthchecker-3 + image: healthchecker:latest + environment: + - CLI_ID=33 + - CLI_LOG_LEVEL=INFO + - CLI_NAME=healthchecker-3 + - CLI_HEALTHCHECKER_ADDRESS=healthchecker-3:8080 + - CLI_HEALTHCHECKER_ELECTION_ID_ADDRESSES=1:healthchecker-1:8081,2:healthchecker-2:8081 + - CLI_HEALTHCHECKER_ELECTION_UDP_ADDRESS=healthchecker-3:8081 + - CLI_HEALTHCHECKER_ELECTION_ID=3 + - CLI_HEALTHCHECKER_ADDRESSES=healthchecker-1:8080,healthchecker-2:8080 networks: - testing_net volumes: diff --git a/ex4_journey_saver/journeysaver.go b/ex4_journey_saver/journeysaver.go index fc01993..5c379b1 100644 --- a/ex4_journey_saver/journeysaver.go +++ b/ex4_journey_saver/journeysaver.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "slices" "strconv" + "strings" ) // JourneySaver Handles the prices of the assigned journeys @@ -184,7 +185,7 @@ func (js *JourneySaver) sendAverageForJourneys(finalAvg float32, clientId string dynMap := make(map[string][]byte) dynMap[utils.Avg] = serializer.SerializeFloat(journeyAverage) dynMap[utils.Max] = serializer.SerializeFloat(journeyMax) - dynMap[utils.Journey] = serializer.SerializeString(fileStr) + dynMap[utils.Journey] = serializer.SerializeString(strings.Split(fileStr, "_")[0]) data := []*dataStructure.DynamicMap{dataStructure.NewDynamicMap(dynMap)} msg := &dataStructure.Message{ TypeMessage: dataStructure.FlightRows, @@ -233,10 +234,13 @@ func (js *JourneySaver) SavePricesForJourneys() { if err != nil { log.Errorf("JourneySaver | Error getting finalAvg") } - log.Infof("JourneySaver | Received Final Avg: %v. Now sending Average for Journeys...", finalAvg) + log.Infof("JourneySaver | Received Final Avg: %v | Client %v | Checking if it was already processed...", finalAvg, msg.ClientId) _, exists := js.processedClients[msg.ClientId] if !exists { + log.Infof("JourneySaver | It was not processed | Client %v | Now sending Average for Journeys...", msg.ClientId) js.sendAverageForJourneys(finalAvg, msg.ClientId) + } else { + log.Infof("JourneySaver | Message was duplicated | Client %v | Discarding it... ", msg.ClientId) } } diff --git a/healthchecker/healthcheck_config.go b/healthchecker/healthcheck_config.go index 11e7bd3..5eb80ea 100644 --- a/healthchecker/healthcheck_config.go +++ b/healthchecker/healthcheck_config.go @@ -6,7 +6,6 @@ import ( "github.com/brunograssano/Distribuidos-TP1/common/config" log "github.com/sirupsen/logrus" "github.com/spf13/viper" - "net" "strconv" "strings" ) @@ -17,8 +16,8 @@ type Config struct { Address string RestartTime uint CheckTime uint - UdpAddress *net.UDPAddr - NetAddresses map[uint8]*net.UDPAddr + UdpAddress []string + NetAddresses map[uint8][]string ElectionId uint8 HealthCheckers []string Name string @@ -94,37 +93,20 @@ func GetConfig(env *viper.Viper) (*Config, error) { if udpAddressString == "" { return nil, errors.New("missing udpAddress") } - ipAndPort := strings.Split(udpAddressString, ":") - port, err := strconv.Atoi(ipAndPort[1]) - if err != nil { - return nil, errors.New(fmt.Sprintf("port error when converting to int: %v", err)) - } - ip, err := net.LookupIP(ipAndPort[0]) - if err != nil { - return nil, errors.New(fmt.Sprintf("ip error when doing lookup: %v", err)) - } - udpAddress := &net.UDPAddr{IP: ip[0], Port: port} + udpAddress := strings.Split(udpAddressString, ":") electionParticipantsIdsAndAddresses := env.GetString("healthchecker.election.id.addresses") - electionParticipants := make(map[uint8]*net.UDPAddr) + electionParticipants := make(map[uint8][]string) if electionParticipantsIdsAndAddresses != "" { idsWithAddresses := strings.Split(electionParticipantsIdsAndAddresses, ",") for _, idAndAddress := range idsWithAddresses { idIpPort := strings.Split(idAndAddress, ":") - port, err := strconv.Atoi(idIpPort[2]) - if err != nil { - return nil, errors.New(fmt.Sprintf("error converting port to int: %v", err)) - } - ipUDP, err := net.LookupIP(idIpPort[1]) - if err != nil { - return nil, errors.New(fmt.Sprintf("ip error when doing lookup: %v", err)) - } - udpAddr := &net.UDPAddr{IP: ipUDP[0], Port: port} - nodeId, err := strconv.Atoi(idIpPort[0]) - if err != nil { - return nil, errors.New(fmt.Sprintf("error converting id to int: %v", err)) + idNodeStr := idIpPort[0] + idNode, err := strconv.Atoi(idNodeStr) + if err != nil || idNode > 255 { + return nil, errors.New(fmt.Sprintf("election node id error when converting to int8: %v", err)) } - electionParticipants[uint8(nodeId)] = udpAddr + electionParticipants[uint8(idNode)] = idIpPort[1:] } } else { log.Warnf("HealthChecker Config | There is only one healthchecker") @@ -150,7 +132,7 @@ func GetConfig(env *viper.Viper) (*Config, error) { restartTime, checkTime, myElectionId, - udpAddressString, + udpAddress, electionParticipantsIdsAndAddresses, hcAddresses, name, diff --git a/simple_saver/saver/saver.go b/simple_saver/saver/saver.go index bd49379..7a4cf47 100644 --- a/simple_saver/saver/saver.go +++ b/simple_saver/saver/saver.go @@ -49,6 +49,10 @@ func (s *SimpleSaver) SaveData() { } func (s *SimpleSaver) handleFlightRows(msg *dataStructures.Message) error { + if filemanager.DirectoryExists(msg.ClientId) { + log.Warnf("SimpleSaver | Already processed client %v, discarding message...", msg.ClientId) + return nil + } file := fmt.Sprintf("%v_%v.csv", s.c.OutputFileName, msg.ClientId) writer, err := filemanager.NewFileWriter(file) if err != nil { From c2c27302e6ed4260507edcf6f0e861a77448f83c Mon Sep 17 00:00:00 2001 From: Franco Batastini <66926111+Bata340@users.noreply.github.com> Date: Tue, 21 Nov 2023 19:20:09 -0300 Subject: [PATCH 2/3] [EDIT] Change channels for rabbit queues in Ex3 Saver Co-Authored-By: Bruno Grassano <53836177+brunograssano@users.noreply.github.com> --- saver_ex_3/ex3/ex3handler.go | 32 ++++++++++---------------------- saver_ex_3/main.go | 2 +- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/saver_ex_3/ex3/ex3handler.go b/saver_ex_3/ex3/ex3handler.go index 523c41c..87ad601 100644 --- a/saver_ex_3/ex3/ex3handler.go +++ b/saver_ex_3/ex3/ex3handler.go @@ -2,13 +2,11 @@ package ex3 import ( "fmt" - dataStructures "github.com/brunograssano/Distribuidos-TP1/common/data_structures" "github.com/brunograssano/Distribuidos-TP1/common/dispatcher" "github.com/brunograssano/Distribuidos-TP1/common/filemanager" "github.com/brunograssano/Distribuidos-TP1/common/getters" queueProtocol "github.com/brunograssano/Distribuidos-TP1/common/protocol/queues" "github.com/brunograssano/Distribuidos-TP1/common/queuefactory" - "github.com/brunograssano/Distribuidos-TP1/common/utils" log "github.com/sirupsen/logrus" ) @@ -17,33 +15,29 @@ type Ex3Handler struct { journeyDispatcher []*dispatcher.JourneyDispatcher savers []*SaverForEx3 getter *getters.Getter - channels []chan *dataStructures.Message finishedSignals chan string outputFilenames []string quantityFinishedByClient map[string]uint } // NewEx3Handler Creates a new exercise 3 handler -func NewEx3Handler(c *SaverConfig, qFactory queuefactory.QueueProtocolFactory) *Ex3Handler { - var channels []chan *dataStructures.Message +func NewEx3Handler(c *SaverConfig, dispatchersQFactory queuefactory.QueueProtocolFactory, internalQFactory queuefactory.QueueProtocolFactory) *Ex3Handler { // Creation of the JourneySavers, they handle the prices per journey - var internalSavers []*SaverForEx3 + var internalSaversConsumers []*SaverForEx3 var outputFileNames []string finishSignal := make(chan string, c.InternalSaversCount) - var toInternalSaversChannels []queueProtocol.ProducerProtocolInterface + var toInternalSavers []queueProtocol.ProducerProtocolInterface log.Infof("Ex3Handler | Creating %v savers...", int(c.InternalSaversCount)) for i := 0; i < int(c.InternalSaversCount); i++ { - internalSaverChannel := make(chan *dataStructures.Message, utils.BufferSizeChannels) - channels = append(channels, internalSaverChannel) - internalSavers = append(internalSavers, NewSaverForEx3( - queueProtocol.NewConsumerChannel(internalSaverChannel), + internalSaversConsumers = append(internalSaversConsumers, NewSaverForEx3( + internalQFactory.CreateConsumer(fmt.Sprintf("saver3-internal-%v-%v", c.ID, i)), c, finishSignal, i, )) outputFileNames = append(outputFileNames, fmt.Sprintf("%v_%v", c.OutputFilePrefix, i)) - toInternalSaversChannels = append(toInternalSaversChannels, queueProtocol.NewProducerChannel(internalSaverChannel)) + toInternalSavers = append(toInternalSavers, internalQFactory.CreateProducer(fmt.Sprintf("saver3-internal-%v-%v", c.ID, i))) log.Infof("Ex3Handler | Created Saver #%v correctly...", i) } @@ -52,9 +46,9 @@ func NewEx3Handler(c *SaverConfig, qFactory queuefactory.QueueProtocolFactory) * var jds []*dispatcher.JourneyDispatcher for i := uint(0); i < c.DispatchersCount; i++ { // We create the input queue to the EX3 service - inputQueue := qFactory.CreateConsumer(fmt.Sprintf("%v-%v", c.InputQueueName, c.ID)) - prodToInput := qFactory.CreateProducer(c.ID) - jds = append(jds, dispatcher.NewJourneyDispatcher(inputQueue, prodToInput, toInternalSaversChannels)) + inputQueue := dispatchersQFactory.CreateConsumer(fmt.Sprintf("%v-%v", c.InputQueueName, c.ID)) + prodToInput := dispatchersQFactory.CreateProducer(c.ID) + jds = append(jds, dispatcher.NewJourneyDispatcher(inputQueue, prodToInput, toInternalSavers)) } getterConf := getters.NewGetterConfig(c.ID, outputFileNames, c.GetterAddress, c.GetterBatchLines) @@ -66,8 +60,7 @@ func NewEx3Handler(c *SaverConfig, qFactory queuefactory.QueueProtocolFactory) * return &Ex3Handler{ c: c, journeyDispatcher: jds, - channels: channels, - savers: internalSavers, + savers: internalSaversConsumers, getter: getter, finishedSignals: finishSignal, outputFilenames: outputFileNames, @@ -122,11 +115,6 @@ func (se3 *Ex3Handler) StartHandler() { // Close Closes the handler of the exercise 4 func (se3 *Ex3Handler) Close() { log.Infof("Ex3Handler | Closing resources...") - log.Infof("Ex3Handler | Starting channel closing...") - for idx, channel := range se3.channels { - log.Infof("Ex3Handler | Closing channel #%v", idx) - close(channel) - } close(se3.finishedSignals) log.Infof("Ex3Handler | Closing Getter") se3.getter.Close() diff --git a/saver_ex_3/main.go b/saver_ex_3/main.go index ec0f9dd..da68dcb 100644 --- a/saver_ex_3/main.go +++ b/saver_ex_3/main.go @@ -22,7 +22,7 @@ func main() { } qMiddleware := middleware.NewQueueMiddleware(config.RabbitAddress) qFactory := queuefactory.NewTopicFactory(qMiddleware, []string{"", config.ID}, config.InputQueueName) - saverEx3 := ex3.NewEx3Handler(config, qFactory) + saverEx3 := ex3.NewEx3Handler(config, qFactory, queuefactory.NewSimpleQueueFactory(qMiddleware)) go saverEx3.StartHandler() endSigHB := heartbeat.StartHeartbeat(config.AddressesHealthCheckers, config.ServiceName) <-sigs From aad2ddb27a97002db1abec18ffb12e5c673dbee8 Mon Sep 17 00:00:00 2001 From: Franco Batastini <66926111+Bata340@users.noreply.github.com> Date: Tue, 21 Nov 2023 19:28:01 -0300 Subject: [PATCH 3/3] [ADD] Container Killer in Python by names separeted with comma Co-Authored-By: Bruno Grassano <53836177+brunograssano@users.noreply.github.com> --- container_killer/killer.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 container_killer/killer.py diff --git a/container_killer/killer.py b/container_killer/killer.py new file mode 100644 index 0000000..32ec2a7 --- /dev/null +++ b/container_killer/killer.py @@ -0,0 +1,12 @@ +import subprocess +import os + +def kill_container(): + containers = input("Container names [Comma Separated Container Names]: ") + containers = containers.split(",") + for container in containers: + result = subprocess.run(['docker','kill',container], check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + print(f"Kill executed on {container}. \nResult={result.returncode}. Output={result.stdout}. Error={result.stderr}") + +if __name__ == '__main__': + kill_container() \ No newline at end of file