From 9ec2ea1efa97b7e7a5e9141401507b4bc457112d Mon Sep 17 00:00:00 2001 From: Dylan AP Date: Tue, 9 Jul 2019 19:01:41 +0200 Subject: [PATCH 01/31] communication: now use TAG_MSG --- include/communication.h | 10 ++++++++++ src/cli/cli.c | 3 ++- src/network/leader_election.c | 6 +++--- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/include/communication.h b/include/communication.h index e50aa6d..270c8ef 100644 --- a/include/communication.h +++ b/include/communication.h @@ -8,6 +8,9 @@ // Number of attempts failed before considering the node as dead. #define NB_ITER 60 +#define TAG_MSG 201 +#define TAG_DATA 4 + /* * Send the given message 'm' to the destination node * Return 1 if it's a success, 0 if the destination node didn't respond in time @@ -20,4 +23,11 @@ int send_safe_message(struct message *m_send, struct queue *queue); */ struct message *receive_message(struct queue *message_queue); +/* + * Recieve a specific data from a specific node. + * All others messages recieved are put in the queue. If needed, a IS_ALIVE is send. + * Return the wanted data, which was calloc. + */ +void *recieve_data(size_t size, struct queue *queue, unsigned source); + #endif /* !DISTRIBUTEDMALLOC_COMMUNICATION_H */ diff --git a/src/cli/cli.c b/src/cli/cli.c index a8a178a..1d0b6ae 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -4,6 +4,7 @@ #include #include #include "cli.h" +#include "communication.h" char *read_cmd() { ssize_t buffer_size = 256; @@ -277,7 +278,7 @@ void execute(char **args, unsigned short leader) { unsigned short get_leader() { struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); - MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, 201, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("Got Leader !", DEF_NODE_USER); return m->id_o; } diff --git a/src/network/leader_election.c b/src/network/leader_election.c index d10b63e..dcde3c9 100644 --- a/src/network/leader_election.c +++ b/src/network/leader_election.c @@ -26,7 +26,7 @@ unsigned leader_election(unsigned id, unsigned network_size) { debug("LEADER IS DEAD, STARTING AGAIN", id); for (unsigned i = 1; i < network_size; ++i) { struct message *m_again = generate_message(id, i, leader, 0, 0, OP_LEADER_AGAIN); - MPI_Send(m_again, sizeof(*m_again), MPI_BYTE, i, 201, MPI_COMM_WORLD); + MPI_Send(m_again, sizeof(*m_again), MPI_BYTE, i, TAG_MSG, MPI_COMM_WORLD); free(m_again); } return leader_election(id, network_size); // TODO: Check if it works + fix leaks @@ -57,13 +57,13 @@ unsigned leader_election(unsigned id, unsigned network_size) { // send LEADER OK for (unsigned i = 1; i < network_size; ++i) { struct message *m_ok = generate_message(id, i, leader, 0, 0, OP_LEADER_OK); - MPI_Send(m_ok, sizeof(*m_ok), MPI_BYTE, i, 201, MPI_COMM_WORLD); + MPI_Send(m_ok, sizeof(*m_ok), MPI_BYTE, i, TAG_MSG, MPI_COMM_WORLD); free(m_ok); } // send leader to user struct message *m_user = generate_message(id, 0, leader, 0, 0, OP_OK); - MPI_Send(m_user, sizeof(*m_user), MPI_BYTE, 0, 201, MPI_COMM_WORLD); + MPI_Send(m_user, sizeof(*m_user), MPI_BYTE, 0, TAG_MSG, MPI_COMM_WORLD); free(m_user); return leader; } From 1c312218d919ae90ec2dc165911ac5f3c976f5f2 Mon Sep 17 00:00:00 2001 From: Dylan AP Date: Tue, 9 Jul 2019 19:02:05 +0200 Subject: [PATCH 02/31] communication: now use TAG_MSG --- src/network/communication.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/network/communication.c b/src/network/communication.c index c2abc75..d1f6a7c 100644 --- a/src/network/communication.c +++ b/src/network/communication.c @@ -8,7 +8,7 @@ int send_safe_message(struct message *m_send, struct queue *queue) { debug("init send safe message", m_send->id_s); - MPI_Send(m_send, sizeof(struct message), MPI_BYTE, m_send->id_t, 201, MPI_COMM_WORLD); + MPI_Send(m_send, sizeof(struct message), MPI_BYTE, m_send->id_t, TAG_MSG, MPI_COMM_WORLD); struct message *m_recv = calloc(1, sizeof(struct message)); MPI_Request r_ok; @@ -30,7 +30,7 @@ int send_safe_message(struct message *m_send, struct queue *queue) { } if (m_recv->need_callback) { struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); - MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, 201, MPI_COMM_WORLD); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, TAG_MSG, MPI_COMM_WORLD); free(m_alive); } queue_push_back(queue, m_recv); @@ -57,7 +57,7 @@ struct message *receive_message(struct queue *message_queue) { if (m_recv->need_callback) { debug("send callback", m_recv->id_s + 10 * m_recv->id_t); struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); - MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, 201, MPI_COMM_WORLD); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, TAG_MSG, MPI_COMM_WORLD); //printf("%u respond to %u %u with a %u __ \n", m_alive->id_s, status.MPI_SOURCE, m_alive->id_t, m_alive->op); free(m_alive); } From 70629df2524586c25402b036849d62a7ec0f7529 Mon Sep 17 00:00:00 2001 From: Dylan AP Date: Tue, 9 Jul 2019 19:06:23 +0200 Subject: [PATCH 03/31] data_communication: recieve_data, need tests --- CMakeLists.txt | 1 + src/network/data_communication.c | 52 ++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 src/network/data_communication.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c0daa8..61372aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,7 @@ set(SRCS src/utils/debug.c src/cli/user.c src/network/communication.c + src/network/data_communication.c src/utils/queue.c) diff --git a/src/network/data_communication.c b/src/network/data_communication.c new file mode 100644 index 0000000..acf707e --- /dev/null +++ b/src/network/data_communication.c @@ -0,0 +1,52 @@ +#include "communication.h" + +#include +#include +#include +#include +#include "debug.h" + + +void *recieve_data(size_t size, struct queue *queue, unsigned source) { + + MPI_Request r_data; + void *data = calloc(1, size); + MPI_Irecv(data, size, MPI_CHAR, source, TAG_DATA, MPI_COMM_WORLD, &r_data); + + MPI_Request r_msg; + struct message *m_recv = calloc(1, sizeof(struct message)); + MPI_Irecv(m_recv, sizeof(struct message), MPI_CHAR, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_msg); + + struct timespec waiting_time; + waiting_time.tv_sec = 0; + waiting_time.tv_nsec = 100000000; + + int f_msg = 0; + int f_data = 0; + + for (int i = 0; i < NB_ITER; i++) { + // Checking messages recieved + MPI_Test(&r_msg, &f_msg, MPI_STATUS_IGNORE); + if (f_msg) { + if (m_recv->need_callback) { + struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, TAG_MSG, MPI_COMM_WORLD); + free(m_alive); + } + queue_push_back(queue, m_recv); + m_recv = calloc(1, sizeof(struct message)); + MPI_Irecv(m_recv, sizeof(struct message), MPI_CHAR, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_msg); + } + // Checking data recieved + if (f_data) { + MPI_Cancel(&r_msg); + return data; + } + nanosleep(&waiting_time, NULL); + } + MPI_Cancel(&r_msg); + MPI_Cancel(&r_data); + free(m_recv); + return 0; + +} \ No newline at end of file From 9e5a5e1a4e5d0f16b7422a5a9ce9686592646aa9 Mon Sep 17 00:00:00 2001 From: Dylan AP Date: Tue, 9 Jul 2019 23:02:07 +0200 Subject: [PATCH 04/31] data_com: fixed fun --- src/network/data_communication.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/network/data_communication.c b/src/network/data_communication.c index acf707e..348c3b8 100644 --- a/src/network/data_communication.c +++ b/src/network/data_communication.c @@ -38,6 +38,7 @@ void *recieve_data(size_t size, struct queue *queue, unsigned source) { MPI_Irecv(m_recv, sizeof(struct message), MPI_CHAR, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_msg); } // Checking data recieved + MPI_Test(&r_data, &f_data, MPI_STATUS_IGNORE); if (f_data) { MPI_Cancel(&r_msg); return data; From 952a30c972afb9d6adff45529b4b8c992dfde283 Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Tue, 9 Jul 2019 17:03:40 +0200 Subject: [PATCH 05/31] communication: fix leaks in leader election --- include/communication.h | 6 ++++++ src/network/communication.c | 10 +++++++++- src/network/leader_election.c | 35 +++++++++++++++++++++-------------- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/include/communication.h b/include/communication.h index 270c8ef..573afbd 100644 --- a/include/communication.h +++ b/include/communication.h @@ -30,4 +30,10 @@ struct message *receive_message(struct queue *message_queue); */ void *recieve_data(size_t size, struct queue *queue, unsigned source); +/* + * Send message to all nodes except user and itself + * m_send->id_t will change according to the target node + */ +void broadcast_message(struct message *m_send, unsigned id, unsigned network_size); + #endif /* !DISTRIBUTEDMALLOC_COMMUNICATION_H */ diff --git a/src/network/communication.c b/src/network/communication.c index d1f6a7c..d736583 100644 --- a/src/network/communication.c +++ b/src/network/communication.c @@ -55,7 +55,7 @@ struct message *receive_message(struct queue *message_queue) { MPI_Recv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("queue empty, receive message", m_recv->id_t); if (m_recv->need_callback) { - debug("send callback", m_recv->id_s + 10 * m_recv->id_t); + debug("send callback", m_recv->id_t); struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, TAG_MSG, MPI_COMM_WORLD); //printf("%u respond to %u %u with a %u __ \n", m_alive->id_s, status.MPI_SOURCE, m_alive->id_t, m_alive->op); @@ -63,3 +63,11 @@ struct message *receive_message(struct queue *message_queue) { } return m_recv; } + +void broadcast_message(struct message *m_send, unsigned id, unsigned network_size) { + for (unsigned i = 1; i < network_size; ++i) { + m_send->id_t = i; + if (i != id) + MPI_Send(m_send, sizeof(struct message), MPI_BYTE, i, TAG_MSG, MPI_COMM_WORLD); + } +} diff --git a/src/network/leader_election.c b/src/network/leader_election.c index dcde3c9..5b3877b 100644 --- a/src/network/leader_election.c +++ b/src/network/leader_election.c @@ -24,16 +24,17 @@ unsigned leader_election(unsigned id, unsigned network_size) { debug("TIMEOUT", id); if (next == leader) { debug("LEADER IS DEAD, STARTING AGAIN", id); - for (unsigned i = 1; i < network_size; ++i) { - struct message *m_again = generate_message(id, i, leader, 0, 0, OP_LEADER_AGAIN); - MPI_Send(m_again, sizeof(*m_again), MPI_BYTE, i, TAG_MSG, MPI_COMM_WORLD); - free(m_again); - } + struct message *m_again = generate_message(id, 0, leader, 0, 0, OP_LEADER_AGAIN); + broadcast_message(m_again, id, network_size); + free(m_again); + free(m_send); return leader_election(id, network_size); // TODO: Check if it works + fix leaks } next = next_id(next, network_size); if (next == id) { debug("i'm alone", id); + queue_free(message_queue); + free(m_send); return 0; } m_send->id_t = next; @@ -42,33 +43,39 @@ unsigned leader_election(unsigned id, unsigned network_size) { free(m_send); } //debug("safe message sent", id); - struct message *m_receive = receive_message(message_queue); - if (m_receive->op == OP_LEADER_OK) + if (m_receive->op == OP_LEADER_OK) { + queue_free(message_queue); + free(m_receive); return leader; + } - if (m_receive->op == OP_LEADER_AGAIN) + if (m_receive->op == OP_LEADER_AGAIN) { + queue_free(message_queue); + free(m_receive); return leader_election(id, network_size); // TODO: Check if it works + fix leaks + } if (m_receive->op == OP_LEADER) { new_leader = m_receive->id_o; if (m_receive->id_o == id) { // send LEADER OK - for (unsigned i = 1; i < network_size; ++i) { - struct message *m_ok = generate_message(id, i, leader, 0, 0, OP_LEADER_OK); - MPI_Send(m_ok, sizeof(*m_ok), MPI_BYTE, i, TAG_MSG, MPI_COMM_WORLD); - free(m_ok); - } + struct message *m_ok = generate_message(id, 0, leader, 0, 0, OP_LEADER_OK); + broadcast_message(m_ok, id, network_size); + free(m_ok); // send leader to user struct message *m_user = generate_message(id, 0, leader, 0, 0, OP_OK); MPI_Send(m_user, sizeof(*m_user), MPI_BYTE, 0, TAG_MSG, MPI_COMM_WORLD); free(m_user); + queue_free(message_queue); + free(m_receive); return leader; } } - } + free(m_receive); + } // while(1) } From 7294f90fb5a72ece5a032d40acb564099f513a25 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 10:44:13 +0200 Subject: [PATCH 06/31] cli: improved help with new commands, add help message if invalid command --- src/cli/cli.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/cli/cli.c b/src/cli/cli.c index 66384c5..fa7ae96 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -88,7 +88,11 @@ void execute(char **args, unsigned short leader) { " m `size` | return `address` to cmd user of the required allocation |\n" " f `address` | free address, Warning if already free |\n" " w `address` `datasize` `data` | write at the address the data of size datasize |\n" + " w `address` `file` | write all content of file at address |\n" + " w `address` `file` `datasize` | write datasize bytes from file to the address |\n" " r `address` `datasize` | read datasize bytes at address |\n" + " r `address` `file` | read all bytes of the block at address into file |\n" + " r `address` `file` `datasize` | read datasize bytes at address into file |\n" " d `address` | dump in as text all data of the block stored in address |\n" " d net | dump all allocation |\n" " d `address` `file` | dump address data in file |\n" @@ -285,6 +289,9 @@ void execute(char **args, unsigned short leader) { error_msg("w require an argument 'address' which can be casted as a positive integer"); } } + else { + error_msg("command not found, see 'h' for help"); + } } From 66ddf4a359562e4afcfef5bded4a79e106b8091d Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 12:06:23 +0200 Subject: [PATCH 07/31] global: set node size to 128 --- include/globals.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/globals.h b/include/globals.h index 55c74b5..3f60f3e 100644 --- a/include/globals.h +++ b/include/globals.h @@ -1,7 +1,7 @@ #ifndef DISTRIBUTEDMALLOC_GLOBALS_H #define DISTRIBUTEDMALLOC_GLOBALS_H -#define DEF_NODE_SIZE 8 +#define DEF_NODE_SIZE 128 #define DEF_NODE_USER 0 #define DEF_NODE_LEADER 1 From 2f037f37394ae7dbb3958e11a7fdd5d158446a22 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 12:06:39 +0200 Subject: [PATCH 08/31] message: add OP_READ_FILE --- include/message.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/message.h b/include/message.h index 1c63da0..ab2c779 100644 --- a/include/message.h +++ b/include/message.h @@ -9,6 +9,7 @@ enum operation { OP_FREE, OP_WRITE, OP_READ, + OP_READ_FILE, OP_SNAP, OP_LEADER, OP_WHOISLEADER, From 049de3137fbab19b38d3c41bd7c8a960d171b440 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 12:07:18 +0200 Subject: [PATCH 09/31] user: add read file, write file --- src/cli/cli.c | 79 ++++++++++++++++++++++++++++++++++++++++++++------ src/cli/user.c | 57 +++++++++++++++++++++++++++--------- 2 files changed, 114 insertions(+), 22 deletions(-) diff --git a/src/cli/cli.c b/src/cli/cli.c index fa7ae96..2e6e789 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "cli.h" #include "communication.h" @@ -141,8 +142,9 @@ void execute(char **args, unsigned short leader) { } } else if (0 == strcmp(args[0], "w")) { // ERRORS - if (l <= 3) { - error_msg("w require 3 arguments : 'address', 'datasize' and 'data'"); + if (l <= 2) { + error_msg( + "w require 2-3 arguments : 'address', 'datasize' and 'data'\n OR 'address' 'file' + 'datasize' optional, check h"); return; } else if (l >= 5) { error_msg("w do not support more than 3 arguments, check command h"); @@ -159,7 +161,36 @@ void execute(char **args, unsigned short leader) { send_command(OP_WRITE, d_w, leader); free(d_w); } else { - error_msg("w requires an argument 'datasize' which can be casted as a positive integer"); + // Write from File + FILE *file_write = fopen(args[2], "r"); + if (!file_write) { + error_msg("w 'address' 'file', file was not possible to open, or doesn't exist"); + return; + } + struct stat st; + /*get the size using stat()*/ + if (stat(args[2], &st) != 0) + return; + if (l == 4) { + // file max data read + if (1 == sscanf(args[3], "%zu", &datasize)) { + if ((ssize_t) datasize >= st.st_size) + datasize = st.st_size; + } else { + error_msg("w requires an argument 'datasize' which can be casted as a positive integer"); + return; + } + } else { + datasize = st.st_size; + } + // Now read datasize + char *buffer_write_file = malloc(sizeof(char) * (2 + datasize)); + size_t r_c = fread(buffer_write_file, sizeof(char), datasize, file_write); + if (r_c != datasize) + error_msg("WARNING: file bytes read different from bytes asked"); + struct data_write *d_w = generate_data_write(address, datasize, buffer_write_file); + send_command(OP_WRITE, d_w, leader); + // OLD error_msg("w requires an argument 'datasize' which can be casted as a positive integer"); } } else { error_msg("w requires an argument 'address' which can be casted as a positive integer"); @@ -167,10 +198,10 @@ void execute(char **args, unsigned short leader) { } else if (0 == strcmp(args[0], "r")) { // ERRORS if (l <= 2) { - error_msg("r requires 2 arguments : 'address' and 'datasize'"); + error_msg("r requires 2-3 arguments : 'address' and 'datasize'\n OR 'address' 'file'\n OR 'address' 'file' 'datasize"); return; - } else if (l >= 4) { - error_msg("r do not support more than 2 arguments, check command h"); + } else if (l >= 5) { + error_msg("r do not support more than 2-3 arguments, check command h"); return; } @@ -184,7 +215,38 @@ void execute(char **args, unsigned short leader) { send_command(OP_READ, d_r, leader); free(d_r); } else { - error_msg("r requires an argument 'datasize' which can be casted as a positive integer"); + // Read to File + FILE *file_read = fopen(args[2], "w"); + if (!file_read) { + error_msg("r 'address' 'file', file was not possible to open, or doesn't exist"); + return; + } + struct stat st; + if (stat(args[2], &st) != 0) + return; + if (l == 4) { + // file max data read + if (1 == sscanf(args[3], "%zu", &datasize)) { + if ((ssize_t) datasize >= st.st_size) + datasize = st.st_size; + } else { + error_msg("r requires an argument 'datasize' which can be casted as a positive integer"); + return; + } + } else { + datasize = st.st_size; + } + // Now read datasize bytes + char *buffer_read_file = NULL; + struct data_write *d_w = generate_data_write(address, datasize, buffer_read_file); + send_command(OP_READ_FILE, d_w, leader); + // /!\ buffer_read_file was allocated in send_command + debug("Write data in file (READ OP)", 0); + printf("Data Read :: %zu", d_w->size); + size_t r_c = fwrite(d_w->data, sizeof(char), d_w->size, file_read); + if (r_c != d_w->size) + error_msg("WARNING: file bytes write different from bytes asked"); + // OLD error_msg("r requires an argument 'datasize' which can be casted as a positive integer"); } } else { error_msg("r require an argument 'address' which can be casted as a positive integer"); @@ -288,8 +350,7 @@ void execute(char **args, unsigned short leader) { } else { error_msg("w require an argument 'address' which can be casted as a positive integer"); } - } - else { + } else { error_msg("command not found, see 'h' for help"); } diff --git a/src/cli/user.c b/src/cli/user.c index 29b67af..78c928c 100644 --- a/src/cli/user.c +++ b/src/cli/user.c @@ -70,21 +70,48 @@ void send_read(void *data, unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); - MPI_Isend((void *) m, sizeof(struct message), MPI_CHAR, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); - // TODO RECV Read data ! - MPI_Status st; - MPI_Request r2; - void *buff = malloc(sizeof(char) * (d_r->size + 1)); - MPI_Irecv(buff, d_r->size * sizeof(char), MPI_CHAR, leader, 0, MPI_COMM_WORLD, &r2); - while (0 != MPI_Wait(&r, &st)) { - char *c_buff = buff; - c_buff[d_r->size] = '\0'; - char *msg = "Read : "; - strcat(msg, c_buff); - debug(msg, DEF_NODE_USER); - } + struct message m2; + MPI_Status st2; + MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &st2); + debug("RECV OK with datasize read", 0); + size_t real_data_size = m2.size; + MPI_Status st3; + char *r_data = malloc(sizeof(char) * (2 + real_data_size)); + MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &st3); + debug("Read:", 0); + r_data[real_data_size + 1] = '\0'; + debug_n(r_data, 0, real_data_size); + free(m); +} + +void send_read_file(void *data, unsigned short leader) { + MPI_Request r; + // MPI_Status st; + struct data_write *d_r = data; + if (d_r->size == 0) + d_r->size = 40000; + + struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + d_r->address, d_r->size, OP_READ); + + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + + struct message *m2 = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + d_r->address, d_r->size, OP_READ); + MPI_Status st2; + MPI_Recv(m2, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &st2); + debug("RECV OK with datasize read", 0); + size_t real_data_size = m2->size; + MPI_Status st3; + d_r->data = malloc(sizeof(char) * (2 +real_data_size)); + d_r->size = real_data_size; + debug("Read Recv Data", 0); + MPI_Recv(d_r->data, real_data_size * sizeof(char), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &st3); + printf("Data Read :: %zu", d_r->size); free(m); + free(m2); } void send_dump(void *data, unsigned short leader) { @@ -134,6 +161,10 @@ void send_command(enum operation op, void *data, unsigned short leader) { debug("Send Dump All", 0); send_dump_all(leader); break; + case OP_READ_FILE: + debug("Send Read (file)", 0); + send_read_file(data, leader); + break; default: break; } From c683ba92094d5df7037c8ad2284235260b1e166f Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 12:07:57 +0200 Subject: [PATCH 10/31] leader: read correct send to user --- src/network/leader.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/network/leader.c b/src/network/leader.c index 4025518..02d1626 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -324,7 +324,14 @@ void execute_read(struct leader_resources *l_r) { free(m); } debug("READ AND ASSEMBLED", l_r->id); - debug_n(read_buff, l_r->id, d_r->size); + debug_n(read_buff, l_r->id, nb_read_size); + // Send Result Read to User + debug("Send to User OK + Data Read size", l_r->id); + struct message *mU = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, 0, nb_read_size, OP_OK); + MPI_Send(mU, sizeof(struct message), MPI_BYTE, mU->id_t, 0, MPI_COMM_WORLD); + debug("Send to User Assemebled Read Data", l_r->id); + MPI_Send(read_buff, nb_read_size, MPI_BYTE, mU->id_t, 0, MPI_COMM_WORLD); + free(mU); } void execute_write(struct leader_resources *l_r) { From ff9cd40552cccd2c18f6866889d370b1961cac35 Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Tue, 9 Jul 2019 23:43:44 +0200 Subject: [PATCH 11/31] handle-dead-leader: change cli loop --- include/cli.h | 2 +- src/cli/cli.c | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/include/cli.h b/include/cli.h index 22940dd..d8a87a9 100644 --- a/include/cli.h +++ b/include/cli.h @@ -10,7 +10,7 @@ #include "message.h" #include "utils.h" -void start_cli(); +void start_cli(unsigned size); void send_command(enum operation op, void *data, unsigned short leader); diff --git a/src/cli/cli.c b/src/cli/cli.c index 2e6e789..8c92105 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -356,19 +356,24 @@ void execute(char **args, unsigned short leader) { } -unsigned short get_leader() { +unsigned short start_leader_election(unsigned size) { + struct message *b = generate_message(0, 0, 0, 0, 0, OP_START_LEADER); + broadcast_message(b, 0, size); + free(b); struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("Got Leader !", DEF_NODE_USER); + free(m); return m->id_o; } -void start_cli() { +void start_cli(unsigned size) { int no_quit = 1; char *cmd; char **args; - unsigned short leader = get_leader(); + struct queue *q = queue_init(); + unsigned short leader = start_leader_election(size); while (no_quit) { fflush(0); @@ -384,7 +389,10 @@ void start_cli() { } // DEBUG print_args(args); - + struct message *m_verif = generate_message_a(0, leader, 0, 0, 0, OP_IS_ALIVE, 1); + if (!send_safe_message(m_verif, q)) + leader = start_leader_election(size); + free(m_verif); execute(args, leader); free(cmd); From c3c15aeba6cc2565f8e28a06a5d68e90b8f43513 Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Tue, 9 Jul 2019 23:44:55 +0200 Subject: [PATCH 12/31] handle-dead-leader: add START_LEADER and IS_ALIVE op --- include/message.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/message.h b/include/message.h index ab2c779..421de35 100644 --- a/include/message.h +++ b/include/message.h @@ -12,6 +12,7 @@ enum operation { OP_READ_FILE, OP_SNAP, OP_LEADER, + OP_START_LEADER, OP_WHOISLEADER, OP_REVIVE, OP_KILL, @@ -20,6 +21,7 @@ enum operation { OP_DUMP, OP_DUMP_ALL, OP_LEADER_OK, + OP_IS_ALIVE, OP_ALIVE, OP_LEADER_AGAIN }; From e8aa41873125a07e3c8ad798a56628c5d0e7f843 Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Tue, 9 Jul 2019 23:45:26 +0200 Subject: [PATCH 13/31] handle-dead-leader: WIP change node loop --- include/node.h | 2 +- src/main/main.c | 31 ++++++++----------------------- src/network/node.c | 23 +++++++++++------------ 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/include/node.h b/include/node.h index e09f9f5..b7850fc 100644 --- a/include/node.h +++ b/include/node.h @@ -15,6 +15,6 @@ struct node { struct node *generate_node(unsigned short id, size_t size); -void node_cycle(struct node *n); +int node_cycle(struct node *n); #endif /* !DISTRIBUTEDMALLOC_NODE_H */ diff --git a/src/main/main.c b/src/main/main.c index 9ac4384..f92503a 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -37,37 +37,22 @@ int main(int argc, char **argv) { if (rank == DEF_NODE_USER) { printf("starting %d processes\n", size); debug("Start User", DEF_NODE_USER); - start_cli(); + start_cli(size); } else { debug("Start Node", rank); - //if (rank != 1) { - - unsigned leader = leader_election(rank, size); - - printf("Node %u finished election. Leader is: %u\n", rank, leader); - - // Node Creation - struct node *n = generate_node(rank, DEF_NODE_SIZE); - // Form rank number ! - - // Start Leader ! - if (n->id == leader) { + struct node *n = generate_node(rank, DEF_NODE_SIZE); + int new_leader = 0; + do { + n->isleader = 0; + new_leader = node_cycle(n); + if (new_leader == n->id) { n->isleader = 1; leader_loop(n, DEF_NODE_USER, size - 1); } - - node_cycle(n); - - /* - while (1) { - // routine - } - */ - //} - + } while (new_leader); } MPI_Finalize(); diff --git a/src/network/node.c b/src/network/node.c index 0993952..cbae2a8 100644 --- a/src/network/node.c +++ b/src/network/node.c @@ -2,6 +2,7 @@ #include #include #include +#include #include "node.h" struct node *generate_node(unsigned short id, size_t size) { @@ -25,8 +26,7 @@ void write_on_node(struct node *n, size_t address, char *data, size_t size) { memcpy((void *) mem_op_ptr, (void *) data, size); debug("Write done of :", n->id); debug_n((char *) n->memory, n->id, n->size); - } - else { + } else { // printf("aske_addr %zu, ask_mem %zu, size_mem %zu\n\n", address, size, n->size); debug("OP WRITE FAILED", n->id); } @@ -43,21 +43,22 @@ void read_on_node(struct node *n, size_t address, char *data, size_t size) { if (address + size <= n->size) { void *mem_op_ptr = (n->memory + address); memcpy((void *) data, (void *) mem_op_ptr, size); - } - else { + } else { debug("OP READ FAILED", n->id); } } -void node_cycle(struct node *n) { +int node_cycle(struct node *n) { while (1) { // cycle of node - // OLD FIXME struct queue *q = queue_init(); - struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); - MPI_Status st; - MPI_Recv(m, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, 3, MPI_COMM_WORLD, &st); + struct queue *q = queue_init(); + struct message *m = receive_message(q); debug("Recv OP", n->id); switch (m->op) { + case OP_START_LEADER: + queue_free(q); + free(m); + return leader_election(n->id, n->size); case OP_OK: break; case OP_WRITE: { @@ -84,8 +85,6 @@ void node_cycle(struct node *n) { default: break; } - if (m->op == OP_KILL) - break; free(m); - } + } // while(1) } From 1f9100791c3cbe315859dea71568b336c5884f6b Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Wed, 10 Jul 2019 12:07:24 +0200 Subject: [PATCH 14/31] handle-dead-leader: leader answer ALIVE + fix all tags in Send and Recv --- include/communication.h | 2 +- include/node.h | 2 +- src/cli/cli.c | 3 ++- src/cli/user.c | 21 ++++++++++---------- src/main/main.c | 4 +++- src/network/leader.c | 43 +++++++++++++++++++++++++++-------------- src/network/node.c | 13 +++++++------ 7 files changed, 53 insertions(+), 35 deletions(-) diff --git a/include/communication.h b/include/communication.h index 573afbd..2073697 100644 --- a/include/communication.h +++ b/include/communication.h @@ -8,7 +8,7 @@ // Number of attempts failed before considering the node as dead. #define NB_ITER 60 -#define TAG_MSG 201 +#define TAG_MSG 3 #define TAG_DATA 4 /* diff --git a/include/node.h b/include/node.h index b7850fc..e09f9f5 100644 --- a/include/node.h +++ b/include/node.h @@ -15,6 +15,6 @@ struct node { struct node *generate_node(unsigned short id, size_t size); -int node_cycle(struct node *n); +void node_cycle(struct node *n); #endif /* !DISTRIBUTEDMALLOC_NODE_H */ diff --git a/src/cli/cli.c b/src/cli/cli.c index 8c92105..c4ca098 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -363,8 +363,9 @@ unsigned short start_leader_election(unsigned size) { struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("Got Leader !", DEF_NODE_USER); + unsigned short leader = m->id_o; free(m); - return m->id_o; + return leader; } void start_cli(unsigned size) { diff --git a/src/cli/user.c b/src/cli/user.c index 78c928c..0f82d3b 100644 --- a/src/cli/user.c +++ b/src/cli/user.c @@ -8,6 +8,7 @@ #include #include #include +#include void send_write(void *data, unsigned short leader) { MPI_Request r; @@ -16,10 +17,10 @@ void send_write(void *data, unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_w->address, d_w->size, OP_WRITE); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); debug("Send Data For Write OP", 0); - MPI_Isend(d_w->data, d_w->size, MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend(d_w->data, d_w->size, MPI_BYTE, m->id_t, TAG_DATA, MPI_COMM_WORLD, &r); free(m); } @@ -28,15 +29,15 @@ void send_malloc(void *data, unsigned short leader) { // MPI_Status st; struct data_size *d_s = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, - 0, d_s->size, OP_MALLOC); + TAG_MSG, d_s->size, OP_MALLOC); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); // TODO RECV Address ! MPI_Status st; MPI_Request r2; void *buff = generate_message(0, 0, 0, 0, 0, OP_NONE); - MPI_Irecv(buff, sizeof(struct message), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &r2); + MPI_Irecv(buff, sizeof(struct message), MPI_BYTE, leader, TAG_MSG, MPI_COMM_WORLD, &r2); /* while (0 != MPI_Wait(&r, &st)) { char *msg = "Address from malloc operation of size "; @@ -70,16 +71,16 @@ void send_read(void *data, unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); struct message m2; MPI_Status st2; - MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &st2); + MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &st2); debug("RECV OK with datasize read", 0); size_t real_data_size = m2.size; MPI_Status st3; char *r_data = malloc(sizeof(char) * (2 + real_data_size)); - MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &st3); + MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); debug("Read:", 0); r_data[real_data_size + 1] = '\0'; debug_n(r_data, 0, real_data_size); @@ -121,7 +122,7 @@ void send_dump(void *data, unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_a->address, 0, OP_DUMP); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); } @@ -132,7 +133,7 @@ void send_dump_all(unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, 0, 0, OP_DUMP_ALL); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); } diff --git a/src/main/main.c b/src/main/main.c index f92503a..37c5901 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -47,7 +47,9 @@ int main(int argc, char **argv) { int new_leader = 0; do { n->isleader = 0; - new_leader = node_cycle(n); + node_cycle(n); + new_leader = leader_election(n->id, size); + printf("NEW LEADER = %d\n", new_leader); if (new_leader == n->id) { n->isleader = 1; leader_loop(n, DEF_NODE_USER, size - 1); diff --git a/src/network/leader.c b/src/network/leader.c index 02d1626..e750adf 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -164,12 +164,18 @@ void get_command(struct leader_resources *l_r, unsigned short user) { int count = sizeof(struct message); MPI_Request r; MPI_Status st; - MPI_Irecv(&buff, count, MPI_BYTE, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &r); + MPI_Irecv(&buff, count, MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r); if (0 == MPI_Wait(&r, &st)) { struct message *m = &buff; struct command_queue *n_command = generate_command_queue(m->op, NULL); switch (m->op) { + case OP_IS_ALIVE: + debug("Leader recv OP IS ALIVE from User", l_r->id); + n_command->command = m->op; + n_command->data = NULL; + l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); + break; case OP_OK: if (m->id_s == 0) { debug("Leader recv OP OK from User", l_r->id); @@ -184,7 +190,7 @@ void get_command(struct leader_resources *l_r, unsigned short user) { struct data_write *d_w = generate_data_write(m->address, m->size, NULL); void *wbuff = malloc(sizeof(char) * (m->size + 2)); debug("Leader wait DATA from User for OP WRITE", l_r->id); - MPI_Irecv(wbuff, m->size * sizeof(char), MPI_BYTE, user, 0, MPI_COMM_WORLD, &r); + MPI_Irecv(wbuff, m->size * sizeof(char), MPI_BYTE, user, TAG_DATA, MPI_COMM_WORLD, &r); if (0 == MPI_Wait(&r, &st)) { d_w->data = wbuff; n_command->data = d_w; @@ -244,7 +250,7 @@ void execute_malloc(struct leader_resources *l_r) { debug("Out of memory", l_r->id); struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, SIZE_MAX, 0, OP_MALLOC); MPI_Request r; - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); return; } @@ -254,7 +260,7 @@ void execute_malloc(struct leader_resources *l_r) { } struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, SIZE_MAX, SIZE_MAX, OP_MALLOC); MPI_Request r; - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); return; } @@ -262,7 +268,7 @@ void execute_malloc(struct leader_resources *l_r) { printf("%zu of %zu bytes\n", v_addr, sss); struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, v_addr, sss, OP_MALLOC); MPI_Request r; - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); } @@ -314,10 +320,10 @@ void execute_read(struct leader_resources *l_r) { // 3 Send READ OP to each node (Warning to the local address of the node, not the virtual) struct message *m = generate_message(l_r->id, b->id, b->id, local_address, to_read_size, OP_READ); debug("Send OP Read", l_r->id); - MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD); + MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD); void *buff = malloc(sizeof(char) * (to_read_size + 1)); MPI_Status st; - MPI_Recv(buff, to_read_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD, &st); + MPI_Recv(buff, to_read_size, MPI_BYTE, b->id, TAG_DATA, MPI_COMM_WORLD, &st); memcpy((void *) (read_buff + (offset * sizeof(char))), buff, to_read_size); offset += to_read_size; nb_read++; @@ -385,13 +391,13 @@ void execute_write(struct leader_resources *l_r) { // 3 Send Write OP to each node (Warning to the local address of the node, not the virtual) struct message *m = generate_message(l_r->id, b->id, b->id, local_address, to_write_size, OP_WRITE); debug("Send Write OP", l_r->id); - MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD); + MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD); struct message m2; MPI_Status st; - MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD, &st); + MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD, &st); debug("Send Data", l_r->id); // debug_n(d_w->data, l_r->id, d_w->size); - MPI_Send((void *) ((char *) d_w->data + offset), to_write_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD); + MPI_Send((void *) ((char *) d_w->data + offset), to_write_size, MPI_BYTE, b->id, TAG_DATA, MPI_COMM_WORLD); offset += to_write_size; } @@ -421,10 +427,10 @@ void execute_dump(struct leader_resources *l_r) { struct block *b = c_a->parts[i]; struct message *m = generate_message(l_r->id, b->id, b->id, b->node_address, b->size, OP_READ); debug("Send Read OP", l_r->id); - MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD); + MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD); void *buff = malloc(sizeof(char) * (b->size + 1)); MPI_Status st; - MPI_Recv(buff, b->size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD, &st); + MPI_Recv(buff, b->size, MPI_BYTE, b->id, TAG_DATA, MPI_COMM_WORLD, &st); memcpy((void *) (dump + (offset * sizeof(char))), buff, b->size); offset += b->size; } @@ -439,9 +445,16 @@ void execute_dump_all(struct leader_resources *l_r) { } void execute_command(struct leader_resources *l_r) { - if (peek_user_command(l_r->leader_command_queue) != OP_NONE) { + while (l_r->leader_command_queue && peek_user_command(l_r->leader_command_queue) != OP_NONE) { switch (peek_user_command(l_r->leader_command_queue)) { + case OP_IS_ALIVE: { + debug("EXECUTE OP ALIVE", l_r->id); + struct message *m_alive = generate_message(l_r->id, 0, 0, 0, 0, OP_ALIVE); + MPI_Send(m_alive, sizeof(*m_alive), MPI_BYTE, 0, TAG_MSG, MPI_COMM_WORLD); + free(m_alive); + break; + } case OP_MALLOC: debug("EXECUTE OP MALLOC LEADER", l_r->id); execute_malloc(l_r); @@ -505,10 +518,10 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n print_allocations_table(l_r); // Get command from user get_command(l_r, terminal_id); - // debug("COMMANDS LISTEN DONE", n->id); + debug("COMMANDS LISTEN DONE", n->id); // Execute Commands execute_command(l_r); - // debug("COMMANDS EXEC DONE", n->id); + debug("COMMANDS EXEC DONE", n->id); // Break on death if (die == 1) break; diff --git a/src/network/node.c b/src/network/node.c index cbae2a8..29de347 100644 --- a/src/network/node.c +++ b/src/network/node.c @@ -48,28 +48,29 @@ void read_on_node(struct node *n, size_t address, char *data, size_t size) { } } -int node_cycle(struct node *n) { +void node_cycle(struct node *n) { + struct queue *q = queue_init(); while (1) { // cycle of node - struct queue *q = queue_init(); struct message *m = receive_message(q); debug("Recv OP", n->id); switch (m->op) { case OP_START_LEADER: queue_free(q); free(m); - return leader_election(n->id, n->size); + debug("START LEADER ELECTION", n->id); + return; case OP_OK: break; case OP_WRITE: { debug("Write OP : send OK", n->id); struct message *mW = generate_message(n->id, m->id_s, n->id, 0, 0, OP_OK); - MPI_Send(mW, sizeof(struct message), MPI_BYTE, mW->id_t, 3, MPI_COMM_WORLD); + MPI_Send(mW, sizeof(struct message), MPI_BYTE, mW->id_t, TAG_MSG, MPI_COMM_WORLD); size_t addr = m->address; size_t size = m->size; char *data = malloc(size * sizeof(char)); MPI_Status st3; - MPI_Recv(data, size, MPI_BYTE, m->id_s, 4, MPI_COMM_WORLD, &st3); + MPI_Recv(data, size, MPI_BYTE, m->id_s, TAG_DATA, MPI_COMM_WORLD, &st3); write_on_node(n, addr, data, size); free(data); } @@ -79,7 +80,7 @@ int node_cycle(struct node *n) { char *data = malloc(m->size * sizeof(char)); read_on_node(n, m->address, data, m->size); debug("Send Read: Data", n->id); - MPI_Send(data, m->size, MPI_BYTE, m->id_s, 4, MPI_COMM_WORLD); + MPI_Send(data, m->size, MPI_BYTE, m->id_s, TAG_DATA, MPI_COMM_WORLD); free(data); break; default: From 5411c7fce287b8e9a7b5b2ef5f292e269bcdc60e Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 12:07:18 +0200 Subject: [PATCH 15/31] user: add read file, write file --- src/cli/cli.c | 19 +++++-------------- src/cli/user.c | 15 ++++++++------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/src/cli/cli.c b/src/cli/cli.c index c4ca098..2e6e789 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -356,25 +356,19 @@ void execute(char **args, unsigned short leader) { } -unsigned short start_leader_election(unsigned size) { - struct message *b = generate_message(0, 0, 0, 0, 0, OP_START_LEADER); - broadcast_message(b, 0, size); - free(b); +unsigned short get_leader() { struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("Got Leader !", DEF_NODE_USER); - unsigned short leader = m->id_o; - free(m); - return leader; + return m->id_o; } -void start_cli(unsigned size) { +void start_cli() { int no_quit = 1; char *cmd; char **args; - struct queue *q = queue_init(); - unsigned short leader = start_leader_election(size); + unsigned short leader = get_leader(); while (no_quit) { fflush(0); @@ -390,10 +384,7 @@ void start_cli(unsigned size) { } // DEBUG print_args(args); - struct message *m_verif = generate_message_a(0, leader, 0, 0, 0, OP_IS_ALIVE, 1); - if (!send_safe_message(m_verif, q)) - leader = start_leader_election(size); - free(m_verif); + execute(args, leader); free(cmd); diff --git a/src/cli/user.c b/src/cli/user.c index 0f82d3b..d8e8f92 100644 --- a/src/cli/user.c +++ b/src/cli/user.c @@ -25,11 +25,12 @@ void send_write(void *data, unsigned short leader) { } void send_malloc(void *data, unsigned short leader) { + debug("SEND MAAAALOC", 0); MPI_Request r; // MPI_Status st; struct data_size *d_s = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, - TAG_MSG, d_s->size, OP_MALLOC); + 0, d_s->size, OP_MALLOC); MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); @@ -71,16 +72,16 @@ void send_read(void *data, unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); struct message m2; MPI_Status st2; - MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &st2); + MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &st2); debug("RECV OK with datasize read", 0); size_t real_data_size = m2.size; MPI_Status st3; char *r_data = malloc(sizeof(char) * (2 + real_data_size)); - MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); + MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &st3); debug("Read:", 0); r_data[real_data_size + 1] = '\0'; debug_n(r_data, 0, real_data_size); @@ -97,19 +98,19 @@ void send_read_file(void *data, unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); struct message *m2 = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); MPI_Status st2; - MPI_Recv(m2, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &st2); + MPI_Recv(m2, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &st2); debug("RECV OK with datasize read", 0); size_t real_data_size = m2->size; MPI_Status st3; d_r->data = malloc(sizeof(char) * (2 +real_data_size)); d_r->size = real_data_size; debug("Read Recv Data", 0); - MPI_Recv(d_r->data, real_data_size * sizeof(char), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &st3); + MPI_Recv(d_r->data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); printf("Data Read :: %zu", d_r->size); free(m); free(m2); From 3968704700ad6d9ec6fdfdcab7da94893d5e9455 Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Tue, 9 Jul 2019 23:43:44 +0200 Subject: [PATCH 16/31] handle-dead-leader: change cli loop --- src/cli/cli.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/cli/cli.c b/src/cli/cli.c index 2e6e789..af9369d 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include "cli.h" #include "communication.h" @@ -356,19 +357,24 @@ void execute(char **args, unsigned short leader) { } -unsigned short get_leader() { +unsigned short start_leader_election(unsigned size) { + struct message *b = generate_message(0, 0, 0, 0, 0, OP_START_LEADER); + broadcast_message(b, 0, size); + free(b); struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("Got Leader !", DEF_NODE_USER); + free(m); return m->id_o; } -void start_cli() { +void start_cli(unsigned size) { int no_quit = 1; char *cmd; char **args; - unsigned short leader = get_leader(); + struct queue *q = queue_init(); + unsigned short leader = start_leader_election(size); while (no_quit) { fflush(0); @@ -384,7 +390,10 @@ void start_cli() { } // DEBUG print_args(args); - + struct message *m_verif = generate_message_a(0, leader, 0, 0, 0, OP_IS_ALIVE, 1); + if (!send_safe_message(m_verif, q)) + leader = start_leader_election(size); + free(m_verif); execute(args, leader); free(cmd); From 847c15b5e16ba8a1211ed2f4e15b7884b68cd21e Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Wed, 10 Jul 2019 12:07:24 +0200 Subject: [PATCH 17/31] handle-dead-leader: leader answer ALIVE + fix all tags in Send and Recv --- src/cli/cli.c | 3 ++- src/cli/user.c | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cli/cli.c b/src/cli/cli.c index af9369d..4d1d56e 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -364,8 +364,9 @@ unsigned short start_leader_election(unsigned size) { struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("Got Leader !", DEF_NODE_USER); + unsigned short leader = m->id_o; free(m); - return m->id_o; + return leader; } void start_cli(unsigned size) { diff --git a/src/cli/user.c b/src/cli/user.c index d8e8f92..c237c2f 100644 --- a/src/cli/user.c +++ b/src/cli/user.c @@ -25,7 +25,6 @@ void send_write(void *data, unsigned short leader) { } void send_malloc(void *data, unsigned short leader) { - debug("SEND MAAAALOC", 0); MPI_Request r; // MPI_Status st; struct data_size *d_s = data; @@ -72,16 +71,16 @@ void send_read(void *data, unsigned short leader) { struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); struct message m2; MPI_Status st2; - MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &st2); + MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &st2); debug("RECV OK with datasize read", 0); size_t real_data_size = m2.size; MPI_Status st3; char *r_data = malloc(sizeof(char) * (2 + real_data_size)); - MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &st3); + MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); debug("Read:", 0); r_data[real_data_size + 1] = '\0'; debug_n(r_data, 0, real_data_size); From 42a41eda94eda48e88df9dcf23554a29ad606aa4 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 12:28:42 +0200 Subject: [PATCH 18/31] user: fixes, and fixes TAG --- src/cli/user.c | 56 ++++++++++++++++++-------------------------------- 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/src/cli/user.c b/src/cli/user.c index c237c2f..776f8ab 100644 --- a/src/cli/user.c +++ b/src/cli/user.c @@ -11,67 +11,53 @@ #include void send_write(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_write *d_w = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_w->address, d_w->size, OP_WRITE); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); debug("Send Data For Write OP", 0); - MPI_Isend(d_w->data, d_w->size, MPI_BYTE, m->id_t, TAG_DATA, MPI_COMM_WORLD, &r); + MPI_Send(d_w->data, d_w->size, MPI_BYTE, m->id_t, TAG_DATA, MPI_COMM_WORLD); free(m); } void send_malloc(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_size *d_s = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, 0, d_s->size, OP_MALLOC); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); // TODO RECV Address ! MPI_Status st; - MPI_Request r2; void *buff = generate_message(0, 0, 0, 0, 0, OP_NONE); - MPI_Irecv(buff, sizeof(struct message), MPI_BYTE, leader, TAG_MSG, MPI_COMM_WORLD, &r2); - /* - while (0 != MPI_Wait(&r, &st)) { - char *msg = "Address from malloc operation of size "; - char size_str[256]; - snprintf(size_str, sizeof(size_str), "%zu", d_s->size); - strcat(msg, size_str); - debug(msg, DEF_NODE_USER); - } - */ - if (0 == MPI_Wait(&r2, &st)) { - struct message *m2 = buff; - if (m2->address != SIZE_MAX) { - printf("user: request malloc of size %zu, is at address %zu on the network\n", - m2->size, - m2->address); - } - else if (m2->size == 0) { - debug("Network: Out of memory", 0); - } else { - debug("Network: Fatal error in leader", 0); - } + MPI_Recv(buff, sizeof(struct message), MPI_BYTE, leader, TAG_MSG, MPI_COMM_WORLD, &st); + + struct message *m2 = buff; + if (m2->address != SIZE_MAX) { + printf("user: request malloc of size %zu, is at address %zu on the network\n", + m2->size, + m2->address); + } else if (m2->size == 0) { + debug("Network: Out of memory", 0); + } else { + debug("Network: Fatal error in leader", 0); } + free(m); free(buff); } void send_read(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_read *d_r = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); struct message m2; MPI_Status st2; @@ -80,7 +66,7 @@ void send_read(void *data, unsigned short leader) { size_t real_data_size = m2.size; MPI_Status st3; char *r_data = malloc(sizeof(char) * (2 + real_data_size)); - MPI_Recv((void*)r_data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); + MPI_Recv((void *) r_data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); debug("Read:", 0); r_data[real_data_size + 1] = '\0'; debug_n(r_data, 0, real_data_size); @@ -106,7 +92,7 @@ void send_read_file(void *data, unsigned short leader) { debug("RECV OK with datasize read", 0); size_t real_data_size = m2->size; MPI_Status st3; - d_r->data = malloc(sizeof(char) * (2 +real_data_size)); + d_r->data = malloc(sizeof(char) * (2 + real_data_size)); d_r->size = real_data_size; debug("Read Recv Data", 0); MPI_Recv(d_r->data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); @@ -116,24 +102,22 @@ void send_read_file(void *data, unsigned short leader) { } void send_dump(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_address *d_a = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_a->address, 0, OP_DUMP); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); free(m); } void send_dump_all(unsigned short leader) { - MPI_Request r; // MPI_Status st; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, 0, 0, OP_DUMP_ALL); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); free(m); } From 14e6f92754b8f5eca7c40759c62db7d54798056a Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 12:28:58 +0200 Subject: [PATCH 19/31] leader: fixes, fixes TAG --- src/network/leader.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/network/leader.c b/src/network/leader.c index e750adf..a85a4ba 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -330,13 +330,13 @@ void execute_read(struct leader_resources *l_r) { free(m); } debug("READ AND ASSEMBLED", l_r->id); - debug_n(read_buff, l_r->id, nb_read_size); + // debug_n(read_buff, l_r->id, nb_read_size); // Send Result Read to User debug("Send to User OK + Data Read size", l_r->id); struct message *mU = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, 0, nb_read_size, OP_OK); - MPI_Send(mU, sizeof(struct message), MPI_BYTE, mU->id_t, 0, MPI_COMM_WORLD); + MPI_Send(mU, sizeof(struct message), MPI_BYTE, mU->id_t, TAG_MSG, MPI_COMM_WORLD); debug("Send to User Assemebled Read Data", l_r->id); - MPI_Send(read_buff, nb_read_size, MPI_BYTE, mU->id_t, 0, MPI_COMM_WORLD); + MPI_Send(read_buff, nb_read_size, MPI_BYTE, mU->id_t, TAG_DATA, MPI_COMM_WORLD); free(mU); } From ab6763b3bf0a61789dfe8da31e7c9e6a0b67e389 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:00:21 +0200 Subject: [PATCH 20/31] message: add OP_TABLE --- include/message.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/message.h b/include/message.h index 421de35..cb2a062 100644 --- a/include/message.h +++ b/include/message.h @@ -10,6 +10,7 @@ enum operation { OP_WRITE, OP_READ, OP_READ_FILE, + OP_TABLE, OP_SNAP, OP_LEADER, OP_START_LEADER, From d5985273396ff8b415cdd904338e912c442c2ee8 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:01:12 +0200 Subject: [PATCH 21/31] user: add free, table send commands from user --- src/cli/user.c | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/cli/user.c b/src/cli/user.c index 776f8ab..25cc24d 100644 --- a/src/cli/user.c +++ b/src/cli/user.c @@ -23,6 +23,25 @@ void send_write(void *data, unsigned short leader) { free(m); } +void send_free(void *data, unsigned short leader) { + // MPI_Status st; + struct data_address *d_a = data; + struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + d_a->address, 0, OP_FREE); + + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); + free(m); +} + +void send_table(unsigned short leader) { + // MPI_Status st; + struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + 0, 0, OP_TABLE); + + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); + free(m); +} + void send_malloc(void *data, unsigned short leader) { // MPI_Status st; struct data_size *d_s = data; @@ -130,6 +149,14 @@ void send_command(enum operation op, void *data, unsigned short leader) { debug("Send Malloc", 0); send_malloc(data, leader); break; + case OP_FREE: + debug("Send Free", 0); + send_free(data, leader); + break; + case OP_TABLE: + debug("Send Table", 0); + send_table(leader); + break; case OP_WRITE: debug("Send Write", 0); send_write(data, leader); From 0706e0cc26e1282a39a110c8bac9481095dfea35 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:01:44 +0200 Subject: [PATCH 22/31] block: add_allocation will first fill NULL entry --- src/network/block.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/network/block.c b/src/network/block.c index a2733db..003de98 100644 --- a/src/network/block.c +++ b/src/network/block.c @@ -66,6 +66,11 @@ struct allocation_register *generate_allocs(size_t size_alloc) { } void add_allocation(struct allocation_register *a_r, struct allocation *a) { + for (size_t i = 0; i < a_r->count_alloc; i++) + if (a_r->allocs[i] == NULL) { + a_r->allocs[i] = a; + return; + } if (a_r->count_alloc >= a_r->size_alloc) { a_r->allocs[a_r->count_alloc] = a; } else { From 0078e3db1fc3640096417a4078b316525a0b9437 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:02:09 +0200 Subject: [PATCH 23/31] leader: start implement free --- src/network/leader.c | 89 +++++++++++++++++++++++++++++++++----------- 1 file changed, 67 insertions(+), 22 deletions(-) diff --git a/src/network/leader.c b/src/network/leader.c index a85a4ba..fb24620 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -16,6 +16,22 @@ size_t get_message_size() { return (sizeof(unsigned short) * 3 + 2 * sizeof(size_t) + sizeof(enum operation)); } +struct address_search *search_at_address(size_t address, struct leader_resources *l_r) { + struct block_register *bs = l_r->leader_blks; + for (size_t i = 0; i < bs->nb_blocks; i++) { + struct block *b = bs->blks[i]; + if (b->virtual_address >= address && b->virtual_address + b->size < address) { + struct address_search *add_s = malloc(sizeof(struct address_search)); + add_s->size = b->size; + add_s->v_address = b->virtual_address; + add_s->n_address = b->node_address; + add_s->r_address = address; + add_s->id = b->id; + return add_s; + } + } + return NULL; +} size_t size_of_allocation(struct allocation *a) { size_t size = 0; @@ -34,8 +50,6 @@ struct allocation *give_for_v_address(struct leader_resources *l_r, size_t v_add struct allocation_register *reg = l_r->leader_reg; for (size_t i = 0; i < reg->count_alloc; i++) { for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { - // FIXME check if its ok - printf("%zu == %zu \n\n", v_address, reg->allocs[i]->parts[j]->virtual_address); if (reg->allocs[i]->parts[j]->virtual_address <= v_address && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > v_address) { *part = j; @@ -57,6 +71,28 @@ struct leader_resources *generate_leader_resources(size_t nb_nodes, size_t id) { return l_r; } +int free_memory(size_t address, struct leader_resources *l_r) { + if (!l_r->leader_reg) + return 0; + struct allocation_register *reg = l_r->leader_reg; + for (size_t i = 0; i < reg->count_alloc; i++) { + for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { + if (reg->allocs[i]->parts[j]->virtual_address <= address + && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > address) { + // Set all block of this alloc as free + for (j = 0; j < reg->allocs[i]->number_parts; j++) { + reg->allocs[i]->parts[j]->free = 0; + } + free(reg->allocs[i]->parts); + free(reg->allocs[i]); + reg->allocs[i] = NULL; + return 0; + } + } + } + return 0; +} + /** * Allocated memory for the User, using free space of nodes * | X1 | | X2 | X1 | @@ -139,23 +175,6 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) { return SIZE_MAX; } -struct address_search *search_at_address(size_t address, struct leader_resources *l_r) { - struct block_register *bs = l_r->leader_blks; - for (size_t i = 0; i < bs->nb_blocks; i++) { - struct block *b = bs->blks[i]; - if (b->virtual_address >= address && b->virtual_address + b->size < address) { - struct address_search *add_s = malloc(sizeof(struct address_search)); - add_s->size = b->size; - add_s->v_address = b->virtual_address; - add_s->n_address = b->node_address; - add_s->r_address = address; - add_s->id = b->id; - return add_s; - } - } - return NULL; -} - void get_command(struct leader_resources *l_r, unsigned short user) { // int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, @@ -207,6 +226,11 @@ void get_command(struct leader_resources *l_r, unsigned short user) { n_command->data = d_r; l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); break; + case OP_TABLE: + debug("Leader recv OP TABLE", l_r->id); + print_allocations_table(l_r); + fflush(0); + break; case OP_MALLOC: debug("Leader recv OP MALLOC from User", l_r->id); n_command->command = m->op; @@ -215,12 +239,20 @@ void get_command(struct leader_resources *l_r, unsigned short user) { n_command->data = d_s; l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); break; + case OP_FREE: + debug("Leader recv OP FREE from User", l_r->id); + n_command->command = m->op; + n_command->data = NULL; + struct data_address *d_a_free = generate_data_address(m->address); + n_command->data = d_a_free; + l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); + break; case OP_DUMP: debug("Leader recv OP DUMP from User", l_r->id); n_command->command = m->op; n_command->data = NULL; - struct data_address *d_a = generate_data_address(m->address); - n_command->data = d_a; + struct data_address *d_a_dump = generate_data_address(m->address); + n_command->data = d_a_dump; l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); break; case OP_DUMP_ALL: @@ -272,6 +304,19 @@ void execute_malloc(struct leader_resources *l_r) { free(m); } +void execute_free(struct leader_resources *l_r) { + struct data_address *d_a = peek_command(l_r->leader_command_queue); + if (!d_a) { + debug("ERROR allocation data_size for OP MALLOC execution [LEADER]", l_r->id); + return; + } + int yes = free_memory(d_a->address, l_r); + if (yes) + debug("Free operation successful", l_r->id); + else + debug("Free operation was invalid", l_r->id); +} + void execute_read(struct leader_resources *l_r) { struct data_read *d_r; d_r = peek_command(l_r->leader_command_queue); @@ -461,6 +506,7 @@ void execute_command(struct leader_resources *l_r) { break; case OP_FREE: debug("EXECUTE OP FREE LEADER", l_r->id); + execute_malloc(l_r); break; case OP_WRITE: debug("EXECUTE OP WRITE LEADER", l_r->id); @@ -515,7 +561,6 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n */ int die = -1; while (1) { - print_allocations_table(l_r); // Get command from user get_command(l_r, terminal_id); debug("COMMANDS LISTEN DONE", n->id); From d05c38f812615c4f9a33ba8f6507c9641488f789 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:06:00 +0200 Subject: [PATCH 24/31] leader: fix NULL allocs handling --- src/network/leader.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/network/leader.c b/src/network/leader.c index fb24620..9d45365 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -49,11 +49,13 @@ struct allocation *give_for_v_address(struct leader_resources *l_r, size_t v_add return NULL; struct allocation_register *reg = l_r->leader_reg; for (size_t i = 0; i < reg->count_alloc; i++) { - for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { - if (reg->allocs[i]->parts[j]->virtual_address <= v_address - && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > v_address) { - *part = j; - return reg->allocs[i]; + if (reg->allocs[i]) { + for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { + if (reg->allocs[i]->parts[j]->virtual_address <= v_address + && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > v_address) { + *part = j; + return reg->allocs[i]; + } } } } From 8b563f2b162775db9568977dac0fecab51478b6b Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Wed, 10 Jul 2019 14:11:08 +0200 Subject: [PATCH 25/31] dead-leader: fix Recv tag --- src/network/communication.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/network/communication.c b/src/network/communication.c index d736583..bb2990d 100644 --- a/src/network/communication.c +++ b/src/network/communication.c @@ -13,7 +13,7 @@ int send_safe_message(struct message *m_send, struct queue *queue) { struct message *m_recv = calloc(1, sizeof(struct message)); MPI_Request r_ok; int f_ok = 0; - MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &r_ok); + MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_ok); struct timespec waiting_time; waiting_time.tv_sec = 0; @@ -35,7 +35,7 @@ int send_safe_message(struct message *m_send, struct queue *queue) { } queue_push_back(queue, m_recv); m_recv = calloc(1, sizeof(struct message)); - MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &r_ok); + MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_ok); } nanosleep(&waiting_time, NULL); } @@ -52,7 +52,7 @@ struct message *receive_message(struct queue *message_queue) { } m_recv = calloc(1, sizeof(struct message)); // What if m_recv->op = OP_ALIVE ? - MPI_Recv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("queue empty, receive message", m_recv->id_t); if (m_recv->need_callback) { debug("send callback", m_recv->id_t); From b24dac4a4f234269f9429ae1968d5b5bc7f27a4e Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:26:03 +0200 Subject: [PATCH 26/31] li: fix w no data, add t command --- src/cli/cli.c | 30 ++++++++++++------------------ src/utils/utils.c | 24 +++++++++++++----------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/cli/cli.c b/src/cli/cli.c index 4d1d56e..366d94c 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -87,6 +87,7 @@ void execute(char **args, unsigned short leader) { if (0 == strcmp(args[0], "h")) { printf("dmalloc commands:\n" " h | display all available commands with their description |\n" + " t | show table of allocations |\n" " m `size` | return `address` to cmd user of the required allocation |\n" " f `address` | free address, Warning if already free |\n" " w `address` `datasize` `data` | write at the address the data of size datasize |\n" @@ -138,6 +139,9 @@ void execute(char **args, unsigned short leader) { size_t address = 0; if (1 == sscanf(args[1], "%zu", &address)) { printf("Execute Free of address : %zu\n", address); + struct data_address *d_a = generate_data_address(address); + send_command(OP_FREE, d_a, leader); + free(d_a); } else { error_msg("f require an argument 'address' which can be casted as a positive integer"); } @@ -157,6 +161,10 @@ void execute(char **args, unsigned short leader) { size_t datasize = 0; if (1 == sscanf(args[1], "%zu", &address)) { if (1 == sscanf(args[2], "%zu", &datasize)) { + if (args[3] == NULL) { + error_msg("w cannot write nothing"); + return; + } printf("Execute Write at %zu of %s : %zu bytes\n", address, args[3], datasize); struct data_write *d_w = generate_data_write(address, datasize, args[3]); send_command(OP_WRITE, d_w, leader); @@ -307,28 +315,14 @@ void execute(char **args, unsigned short leader) { } else { error_msg("w require an argument 'address' which can be casted as a positive integer"); } - } else if (0 == strcmp(args[0], "w")) { + } else if (0 == strcmp(args[0], "t")) { // ERRORS - if (l <= 3) { - error_msg("w require 3 arguments : 'address', 'datasize' and 'data'"); - return; - } else if (l >= 5) { - error_msg("w do not support more than 3 arguments, check command h"); + if (l != 1) { + error_msg("t require no arguments"); return; } + send_command(OP_TABLE, NULL, leader); - // Execution - size_t address = 0; - size_t datasize = 0; - if (1 == sscanf(args[1], "%zu", &address)) { - if (1 == sscanf(args[2], "%zu", &datasize)) { - printf("Execute Write at %zu of %s : %zu bytes\n", address, args[3], datasize); - } else { - error_msg("w require an argument 'datasize' which can be casted as a positive integer"); - } - } else { - error_msg("w require an argument 'address' which can be casted as a positive integer"); - } } else if (0 == strcmp(args[0], "w")) { // ERRORS if (l <= 3) { diff --git a/src/utils/utils.c b/src/utils/utils.c index 29d0b6d..c41e002 100644 --- a/src/utils/utils.c +++ b/src/utils/utils.c @@ -31,17 +31,19 @@ void print_allocations_table(struct leader_resources *l_r) { size_t total_size = 0; for (size_t i = 0; i < a_r->count_alloc; i++) { size = 0; - for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { - size += a_r->allocs[i]->parts[j]->size; - total_size += a_r->allocs[i]->parts[j]->size; - } - printf("%4zu : %9zu : %5zu : %8zu : " - , i, a_r->allocs[i]->v_address_start, a_r->allocs[i]->number_parts, size); - for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { - if (j == a_r->allocs[i]->number_parts - 1) - printf("%d\n", a_r->allocs[i]->parts[j]->id); - else - printf("%d > ", a_r->allocs[i]->parts[j]->id); + if (a_r->allocs[i]) { + for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { + size += a_r->allocs[i]->parts[j]->size; + total_size += a_r->allocs[i]->parts[j]->size; + } + printf("%4zu : %9zu : %5zu : %8zu : ", i, a_r->allocs[i]->v_address_start, a_r->allocs[i]->number_parts, + size); + for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { + if (j == a_r->allocs[i]->number_parts - 1) + printf("%d\n", a_r->allocs[i]->parts[j]->id); + else + printf("%d > ", a_r->allocs[i]->parts[j]->id); + } } } printf(" === ========= ===== ======== =======\n"); From 413991c52e8e576ed39b8a2cf78a752fec17ee1f Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:26:29 +0200 Subject: [PATCH 27/31] block: implement merge_free_block --- src/network/block.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/network/block.c b/src/network/block.c index 003de98..8a1afca 100644 --- a/src/network/block.c +++ b/src/network/block.c @@ -27,7 +27,19 @@ void add_block(struct block *blk, unsigned short id, size_t size, size_t node_ad (void) blk; } -void merge_free_block(struct block_register *blks); +void merge_free_block(struct block_register *blks) { + for (size_t i = 0; i < blks->nb_blocks; i++) { + struct block *b = blks->blks[i]; + while (b && b->next) { + if (b->free == 0 && b->next->free == 0) { + b->size = b->size + b->next->size; + b->next = b->next->next; + } else { + b = b->next; + } + } + } +} struct block_register *init_nodes_same_size(unsigned short nb_nodes, size_t size) { struct block_register *blks = generate_blocks(nb_nodes); From b068d854e05aa724b5f2c3028c5597760190fe67 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:27:02 +0200 Subject: [PATCH 28/31] leader: alloc_memory, launch merge_free before execution for optimized space --- src/network/leader.c | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/network/leader.c b/src/network/leader.c index 9d45365..ada22f2 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -106,6 +106,8 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) { if (size >= l_r->max_memory || size >= l_r->availaible_memory) return SIZE_MAX; struct block_register *blks = l_r->leader_blks; + // Merge free blocks (reform big blocks after frees) + merge_free_block(blks); if (!blks && blks->nb_blocks > 0) { debug("ERROR not malloc blockS !!!", 0); //l_r->id); return 999; @@ -508,7 +510,7 @@ void execute_command(struct leader_resources *l_r) { break; case OP_FREE: debug("EXECUTE OP FREE LEADER", l_r->id); - execute_malloc(l_r); + execute_free(l_r); break; case OP_WRITE: debug("EXECUTE OP WRITE LEADER", l_r->id); @@ -551,16 +553,6 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n debug("START LEADER LOOP", n->id); struct leader_resources *l_r = generate_leader_resources(nb_nodes, n->id); - /* - // MPI_Isend(); - int MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, - MPI_Comm comm, MPI_Request *request) - // MPI_Irecv(); - // MPI_Wait(); - // MPI_Test(); - int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, - int tag, MPI_Comm comm, MPI_Request *request); - */ int die = -1; while (1) { // Get command from user From 6df66626afb0286baca9c3fe770d8016331882c6 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:42:12 +0200 Subject: [PATCH 29/31] free: add available memory --- src/network/leader.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/network/leader.c b/src/network/leader.c index ada22f2..15376f6 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -81,6 +81,7 @@ int free_memory(size_t address, struct leader_resources *l_r) { for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { if (reg->allocs[i]->parts[j]->virtual_address <= address && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > address) { + l_r->availaible_memory += size_of_allocation(reg->allocs[i]); // Set all block of this alloc as free for (j = 0; j < reg->allocs[i]->number_parts; j++) { reg->allocs[i]->parts[j]->free = 0; From ca573921f8fa28275cb4b2d71363f73602473ea2 Mon Sep 17 00:00:00 2001 From: sidore_m Date: Wed, 10 Jul 2019 14:54:26 +0200 Subject: [PATCH 30/31] leader: fix allocation size, fix print --- include/leader.h | 2 ++ src/network/leader.c | 11 ++++++++--- src/utils/utils.c | 7 ++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/include/leader.h b/include/leader.h index 54c52a9..8c48e02 100644 --- a/include/leader.h +++ b/include/leader.h @@ -31,4 +31,6 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n struct allocation *give_for_v_address(struct leader_resources *l_r, size_t v_address, size_t *part); +size_t size_of_allocation(struct allocation *a); + #endif /* !DISTRIBUTEDMALLOC_LEADER_H */ diff --git a/src/network/leader.c b/src/network/leader.c index 15376f6..9ba716e 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -154,14 +154,19 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) { a->parts = NULL; a->v_address_start = SIZE_MAX; ssize_t m_size = size; + size_t m_t_size = 0; for (size_t i = 0; i < blks->nb_blocks; i++) { struct block *b = blks->blks[i]; while (b && b->id != l_r->id && m_size > 0) { if (b->free == 0) { b->free = 1; - m_size -= b->size; - if (m_size < 0) { - b = split_block_u(b, -1 * m_size); + if ((ssize_t) b->size >= m_size) { + m_t_size = m_size; + m_size = 0; + b = split_block_u(b, m_t_size); + } else { + m_t_size = m_size - b->size; + m_size -= b->size; } if (a->v_address_start == SIZE_MAX) a->v_address_start = b->virtual_address; diff --git a/src/utils/utils.c b/src/utils/utils.c index c41e002..f2c4e6f 100644 --- a/src/utils/utils.c +++ b/src/utils/utils.c @@ -30,12 +30,9 @@ void print_allocations_table(struct leader_resources *l_r) { size_t size = 0; size_t total_size = 0; for (size_t i = 0; i < a_r->count_alloc; i++) { - size = 0; if (a_r->allocs[i]) { - for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { - size += a_r->allocs[i]->parts[j]->size; - total_size += a_r->allocs[i]->parts[j]->size; - } + size = size_of_allocation(a_r->allocs[i]); + total_size += size; printf("%4zu : %9zu : %5zu : %8zu : ", i, a_r->allocs[i]->v_address_start, a_r->allocs[i]->number_parts, size); for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { From 4ff125e12cd714b9b26756796b9737a00f600bb7 Mon Sep 17 00:00:00 2001 From: Corentin Mounier Date: Wed, 10 Jul 2019 14:56:21 +0200 Subject: [PATCH 31/31] dev: fix protocole communication --- include/communication.h | 7 ++++--- src/cli/cli.c | 4 ++-- src/network/communication.c | 20 ++++++++++---------- src/network/leader_election.c | 8 ++++---- src/network/node.c | 2 +- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/include/communication.h b/include/communication.h index 2073697..e4dc7e6 100644 --- a/include/communication.h +++ b/include/communication.h @@ -10,18 +10,19 @@ #define TAG_MSG 3 #define TAG_DATA 4 +#define TAG_ELECTION 5 /* * Send the given message 'm' to the destination node * Return 1 if it's a success, 0 if the destination node didn't respond in time */ -int send_safe_message(struct message *m_send, struct queue *queue); +int send_safe_message(struct message *m_send, struct queue *queue, int tag); /* * Receive a message sent with 'send_safe_message' * then send a OP_OK to the source node */ -struct message *receive_message(struct queue *message_queue); +struct message *receive_message(struct queue *message_queue, int tag); /* * Recieve a specific data from a specific node. @@ -34,6 +35,6 @@ void *recieve_data(size_t size, struct queue *queue, unsigned source); * Send message to all nodes except user and itself * m_send->id_t will change according to the target node */ -void broadcast_message(struct message *m_send, unsigned id, unsigned network_size); +void broadcast_message(struct message *m_send, unsigned id, unsigned network_size, int tag); #endif /* !DISTRIBUTEDMALLOC_COMMUNICATION_H */ diff --git a/src/cli/cli.c b/src/cli/cli.c index 366d94c..770c33f 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -353,7 +353,7 @@ void execute(char **args, unsigned short leader) { unsigned short start_leader_election(unsigned size) { struct message *b = generate_message(0, 0, 0, 0, 0, OP_START_LEADER); - broadcast_message(b, 0, size); + broadcast_message(b, 0, size, TAG_MSG); free(b); struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); @@ -386,7 +386,7 @@ void start_cli(unsigned size) { // DEBUG print_args(args); struct message *m_verif = generate_message_a(0, leader, 0, 0, 0, OP_IS_ALIVE, 1); - if (!send_safe_message(m_verif, q)) + if (!send_safe_message(m_verif, q, TAG_MSG)) leader = start_leader_election(size); free(m_verif); execute(args, leader); diff --git a/src/network/communication.c b/src/network/communication.c index bb2990d..2d828b7 100644 --- a/src/network/communication.c +++ b/src/network/communication.c @@ -6,14 +6,14 @@ #include #include "debug.h" -int send_safe_message(struct message *m_send, struct queue *queue) { +int send_safe_message(struct message *m_send, struct queue *queue, int tag) { debug("init send safe message", m_send->id_s); - MPI_Send(m_send, sizeof(struct message), MPI_BYTE, m_send->id_t, TAG_MSG, MPI_COMM_WORLD); + MPI_Send(m_send, sizeof(struct message), MPI_BYTE, m_send->id_t, tag, MPI_COMM_WORLD); struct message *m_recv = calloc(1, sizeof(struct message)); MPI_Request r_ok; int f_ok = 0; - MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_ok); + MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &r_ok); struct timespec waiting_time; waiting_time.tv_sec = 0; @@ -30,12 +30,12 @@ int send_safe_message(struct message *m_send, struct queue *queue) { } if (m_recv->need_callback) { struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); - MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, TAG_MSG, MPI_COMM_WORLD); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, tag, MPI_COMM_WORLD); free(m_alive); } queue_push_back(queue, m_recv); m_recv = calloc(1, sizeof(struct message)); - MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_ok); + MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &r_ok); } nanosleep(&waiting_time, NULL); } @@ -44,7 +44,7 @@ int send_safe_message(struct message *m_send, struct queue *queue) { return 0; } -struct message *receive_message(struct queue *message_queue) { +struct message *receive_message(struct queue *message_queue, int tag) { struct message *m_recv = queue_pop(message_queue); if (m_recv) { debug("queue not empty", m_recv->id_t); @@ -52,22 +52,22 @@ struct message *receive_message(struct queue *message_queue) { } m_recv = calloc(1, sizeof(struct message)); // What if m_recv->op = OP_ALIVE ? - MPI_Recv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("queue empty, receive message", m_recv->id_t); if (m_recv->need_callback) { debug("send callback", m_recv->id_t); struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); - MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, TAG_MSG, MPI_COMM_WORLD); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, tag, MPI_COMM_WORLD); //printf("%u respond to %u %u with a %u __ \n", m_alive->id_s, status.MPI_SOURCE, m_alive->id_t, m_alive->op); free(m_alive); } return m_recv; } -void broadcast_message(struct message *m_send, unsigned id, unsigned network_size) { +void broadcast_message(struct message *m_send, unsigned id, unsigned network_size, int tag) { for (unsigned i = 1; i < network_size; ++i) { m_send->id_t = i; if (i != id) - MPI_Send(m_send, sizeof(struct message), MPI_BYTE, i, TAG_MSG, MPI_COMM_WORLD); + MPI_Send(m_send, sizeof(struct message), MPI_BYTE, i, tag, MPI_COMM_WORLD); } } diff --git a/src/network/leader_election.c b/src/network/leader_election.c index 5b3877b..4ecb028 100644 --- a/src/network/leader_election.c +++ b/src/network/leader_election.c @@ -20,12 +20,12 @@ unsigned leader_election(unsigned id, unsigned network_size) { leader = new_leader; struct message *m_send = generate_message_a(id, next, leader, 0, 0, OP_LEADER, 1); //printf("id:%u next:%u leader:%u op:%u\n", m_send->id_s, m_send->id_t, m_send->id_o, m_send->op); - while (!send_safe_message(m_send, message_queue)) { + while (!send_safe_message(m_send, message_queue, TAG_ELECTION)) { debug("TIMEOUT", id); if (next == leader) { debug("LEADER IS DEAD, STARTING AGAIN", id); struct message *m_again = generate_message(id, 0, leader, 0, 0, OP_LEADER_AGAIN); - broadcast_message(m_again, id, network_size); + broadcast_message(m_again, id, network_size, TAG_ELECTION); free(m_again); free(m_send); return leader_election(id, network_size); // TODO: Check if it works + fix leaks @@ -43,7 +43,7 @@ unsigned leader_election(unsigned id, unsigned network_size) { free(m_send); } //debug("safe message sent", id); - struct message *m_receive = receive_message(message_queue); + struct message *m_receive = receive_message(message_queue, TAG_ELECTION); if (m_receive->op == OP_LEADER_OK) { queue_free(message_queue); @@ -62,7 +62,7 @@ unsigned leader_election(unsigned id, unsigned network_size) { if (m_receive->id_o == id) { // send LEADER OK struct message *m_ok = generate_message(id, 0, leader, 0, 0, OP_LEADER_OK); - broadcast_message(m_ok, id, network_size); + broadcast_message(m_ok, id, network_size, TAG_ELECTION); free(m_ok); // send leader to user diff --git a/src/network/node.c b/src/network/node.c index 29de347..d4a59ee 100644 --- a/src/network/node.c +++ b/src/network/node.c @@ -52,7 +52,7 @@ void node_cycle(struct node *n) { struct queue *q = queue_init(); while (1) { // cycle of node - struct message *m = receive_message(q); + struct message *m = receive_message(q, TAG_MSG); debug("Recv OP", n->id); switch (m->op) { case OP_START_LEADER: