diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c0daa8..61372aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,7 @@ set(SRCS src/utils/debug.c src/cli/user.c src/network/communication.c + src/network/data_communication.c src/utils/queue.c) diff --git a/include/cli.h b/include/cli.h index 22940dd..d8a87a9 100644 --- a/include/cli.h +++ b/include/cli.h @@ -10,7 +10,7 @@ #include "message.h" #include "utils.h" -void start_cli(); +void start_cli(unsigned size); void send_command(enum operation op, void *data, unsigned short leader); diff --git a/include/communication.h b/include/communication.h index e50aa6d..e4dc7e6 100644 --- a/include/communication.h +++ b/include/communication.h @@ -8,16 +8,33 @@ // Number of attempts failed before considering the node as dead. #define NB_ITER 60 +#define TAG_MSG 3 +#define TAG_DATA 4 +#define TAG_ELECTION 5 + /* * Send the given message 'm' to the destination node * Return 1 if it's a success, 0 if the destination node didn't respond in time */ -int send_safe_message(struct message *m_send, struct queue *queue); +int send_safe_message(struct message *m_send, struct queue *queue, int tag); /* * Receive a message sent with 'send_safe_message' * then send a OP_OK to the source node */ -struct message *receive_message(struct queue *message_queue); +struct message *receive_message(struct queue *message_queue, int tag); + +/* + * Recieve a specific data from a specific node. + * All others messages recieved are put in the queue. If needed, a IS_ALIVE is send. + * Return the wanted data, which was calloc. + */ +void *recieve_data(size_t size, struct queue *queue, unsigned source); + +/* + * Send message to all nodes except user and itself + * m_send->id_t will change according to the target node + */ +void broadcast_message(struct message *m_send, unsigned id, unsigned network_size, int tag); #endif /* !DISTRIBUTEDMALLOC_COMMUNICATION_H */ diff --git a/include/globals.h b/include/globals.h index 55c74b5..3f60f3e 100644 --- a/include/globals.h +++ b/include/globals.h @@ -1,7 +1,7 @@ #ifndef DISTRIBUTEDMALLOC_GLOBALS_H #define DISTRIBUTEDMALLOC_GLOBALS_H -#define DEF_NODE_SIZE 8 +#define DEF_NODE_SIZE 128 #define DEF_NODE_USER 0 #define DEF_NODE_LEADER 1 diff --git a/include/leader.h b/include/leader.h index 54c52a9..8c48e02 100644 --- a/include/leader.h +++ b/include/leader.h @@ -31,4 +31,6 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n struct allocation *give_for_v_address(struct leader_resources *l_r, size_t v_address, size_t *part); +size_t size_of_allocation(struct allocation *a); + #endif /* !DISTRIBUTEDMALLOC_LEADER_H */ diff --git a/include/message.h b/include/message.h index 1c63da0..cb2a062 100644 --- a/include/message.h +++ b/include/message.h @@ -9,8 +9,11 @@ enum operation { OP_FREE, OP_WRITE, OP_READ, + OP_READ_FILE, + OP_TABLE, OP_SNAP, OP_LEADER, + OP_START_LEADER, OP_WHOISLEADER, OP_REVIVE, OP_KILL, @@ -19,6 +22,7 @@ enum operation { OP_DUMP, OP_DUMP_ALL, OP_LEADER_OK, + OP_IS_ALIVE, OP_ALIVE, OP_LEADER_AGAIN }; diff --git a/src/cli/cli.c b/src/cli/cli.c index c06ad53..770c33f 100644 --- a/src/cli/cli.c +++ b/src/cli/cli.c @@ -3,7 +3,10 @@ #include #include #include +#include +#include #include "cli.h" +#include "communication.h" char *read_cmd() { ssize_t buffer_size = 256; @@ -84,10 +87,15 @@ void execute(char **args, unsigned short leader) { if (0 == strcmp(args[0], "h")) { printf("dmalloc commands:\n" " h | display all available commands with their description |\n" + " t | show table of allocations |\n" " m `size` | return `address` to cmd user of the required allocation |\n" " f `address` | free address, Warning if already free |\n" " w `address` `datasize` `data` | write at the address the data of size datasize |\n" + " w `address` `file` | write all content of file at address |\n" + " w `address` `file` `datasize` | write datasize bytes from file to the address |\n" " r `address` `datasize` | read datasize bytes at address |\n" + " r `address` `file` | read all bytes of the block at address into file |\n" + " r `address` `file` `datasize` | read datasize bytes at address into file |\n" " d `address` | dump in as text all data of the block stored in address |\n" " d net | dump all allocation |\n" " d `address` `file` | dump address data in file |\n" @@ -131,13 +139,17 @@ void execute(char **args, unsigned short leader) { size_t address = 0; if (1 == sscanf(args[1], "%zu", &address)) { printf("Execute Free of address : %zu\n", address); + struct data_address *d_a = generate_data_address(address); + send_command(OP_FREE, d_a, leader); + free(d_a); } else { error_msg("f require an argument 'address' which can be casted as a positive integer"); } } else if (0 == strcmp(args[0], "w")) { // ERRORS - if (l <= 3) { - error_msg("w require 3 arguments : 'address', 'datasize' and 'data'"); + if (l <= 2) { + error_msg( + "w require 2-3 arguments : 'address', 'datasize' and 'data'\n OR 'address' 'file' + 'datasize' optional, check h"); return; } else if (l >= 5) { error_msg("w do not support more than 3 arguments, check command h"); @@ -149,12 +161,45 @@ void execute(char **args, unsigned short leader) { size_t datasize = 0; if (1 == sscanf(args[1], "%zu", &address)) { if (1 == sscanf(args[2], "%zu", &datasize)) { + if (args[3] == NULL) { + error_msg("w cannot write nothing"); + return; + } printf("Execute Write at %zu of %s : %zu bytes\n", address, args[3], datasize); struct data_write *d_w = generate_data_write(address, datasize, args[3]); send_command(OP_WRITE, d_w, leader); free(d_w); } else { - error_msg("w requires an argument 'datasize' which can be casted as a positive integer"); + // Write from File + FILE *file_write = fopen(args[2], "r"); + if (!file_write) { + error_msg("w 'address' 'file', file was not possible to open, or doesn't exist"); + return; + } + struct stat st; + /*get the size using stat()*/ + if (stat(args[2], &st) != 0) + return; + if (l == 4) { + // file max data read + if (1 == sscanf(args[3], "%zu", &datasize)) { + if ((ssize_t) datasize >= st.st_size) + datasize = st.st_size; + } else { + error_msg("w requires an argument 'datasize' which can be casted as a positive integer"); + return; + } + } else { + datasize = st.st_size; + } + // Now read datasize + char *buffer_write_file = malloc(sizeof(char) * (2 + datasize)); + size_t r_c = fread(buffer_write_file, sizeof(char), datasize, file_write); + if (r_c != datasize) + error_msg("WARNING: file bytes read different from bytes asked"); + struct data_write *d_w = generate_data_write(address, datasize, buffer_write_file); + send_command(OP_WRITE, d_w, leader); + // OLD error_msg("w requires an argument 'datasize' which can be casted as a positive integer"); } } else { error_msg("w requires an argument 'address' which can be casted as a positive integer"); @@ -162,10 +207,10 @@ void execute(char **args, unsigned short leader) { } else if (0 == strcmp(args[0], "r")) { // ERRORS if (l <= 2) { - error_msg("r requires 2 arguments : 'address' and 'datasize'"); + error_msg("r requires 2-3 arguments : 'address' and 'datasize'\n OR 'address' 'file'\n OR 'address' 'file' 'datasize"); return; - } else if (l >= 4) { - error_msg("r do not support more than 2 arguments, check command h"); + } else if (l >= 5) { + error_msg("r do not support more than 2-3 arguments, check command h"); return; } @@ -179,7 +224,38 @@ void execute(char **args, unsigned short leader) { send_command(OP_READ, d_r, leader); free(d_r); } else { - error_msg("r requires an argument 'datasize' which can be casted as a positive integer"); + // Read to File + FILE *file_read = fopen(args[2], "w"); + if (!file_read) { + error_msg("r 'address' 'file', file was not possible to open, or doesn't exist"); + return; + } + struct stat st; + if (stat(args[2], &st) != 0) + return; + if (l == 4) { + // file max data read + if (1 == sscanf(args[3], "%zu", &datasize)) { + if ((ssize_t) datasize >= st.st_size) + datasize = st.st_size; + } else { + error_msg("r requires an argument 'datasize' which can be casted as a positive integer"); + return; + } + } else { + datasize = st.st_size; + } + // Now read datasize bytes + char *buffer_read_file = NULL; + struct data_write *d_w = generate_data_write(address, datasize, buffer_read_file); + send_command(OP_READ_FILE, d_w, leader); + // /!\ buffer_read_file was allocated in send_command + debug("Write data in file (READ OP)", 0); + printf("Data Read :: %zu", d_w->size); + size_t r_c = fwrite(d_w->data, sizeof(char), d_w->size, file_read); + if (r_c != d_w->size) + error_msg("WARNING: file bytes write different from bytes asked"); + // OLD error_msg("r requires an argument 'datasize' which can be casted as a positive integer"); } } else { error_msg("r require an argument 'address' which can be casted as a positive integer"); @@ -239,28 +315,14 @@ void execute(char **args, unsigned short leader) { } else { error_msg("w require an argument 'address' which can be casted as a positive integer"); } - } else if (0 == strcmp(args[0], "w")) { + } else if (0 == strcmp(args[0], "t")) { // ERRORS - if (l <= 3) { - error_msg("w require 3 arguments : 'address', 'datasize' and 'data'"); - return; - } else if (l >= 5) { - error_msg("w do not support more than 3 arguments, check command h"); + if (l != 1) { + error_msg("t require no arguments"); return; } + send_command(OP_TABLE, NULL, leader); - // Execution - size_t address = 0; - size_t datasize = 0; - if (1 == sscanf(args[1], "%zu", &address)) { - if (1 == sscanf(args[2], "%zu", &datasize)) { - printf("Execute Write at %zu of %s : %zu bytes\n", address, args[3], datasize); - } else { - error_msg("w require an argument 'datasize' which can be casted as a positive integer"); - } - } else { - error_msg("w require an argument 'address' which can be casted as a positive integer"); - } } else if (0 == strcmp(args[0], "w")) { // ERRORS if (l <= 3) { @@ -283,23 +345,31 @@ void execute(char **args, unsigned short leader) { } else { error_msg("w require an argument 'address' which can be casted as a positive integer"); } + } else { + error_msg("command not found, see 'h' for help"); } } -unsigned short get_leader() { +unsigned short start_leader_election(unsigned size) { + struct message *b = generate_message(0, 0, 0, 0, 0, OP_START_LEADER); + broadcast_message(b, 0, size, TAG_MSG); + free(b); struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); - MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, 201, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(m, sizeof(*m), MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("Got Leader !", DEF_NODE_USER); - return m->id_o; + unsigned short leader = m->id_o; + free(m); + return leader; } -void start_cli() { +void start_cli(unsigned size) { int no_quit = 1; char *cmd; char **args; - unsigned short leader = get_leader(); + struct queue *q = queue_init(); + unsigned short leader = start_leader_election(size); while (no_quit) { fflush(0); @@ -315,7 +385,10 @@ void start_cli() { } // DEBUG print_args(args); - + struct message *m_verif = generate_message_a(0, leader, 0, 0, 0, OP_IS_ALIVE, 1); + if (!send_safe_message(m_verif, q, TAG_MSG)) + leader = start_leader_election(size); + free(m_verif); execute(args, leader); free(cmd); diff --git a/src/cli/user.c b/src/cli/user.c index 29b67af..25cc24d 100644 --- a/src/cli/user.c +++ b/src/cli/user.c @@ -8,104 +8,135 @@ #include #include #include +#include void send_write(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_write *d_w = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_w->address, d_w->size, OP_WRITE); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); debug("Send Data For Write OP", 0); - MPI_Isend(d_w->data, d_w->size, MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Send(d_w->data, d_w->size, MPI_BYTE, m->id_t, TAG_DATA, MPI_COMM_WORLD); + free(m); +} + +void send_free(void *data, unsigned short leader) { + // MPI_Status st; + struct data_address *d_a = data; + struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + d_a->address, 0, OP_FREE); + + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); + free(m); +} + +void send_table(unsigned short leader) { + // MPI_Status st; + struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + 0, 0, OP_TABLE); + + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); free(m); } void send_malloc(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_size *d_s = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, 0, d_s->size, OP_MALLOC); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); // TODO RECV Address ! MPI_Status st; - MPI_Request r2; void *buff = generate_message(0, 0, 0, 0, 0, OP_NONE); - MPI_Irecv(buff, sizeof(struct message), MPI_BYTE, leader, 0, MPI_COMM_WORLD, &r2); - /* - while (0 != MPI_Wait(&r, &st)) { - char *msg = "Address from malloc operation of size "; - char size_str[256]; - snprintf(size_str, sizeof(size_str), "%zu", d_s->size); - strcat(msg, size_str); - debug(msg, DEF_NODE_USER); - } - */ - if (0 == MPI_Wait(&r2, &st)) { - struct message *m2 = buff; - if (m2->address != SIZE_MAX) { - printf("user: request malloc of size %zu, is at address %zu on the network\n", - m2->size, - m2->address); - } - else if (m2->size == 0) { - debug("Network: Out of memory", 0); - } else { - debug("Network: Fatal error in leader", 0); - } + MPI_Recv(buff, sizeof(struct message), MPI_BYTE, leader, TAG_MSG, MPI_COMM_WORLD, &st); + + struct message *m2 = buff; + if (m2->address != SIZE_MAX) { + printf("user: request malloc of size %zu, is at address %zu on the network\n", + m2->size, + m2->address); + } else if (m2->size == 0) { + debug("Network: Out of memory", 0); + } else { + debug("Network: Fatal error in leader", 0); } + free(m); free(buff); } void send_read(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_read *d_r = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_r->address, d_r->size, OP_READ); - MPI_Isend((void *) m, sizeof(struct message), MPI_CHAR, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); + + struct message m2; + MPI_Status st2; + MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &st2); + debug("RECV OK with datasize read", 0); + size_t real_data_size = m2.size; + MPI_Status st3; + char *r_data = malloc(sizeof(char) * (2 + real_data_size)); + MPI_Recv((void *) r_data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); + debug("Read:", 0); + r_data[real_data_size + 1] = '\0'; + debug_n(r_data, 0, real_data_size); + free(m); +} - // TODO RECV Read data ! - MPI_Status st; - MPI_Request r2; - void *buff = malloc(sizeof(char) * (d_r->size + 1)); - MPI_Irecv(buff, d_r->size * sizeof(char), MPI_CHAR, leader, 0, MPI_COMM_WORLD, &r2); - while (0 != MPI_Wait(&r, &st)) { - char *c_buff = buff; - c_buff[d_r->size] = '\0'; - char *msg = "Read : "; - strcat(msg, c_buff); - debug(msg, DEF_NODE_USER); - } +void send_read_file(void *data, unsigned short leader) { + MPI_Request r; + // MPI_Status st; + struct data_write *d_r = data; + if (d_r->size == 0) + d_r->size = 40000; + + struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + d_r->address, d_r->size, OP_READ); + + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); + + struct message *m2 = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, + d_r->address, d_r->size, OP_READ); + MPI_Status st2; + MPI_Recv(m2, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &st2); + debug("RECV OK with datasize read", 0); + size_t real_data_size = m2->size; + MPI_Status st3; + d_r->data = malloc(sizeof(char) * (2 + real_data_size)); + d_r->size = real_data_size; + debug("Read Recv Data", 0); + MPI_Recv(d_r->data, real_data_size * sizeof(char), MPI_BYTE, leader, TAG_DATA, MPI_COMM_WORLD, &st3); + printf("Data Read :: %zu", d_r->size); free(m); + free(m2); } void send_dump(void *data, unsigned short leader) { - MPI_Request r; // MPI_Status st; struct data_address *d_a = data; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, d_a->address, 0, OP_DUMP); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); free(m); } void send_dump_all(unsigned short leader) { - MPI_Request r; // MPI_Status st; struct message *m = generate_message(DEF_NODE_USER, leader, DEF_NODE_LEADER, 0, 0, OP_DUMP_ALL); - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Send((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD); free(m); } @@ -118,6 +149,14 @@ void send_command(enum operation op, void *data, unsigned short leader) { debug("Send Malloc", 0); send_malloc(data, leader); break; + case OP_FREE: + debug("Send Free", 0); + send_free(data, leader); + break; + case OP_TABLE: + debug("Send Table", 0); + send_table(leader); + break; case OP_WRITE: debug("Send Write", 0); send_write(data, leader); @@ -134,6 +173,10 @@ void send_command(enum operation op, void *data, unsigned short leader) { debug("Send Dump All", 0); send_dump_all(leader); break; + case OP_READ_FILE: + debug("Send Read (file)", 0); + send_read_file(data, leader); + break; default: break; } diff --git a/src/main/main.c b/src/main/main.c index 9ac4384..37c5901 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -37,37 +37,24 @@ int main(int argc, char **argv) { if (rank == DEF_NODE_USER) { printf("starting %d processes\n", size); debug("Start User", DEF_NODE_USER); - start_cli(); + start_cli(size); } else { debug("Start Node", rank); - //if (rank != 1) { - - unsigned leader = leader_election(rank, size); - - printf("Node %u finished election. Leader is: %u\n", rank, leader); - - // Node Creation - struct node *n = generate_node(rank, DEF_NODE_SIZE); - // Form rank number ! - - // Start Leader ! - if (n->id == leader) { + struct node *n = generate_node(rank, DEF_NODE_SIZE); + int new_leader = 0; + do { + n->isleader = 0; + node_cycle(n); + new_leader = leader_election(n->id, size); + printf("NEW LEADER = %d\n", new_leader); + if (new_leader == n->id) { n->isleader = 1; leader_loop(n, DEF_NODE_USER, size - 1); } - - node_cycle(n); - - /* - while (1) { - // routine - } - */ - //} - + } while (new_leader); } MPI_Finalize(); diff --git a/src/network/block.c b/src/network/block.c index a2733db..8a1afca 100644 --- a/src/network/block.c +++ b/src/network/block.c @@ -27,7 +27,19 @@ void add_block(struct block *blk, unsigned short id, size_t size, size_t node_ad (void) blk; } -void merge_free_block(struct block_register *blks); +void merge_free_block(struct block_register *blks) { + for (size_t i = 0; i < blks->nb_blocks; i++) { + struct block *b = blks->blks[i]; + while (b && b->next) { + if (b->free == 0 && b->next->free == 0) { + b->size = b->size + b->next->size; + b->next = b->next->next; + } else { + b = b->next; + } + } + } +} struct block_register *init_nodes_same_size(unsigned short nb_nodes, size_t size) { struct block_register *blks = generate_blocks(nb_nodes); @@ -66,6 +78,11 @@ struct allocation_register *generate_allocs(size_t size_alloc) { } void add_allocation(struct allocation_register *a_r, struct allocation *a) { + for (size_t i = 0; i < a_r->count_alloc; i++) + if (a_r->allocs[i] == NULL) { + a_r->allocs[i] = a; + return; + } if (a_r->count_alloc >= a_r->size_alloc) { a_r->allocs[a_r->count_alloc] = a; } else { diff --git a/src/network/communication.c b/src/network/communication.c index c2abc75..2d828b7 100644 --- a/src/network/communication.c +++ b/src/network/communication.c @@ -6,14 +6,14 @@ #include #include "debug.h" -int send_safe_message(struct message *m_send, struct queue *queue) { +int send_safe_message(struct message *m_send, struct queue *queue, int tag) { debug("init send safe message", m_send->id_s); - MPI_Send(m_send, sizeof(struct message), MPI_BYTE, m_send->id_t, 201, MPI_COMM_WORLD); + MPI_Send(m_send, sizeof(struct message), MPI_BYTE, m_send->id_t, tag, MPI_COMM_WORLD); struct message *m_recv = calloc(1, sizeof(struct message)); MPI_Request r_ok; int f_ok = 0; - MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &r_ok); + MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &r_ok); struct timespec waiting_time; waiting_time.tv_sec = 0; @@ -30,12 +30,12 @@ int send_safe_message(struct message *m_send, struct queue *queue) { } if (m_recv->need_callback) { struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); - MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, 201, MPI_COMM_WORLD); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, tag, MPI_COMM_WORLD); free(m_alive); } queue_push_back(queue, m_recv); m_recv = calloc(1, sizeof(struct message)); - MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &r_ok); + MPI_Irecv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &r_ok); } nanosleep(&waiting_time, NULL); } @@ -44,7 +44,7 @@ int send_safe_message(struct message *m_send, struct queue *queue) { return 0; } -struct message *receive_message(struct queue *message_queue) { +struct message *receive_message(struct queue *message_queue, int tag) { struct message *m_recv = queue_pop(message_queue); if (m_recv) { debug("queue not empty", m_recv->id_t); @@ -52,14 +52,22 @@ struct message *receive_message(struct queue *message_queue) { } m_recv = calloc(1, sizeof(struct message)); // What if m_recv->op = OP_ALIVE ? - MPI_Recv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(m_recv, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); debug("queue empty, receive message", m_recv->id_t); if (m_recv->need_callback) { - debug("send callback", m_recv->id_s + 10 * m_recv->id_t); + debug("send callback", m_recv->id_t); struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); - MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, 201, MPI_COMM_WORLD); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, tag, MPI_COMM_WORLD); //printf("%u respond to %u %u with a %u __ \n", m_alive->id_s, status.MPI_SOURCE, m_alive->id_t, m_alive->op); free(m_alive); } return m_recv; } + +void broadcast_message(struct message *m_send, unsigned id, unsigned network_size, int tag) { + for (unsigned i = 1; i < network_size; ++i) { + m_send->id_t = i; + if (i != id) + MPI_Send(m_send, sizeof(struct message), MPI_BYTE, i, tag, MPI_COMM_WORLD); + } +} diff --git a/src/network/data_communication.c b/src/network/data_communication.c new file mode 100644 index 0000000..348c3b8 --- /dev/null +++ b/src/network/data_communication.c @@ -0,0 +1,53 @@ +#include "communication.h" + +#include +#include +#include +#include +#include "debug.h" + + +void *recieve_data(size_t size, struct queue *queue, unsigned source) { + + MPI_Request r_data; + void *data = calloc(1, size); + MPI_Irecv(data, size, MPI_CHAR, source, TAG_DATA, MPI_COMM_WORLD, &r_data); + + MPI_Request r_msg; + struct message *m_recv = calloc(1, sizeof(struct message)); + MPI_Irecv(m_recv, sizeof(struct message), MPI_CHAR, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_msg); + + struct timespec waiting_time; + waiting_time.tv_sec = 0; + waiting_time.tv_nsec = 100000000; + + int f_msg = 0; + int f_data = 0; + + for (int i = 0; i < NB_ITER; i++) { + // Checking messages recieved + MPI_Test(&r_msg, &f_msg, MPI_STATUS_IGNORE); + if (f_msg) { + if (m_recv->need_callback) { + struct message *m_alive = generate_message_a(m_recv->id_t, m_recv->id_s, 0, 0, 0, OP_ALIVE, 0); + MPI_Send(m_alive, sizeof(struct message), MPI_BYTE, m_recv->id_s, TAG_MSG, MPI_COMM_WORLD); + free(m_alive); + } + queue_push_back(queue, m_recv); + m_recv = calloc(1, sizeof(struct message)); + MPI_Irecv(m_recv, sizeof(struct message), MPI_CHAR, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r_msg); + } + // Checking data recieved + MPI_Test(&r_data, &f_data, MPI_STATUS_IGNORE); + if (f_data) { + MPI_Cancel(&r_msg); + return data; + } + nanosleep(&waiting_time, NULL); + } + MPI_Cancel(&r_msg); + MPI_Cancel(&r_data); + free(m_recv); + return 0; + +} \ No newline at end of file diff --git a/src/network/leader.c b/src/network/leader.c index 4025518..9ba716e 100644 --- a/src/network/leader.c +++ b/src/network/leader.c @@ -16,6 +16,22 @@ size_t get_message_size() { return (sizeof(unsigned short) * 3 + 2 * sizeof(size_t) + sizeof(enum operation)); } +struct address_search *search_at_address(size_t address, struct leader_resources *l_r) { + struct block_register *bs = l_r->leader_blks; + for (size_t i = 0; i < bs->nb_blocks; i++) { + struct block *b = bs->blks[i]; + if (b->virtual_address >= address && b->virtual_address + b->size < address) { + struct address_search *add_s = malloc(sizeof(struct address_search)); + add_s->size = b->size; + add_s->v_address = b->virtual_address; + add_s->n_address = b->node_address; + add_s->r_address = address; + add_s->id = b->id; + return add_s; + } + } + return NULL; +} size_t size_of_allocation(struct allocation *a) { size_t size = 0; @@ -33,13 +49,13 @@ struct allocation *give_for_v_address(struct leader_resources *l_r, size_t v_add return NULL; struct allocation_register *reg = l_r->leader_reg; for (size_t i = 0; i < reg->count_alloc; i++) { - for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { - // FIXME check if its ok - printf("%zu == %zu \n\n", v_address, reg->allocs[i]->parts[j]->virtual_address); - if (reg->allocs[i]->parts[j]->virtual_address <= v_address - && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > v_address) { - *part = j; - return reg->allocs[i]; + if (reg->allocs[i]) { + for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { + if (reg->allocs[i]->parts[j]->virtual_address <= v_address + && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > v_address) { + *part = j; + return reg->allocs[i]; + } } } } @@ -57,6 +73,29 @@ struct leader_resources *generate_leader_resources(size_t nb_nodes, size_t id) { return l_r; } +int free_memory(size_t address, struct leader_resources *l_r) { + if (!l_r->leader_reg) + return 0; + struct allocation_register *reg = l_r->leader_reg; + for (size_t i = 0; i < reg->count_alloc; i++) { + for (size_t j = 0; j < reg->allocs[i]->number_parts; j++) { + if (reg->allocs[i]->parts[j]->virtual_address <= address + && reg->allocs[i]->parts[j]->virtual_address + reg->allocs[i]->parts[j]->size > address) { + l_r->availaible_memory += size_of_allocation(reg->allocs[i]); + // Set all block of this alloc as free + for (j = 0; j < reg->allocs[i]->number_parts; j++) { + reg->allocs[i]->parts[j]->free = 0; + } + free(reg->allocs[i]->parts); + free(reg->allocs[i]); + reg->allocs[i] = NULL; + return 0; + } + } + } + return 0; +} + /** * Allocated memory for the User, using free space of nodes * | X1 | | X2 | X1 | @@ -68,6 +107,8 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) { if (size >= l_r->max_memory || size >= l_r->availaible_memory) return SIZE_MAX; struct block_register *blks = l_r->leader_blks; + // Merge free blocks (reform big blocks after frees) + merge_free_block(blks); if (!blks && blks->nb_blocks > 0) { debug("ERROR not malloc blockS !!!", 0); //l_r->id); return 999; @@ -113,14 +154,19 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) { a->parts = NULL; a->v_address_start = SIZE_MAX; ssize_t m_size = size; + size_t m_t_size = 0; for (size_t i = 0; i < blks->nb_blocks; i++) { struct block *b = blks->blks[i]; while (b && b->id != l_r->id && m_size > 0) { if (b->free == 0) { b->free = 1; - m_size -= b->size; - if (m_size < 0) { - b = split_block_u(b, -1 * m_size); + if ((ssize_t) b->size >= m_size) { + m_t_size = m_size; + m_size = 0; + b = split_block_u(b, m_t_size); + } else { + m_t_size = m_size - b->size; + m_size -= b->size; } if (a->v_address_start == SIZE_MAX) a->v_address_start = b->virtual_address; @@ -139,23 +185,6 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) { return SIZE_MAX; } -struct address_search *search_at_address(size_t address, struct leader_resources *l_r) { - struct block_register *bs = l_r->leader_blks; - for (size_t i = 0; i < bs->nb_blocks; i++) { - struct block *b = bs->blks[i]; - if (b->virtual_address >= address && b->virtual_address + b->size < address) { - struct address_search *add_s = malloc(sizeof(struct address_search)); - add_s->size = b->size; - add_s->v_address = b->virtual_address; - add_s->n_address = b->node_address; - add_s->r_address = address; - add_s->id = b->id; - return add_s; - } - } - return NULL; -} - void get_command(struct leader_resources *l_r, unsigned short user) { // int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, @@ -164,12 +193,18 @@ void get_command(struct leader_resources *l_r, unsigned short user) { int count = sizeof(struct message); MPI_Request r; MPI_Status st; - MPI_Irecv(&buff, count, MPI_BYTE, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &r); + MPI_Irecv(&buff, count, MPI_BYTE, MPI_ANY_SOURCE, TAG_MSG, MPI_COMM_WORLD, &r); if (0 == MPI_Wait(&r, &st)) { struct message *m = &buff; struct command_queue *n_command = generate_command_queue(m->op, NULL); switch (m->op) { + case OP_IS_ALIVE: + debug("Leader recv OP IS ALIVE from User", l_r->id); + n_command->command = m->op; + n_command->data = NULL; + l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); + break; case OP_OK: if (m->id_s == 0) { debug("Leader recv OP OK from User", l_r->id); @@ -184,7 +219,7 @@ void get_command(struct leader_resources *l_r, unsigned short user) { struct data_write *d_w = generate_data_write(m->address, m->size, NULL); void *wbuff = malloc(sizeof(char) * (m->size + 2)); debug("Leader wait DATA from User for OP WRITE", l_r->id); - MPI_Irecv(wbuff, m->size * sizeof(char), MPI_BYTE, user, 0, MPI_COMM_WORLD, &r); + MPI_Irecv(wbuff, m->size * sizeof(char), MPI_BYTE, user, TAG_DATA, MPI_COMM_WORLD, &r); if (0 == MPI_Wait(&r, &st)) { d_w->data = wbuff; n_command->data = d_w; @@ -201,6 +236,11 @@ void get_command(struct leader_resources *l_r, unsigned short user) { n_command->data = d_r; l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); break; + case OP_TABLE: + debug("Leader recv OP TABLE", l_r->id); + print_allocations_table(l_r); + fflush(0); + break; case OP_MALLOC: debug("Leader recv OP MALLOC from User", l_r->id); n_command->command = m->op; @@ -209,12 +249,20 @@ void get_command(struct leader_resources *l_r, unsigned short user) { n_command->data = d_s; l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); break; + case OP_FREE: + debug("Leader recv OP FREE from User", l_r->id); + n_command->command = m->op; + n_command->data = NULL; + struct data_address *d_a_free = generate_data_address(m->address); + n_command->data = d_a_free; + l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); + break; case OP_DUMP: debug("Leader recv OP DUMP from User", l_r->id); n_command->command = m->op; n_command->data = NULL; - struct data_address *d_a = generate_data_address(m->address); - n_command->data = d_a; + struct data_address *d_a_dump = generate_data_address(m->address); + n_command->data = d_a_dump; l_r->leader_command_queue = push_command(l_r->leader_command_queue, n_command); break; case OP_DUMP_ALL: @@ -244,7 +292,7 @@ void execute_malloc(struct leader_resources *l_r) { debug("Out of memory", l_r->id); struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, SIZE_MAX, 0, OP_MALLOC); MPI_Request r; - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); return; } @@ -254,7 +302,7 @@ void execute_malloc(struct leader_resources *l_r) { } struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, SIZE_MAX, SIZE_MAX, OP_MALLOC); MPI_Request r; - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); return; } @@ -262,10 +310,23 @@ void execute_malloc(struct leader_resources *l_r) { printf("%zu of %zu bytes\n", v_addr, sss); struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, v_addr, sss, OP_MALLOC); MPI_Request r; - MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r); + MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, TAG_MSG, MPI_COMM_WORLD, &r); free(m); } +void execute_free(struct leader_resources *l_r) { + struct data_address *d_a = peek_command(l_r->leader_command_queue); + if (!d_a) { + debug("ERROR allocation data_size for OP MALLOC execution [LEADER]", l_r->id); + return; + } + int yes = free_memory(d_a->address, l_r); + if (yes) + debug("Free operation successful", l_r->id); + else + debug("Free operation was invalid", l_r->id); +} + void execute_read(struct leader_resources *l_r) { struct data_read *d_r; d_r = peek_command(l_r->leader_command_queue); @@ -314,17 +375,24 @@ void execute_read(struct leader_resources *l_r) { // 3 Send READ OP to each node (Warning to the local address of the node, not the virtual) struct message *m = generate_message(l_r->id, b->id, b->id, local_address, to_read_size, OP_READ); debug("Send OP Read", l_r->id); - MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD); + MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD); void *buff = malloc(sizeof(char) * (to_read_size + 1)); MPI_Status st; - MPI_Recv(buff, to_read_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD, &st); + MPI_Recv(buff, to_read_size, MPI_BYTE, b->id, TAG_DATA, MPI_COMM_WORLD, &st); memcpy((void *) (read_buff + (offset * sizeof(char))), buff, to_read_size); offset += to_read_size; nb_read++; free(m); } debug("READ AND ASSEMBLED", l_r->id); - debug_n(read_buff, l_r->id, d_r->size); + // debug_n(read_buff, l_r->id, nb_read_size); + // Send Result Read to User + debug("Send to User OK + Data Read size", l_r->id); + struct message *mU = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, 0, nb_read_size, OP_OK); + MPI_Send(mU, sizeof(struct message), MPI_BYTE, mU->id_t, TAG_MSG, MPI_COMM_WORLD); + debug("Send to User Assemebled Read Data", l_r->id); + MPI_Send(read_buff, nb_read_size, MPI_BYTE, mU->id_t, TAG_DATA, MPI_COMM_WORLD); + free(mU); } void execute_write(struct leader_resources *l_r) { @@ -378,13 +446,13 @@ void execute_write(struct leader_resources *l_r) { // 3 Send Write OP to each node (Warning to the local address of the node, not the virtual) struct message *m = generate_message(l_r->id, b->id, b->id, local_address, to_write_size, OP_WRITE); debug("Send Write OP", l_r->id); - MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD); + MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD); struct message m2; MPI_Status st; - MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD, &st); + MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD, &st); debug("Send Data", l_r->id); // debug_n(d_w->data, l_r->id, d_w->size); - MPI_Send((void *) ((char *) d_w->data + offset), to_write_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD); + MPI_Send((void *) ((char *) d_w->data + offset), to_write_size, MPI_BYTE, b->id, TAG_DATA, MPI_COMM_WORLD); offset += to_write_size; } @@ -414,10 +482,10 @@ void execute_dump(struct leader_resources *l_r) { struct block *b = c_a->parts[i]; struct message *m = generate_message(l_r->id, b->id, b->id, b->node_address, b->size, OP_READ); debug("Send Read OP", l_r->id); - MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD); + MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, TAG_MSG, MPI_COMM_WORLD); void *buff = malloc(sizeof(char) * (b->size + 1)); MPI_Status st; - MPI_Recv(buff, b->size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD, &st); + MPI_Recv(buff, b->size, MPI_BYTE, b->id, TAG_DATA, MPI_COMM_WORLD, &st); memcpy((void *) (dump + (offset * sizeof(char))), buff, b->size); offset += b->size; } @@ -432,15 +500,23 @@ void execute_dump_all(struct leader_resources *l_r) { } void execute_command(struct leader_resources *l_r) { - if (peek_user_command(l_r->leader_command_queue) != OP_NONE) { + while (l_r->leader_command_queue && peek_user_command(l_r->leader_command_queue) != OP_NONE) { switch (peek_user_command(l_r->leader_command_queue)) { + case OP_IS_ALIVE: { + debug("EXECUTE OP ALIVE", l_r->id); + struct message *m_alive = generate_message(l_r->id, 0, 0, 0, 0, OP_ALIVE); + MPI_Send(m_alive, sizeof(*m_alive), MPI_BYTE, 0, TAG_MSG, MPI_COMM_WORLD); + free(m_alive); + break; + } case OP_MALLOC: debug("EXECUTE OP MALLOC LEADER", l_r->id); execute_malloc(l_r); break; case OP_FREE: debug("EXECUTE OP FREE LEADER", l_r->id); + execute_free(l_r); break; case OP_WRITE: debug("EXECUTE OP WRITE LEADER", l_r->id); @@ -483,25 +559,14 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n debug("START LEADER LOOP", n->id); struct leader_resources *l_r = generate_leader_resources(nb_nodes, n->id); - /* - // MPI_Isend(); - int MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, - MPI_Comm comm, MPI_Request *request) - // MPI_Irecv(); - // MPI_Wait(); - // MPI_Test(); - int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, - int tag, MPI_Comm comm, MPI_Request *request); - */ int die = -1; while (1) { - print_allocations_table(l_r); // Get command from user get_command(l_r, terminal_id); - // debug("COMMANDS LISTEN DONE", n->id); + debug("COMMANDS LISTEN DONE", n->id); // Execute Commands execute_command(l_r); - // debug("COMMANDS EXEC DONE", n->id); + debug("COMMANDS EXEC DONE", n->id); // Break on death if (die == 1) break; diff --git a/src/network/leader_election.c b/src/network/leader_election.c index d10b63e..4ecb028 100644 --- a/src/network/leader_election.c +++ b/src/network/leader_election.c @@ -20,20 +20,21 @@ unsigned leader_election(unsigned id, unsigned network_size) { leader = new_leader; struct message *m_send = generate_message_a(id, next, leader, 0, 0, OP_LEADER, 1); //printf("id:%u next:%u leader:%u op:%u\n", m_send->id_s, m_send->id_t, m_send->id_o, m_send->op); - while (!send_safe_message(m_send, message_queue)) { + while (!send_safe_message(m_send, message_queue, TAG_ELECTION)) { debug("TIMEOUT", id); if (next == leader) { debug("LEADER IS DEAD, STARTING AGAIN", id); - for (unsigned i = 1; i < network_size; ++i) { - struct message *m_again = generate_message(id, i, leader, 0, 0, OP_LEADER_AGAIN); - MPI_Send(m_again, sizeof(*m_again), MPI_BYTE, i, 201, MPI_COMM_WORLD); - free(m_again); - } + struct message *m_again = generate_message(id, 0, leader, 0, 0, OP_LEADER_AGAIN); + broadcast_message(m_again, id, network_size, TAG_ELECTION); + free(m_again); + free(m_send); return leader_election(id, network_size); // TODO: Check if it works + fix leaks } next = next_id(next, network_size); if (next == id) { debug("i'm alone", id); + queue_free(message_queue); + free(m_send); return 0; } m_send->id_t = next; @@ -42,33 +43,39 @@ unsigned leader_election(unsigned id, unsigned network_size) { free(m_send); } //debug("safe message sent", id); + struct message *m_receive = receive_message(message_queue, TAG_ELECTION); - struct message *m_receive = receive_message(message_queue); - - if (m_receive->op == OP_LEADER_OK) + if (m_receive->op == OP_LEADER_OK) { + queue_free(message_queue); + free(m_receive); return leader; + } - if (m_receive->op == OP_LEADER_AGAIN) + if (m_receive->op == OP_LEADER_AGAIN) { + queue_free(message_queue); + free(m_receive); return leader_election(id, network_size); // TODO: Check if it works + fix leaks + } if (m_receive->op == OP_LEADER) { new_leader = m_receive->id_o; if (m_receive->id_o == id) { // send LEADER OK - for (unsigned i = 1; i < network_size; ++i) { - struct message *m_ok = generate_message(id, i, leader, 0, 0, OP_LEADER_OK); - MPI_Send(m_ok, sizeof(*m_ok), MPI_BYTE, i, 201, MPI_COMM_WORLD); - free(m_ok); - } + struct message *m_ok = generate_message(id, 0, leader, 0, 0, OP_LEADER_OK); + broadcast_message(m_ok, id, network_size, TAG_ELECTION); + free(m_ok); // send leader to user struct message *m_user = generate_message(id, 0, leader, 0, 0, OP_OK); - MPI_Send(m_user, sizeof(*m_user), MPI_BYTE, 0, 201, MPI_COMM_WORLD); + MPI_Send(m_user, sizeof(*m_user), MPI_BYTE, 0, TAG_MSG, MPI_COMM_WORLD); free(m_user); + queue_free(message_queue); + free(m_receive); return leader; } } - } + free(m_receive); + } // while(1) } diff --git a/src/network/node.c b/src/network/node.c index 0993952..d4a59ee 100644 --- a/src/network/node.c +++ b/src/network/node.c @@ -2,6 +2,7 @@ #include #include #include +#include #include "node.h" struct node *generate_node(unsigned short id, size_t size) { @@ -25,8 +26,7 @@ void write_on_node(struct node *n, size_t address, char *data, size_t size) { memcpy((void *) mem_op_ptr, (void *) data, size); debug("Write done of :", n->id); debug_n((char *) n->memory, n->id, n->size); - } - else { + } else { // printf("aske_addr %zu, ask_mem %zu, size_mem %zu\n\n", address, size, n->size); debug("OP WRITE FAILED", n->id); } @@ -43,32 +43,34 @@ void read_on_node(struct node *n, size_t address, char *data, size_t size) { if (address + size <= n->size) { void *mem_op_ptr = (n->memory + address); memcpy((void *) data, (void *) mem_op_ptr, size); - } - else { + } else { debug("OP READ FAILED", n->id); } } void node_cycle(struct node *n) { + struct queue *q = queue_init(); while (1) { // cycle of node - // OLD FIXME struct queue *q = queue_init(); - struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE); - MPI_Status st; - MPI_Recv(m, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, 3, MPI_COMM_WORLD, &st); + struct message *m = receive_message(q, TAG_MSG); debug("Recv OP", n->id); switch (m->op) { + case OP_START_LEADER: + queue_free(q); + free(m); + debug("START LEADER ELECTION", n->id); + return; case OP_OK: break; case OP_WRITE: { debug("Write OP : send OK", n->id); struct message *mW = generate_message(n->id, m->id_s, n->id, 0, 0, OP_OK); - MPI_Send(mW, sizeof(struct message), MPI_BYTE, mW->id_t, 3, MPI_COMM_WORLD); + MPI_Send(mW, sizeof(struct message), MPI_BYTE, mW->id_t, TAG_MSG, MPI_COMM_WORLD); size_t addr = m->address; size_t size = m->size; char *data = malloc(size * sizeof(char)); MPI_Status st3; - MPI_Recv(data, size, MPI_BYTE, m->id_s, 4, MPI_COMM_WORLD, &st3); + MPI_Recv(data, size, MPI_BYTE, m->id_s, TAG_DATA, MPI_COMM_WORLD, &st3); write_on_node(n, addr, data, size); free(data); } @@ -78,14 +80,12 @@ void node_cycle(struct node *n) { char *data = malloc(m->size * sizeof(char)); read_on_node(n, m->address, data, m->size); debug("Send Read: Data", n->id); - MPI_Send(data, m->size, MPI_BYTE, m->id_s, 4, MPI_COMM_WORLD); + MPI_Send(data, m->size, MPI_BYTE, m->id_s, TAG_DATA, MPI_COMM_WORLD); free(data); break; default: break; } - if (m->op == OP_KILL) - break; free(m); - } + } // while(1) } diff --git a/src/utils/utils.c b/src/utils/utils.c index 29d0b6d..f2c4e6f 100644 --- a/src/utils/utils.c +++ b/src/utils/utils.c @@ -30,18 +30,17 @@ void print_allocations_table(struct leader_resources *l_r) { size_t size = 0; size_t total_size = 0; for (size_t i = 0; i < a_r->count_alloc; i++) { - size = 0; - for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { - size += a_r->allocs[i]->parts[j]->size; - total_size += a_r->allocs[i]->parts[j]->size; - } - printf("%4zu : %9zu : %5zu : %8zu : " - , i, a_r->allocs[i]->v_address_start, a_r->allocs[i]->number_parts, size); - for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { - if (j == a_r->allocs[i]->number_parts - 1) - printf("%d\n", a_r->allocs[i]->parts[j]->id); - else - printf("%d > ", a_r->allocs[i]->parts[j]->id); + if (a_r->allocs[i]) { + size = size_of_allocation(a_r->allocs[i]); + total_size += size; + printf("%4zu : %9zu : %5zu : %8zu : ", i, a_r->allocs[i]->v_address_start, a_r->allocs[i]->number_parts, + size); + for (size_t j = 0; j < a_r->allocs[i]->number_parts; j++) { + if (j == a_r->allocs[i]->number_parts - 1) + printf("%d\n", a_r->allocs[i]->parts[j]->id); + else + printf("%d > ", a_r->allocs[i]->parts[j]->id); + } } } printf(" === ========= ===== ======== =======\n");