diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index c08890ac7..797a0fcdd 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -121,6 +121,8 @@ void usage(int argc, const char* argv[]) { lf_print(" The ID of the federation that this RTI will control.\n"); lf_print(" -n, --number_of_federates "); lf_print(" The number of federates in the federation that this RTI will control.\n"); + lf_print(" -nt, --number_of_transient_federates "); + lf_print(" The number of transient federates in the federation that this RTI will control.\n"); lf_print(" -p, --port "); lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, DEFAULT_PORT); @@ -233,6 +235,21 @@ int process_args(int argc, const char* argv[]) { } rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); + } else if (strcmp(argv[i], "-nt") == 0 || strcmp(argv[i], "--number_of_transient_federates") == 0) { + if (argc < i + 2) { + lf_print_error("--number_of_transient_federates needs a valid positive argument."); + usage(argc, argv); + return 0; + } + i++; + long num_transient_federates = strtol(argv[i], NULL, 10); + if (num_transient_federates == LONG_MAX || num_transient_federates == LONG_MIN) { + lf_print_error("--number_of_transient_federates needs a valid positive or null integer argument."); + usage(argc, argv); + return 0; + } + rti.number_of_transient_federates = (int32_t)num_transient_federates; // FIXME: Loses numbers on 64-bit machines + lf_print("RTI: Number of transient federates: %d", rti.number_of_transient_federates); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { if (argc < i + 2) { lf_print_error("--port needs a short unsigned integer argument ( > 0 and < %d).", UINT16_MAX); @@ -275,6 +292,16 @@ int process_args(int argc, const char* argv[]) { return 0; } } + if (rti.base.number_of_scheduling_nodes == 0) { + lf_print_error("--number_of_federates needs a valid positive integer argument."); + usage(argc, argv); + return 0; + } + if (rti.number_of_transient_federates > rti.base.number_of_scheduling_nodes) { + lf_print_error("--number_of_transient_federates cannot be higher than the number of federates."); + usage(argc, argv); + return 0; + } return 1; } int main(int argc, const char* argv[]) { @@ -314,8 +341,8 @@ int main(int argc, const char* argv[]) { lf_print("Tracing the RTI execution in %s file.", rti_trace_file_name); } - lf_print("Starting RTI for %d federates in federation ID %s.", rti.base.number_of_scheduling_nodes, - rti.federation_id); + lf_print("Starting RTI for a total of %d federates, with %d being transient, in federation ID %s", + rti.base.number_of_scheduling_nodes, rti.number_of_transient_federates, rti.federation_id); assert(rti.base.number_of_scheduling_nodes < UINT16_MAX); // Allocate memory for the federates diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index f99edb543..db063bb53 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -42,6 +42,7 @@ void invalidate_min_delays() { node->flags = 0; // All flags cleared because they get set lazily. } free(rti_common->min_delays); + rti_common->min_delays = NULL; } } @@ -101,6 +102,8 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) { if (lf_tag_compare(rti_common->min_delays[i * n + e->id], FOREVER_TAG) != 0) { // Node i is upstream of e with min delay rti_common->min_delays[i * n + e->id] scheduling_node_t* upstream = rti_common->scheduling_nodes[i]; + if (upstream->state == NOT_CONNECTED) + continue; // If we haven't heard from the upstream node, then assume it can send an event at the start time. if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) { tag_t start_tag = {.time = start_time, .microstep = 0}; @@ -163,6 +166,9 @@ tag_t eimt_strict(scheduling_node_t* e) { tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { tag_advance_grant_t result = {.tag = NEVER_TAG, .is_provisional = false}; + // Check how many upstream federates are connected + int num_connected_upstream = 0; + // Find the earliest LTC of upstream scheduling_nodes (M). tag_t min_upstream_completed = FOREVER_TAG; @@ -172,6 +178,7 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { // Ignore this enclave/federate if it is not connected. if (upstream->state == NOT_CONNECTED) continue; + num_connected_upstream++; // Adjust by the "after" delay. // Note that "no delay" is encoded as NEVER, @@ -184,8 +191,15 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) { } LF_PRINT_LOG("RTI: Minimum upstream LTC for federate/enclave %d is " PRINTF_TAG "(adjusted by after delay).", e->id, min_upstream_completed.time - start_time, min_upstream_completed.microstep); - if (lf_tag_compare(min_upstream_completed, e->last_granted) > 0 && - lf_tag_compare(min_upstream_completed, e->next_event) >= 0 // The enclave has to advance its tag + + if (num_connected_upstream == 0) { + // When none of the upstream federates is connected (case of transients), + if (lf_tag_compare(e->next_event, FOREVER_TAG) != 0) { + result.tag = e->next_event; + return result; + } + } else if (lf_tag_compare(min_upstream_completed, e->last_granted) > 0 && + lf_tag_compare(min_upstream_completed, e->next_event) >= 0 // The enclave has to advance its tag ) { result.tag = min_upstream_completed; return result; diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 7f3fd6f15..cda3ce9e8 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -29,6 +29,7 @@ #include "rti_remote.h" #include "net_util.h" #include +#include "clock.h" // For lf_clock_cond_timedwait() // Global variables defined in tag.c: extern instant_t start_time; @@ -38,6 +39,15 @@ extern instant_t start_time; */ static rti_remote_t* rti_remote; +// Referance to the federate instance to support hot swap +static federate_info_t* hot_swap_federate; + +// Indicates if a hot swap process is in progress +static bool hot_swap_in_progress = false; + +// Indicates that the old federate has stopped. +static bool hot_swap_old_resigned = false; + bool _lf_federate_reports_error = false; // A convenient macro for getting the `federate_info_t *` at index `_idx` @@ -45,24 +55,248 @@ bool _lf_federate_reports_error = false; #define GET_FED_INFO(_idx) (federate_info_t*)rti_remote->base.scheduling_nodes[_idx] lf_mutex_t rti_mutex; -lf_cond_t received_start_times; -lf_cond_t sent_start_time; +static lf_cond_t received_start_times; +static lf_cond_t sent_start_time; +static lf_cond_t updated_delayed_grants; extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock(&rti_mutex); } extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock(&rti_mutex); } -void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { - if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 || - lf_tag_compare(tag, e->last_provisionally_granted) < 0) { +// Utility functions to simplify the call of pqueue_tag routines. +// These functions mainly do the casting. +// FIXME: Should we remove the queue parameter from the functions? + +/** + * @brief Creates a priority queue of delayed grants that is sorted by tags. + * + * @param nbr_delayed_grants The size. + * @return The dynamically allocated queue or NULL. + */ +static pqueue_delayed_grants_t* pqueue_delayed_grants_init(uint16_t nbr_delayed_grants) { + return (pqueue_delayed_grants_t*)pqueue_tag_init((size_t)nbr_delayed_grants); +} + +/** + * @brief Return the size of the queue. + * + * @param q The queue. + * @return The size. + */ +static size_t pqueue_delayed_grants_size(pqueue_delayed_grants_t* q) { return pqueue_tag_size((pqueue_tag_t*)q); } + +/** + * @brief Insert an\ delayed grant element into the queue. + * + * @param q The queue. + * @param e The delayed grant element to insert. + * @return 0 on success + */ +static int pqueue_delayed_grants_insert(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* d) { + return pqueue_tag_insert((pqueue_tag_t*)q, (void*)d); +} + +/** + * @brief Pop the least-tag element from the queue. + * + * @param q The queue. + * @return NULL on error, otherwise the entry + */ +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_pop(pqueue_delayed_grants_t* q) { + return (pqueue_delayed_grant_element_t*)pqueue_tag_pop((pqueue_tag_t*)q); +} + +/** + * @brief Return highest-ranking element without removing it. + * + * @param q The queue. + * @return NULL on if the queue is empty, otherwise the delayed grant element. + */ +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_peek(pqueue_delayed_grants_t* q) { + return (pqueue_delayed_grant_element_t*)pqueue_tag_peek((pqueue_tag_t*)q); +} + +/** + * @brief Free all memory used by the queue including elements that are marked dynamic. + * + * @param q The queue. + */ +static void pqueue_delayed_grants_free(pqueue_delayed_grants_t* q) { pqueue_tag_free((pqueue_tag_t*)q); } + +/** + * @brief Remove an item from the delayed grants queue. + * + * @param q The queue. + * @param e The entry to remove. + */ +static void pqueue_delayed_grants_remove(pqueue_delayed_grants_t* q, pqueue_delayed_grant_element_t* e) { + pqueue_tag_remove((pqueue_tag_t*)q, (void*)e); +} + +/** + * @brief Return the first item with the specified tag or NULL if there is none. + * @param q The queue. + * @param t The tag. + * @return An entry with the specified tag or NULL if there isn't one. + */ +pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_with_tag(pqueue_delayed_grants_t* q, tag_t t) { + return (pqueue_delayed_grant_element_t*)pqueue_tag_find_with_tag((pqueue_tag_t*)q, t); +} + +// Function that does not in pqueue_tag.c +/** + * @brief Return the first item with the specified federate id or NULL if there is none. + * @param q The queue. + * @param fed_id The federate id. + * @return An entry with the specified federate if or NULL if there isn't one. + */ +static pqueue_delayed_grant_element_t* pqueue_delayed_grants_find_by_fed_id(pqueue_delayed_grants_t* q, + uint16_t fed_id) { + pqueue_delayed_grant_element_t* dge; + pqueue_t* _q = (pqueue_t*)q; + if (!q || q->size == 1) + return NULL; + for (int i = 1; i < q->size; i++) { + dge = (pqueue_delayed_grant_element_t*)q->d[i]; + if (dge) { + if (dge->fed_id == fed_id) { + return dge; + } + } + } + return NULL; +} + +/** + * @brief Insert the delayed grant into the delayed_grants queue and notify. + * + * This function assumes the caller holds the rti_mutex. + * @param fed The federate. + * @param tag The tag to grant. + * @param is_provisional State whther the grant is provisional. + */ +static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provisional) { + // Check wether there is already a pending grant. + pqueue_delayed_grant_element_t* dge = + pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, fed->enclave.id); + if (dge == NULL) { + pqueue_delayed_grant_element_t* dge = + (pqueue_delayed_grant_element_t*)malloc(sizeof(pqueue_delayed_grant_element_t)); + dge->base.is_dynamic = 1; + dge->base.tag = tag; + dge->fed_id = fed->enclave.id; + dge->is_provisional = is_provisional; + pqueue_delayed_grants_insert(rti_remote->delayed_grants, dge); + LF_PRINT_LOG("RTI: Inserting a delayed grant of " PRINTF_TAG " for federate %d.", dge->base.tag.time - start_time, + dge->base.tag.microstep, dge->fed_id); + lf_cond_signal(&updated_delayed_grants); + } else { + // Note that there should never be more than one pending grant for a federate. + int compare = lf_tag_compare(dge->base.tag, tag); + if (compare > 0) { + // Update the pre-existing grant. + dge->base.tag = tag; + dge->is_provisional = is_provisional; + LF_PRINT_LOG("RTI: Updating a delayed grant of " PRINTF_TAG " for federate %d.", tag.time - start_time, + tag.microstep, dge->fed_id); + lf_cond_signal(&updated_delayed_grants); + } else if (compare == 0) { + if (dge->is_provisional != is_provisional) { + // Update the grant to keep the most recent is_provisional status. + dge->is_provisional = is_provisional; + LF_PRINT_LOG("RTI: Changing status of a delayed grant of " PRINTF_TAG " for federate %d to provisional: %d.", + dge->base.tag.time - start_time, dge->base.tag.microstep, dge->fed_id, is_provisional); + } + } + } +} + +/** + * Find the number of non connected upstream transients + * @param fed The federate + * @return the number of non connected upstream transients + */ +static int get_num_absent_upstream_transients(federate_info_t* fed) { + int num_absent_upstream_transients = 0; + for (int j = 0; j < fed->enclave.num_immediate_upstreams; j++) { + federate_info_t* upstream = GET_FED_INFO(fed->enclave.immediate_upstreams[j]); + // Ignore this enclave if it no longer connected. + if ((upstream->enclave.state == NOT_CONNECTED) && (upstream->is_transient)) { + num_absent_upstream_transients++; + } + } + return num_absent_upstream_transients; +} + +/** + * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the RTI, + * telling it that the specified `upstream` federate is also now connected. + * + * This function assumes that the mutex lock is already held. + * @param destination The destination federate. + * @param disconnected The connected federate. + */ +static void send_upstream_connected_locked(federate_info_t* destination, federate_info_t* connected) { + if (destination->enclave.state == NOT_CONNECTED) { + LF_PRINT_LOG("RTI did not send upstream connected message to federate %d, because it is not connected.", + destination->enclave.id); return; } - // Need to make sure that the destination federate's thread has already - // sent the starting MSG_TYPE_TIMESTAMP message. - while (e->state == PENDING) { - // Need to wait here. - lf_cond_wait(&sent_start_time); + unsigned char buffer[MSG_TYPE_UPSTREAM_CONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_UPSTREAM_CONNECTED; + encode_uint16(connected->enclave.id, &buffer[1]); + if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send upstream connected message to federate %d.", destination->enclave.id); + } +} + +/** + * @brief Send MSG_TYPE_UPSTREAM_DISCONNECTED to the specified federate. + * + * This function assumes that the mutex lock is already held. + * @param destination The destination federate. + * @param disconnected The disconnected federate. + */ +static void send_upstream_disconnected_locked(federate_info_t* destination, federate_info_t* disconnected) { + unsigned char buffer[MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH]; + buffer[0] = MSG_TYPE_UPSTREAM_DISCONNECTED; + encode_uint16(disconnected->enclave.id, &buffer[1]); + if (write_to_socket_close_on_error(&destination->socket, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH, buffer)) { + lf_print_warning("RTI: Failed to send upstream disconnected message to federate %d.", disconnected->enclave.id); } +} + +/** + * @brief Mark a federate as disconnected and, if this is a transient, inform downstream federates. + * @param fed The disconnected federate. + */ +static void notify_federate_disconnected(federate_info_t* fed) { + fed->enclave.state = NOT_CONNECTED; + // Notify downstream federates. Need to hold the mutex lock to do this. + if (fed->is_transient) { + LF_MUTEX_LOCK(&rti_mutex); + for (int j = 0; j < fed->enclave.num_immediate_downstreams; j++) { + federate_info_t* downstream = GET_FED_INFO(fed->enclave.immediate_downstreams[j]); + // Ignore this enclave if it no longer connected. + if (downstream->enclave.state != NOT_CONNECTED) { + // Notify the downstream enclave. + send_upstream_disconnected_locked(downstream, fed); + } + } + LF_MUTEX_UNLOCK(&rti_mutex); + } +} + +/** + * Notify a tag advance grant (TAG) message to the specified federate immediately. + * + * This function will keep a record of this TAG in the enclave's last_granted + * field. + * + * @param e The enclave. + * @param tag The tag to grant. + */ +static void notify_tag_advance_grant_immediate(scheduling_node_t* e, tag_t tag) { size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); unsigned char buffer[message_length]; buffer[0] = MSG_TYPE_TAG_ADVANCE_GRANT; @@ -77,7 +311,8 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // to fail. Consider a failure here a soft failure and update the federate's status. if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - e->state = NOT_CONNECTED; + // Mark a federate as disconnected and inform if necessary + notify_federate_disconnected(GET_FED_INFO(e->id)); } else { e->last_granted = tag; LF_PRINT_LOG("RTI sent to federate %d the tag advance grant (TAG) " PRINTF_TAG ".", e->id, tag.time - start_time, @@ -85,7 +320,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { } } -void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { +void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 || lf_tag_compare(tag, e->last_provisionally_granted) <= 0) { return; @@ -96,6 +331,32 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // Need to wait here. lf_cond_wait(&sent_start_time); } + + // Check if sending the tag advance grant needs to be delayed or not + // Delay is needed when a federate has, at least one, absent upstream transient + federate_info_t* fed = GET_FED_INFO(e->id); + if (!fed->has_upstream_transient_federates) { + notify_tag_advance_grant_immediate(e, tag); + } else { + if (get_num_absent_upstream_transients(fed) > 0) { + notify_grant_delayed(fed, tag, false); + } else { + notify_tag_advance_grant_immediate(e, tag); + } + } +} + +/** + * Notify a provisional tag advance grant (PTAG) message to the specified federate + * immediately. + * + * This function will keep a record of this TAG in the enclave's last_provisionally_granted + * field. + * + * @param e The scheduling node. + * @param tag The tag to grant. + */ +void notify_provisional_tag_advance_grant_immediate(scheduling_node_t* e, tag_t tag) { size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); unsigned char buffer[message_length]; buffer[0] = MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT; @@ -110,7 +371,8 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // to fail. Consider a failure here a soft failure and update the federate's status. if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - e->state = NOT_CONNECTED; + // Mark a federate as disconnected and inform if necessary + notify_federate_disconnected(GET_FED_INFO(e->id)); } else { e->last_provisionally_granted = tag; LF_PRINT_LOG("RTI sent to federate %d the Provisional Tag Advance Grant (PTAG) " PRINTF_TAG ".", e->id, @@ -146,6 +408,32 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { } } +void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { + if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 || + lf_tag_compare(tag, e->last_provisionally_granted) <= 0) { + return; + } + // Need to make sure that the destination federate's thread has already + // sent the starting MSG_TYPE_TIMESTAMP message. + while (e->state == PENDING) { + // Need to wait here. + lf_cond_wait(&sent_start_time); + } + + // Check if sending the tag advance grant needs to be delayed or not + // Delay is needed when a federate has, at least one, absent upstream transient + federate_info_t* fed = GET_FED_INFO(e->id); + if (!fed->has_upstream_transient_federates) { + notify_provisional_tag_advance_grant_immediate(e, tag); + } else { + if (get_num_absent_upstream_transients(fed) > 0) { + notify_grant_delayed(fed, tag, true); + } else { + notify_provisional_tag_advance_grant_immediate(e, tag); + } + } +} + void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) { if (e->state == NOT_CONNECTED) { return; @@ -289,20 +577,22 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff // issue a TAG before this message has been forwarded. LF_MUTEX_LOCK(&rti_mutex); - // If the destination federate is no longer connected, issue a warning, - // remove the message from the socket and return. + // If the destination federate is no longer connected, or it is a transient that has not started executing yet + // (the delayed intended tag is less than the effective start tag of the destination), issue a warning, remove the + // message from the socket, and return. federate_info_t* fed = GET_FED_INFO(federate_id); - if (fed->enclave.state == NOT_CONNECTED) { - lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id); - LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", " - "completed " PRINTF_TAG ", " - "last_granted " PRINTF_TAG ", " - "last_provisionally_granted " PRINTF_TAG ".", - fed->enclave.next_event.time - start_time, fed->enclave.next_event.microstep, - fed->enclave.completed.time - start_time, fed->enclave.completed.microstep, - fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep, - fed->enclave.last_provisionally_granted.time - start_time, - fed->enclave.last_provisionally_granted.microstep); + interval_t delay = NEVER; + for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) { + if (fed->enclave.immediate_upstreams[i] == sending_federate->enclave.id) { + delay = fed->enclave.immediate_upstream_delays[i]; + break; + } + } + if (fed->enclave.state == NOT_CONNECTED || + lf_tag_compare(lf_delay_tag(intended_tag, delay), fed->effective_start_tag) < 0) { + lf_print_warning("RTI: Destination federate %d is not connected at logical time (" PRINTF_TAG + "). Dropping message.", + federate_id, intended_tag.time - start_time, intended_tag.microstep); // If the message was larger than the buffer, we must empty out the remainder also. size_t total_bytes_read = bytes_read; while (total_bytes_read < total_bytes_to_read) { @@ -467,18 +757,21 @@ static void broadcast_stop_time_to_federates_locked() { } /** - * Mark a federate requesting stop. If the number of federates handling stop reaches the - * NUM_OF_FEDERATES, broadcast MSG_TYPE_STOP_GRANTED to every federate. + * Mark a federate requesting stop. If the number of federates handling stop reaches + * the number of persistent federates, broadcast MSG_TYPE_STOP_GRANTED to every federate. * This function assumes the _RTI.mutex is already locked. * @param fed The federate that has requested a stop. * @return 1 if stop time has been sent to all federates and 0 otherwise. */ static int mark_federate_requesting_stop(federate_info_t* fed) { if (!fed->requested_stop) { - rti_remote->base.num_scheduling_nodes_handling_stop++; + // Increment the number of federates handling stop only if it is persistent + if (!fed->is_transient) + rti_remote->base.num_scheduling_nodes_handling_stop++; fed->requested_stop = true; } - if (rti_remote->base.num_scheduling_nodes_handling_stop == rti_remote->base.number_of_scheduling_nodes) { + if (rti_remote->base.num_scheduling_nodes_handling_stop == + (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { // We now have information about the stop time of all // federates. broadcast_stop_time_to_federates_locked(); @@ -680,6 +973,69 @@ void handle_address_ad(uint16_t federate_id) { } } +/** + * @brief Send the global federation start time and the federate-specific starting tag to the specified federate. + * + * For persistent federates and transient federates that happen to join during federation startup, the + * `federation_start_time` will match the time in the `federate_start_tag`, and the microstep will be 0. + * For a transient federate that joins later, the time in the `federate_start_tag` will be greater than the + * federation_start_time`. + * + * + * Before sending the start time and tag, this function notifies my_fed of all upstream transient federates that are + * connected. After sending the start time and tag, and if my_fed is transient, notify federates downstream of its + * connection, ensuring proper handling of zero-delay cycles. + * + * This function assumes that the mutex lock is already held. + * + * @param my_fed the federate to send the start time to. + * @param federation_start_time the federation start_time + * @param federate_start_tag the federate effective start tag + */ +static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { + // Notify my_fed of any upstream transient federates that are connected. + // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these + // upstream federates are not connected. + for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) { + federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]); + if (fed->is_transient && fed->enclave.state == GRANTED) { + send_upstream_connected_locked(my_fed, fed); + } + } + + // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START + // message. + // If it is a persistent federate, only the start_time is sent. If, however, it is a transient + // federate, the effective_start_tag is also sent. + size_t buffer_size = (my_fed->is_transient) ? MSG_TYPE_TIMESTAMP_TAG_LENGTH : MSG_TYPE_TIMESTAMP_LENGTH; + unsigned char start_time_buffer[buffer_size]; + start_time_buffer[0] = MSG_TYPE_TIMESTAMP; + encode_int64(swap_bytes_if_big_endian_int64(federation_start_time), &start_time_buffer[1]); + if (my_fed->is_transient) { + encode_tag(&(start_time_buffer[1 + sizeof(instant_t)]), federate_start_tag); + } + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_TIMESTAMP, my_fed->enclave.id, &federate_start_tag); + } + if (write_to_socket(my_fed->socket, buffer_size, start_time_buffer)) { + lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); + } else { + // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP_START + // message has been sent. That MSG_TYPE_TIMESTAMP_START message grants time advance to + // the federate to the federate_start_tag.time. + my_fed->enclave.state = GRANTED; + lf_cond_broadcast(&sent_start_time); + LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); + + // If this is a transient federate, notify its downstream federates that it is now connected. + if (my_fed->is_transient) { + for (int i = 0; i < my_fed->enclave.num_immediate_downstreams; i++) { + send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.immediate_downstreams[i]), my_fed); + } + } + } +} + void handle_timestamp(federate_info_t* my_fed) { unsigned char buffer[sizeof(int64_t)]; // Read bytes from the socket. We need 8 bytes. @@ -694,49 +1050,150 @@ void handle_timestamp(federate_info_t* my_fed) { LF_PRINT_DEBUG("RTI received timestamp message with time: " PRINTF_TIME ".", timestamp); LF_MUTEX_LOCK(&rti_mutex); - rti_remote->num_feds_proposed_start++; - if (timestamp > rti_remote->max_start_time) { - rti_remote->max_start_time = timestamp; - } - if (rti_remote->num_feds_proposed_start == rti_remote->base.number_of_scheduling_nodes) { - // All federates have proposed a start time. - lf_cond_broadcast(&received_start_times); + + // Processing the TIMESTAMP depends on whether it is the startup phase. + if (rti_remote->phase == startup_phase) { + // Not all persistent federates have proposed a start time. + if (timestamp > rti_remote->max_start_time) { + rti_remote->max_start_time = timestamp; + } + // Note that if a transient federate's thread gets here during the startup phase, + // then it will be assigned the same global tag as its effective start tag and its + // timestamp will affect that start tag. + if (!my_fed->is_transient) { + rti_remote->num_feds_proposed_start++; + } + if (rti_remote->num_feds_proposed_start == + (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { + // This federate is the last persistent federate to proposed a start time. + lf_cond_broadcast(&received_start_times); + rti_remote->phase = execution_phase; + } else { + // Wait until all persistent federates have proposed a start time. + while (rti_remote->num_feds_proposed_start < + (rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates)) { + lf_cond_wait(&received_start_times); + } + } + // Add an offset to the maximum tag to get everyone starting together. + start_time = rti_remote->max_start_time + DELAY_START; + // Set the start_time in the RTI trace + if (rti_remote->base.tracing_enabled) { + lf_tracing_set_start_time(start_time); + } + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; + + // Notify the federate of its start tag. + // This has to be done while still holding the mutex. + send_start_tag_locked(my_fed, start_time, my_fed->effective_start_tag); + + LF_MUTEX_UNLOCK(&rti_mutex); + } else if (rti_remote->phase == shutdown_phase || !my_fed->is_transient) { + LF_MUTEX_UNLOCK(&rti_mutex); + + // Send reject message if the federation is in shutdown phase or if + // it is in the execution phase but the federate is persistent. + send_reject(&my_fed->socket, JOINING_TOO_LATE); + return; } else { - // Some federates have not yet proposed a start time. - // wait for a notification. - while (rti_remote->num_feds_proposed_start < rti_remote->base.number_of_scheduling_nodes) { - // FIXME: Should have a timeout here? - lf_cond_wait(&received_start_times); + // The federate is transient and we are in the execution phase. + // At this point, we already hold the mutex. + + //// Algorithm for computing the effective_start_time of a joining transient + // The effective_start_time will be the max among all the following tags: + // 1. At tag: (joining time, 0 microstep) + // 2. (start_time, 0 microstep) + // 3. The latest completed logical tag + 1 microstep + // 4. The latest granted (P)TAG + 1 microstep, of every downstream federate + // 5. The maximun tag of messages from the upstream federates + 1 microstep + + // Condition 1. + my_fed->effective_start_tag = (tag_t){.time = timestamp, .microstep = 0u}; + + // Condition 2. + if (timestamp < start_time) { + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; } - } - LF_MUTEX_UNLOCK(&rti_mutex); + // Condition 3. + if (lf_tag_compare(my_fed->enclave.completed, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = my_fed->enclave.completed; + my_fed->effective_start_tag.microstep++; + } - // Send back to the federate the maximum time plus an offset on a TIMESTAMP - // message. - unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_LENGTH]; - start_time_buffer[0] = MSG_TYPE_TIMESTAMP; - // Add an offset to this start time to get everyone starting together. - start_time = rti_remote->max_start_time + DELAY_START; - lf_tracing_set_start_time(start_time); - encode_int64(swap_bytes_if_big_endian_int64(start_time), &start_time_buffer[1]); + // Condition 4. Iterate over the downstream federates + for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) { + federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]); - if (rti_remote->base.tracing_enabled) { - tag_t tag = {.time = start_time, .microstep = 0}; - tracepoint_rti_to_federate(send_TIMESTAMP, my_fed->enclave.id, &tag); - } - if (write_to_socket(my_fed->socket, MSG_TYPE_TIMESTAMP_LENGTH, start_time_buffer)) { - lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); - } + // Get the max over the TAG of the downstreams + if (lf_tag_compare(downstream->enclave.last_granted, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = downstream->enclave.last_granted; + my_fed->effective_start_tag.microstep++; + } - LF_MUTEX_LOCK(&rti_mutex); - // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP - // message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to - // the federate to the start time. - my_fed->enclave.state = GRANTED; - lf_cond_broadcast(&sent_start_time); - LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); - LF_MUTEX_UNLOCK(&rti_mutex); + // Get the max over the PTAG of the downstreams + if (lf_tag_compare(downstream->enclave.last_provisionally_granted, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = downstream->enclave.last_provisionally_granted; + my_fed->effective_start_tag.microstep++; + } + } + + // Condition 5. + // This one is a bit subtle. Any messages from upstream federates that the RTI has + // not yet seen will be sent to this joining federate after the effective_start_tag + // because the effective_start_tag is sent while still holding the mutex. + + // Iterate over the messages from the upstream federates + for (int j = 0; j < my_fed->enclave.num_immediate_upstreams; j++) { + federate_info_t* upstream = GET_FED_INFO(my_fed->enclave.immediate_upstreams[j]); + + size_t queue_size = pqueue_tag_size(upstream->in_transit_message_tags); + if (queue_size != 0) { + tag_t max_tag = pqueue_tag_max_tag(upstream->in_transit_message_tags); + + if (lf_tag_compare(max_tag, my_fed->effective_start_tag) >= 0) { + my_fed->effective_start_tag = max_tag; + my_fed->effective_start_tag.microstep++; + } + } + } + + // For every downstream that has a pending grant that is higher than the + // effective_start_time of the federate, cancel it. + // FIXME: Should this be higher-than or equal to? + // FIXME: Also, won't the grant simply be lost? + // If the joining federate doesn't send anything, the downstream federate won't issue another NET. + for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) { + federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]); + + // Ignore this federate if it has resigned. + if (downstream->enclave.state == NOT_CONNECTED) { + continue; + } + + // Check the pending grants, if any, and keep it only if it is + // sooner than the effective start tag. + pqueue_delayed_grant_element_t* dge = + pqueue_delayed_grants_find_by_fed_id(rti_remote->delayed_grants, downstream->enclave.id); + if (dge != NULL && lf_tag_compare(dge->base.tag, my_fed->effective_start_tag) > 0) { + pqueue_delayed_grants_remove(rti_remote->delayed_grants, dge); + } + } + + // Once the effective start time set, sent it to the joining transient, + // together with the start time of the federation. + + // Have to send the start tag while still holding the mutex to ensure that no message + // from an upstream federate is forwarded before the start tag. + send_start_tag_locked(my_fed, start_time, my_fed->effective_start_tag); + + // Whenver a transient joins, invalidate all federates, so that all min_delays_upstream + // get re-computed. + // FIXME: Maybe optimize it to only invalidate those affected by the transient + invalidate_min_delays(); + + LF_MUTEX_UNLOCK(&rti_mutex); + } } void send_physical_clock(unsigned char message_type, federate_info_t* fed, socket_type_t socket_type) { @@ -885,18 +1342,19 @@ void* clock_synchronization_thread(void* noargs) { */ static void handle_federate_failed(federate_info_t* my_fed) { // Nothing more to do. Close the socket and exit. - LF_MUTEX_LOCK(&rti_mutex); - if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(receive_FAILED, my_fed->enclave.id, NULL); } + // First, mark a federate as disconnected and inform if necessary + notify_federate_disconnected(my_fed); + + LF_MUTEX_LOCK(&rti_mutex); + // Set the flag telling the RTI to exit with an error code when it exits. _lf_federate_reports_error = true; lf_print_error("RTI: Federate %d reports an error and has exited.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; - // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; @@ -933,15 +1391,15 @@ static void handle_federate_failed(federate_info_t* my_fed) { */ static void handle_federate_resign(federate_info_t* my_fed) { // Nothing more to do. Close the socket and exit. - LF_MUTEX_LOCK(&rti_mutex); - if (rti_remote->base.tracing_enabled) { tracepoint_rti_from_federate(receive_RESIGN, my_fed->enclave.id, NULL); } + // First, mark a federate as disconnected and inform if necessary + notify_federate_disconnected(my_fed); lf_print("RTI: Federate %d has resigned.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; + LF_MUTEX_LOCK(&rti_mutex); // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; @@ -952,14 +1410,14 @@ static void handle_federate_resign(federate_info_t* my_fed) { // forthcoming, which should result in the other end getting a zero-length reception. shutdown(my_fed->socket, SHUT_WR); - // Wait for the federate to send an EOF or a socket error to occur. - // Discard any incoming bytes. Normally, this read should return 0 because - // the federate is resigning and should itself invoke shutdown. + // // Wait for the federate to send an EOF or a socket error to occur. + // // Discard any incoming bytes. Normally, this read should return 0 because + // // the federate is resigning and should itself invoke shutdown. unsigned char buffer[10]; while (read(my_fed->socket, buffer, 10) > 0) ; - // We can now safely close the socket. + // // We can now safely close the socket. close(my_fed->socket); // from unistd.h // Check downstream federates to see whether they should now be granted a TAG. @@ -988,7 +1446,7 @@ void* federate_info_thread_TCP(void* fed) { if (read_failed) { // Socket is closed lf_print_error("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id); - my_fed->enclave.state = NOT_CONNECTED; + notify_federate_disconnected(my_fed); my_fed->socket = -1; // FIXME: We need better error handling here, but do not stop execution here. break; @@ -1009,7 +1467,7 @@ void* federate_info_thread_TCP(void* fed) { break; case MSG_TYPE_RESIGN: handle_federate_resign(my_fed); - return NULL; + break; case MSG_TYPE_NEXT_EVENT_TAG: handle_next_event_tag(my_fed); break; @@ -1044,15 +1502,31 @@ void* federate_info_thread_TCP(void* fed) { // Prevent multiple threads from closing the same socket at the same time. LF_MUTEX_LOCK(&rti_mutex); close(my_fed->socket); // from unistd.h + // Manual clean, in case of a transient federate + if (my_fed->is_transient) { + // FIXME: Aren't there transit messages anymore??? + // free_in_transit_message_q(my_fed->in_transit_message_tags); + lf_print("RTI: Transient Federate %d thread exited. and socket_id is: %d ", my_fed->enclave.id, my_fed->socket); + + // Update the number of connected transient federates + rti_remote->number_of_connected_transient_federates--; + + // Reset the status of the leaving federate + reset_transient_federate(my_fed); + } + // Signal the hot swap mechanism, if needed + if (hot_swap_in_progress && hot_swap_federate->enclave.id == my_fed->enclave.id) { + hot_swap_old_resigned = true; + } LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } -void send_reject(int* socket_id, unsigned char error_code) { +void send_reject(int* socket_id, rejection_code_t error_code) { LF_PRINT_DEBUG("RTI sending MSG_TYPE_REJECT."); unsigned char response[2]; response[0] = MSG_TYPE_REJECT; - response[1] = error_code; + response[1] = (unsigned char)error_code; LF_MUTEX_LOCK(&rti_mutex); // NOTE: Ignore errors on this response. if (write_to_socket(*socket_id, 2, response)) { @@ -1071,12 +1545,11 @@ void send_reject(int* socket_id, unsigned char error_code) { * matches this federation, send an MSG_TYPE_ACK and otherwise send * a MSG_TYPE_REJECT message. * @param socket_id Pointer to the socket on which to listen. - * @param client_fd The socket address. * @return The federate ID for success or -1 for failure. */ static int32_t receive_and_check_fed_id_message(int* socket_id) { - // Buffer for message ID, federate ID, and federation ID length. - size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID, length of fedration ID. + // Buffer for message ID, federate ID, type (persistent or transient), and federation ID length. + size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID and length of fedration ID. unsigned char buffer[length]; // Read bytes from the socket. We need 4 bytes. @@ -1086,9 +1559,10 @@ static int32_t receive_and_check_fed_id_message(int* socket_id) { } uint16_t fed_id = rti_remote->base.number_of_scheduling_nodes; // Initialize to an invalid value. + bool is_transient = false; // First byte received is the message type. - if (buffer[0] != MSG_TYPE_FED_IDS) { + if (buffer[0] != MSG_TYPE_FED_IDS && buffer[0] != MSG_TYPE_TRANSIENT_FED_IDS) { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); } @@ -1112,10 +1586,21 @@ static int32_t receive_and_check_fed_id_message(int* socket_id) { } else { // Received federate ID. fed_id = extract_uint16(buffer + 1); - LF_PRINT_DEBUG("RTI received federate ID: %d.", fed_id); - - // Read the federation ID. First read the length, which is one byte. + // Read the federation ID length, which is one byte. size_t federation_id_length = (size_t)buffer[sizeof(uint16_t) + 1]; + if (buffer[0] == MSG_TYPE_TRANSIENT_FED_IDS) { + unsigned char buf; + read_from_socket_close_on_error(socket_id, 1, &buf); + is_transient = (buf == 1) ? true : false; + } + + if (is_transient) { + LF_PRINT_LOG("RTI received federate ID: %d, which is transient.", fed_id); + } else { + LF_PRINT_LOG("RTI received federate ID: %d, which is persistent.", fed_id); + } + + // Read the federation ID. char federation_id_received[federation_id_length + 1]; // One extra for null terminator. // Next read the actual federation ID. if (read_from_socket_close_on_error(socket_id, federation_id_length, (unsigned char*)federation_id_received)) { @@ -1151,18 +1636,58 @@ static int32_t receive_and_check_fed_id_message(int* socket_id) { send_reject(socket_id, FEDERATE_ID_OUT_OF_RANGE); return -1; } else { + // Find out if it is a new connection or a hot swap. + // Reject if: + // - duplicate of a connected persistent federate + // - or hot_swap is already in progress (Only 1 hot swap at a time!), for that + // particular federate + // - or it is a hot swap, but it is not the execution phase yet if ((rti_remote->base.scheduling_nodes[fed_id])->state != NOT_CONNECTED) { - lf_print_error("RTI received duplicate federate ID: %d.", fed_id); - if (rti_remote->base.tracing_enabled) { - tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); + if (!is_transient) { + lf_print_error("RTI received duplicate federate ID: %d.", fed_id); + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); + } + send_reject(socket_id, FEDERATE_ID_IN_USE); + return -1; + } else if (hot_swap_in_progress || rti_remote->phase != execution_phase) { + lf_print_warning("RTI rejects the connection of transient federate %d, \ + because a hot swap is already in progress for federate %d. \n\ + Only one hot swap operation is allowed at a time.", + fed_id, hot_swap_federate->enclave.id); + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_REJECT, fed_id, NULL); + } + send_reject(socket_id, FEDERATE_ID_IN_USE); + return -1; } - send_reject(socket_id, FEDERATE_ID_IN_USE); - return -1; } } } } - federate_info_t* fed = GET_FED_INFO(fed_id); + + federate_info_t* fed_twin = GET_FED_INFO(fed_id); + federate_info_t* fed; + // If the federate is already connected (making the request a duplicate), and that + // the federate is transient, and it is the execution phase, then mark that a hot + // swap is in progreass and initialize the hot_swap_federate. + // Otherwise, proceed with a normal transinet connection + if (fed_twin->enclave.state != NOT_CONNECTED && is_transient && fed_twin->is_transient && + rti_remote->phase == execution_phase && !hot_swap_in_progress) { + // Allocate memory for the new federate and initilize it + hot_swap_federate = (federate_info_t*)malloc(sizeof(federate_info_t)); + initialize_federate(hot_swap_federate, fed_id); + + // Set that hot swap is in progress + hot_swap_in_progress = true; + // free(fed); // Free the old memory to prevent memory leak + fed = hot_swap_federate; + lf_print("RTI: Hot Swap starting for federate %d.", fed_id); + } else { + fed = fed_twin; + fed->is_transient = is_transient; + } + // The MSG_TYPE_FED_IDS message has the right federation ID. // Get the peer address from the connected socket_id. Then assign it as the federate's socket server. @@ -1211,6 +1736,11 @@ static int32_t receive_and_check_fed_id_message(int* socket_id) { /** * Listen for a MSG_TYPE_NEIGHBOR_STRUCTURE message, and upon receiving it, fill * out the relevant information in the federate's struct. + * + * In case of a hot swap, check that no changes were made to the connections, compared + * to the first instance that joigned. This means that the first instance to join + * __is__ the reference. + * * @return 1 on success and 0 on failure. */ static int receive_connection_information(int* socket_id, uint16_t fed_id) { @@ -1227,7 +1757,19 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) { send_reject(socket_id, UNEXPECTED_MESSAGE); return 0; } else { + // In case of a transient federate that is joining again, or a hot swap, then + // check that the connection information did not change. federate_info_t* fed = GET_FED_INFO(fed_id); + federate_info_t* temp_fed = NULL; + if (lf_tag_compare(fed->effective_start_tag, NEVER_TAG) != 0) { + if (hot_swap_in_progress) { + fed = hot_swap_federate; + } else { + temp_fed = (federate_info_t*)calloc(1, sizeof(federate_info_t)); + initialize_federate(temp_fed, fed_id); + fed = temp_fed; + } + } // Read the number of upstream and downstream connections fed->enclave.num_immediate_upstreams = extract_int32(&(connection_info_header[1])); fed->enclave.num_immediate_downstreams = extract_int32(&(connection_info_header[1 + sizeof(int32_t)])); @@ -1277,6 +1819,46 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) { free(connections_info_body); } + + // NOTE: In this design, changes in the connections are not allowed. This means that the first + // instance to join __is__ the reference. If this policy is to be changed, then it is in + // the following lines will be updated accordingly. + if (hot_swap_in_progress || temp_fed != NULL) { + if (temp_fed == NULL) { + temp_fed = hot_swap_federate; + } + // Now, compare the previous and the new neighberhood structure + // Start with the number of upstreams and downstreams + bool reject = false; + if ((fed->enclave.num_immediate_upstreams != temp_fed->enclave.num_immediate_upstreams) || + (fed->enclave.num_immediate_downstreams != temp_fed->enclave.num_immediate_downstreams)) { + reject = true; + } else { + // Then check all upstreams and their delays + for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) { + if ((fed->enclave.immediate_upstreams[i] != temp_fed->enclave.immediate_upstreams[i]) || + (fed->enclave.immediate_upstream_delays[i] != temp_fed->enclave.immediate_upstream_delays[i])) { + reject = true; + break; + } + } + if (!reject) { + // Finally, check all downstream federates + for (int i = 0; i < fed->enclave.num_immediate_downstreams; i++) { + if (fed->enclave.immediate_downstreams[i] != temp_fed->enclave.immediate_downstreams[i]) { + reject = true; + break; + } + } + } + } + if (reject) { + if (temp_fed != hot_swap_federate) { + free(temp_fed); + } + return 0; + } + } } LF_PRINT_DEBUG("RTI received neighbor structure from federate %d.", fed_id); return 1; @@ -1309,7 +1891,12 @@ static int receive_udp_message_and_set_up_clock_sync(int* socket_id, uint16_t fe send_reject(socket_id, UNEXPECTED_MESSAGE); return 0; } else { - federate_info_t* fed = GET_FED_INFO(fed_id); + federate_info_t* fed; + if (hot_swap_in_progress) { + fed = hot_swap_federate; + } else { + fed = GET_FED_INFO(fed_id); + } if (rti_remote->clock_sync_global_status >= clock_sync_init) { // If no initial clock sync, no need perform initial clock sync. uint16_t federate_UDP_port_number = extract_uint16(&(response[1])); @@ -1436,8 +2023,9 @@ static bool authenticate_federate(int* socket) { } #endif -void lf_connect_to_federates(int socket_descriptor) { - for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { +// FIXME: The socket descriptor here (parameter) is not used. Should be removed? +void lf_connect_to_persistent_federates(int socket_descriptor) { + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes - rti_remote->number_of_transient_federates; i++) { int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1); // Wait for the first message from the federate when RTI -a option is on. #ifdef __RTI_AUTH__ @@ -1466,13 +2054,21 @@ void lf_connect_to_federates(int socket_descriptor) { // synchronization messages. federate_info_t* fed = GET_FED_INFO(fed_id); lf_thread_create(&(fed->thread_id), federate_info_thread_TCP, fed); + + // If the federate is transient, then do not count it. + if (fed->is_transient) { + rti_remote->number_of_connected_transient_federates++; + assert(rti_remote->number_of_connected_transient_federates <= rti_remote->number_of_transient_federates); + i--; + lf_print("RTI: Transient federate %d joined.", fed->enclave.id); + } } else { // Received message was rejected. Try again. i--; } } // All federates have connected. - LF_PRINT_DEBUG("All federates have connected to RTI."); + LF_PRINT_DEBUG("All persistent federates have connected to RTI."); if (rti_remote->clock_sync_global_status >= clock_sync_on) { // Create the thread that performs periodic PTP clock synchronization sessions @@ -1492,6 +2088,171 @@ void lf_connect_to_federates(int socket_descriptor) { } } +/** + * @brief A request for immediate stop to the federate + * + * @param fed: the deferate to stop + */ +void send_stop(federate_info_t* fed) { + // Reply with a stop granted to all federates + unsigned char outgoing_buffer[MSG_TYPE_STOP_LENGTH]; + outgoing_buffer[0] = MSG_TYPE_STOP; + lf_print("RTI sent MSG_TYPE_STOP to federate %d.", fed->enclave.id); + + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(send_STOP, fed->enclave.id, NULL); + } + write_to_socket_fail_on_error(&(fed->socket), MSG_TYPE_STOP_LENGTH, outgoing_buffer, NULL, + "RTI failed to send MSG_TYPE_STOP message to federate %d.", fed->enclave.id); + + LF_PRINT_LOG("RTI sent MSG_TYPE_STOP to federate %d.", fed->enclave.id); +} + +void* lf_connect_to_transient_federates_thread(void* nothing) { + // This loop will continue to accept connections of transient federates, as soon as there is room, or enable hot swap + while (!rti_remote->all_persistent_federates_exited) { + // Continue waiting for an incoming connection requests from transients to join, or for hot swap. + // Wait for an incoming connection request. + int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1); + +// Wait for the first message from the federate when RTI -a option is on. +#ifdef __RTI_AUTH__ + if (rti_remote->authentication_enabled) { + if (!authenticate_federate(&socket_id)) { + lf_print_warning("RTI failed to authenticate the incoming federate."); + // Close the socket. + shutdown(socket_id, SHUT_RDWR); + close(socket_id); + socket_id = -1; + continue; + } + } +#endif + + // The first message from the federate should contain its ID and the federation ID. + // The function also detects if a hot swap request is initiated. + int32_t fed_id = receive_and_check_fed_id_message(&socket_id); + + if (fed_id >= 0 && receive_connection_information(&socket_id, (uint16_t)fed_id) && + receive_udp_message_and_set_up_clock_sync(&socket_id, (uint16_t)fed_id)) { + LF_MUTEX_LOCK(&rti_mutex); + if (hot_swap_in_progress) { + lf_print("RTI: Hot swap confirmed for federate %d.", fed_id); + + // Then send STOP + federate_info_t* fed_old = GET_FED_INFO(fed_id); + hot_swap_federate->enclave.completed = fed_old->enclave.completed; + + LF_PRINT_LOG("RTI: Send MSG_TYPE_STOP to old federate %d.", fed_id); + send_stop(fed_old); + LF_MUTEX_UNLOCK(&rti_mutex); + + // Wait for the old federate to send MSG_TYPE_RESIGN + LF_PRINT_LOG("RTI: Waiting for old federate %d to send resign.", fed_id); + // FIXME: This is a busy wait! Need instead a lf_cond_wait on a condition variable. + while (!hot_swap_old_resigned) { + } + + // The latest LTC is the tag at which the old federate resigned. This is useful + // for computing the effective_start_time of the new joining federate. + hot_swap_federate->enclave.completed = fed_old->enclave.completed; + + // Create a thread to communicate with the federate. + // This has to be done after clock synchronization is finished + // or that thread may end up attempting to handle incoming clock + // synchronization messages. + lf_thread_create(&(hot_swap_federate->thread_id), federate_info_thread_TCP, hot_swap_federate); + + // Redirect the federate in rti_remote + rti_remote->base.scheduling_nodes[fed_id] = (scheduling_node_t*)hot_swap_federate; + + // Free the old federate memory and reset the Hot wap indicators + // FIXME: Is this enough to free the memory allocated to the federate? + free(fed_old); + lf_mutex_lock(&rti_mutex); + hot_swap_in_progress = false; + lf_mutex_unlock(&rti_mutex); + + lf_print("RTI: Hot swap succeeded for federate %d.", fed_id); + } else { + lf_mutex_unlock(&rti_mutex); + + // Create a thread to communicate with the federate. + // This has to be done after clock synchronization is finished + // or that thread may end up attempting to handle incoming clock + // synchronization messages. + federate_info_t* fed = GET_FED_INFO(fed_id); + lf_thread_create(&(fed->thread_id), federate_info_thread_TCP, fed); + lf_print("RTI: Transient federate %d joined.", fed_id); + } + rti_remote->number_of_connected_transient_federates++; + } else { + // If a hot swap was initialed, but the connection information or/and clock + // synchronization fail, then reset hot_swap_in_profress, and free the memory + // allocated for hot_swap_federate + if (hot_swap_in_progress) { + lf_print("RTI: Hot swap canceled for federate %d.", fed_id); + lf_mutex_lock(&rti_mutex); + hot_swap_in_progress = false; + lf_mutex_unlock(&rti_mutex); + + // FIXME: Is this enough to free the memory of a federate_info_t data structure? + free(hot_swap_federate); + } + } + } + return NULL; +} + +/** + * @brief Thread that manages the delayed grants using a priprity queue. + * + * This thread is responsible for managing the priority queue of delayed grants to be issued. + * It waits until the current time matches the highest priority tag time in the queue. + * If reached, it notifies the grant immediately. If, however, the current time has not yet + * reached the highest priority tag and the queue has been updated (either by inserting or + * canceling an entry), the thread stops waiting and restarts the process again. + */ +static void* lf_delayed_grants_thread(void* nothing) { + initialize_lf_thread_id(); + // Hold the mutex when not waiting. + LF_MUTEX_LOCK(&rti_mutex); + while (!rti_remote->all_federates_exited) { + if (pqueue_delayed_grants_size(rti_remote->delayed_grants) > 0) { + // Do not pop, but rather peek. + pqueue_delayed_grant_element_t* next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); + instant_t next_time = next->base.tag.time; + // Wait for expiration, or a signal to stop or terminate. + int ret = lf_clock_cond_timedwait(&updated_delayed_grants, next_time); + if (ret == LF_TIMEOUT) { + // Time reached to send the grant. + // However, the grant may have been canceled while we were waiting. + pqueue_delayed_grant_element_t* new_next = pqueue_delayed_grants_peek(rti_remote->delayed_grants); + if (next == new_next) { + pqueue_delayed_grants_pop(rti_remote->delayed_grants); + federate_info_t* fed = GET_FED_INFO(next->fed_id); + if (next->is_provisional) { + notify_provisional_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + } else { + notify_tag_advance_grant_immediate(&(fed->enclave), next->base.tag); + } + free(next); + } + } else if (ret != 0) { + // An error occurred. + lf_print_error_and_exit("lf_delayed_grants_thread: lf_clock_cond_timedwait failed with code %d.", ret); + } + } else if (pqueue_delayed_grants_size(rti_remote->delayed_grants) == 0) { + // Wait for something to appear on the queue. + lf_cond_wait(&updated_delayed_grants); + } + } + // Free any delayed grants that are still on the queue. + pqueue_delayed_grants_free(rti_remote->delayed_grants); + LF_MUTEX_UNLOCK(&rti_mutex); + return NULL; +} + void* respond_to_erroneous_connections(void* nothing) { initialize_lf_thread_id(); while (true) { @@ -1530,6 +2291,32 @@ void initialize_federate(federate_info_t* fed, uint16_t id) { strncpy(fed->server_hostname, "localhost", INET_ADDRSTRLEN); fed->server_ip_addr.s_addr = 0; fed->server_port = -1; + fed->has_upstream_transient_federates = false; + fed->is_transient = true; + fed->effective_start_tag = NEVER_TAG; +} + +void reset_transient_federate(federate_info_t* fed) { + // Reset all the timing information from the previous run + fed->enclave.completed = NEVER_TAG; + fed->enclave.last_granted = NEVER_TAG; + fed->enclave.last_provisionally_granted = NEVER_TAG; + fed->enclave.next_event = NEVER_TAG; + // Reset of the federate-related attributes + fed->socket = -1; // No socket. + fed->clock_synchronization_enabled = true; + // FIXME: The following two lines can be improved? + pqueue_tag_free(fed->in_transit_message_tags); + fed->in_transit_message_tags = pqueue_tag_init(10); + strncpy(fed->server_hostname, "localhost", INET_ADDRSTRLEN); + fed->server_ip_addr.s_addr = 0; + fed->server_port = -1; + fed->requested_stop = false; + fed->effective_start_tag = NEVER_TAG; + // Whenver a transient resigns or leaves, invalidate all federates, so that all min_delays_upstream + // get re-computed. + // FIXME: Maybe optimize it to only invalidate those affected by the transient + invalidate_min_delays(); } int32_t start_rti_server(uint16_t port) { @@ -1550,28 +2337,108 @@ int32_t start_rti_server(uint16_t port) { return rti_remote->socket_descriptor_TCP; } +/** + * Iterate over the federates and sets 'has_upstream_transient_federates'. + * Once done, check that no transient federate has an upstream transient federate. + * and compute the number of persistent federates that do have upstream transients, + * which is the maximun number of delayed grants that can be pending at the same time. + * This is useful for initialyzing the queue of delayed grants. + + * @return -1, if there is more than one level of transiency, else, the number of + * persistents that have an upstream transient + */ +static int set_has_upstream_transient_federates_parameter_and_check() { + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + for (int j = 0; j < fed->enclave.num_immediate_upstreams; j++) { + federate_info_t* upstream_fed = GET_FED_INFO(fed->enclave.immediate_upstreams[j]); + if (upstream_fed->is_transient) { + fed->has_upstream_transient_federates = true; + break; + } + } + } + + // Now check that no transient has an upstream transient + // FIXME: Do we really need this? Or should it be the job of the validator? + uint16_t max_number_of_delayed_grants = 0; + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + if (fed->is_transient && fed->has_upstream_transient_federates) { + return -1; + } + if (!fed->is_transient && fed->has_upstream_transient_federates) { + max_number_of_delayed_grants++; + } + } + return max_number_of_delayed_grants; +} + void wait_for_federates(int socket_descriptor) { - // Wait for connections from federates and create a thread for each. - lf_connect_to_federates(socket_descriptor); + // Wait for connections from persistent federates and create a thread for each. + lf_connect_to_persistent_federates(socket_descriptor); + + // Set has_upstream_transient_federates parameter in all federates and check + // that there is no more than one level of transiency + if (rti_remote->number_of_transient_federates > 0) { + int max_number_of_pending_grants = set_has_upstream_transient_federates_parameter_and_check(); + if (max_number_of_pending_grants == -1) { + lf_print_error_and_exit("RTI: Transient federates cannot have transient upstreams!"); + } + rti_remote->delayed_grants = pqueue_delayed_grants_init(max_number_of_pending_grants); + } - // All federates have connected. - lf_print("RTI: All expected federates have connected. Starting execution."); + // All persistent federates have connected. + lf_print("RTI: All expected persistent federates have connected. Starting execution."); + if (rti_remote->number_of_transient_federates > 0) { + lf_print("RTI: Transient Federates can join and leave the federation at anytime."); + } - // The socket server will not continue to accept connections after all the federates - // have joined. + // The socket server will only continue to accept connections from transient + // federates. // In case some other federation's federates are trying to join the wrong // federation, need to respond. Start a separate thread to do that. lf_thread_t responder_thread; - lf_thread_create(&responder_thread, respond_to_erroneous_connections, NULL); + lf_thread_t transient_thread; + lf_thread_t delayed_grants_thread; + + // If the federation does not include transient federates, then respond to + // erronous connections. Otherwise, continue to accept transients joining and + // respond to duplicate joing requests. + if (rti_remote->number_of_transient_federates == 0) { + lf_thread_create(&responder_thread, respond_to_erroneous_connections, NULL); + } else if (rti_remote->number_of_transient_federates > 0) { + lf_thread_create(&transient_thread, lf_connect_to_transient_federates_thread, NULL); + lf_thread_create(&delayed_grants_thread, lf_delayed_grants_thread, NULL); + } - // Wait for federate threads to exit. + // Wait for persistent federate threads to exit. void* thread_exit_status; for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { federate_info_t* fed = GET_FED_INFO(i); - lf_print("RTI: Waiting for thread handling federate %d.", fed->enclave.id); - lf_thread_join(fed->thread_id, &thread_exit_status); - pqueue_tag_free(fed->in_transit_message_tags); - lf_print("RTI: Federate %d thread exited.", fed->enclave.id); + if (!fed->is_transient) { + lf_print("RTI: Waiting for thread handling federate %d.", fed->enclave.id); + lf_thread_join(fed->thread_id, &thread_exit_status); + pqueue_tag_free(fed->in_transit_message_tags); + lf_print("RTI: Persistent federate %d thread exited.", fed->enclave.id); + } + } + + rti_remote->all_persistent_federates_exited = true; + rti_remote->phase = shutdown_phase; + lf_print("RTI: All persistent threads exited."); + + // Wait for transient federate threads to exit, if any. + if (rti_remote->number_of_transient_federates > 0) { + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + federate_info_t* fed = GET_FED_INFO(i); + if (fed->is_transient) { + lf_print("RTI: Waiting for thread handling federate %d.", fed->enclave.id); + lf_thread_join(fed->thread_id, &thread_exit_status); + pqueue_tag_free(fed->in_transit_message_tags); + lf_print("RTI: Transient federate %d thread exited.", fed->enclave.id); + } + } } rti_remote->all_federates_exited = true; @@ -1604,6 +2471,7 @@ void initialize_RTI(rti_remote_t* rti) { LF_MUTEX_INIT(&rti_mutex); LF_COND_INIT(&received_start_times, &rti_mutex); LF_COND_INIT(&sent_start_time, &rti_mutex); + LF_COND_INIT(&updated_delayed_grants, &rti_mutex); initialize_rti_common(&rti_remote->base); rti_remote->base.mutex = &rti_mutex; @@ -1625,6 +2493,8 @@ void initialize_RTI(rti_remote_t* rti) { rti_remote->base.tracing_enabled = false; rti_remote->base.dnet_disabled = false; rti_remote->stop_in_progress = false; + rti_remote->number_of_transient_federates = 0; + rti_remote->phase = startup_phase; } // The RTI includes clock.c, which requires the following functions that are defined @@ -1636,6 +2506,7 @@ void clock_sync_subtract_offset(instant_t* t) { (void)t; } void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { invalidate_min_delays(); for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) { + // FIXME: Gives error freeing memory not allocated!!!! scheduling_node_t* node = scheduling_nodes[i]; if (node->immediate_upstreams != NULL) { free(node->immediate_upstreams); diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index 99e439588..1df521aa5 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -67,6 +67,11 @@ typedef struct federate_info_t { // RTI has not been informed of the port number. struct in_addr server_ip_addr; // Information about the IP address of the socket // server of the federate. + bool has_upstream_transient_federates; // Indicates whether the federate has uptream + // transient federates + bool is_transient; // Indicates whether the federate is transient or persistent. + tag_t effective_start_tag; // Records the start time of the federate, which is + // mainly useful for transient federates } federate_info_t; /** @@ -74,18 +79,32 @@ typedef struct federate_info_t { */ typedef enum clock_sync_stat { clock_sync_off, clock_sync_init, clock_sync_on } clock_sync_stat; +/** + * The federation life cycle phases. + */ +typedef enum federation_life_cycle_phase { + startup_phase, // Not all persistent federates have joined. + execution_phase, // All persistent federates have joined. + shutdown_phase // Federation is shutting down. +} federation_life_cycle_phase; + +/** + * @brief The type for an element in a delayed grants priority queue that is sorted by tag. + */ +typedef struct pqueue_delayed_grant_element_t { + pqueue_tag_element_t base; + uint16_t fed_id; // Id of the federate with delayed grant of tag (in base) + bool is_provisional; // Boolean recoding if the delayed grant is provisional +} pqueue_delayed_grant_element_t; + +/** + * @brief Type of a delayed grants queue sorted by tags. + */ +typedef pqueue_tag_t pqueue_delayed_grants_t; + /** * Structure that an RTI instance uses to keep track of its own and its * corresponding federates' state. - * It is a special case of `rti_common_t` (declared in enclave.h). Inheritence - * is mimicked by having the first attributes to be the same as of rti_common_t, - * except that scheduling_nodes attribute here is of type `federate_info_t**`, while it - * is of type `scheduling_node_t**` in `rti_common_t`. - * // **************** IMPORTANT!!! ******************** - * // ** If you make any change to this struct, ** - * // ** you MUST also change rti_common_t in ** - * // ** (enclave.h)! The change must exactly match. ** - * // ************************************************** */ typedef struct rti_remote_t { rti_common_t base; @@ -105,6 +124,15 @@ typedef struct rti_remote_t { */ volatile bool all_federates_exited; + /** + * Boolean indicating that all persistent federates have exited. + * This gets set to true exactly once before the program waits for + * persistent federates, then exits. + * It is marked volatile because the write is not guarded by a mutex. + * The main thread makes this true. + */ + volatile bool all_persistent_federates_exited; + /** * The ID of the federation that this RTI will supervise. * This should be overridden with a command-line -i option to ensure @@ -158,6 +186,27 @@ typedef struct rti_remote_t { * Boolean indicating that a stop request is already in progress. */ bool stop_in_progress; + + /** + * Number of transient federates + */ + int32_t number_of_transient_federates; + + /** + * Number of connected transient federates + */ + int32_t number_of_connected_transient_federates; + + /** + * Indicates the life cycle phase of the federation. + */ + federation_life_cycle_phase phase; + + /** + * Queue of the pending grants, in case transient federates are absent and + * issuing grants to their downstreams need to be delayed. + */ + pqueue_delayed_grants_t* delayed_grants; } rti_remote_t; /** @@ -277,7 +326,7 @@ void handle_address_query(uint16_t fed_id); * field of the _RTI.federates[federate_id] array of structs. * * The server_hostname and server_ip_addr fields are assigned - * in lf_connect_to_federates() upon accepting the socket + * in lf_connect_to_persistent_federates() upon accepting the socket * from the remote federate. * * This function assumes the caller does not hold the mutex. @@ -345,15 +394,15 @@ void* federate_info_thread_TCP(void* fed); * @param socket_id Pointer to the socket ID. * @param error_code An error code. */ -void send_reject(int* socket_id, unsigned char error_code); +void send_reject(int* socket_id, rejection_code_t error_code); /** - * Wait for one incoming connection request from each federate, - * and upon receiving it, create a thread to communicate with - * that federate. Return when all federates have connected. - * @param socket_descriptor The socket on which to accept connections. + * Thread to wait for incoming connection request from transient federates. + * Upon receiving the connection request, check if a hot swap should start or + * simply create a thread to communicate with that federate. + * Stops if all persistent federates exited. */ -void lf_connect_to_federates(int socket_descriptor); +void* lf_connect_to_transient_federates_thread(void* nothing); /** * Thread to respond to new connections, which could be federates of other @@ -368,6 +417,12 @@ void* respond_to_erroneous_connections(void* nothing); */ void initialize_federate(federate_info_t* fed, uint16_t id); +/** + * Reset the federate. The federate has to be transient. + * @param fed A pointer to the federate + */ +void reset_transient_federate(federate_info_t* fed); + /** * Start the socket server for the runtime infrastructure (RTI) and * return the socket descriptor. diff --git a/core/federated/federate.c b/core/federated/federate.c index f7f52e37a..721370344 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -44,6 +44,7 @@ // Global variables defined in tag.c: extern instant_t start_time; +extern tag_t effective_start_tag; // Global variable defined in reactor_common.c: extern bool _lf_termination_executed; @@ -92,7 +93,8 @@ federate_instance_t _fed = {.socket_TCP_RTI = -1, .last_sent_LTC = {.time = NEVER, .microstep = 0u}, .last_sent_NET = {.time = NEVER, .microstep = 0u}, .last_skipped_NET = {.time = NEVER, .microstep = 0u}, - .min_delay_from_physical_action_to_federate_output = NEVER}; + .min_delay_from_physical_action_to_federate_output = NEVER, + .is_transient = false}; federation_metadata_t federation_metadata = { .federation_id = "Unidentified Federation", .rti_host = NULL, .rti_port = -1, .rti_user = NULL}; @@ -170,6 +172,8 @@ extern interval_t _lf_action_delay_table[]; extern size_t _lf_action_table_size; extern lf_action_base_t* _lf_zero_delay_cycle_action_table[]; extern size_t _lf_zero_delay_cycle_action_table_size; +extern uint16_t _lf_zero_delay_cycle_upstream_ids[]; +extern bool _lf_zero_delay_cycle_upstream_disconnected[]; extern reaction_t* network_input_reactions[]; extern size_t num_network_input_reactions; extern reaction_t* port_absent_reaction[]; @@ -195,7 +199,7 @@ static lf_action_base_t* action_for_port(int port_id) { /** * Update the last known status tag of all network input ports - * to the value of `tag`, unless that the provided `tag` is less + * to the value of `tag`, unless the provided `tag` is less * than the last_known_status_tag of the port. This is called when * a TAG signal is received from the RTI in centralized coordination. * If any update occurs, then this broadcasts on `lf_port_status_changed`. @@ -261,7 +265,7 @@ static void update_last_known_status_on_input_ports(tag_t tag, environment_t* en * * @param env The top-level environment, whose mutex is assumed to be held. * @param tag The tag on which the latest status of the specified network input port is known. - * @param portID The port ID. + * @param port_id The port ID. */ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag, int port_id) { if (lf_tag_compare(tag, env->current_tag) < 0) @@ -323,13 +327,41 @@ static void mark_inputs_known_absent(int fed_id) { } /** - * Set the status of network port with id portID. + * @brief Update the last known status tag of a network input action. + * + * This function is similar to update_last_known_status_on_input_port, but + * it is called when a PTAG is granted and an upstream transient federate is not + * connected. It updates the last known status tag of the network input action + * so that it will not wait for a message or absent message from the upstream federate. + * + * This function assumes the caller holds the mutex on the top-level environment, + * and, if the tag actually increases, it broadcasts on `lf_port_status_changed`. * - * @param portID The network port ID + * @param env The top-level environment, whose mutex is assumed to be held. + * @param action The action associated with the network input port. + * @param tag The tag of the PTAG. + */ +static void update_last_known_status_on_action(environment_t* env, lf_action_base_t* action, tag_t tag) { + if (lf_tag_compare(tag, env->current_tag) < 0) + tag = env->current_tag; + trigger_t* input_port_trigger = action->trigger; + if (lf_tag_compare(tag, input_port_trigger->last_known_status_tag) > 0) { + LF_PRINT_LOG("Updating the last known status tag of port for upstream absent transient federate from " PRINTF_TAG + " to " PRINTF_TAG ".", + input_port_trigger->last_known_status_tag.time - lf_time_start(), + input_port_trigger->last_known_status_tag.microstep, tag.time - lf_time_start(), tag.microstep); + input_port_trigger->last_known_status_tag = tag; + } +} + +/** + * Set the status of network port with id port_id. + * + * @param port_id The network port ID * @param status The network port status (port_status_t) */ -static void set_network_port_status(int portID, port_status_t status) { - lf_action_base_t* network_input_port_action = action_for_port(portID); +static void set_network_port_status(int port_id, port_status_t status) { + lf_action_base_t* network_input_port_action = action_for_port(port_id); network_input_port_action->trigger->status = status; } @@ -721,7 +753,7 @@ static int handle_port_absent_message(int* socket, int fed_id) { tracepoint_federate_from_federate(receive_PORT_ABS, _lf_my_fed_id, fed_id, &intended_tag); } LF_PRINT_LOG("Handling port absent for tag " PRINTF_TAG " for port %hu of fed %d.", - intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, fed_id); + intended_tag.time - lf_time_start(), intended_tag.microstep, port_id, _lf_my_fed_id); // Environment is always the one corresponding to the top-level scheduling enclave. environment_t* env; @@ -939,6 +971,44 @@ static int perform_hmac_authentication() { } #endif +/** + * @brief Handle message from the RTI that an upstream federate has connected. + * + */ +static void handle_upstream_connected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read upstream connected message from RTI."); + uint16_t connected = extract_uint16(buffer); + LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected); + // Mark the upstream as connected. + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == connected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = false; + } + } +} + +/** + * @brief Handle message from the RTI that an upstream federate has disconnected. + * + */ +static void handle_upstream_disconnected_message(void) { + size_t bytes_to_read = sizeof(uint16_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL, + "Failed to read upstream disconnected message from RTI."); + uint16_t disconnected = extract_uint16(buffer); + LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected); + // Mark the upstream as disconnected. + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) { + _lf_zero_delay_cycle_upstream_disconnected[i] = true; + } + } +} + /** * Send the specified timestamp to the RTI and wait for a response. * The specified timestamp should be current physical time of the @@ -952,30 +1022,48 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { // Send the timestamp marker first. send_time(MSG_TYPE_TIMESTAMP, my_physical_time); - // Read bytes from the socket. We need 9 bytes. + // Read bytes from the socket. We need either 9 butes or 21, depending on the federate type // Buffer for message ID plus timestamp. - size_t buffer_length = 1 + sizeof(instant_t); + size_t buffer_length = (_fed.is_transient) ? MSG_TYPE_TIMESTAMP_TAG_LENGTH : MSG_TYPE_TIMESTAMP_LENGTH; unsigned char buffer[buffer_length]; - read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, buffer_length, buffer, NULL, - "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); - LF_PRINT_DEBUG("Read 9 bytes."); - - // First byte received is the message ID. - if (buffer[0] != MSG_TYPE_TIMESTAMP) { - if (buffer[0] == MSG_TYPE_FAILED) { - lf_print_error_and_exit("RTI has failed."); + while (true) { + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, 1, buffer, NULL, + "Failed to read MSG_TYPE_TIMESTAMP_START message from RTI."); + // First byte received is the message ID. + if (buffer[0] != MSG_TYPE_TIMESTAMP) { + if (buffer[0] == MSG_TYPE_FAILED) { + lf_print_error_and_exit("RTI has failed."); + } else if (buffer[0] == MSG_TYPE_UPSTREAM_CONNECTED) { + // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP_START to arrive + handle_upstream_connected_message(); + continue; + } else if (buffer[0] == MSG_TYPE_UPSTREAM_DISCONNECTED) { + // We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP_START to arrive + handle_upstream_disconnected_message(); + continue; + } else { + lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from the RTI. Got %u (see net_common.h).", + buffer[0]); + } + } else { + read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, buffer_length - 1, buffer + 1, NULL, + "Failed to read MSG_TYPE_TIMESTAMP_START message from RTI."); + break; } - lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", - buffer[0]); } instant_t timestamp = extract_int64(&(buffer[1])); + if (_fed.is_transient) { + effective_start_tag = extract_tag(&(buffer[9])); + } else { + effective_start_tag = (tag_t){.time = timestamp, .microstep = 0u}; + } - tag_t tag = {.time = timestamp, .microstep = 0}; - // Trace the event when tracing is enabled - tracepoint_federate_from_rti(receive_TIMESTAMP, _lf_my_fed_id, &tag); - lf_print("Starting timestamp is: " PRINTF_TIME ".", timestamp); + // Trace the event when tracing is enabled. + // Note that we report in the trace the effective_start_tag. + // This is rather a choice. To be changed, if needed, of course. + tracepoint_federate_from_rti(receive_TIMESTAMP, _lf_my_fed_id, &effective_start_tag); LF_PRINT_LOG("Current physical time is: " PRINTF_TIME ".", lf_time_physical()); return timestamp; @@ -991,7 +1079,7 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) { * a notification of this update, which may unblock whichever worker * thread is trying to advance time. * - * @note This function is very similar to handle_provisinal_tag_advance_grant() except that + * @note This function is very similar to handle_provisional_tag_advance_grant() except that * it sets last_TAG_was_provisional to false. */ static void handle_tag_advance_grant(void) { @@ -1233,7 +1321,8 @@ static void* update_ports_from_staa_offsets(void* args) { * * @note This function is similar to handle_tag_advance_grant() except that * it sets last_TAG_was_provisional to true and also it does not update the - * last known tag for input ports. + * last known tag for input ports unless there is an upstream federate that is + * disconnected. */ static void handle_provisional_tag_advance_grant() { // Environment is always the one corresponding to the top-level scheduling enclave. @@ -1270,6 +1359,12 @@ static void handle_provisional_tag_advance_grant() { env->current_tag.time - start_time, env->current_tag.microstep, _fed.last_TAG.time - start_time, _fed.last_TAG.microstep); + for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) { + if (_lf_zero_delay_cycle_upstream_disconnected[i]) { + update_last_known_status_on_action(env, _lf_zero_delay_cycle_action_table[i], PTAG); + } + } + // Even if we don't modify the event queue, we need to broadcast a change // because we do not need to continue to wait for a TAG. lf_cond_broadcast(&env->event_q_changed); @@ -1370,6 +1465,20 @@ static void handle_stop_granted_message() { } } +/** + * @brief Handle a MSG_TYPE_STOP message from the RTI. + * + * This function simply calls lf_stop(). + */ +void handle_stop() { + // Trace the event when tracing is enabled + tracepoint_federate_from_rti(receive_STOP, _lf_my_fed_id, NULL); + + lf_print("Received from RTI a MSG_TYPE_STOP at physical time " PRINTF_TIME ".", lf_time_physical()); + + lf_stop(); +} + /** * Handle a MSG_TYPE_STOP_REQUEST message from the RTI. */ @@ -1586,6 +1695,9 @@ static void* listen_to_rti_TCP(void* args) { case MSG_TYPE_STOP_GRANTED: handle_stop_granted_message(); break; + case MSG_TYPE_STOP: + handle_stop(); + break; case MSG_TYPE_PORT_ABSENT: if (handle_port_absent_message(&_fed.socket_TCP_RTI, -1)) { // Failures to complete the read of absent messages from the RTI are fatal. @@ -1598,6 +1710,12 @@ static void* listen_to_rti_TCP(void* args) { case MSG_TYPE_FAILED: handle_rti_failed_message(); break; + case MSG_TYPE_UPSTREAM_CONNECTED: + handle_upstream_connected_message(); + break; + case MSG_TYPE_UPSTREAM_DISCONNECTED: + handle_upstream_disconnected_message(); + break; case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", _lf_my_fed_id); @@ -1819,15 +1937,16 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { break; } // Connect was successful. - size_t buffer_length = 1 + sizeof(uint16_t) + 1; + size_t buffer_length = 1 + sizeof(uint16_t) + 1 + 1; unsigned char buffer[buffer_length]; buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; if (_lf_my_fed_id == UINT16_MAX) { lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); } encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(buffer[1])); + buffer[1 + sizeof(uint16_t)] = _fed.is_transient ? 1 : 0; unsigned char federation_id_length = (unsigned char)strnlen(federation_metadata.federation_id, 255); - buffer[sizeof(uint16_t) + 1] = federation_id_length; + buffer[sizeof(uint16_t) + 2] = federation_id_length; // Trace the event when tracing is enabled tracepoint_federate_to_federate(send_FED_ID, _lf_my_fed_id, remote_federate_id, NULL); @@ -1881,7 +2000,7 @@ void lf_connect_to_rti(const char* hostname, int port) { while (!CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT) && !_lf_termination_executed) { // Have connected to an RTI, but not sure it's the right RTI. - // Send a MSG_TYPE_FED_IDS message and wait for a reply. + // Send a MSG_TYPE_FED_IDS or MSG_TYPE_TRANSIENT_FED_IDS message and wait for a reply. // Notify the RTI of the ID of this federate and its federation. #ifdef FEDERATED_AUTHENTICATED @@ -1898,9 +2017,14 @@ void lf_connect_to_rti(const char* hostname, int port) { LF_PRINT_LOG("Connected to an RTI. Sending federation ID for authentication."); #endif + unsigned char buffer[5]; // Send the message type first. - unsigned char buffer[4]; - buffer[0] = MSG_TYPE_FED_IDS; + if (_fed.is_transient) { + buffer[0] = MSG_TYPE_TRANSIENT_FED_IDS; + } else { + buffer[0] = MSG_TYPE_FED_IDS; + } + // Next send the federate ID. if (_lf_my_fed_id == UINT16_MAX) { lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); @@ -1914,8 +2038,14 @@ void lf_connect_to_rti(const char* hostname, int port) { // Trace the event when tracing is enabled tracepoint_federate_to_rti(send_FED_ID, _lf_my_fed_id, NULL); + size_t size = 1 + sizeof(uint16_t) + 1; + if (_fed.is_transient) { + // Next send the federate type (persistent or transient) + buffer[2 + sizeof(uint16_t)] = _fed.is_transient ? 1 : 0; + size++; + } // No need for a mutex here because no other threads are writing to this socket. - if (write_to_socket(_fed.socket_TCP_RTI, 2 + sizeof(uint16_t), buffer)) { + if (write_to_socket(_fed.socket_TCP_RTI, size, buffer)) { continue; // Try again, possibly on a new port. } @@ -1954,8 +2084,12 @@ void lf_connect_to_rti(const char* hostname, int port) { } else if (response == MSG_TYPE_RESIGN) { lf_print_warning("RTI resigned. Will try again"); continue; + } else if (response == MSG_TYPE_UPSTREAM_CONNECTED) { + handle_upstream_connected_message(); + } else if (response == MSG_TYPE_UPSTREAM_DISCONNECTED) { + handle_upstream_disconnected_message(); } else { - lf_print_warning("RTI gave unexpect response %u. Will try again", response); + lf_print_warning("RTI on port %d gave unexpected response %u. Will try again", port, response); continue; } } @@ -2037,7 +2171,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { } LF_PRINT_LOG("Accepted new connection from remote federate."); - size_t header_length = 1 + sizeof(uint16_t) + 1; + size_t header_length = 1 + sizeof(uint16_t) + 1 + 1; unsigned char buffer[header_length]; int read_failed = read_from_socket(socket_id, header_length, (unsigned char*)&buffer); if (read_failed || buffer[0] != MSG_TYPE_P2P_SENDING_FED_ID) { @@ -2078,7 +2212,12 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Extract the ID of the sending federate. uint16_t remote_fed_id = extract_uint16((unsigned char*)&(buffer[1])); - LF_PRINT_DEBUG("Received sending federate ID %d.", remote_fed_id); + bool remote_fed_is_transient = buffer[1 + sizeof(uint16_t)]; + if (remote_fed_is_transient) { + LF_PRINT_DEBUG("Received sending federate ID %d, which is transient.", remote_fed_id); + } else { + LF_PRINT_DEBUG("Received sending federate ID %d, which is persistent.", remote_fed_id); + } // Trace the event when tracing is enabled tracepoint_federate_to_federate(receive_FED_ID, _lf_my_fed_id, remote_fed_id, NULL); @@ -2593,6 +2732,10 @@ void lf_synchronize_with_other_federates(void) { // Reset the start time to the coordinated start time for all federates. // Note that this does not grant execution to this federate. start_time = get_start_time_from_rti(lf_time_physical()); + + lf_print("Starting timestamp is: " PRINTF_TIME " and effective start tag is: " PRINTF_TAG ".", lf_time_start(), + effective_start_tag.time - lf_time_start(), effective_start_tag.microstep); + lf_tracing_set_start_time(start_time); // Start a thread to listen for incoming TCP messages from the RTI. @@ -2644,13 +2787,20 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) { _lf_action_delay_table[i])) <= 0)) { continue; } +#else + // For centralized coordination, if there is an upstream transient federate that is not + // connected, then we don't want to block on its action. + if (_lf_zero_delay_cycle_upstream_disconnected[i]) { + // Mark the action known up to and including the current tag. It is absent. + update_last_known_status_on_action(env, input_port_action, env->current_tag); + } #endif // FEDERATED_DECENTRALIZED - // If the current tag is greater than the last known status tag of the input port, - // and the input port is not physical, then block on that port by ensuring - // the MLAA is no greater than the level of that port. - // For centralized coordination, this is applied only to input ports coming from - // federates that are in a ZDC. For decentralized coordination, this is applied - // to all input ports. + // If the current tag is greater than the last known status tag of the input port, + // and the input port is not physical, then block on that port by ensuring + // the MLAA is no greater than the level of that port. + // For centralized coordination, this is applied only to input ports coming from + // federates that are in a ZDC. For decentralized coordination, this is applied + // to all input ports. if (lf_tag_compare(env->current_tag, input_port_action->trigger->last_known_status_tag) > 0 && !input_port_action->trigger->is_physical) { max_level_allowed_to_advance = @@ -2662,6 +2812,32 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) { return (prev_max_level_allowed_to_advance != max_level_allowed_to_advance); } +void lf_stop() { + environment_t* env; + int num_env = _lf_get_environments(&env); + + for (int i = 0; i < num_env; i++) { + LF_MUTEX_LOCK(&env[i].mutex); + + tag_t new_stop_tag; + new_stop_tag.time = env[i].current_tag.time; + new_stop_tag.microstep = env[i].current_tag.microstep + 1; + + lf_set_stop_tag(&env[i], new_stop_tag); + + LF_PRINT_LOG("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time, + env[i].stop_tag.microstep); + + if (env[i].barrier.requestors) + _lf_decrement_tag_barrier_locked(&env[i]); + lf_cond_broadcast(&env[i].event_q_changed); + LF_MUTEX_UNLOCK(&env[i].mutex); + } + LF_PRINT_LOG("Federate is stopping."); +} + +const char* lf_get_federation_id() { return federation_metadata.federation_id; } + #ifdef FEDERATED_DECENTRALIZED instant_t lf_wait_until_time(tag_t tag) { instant_t result = tag.time; // Default. diff --git a/core/tag.c b/core/tag.c index 10e3f282f..d534af209 100644 --- a/core/tag.c +++ b/core/tag.c @@ -32,6 +32,12 @@ typedef enum _lf_time_type { LF_LOGICAL, LF_PHYSICAL, LF_ELAPSED_LOGICAL, LF_ELA // Global variables declared in tag.h: instant_t start_time = NEVER; +/** + * Only useful for transient federates. It records the effective start tag, to + * be used at startup. Elapsed logical time calculations will use start_time. + */ +tag_t effective_start_tag = {.time = 0LL, .microstep = 0}; + //////////////// Functions declared in tag.h tag_t lf_tag(void* env) { @@ -173,6 +179,8 @@ instant_t lf_time_physical_elapsed(void) { return lf_time_physical() - start_tim instant_t lf_time_start(void) { return start_time; } +tag_t lf_tag_start_effective(void) { return effective_start_tag; } + size_t lf_readable_time(char* buffer, instant_t time) { if (time <= (instant_t)0) { snprintf(buffer, 2, "0"); diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 493bd5a3e..540b7344e 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -34,6 +34,7 @@ // Global variables defined in tag.c and shared across environments: extern instant_t start_time; +extern tag_t effective_start_tag; /** * The maximum amount of time a worker thread should stall @@ -569,12 +570,12 @@ void _lf_initialize_start_tag(environment_t* env) { // statuses to unknown lf_reset_status_fields_on_input_port_triggers(); - // Get a start_time from the RTI + // Get a start_time and effective_start_tag from the RTI lf_synchronize_with_other_federates(); // Resets start_time in federated execution according to the RTI. } // The start time will likely have changed. Adjust the current tag and stop tag. - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + env->current_tag = effective_start_tag; if (duration >= 0LL) { // A duration has been specified. Recalculate the stop time. env->stop_tag = ((tag_t){.time = start_time + duration, .microstep = 0}); @@ -592,25 +593,25 @@ void _lf_initialize_start_tag(environment_t* env) { // To use uniform code below, we define it here as a local variable. instant_t lf_fed_STA_offset = 0; #endif - LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", start_time); - - // Wait until the start time. This is required for federates because the startup procedure - // in lf_synchronize_with_other_federates() can decide on a new start_time that is - // larger than the current physical time. - // This wait_until() is deliberately called after most precursor operations - // for tag (0,0) are performed (e.g., injecting startup reactions, etc.). - // This has two benefits: First, the startup overheads will reduce - // the required waiting time. Second, this call releases the mutex lock and allows - // other threads (specifically, federate threads that handle incoming p2p messages - // from other federates) to hold the lock and possibly raise a tag barrier. - while (!wait_until(start_time, &env->event_q_changed)) { + LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", effective_start_tag.time); + + // Wait until the effective start time. This is required for federates because the startup procedure + // in lf_synchronize_with_other_federates() can decide on a new start_time, or the effective start time if it is a + // transient federate, that is larger than the current physical time. + // This wait_until() is deliberately called after most precursor operations for tag (0,0), or effective_start_tag,q + // are performed (e.g., injecting startup reactions, etc.). This has two benefits: First, the startup overheads will + // reduce the required waiting time. Second, this call releases the mutex lock and allows other threads (specifically, + // federate threads that handle incoming p2p messages from other federates) to hold the lock and possibly raise a tag + // barrier. + while (!wait_until(effective_start_tag.time, &env->event_q_changed)) { }; - LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + lf_fed_STA_offset); + LF_PRINT_DEBUG("Done waiting for effective start time + STA offset " PRINTF_TIME ".", + effective_start_tag.time + lf_fed_STA_offset); LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be close to the STA offset.", - lf_time_physical() - start_time); + lf_time_physical() - effective_start_tag.time); - // Restore the current tag to match the start time. - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + // Restore the current tag to match the effective start time. + env->current_tag = (tag_t){.time = effective_start_tag.time, .microstep = effective_start_tag.microstep}; // If the stop_tag is (0,0), also insert the shutdown // reactions. This can only happen if the timeout time @@ -627,7 +628,7 @@ void _lf_initialize_start_tag(environment_t* env) { // from exceeding the timestamp of the message. It will remove that barrier // once the complete message has been read. Here, we wait for that barrier // to be removed, if appropriate before proceeding to executing tag (0,0). - _lf_wait_on_tag_barrier(env, (tag_t){.time = start_time, .microstep = 0}); + _lf_wait_on_tag_barrier(env, effective_start_tag); lf_spawn_staa_thread(); #else // NOT FEDERATED_DECENTRALIZED @@ -1018,6 +1019,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // Initialize the clock through the platform API. No reading of physical time before this. _lf_initialize_clock(); start_time = lf_time_physical(); + effective_start_tag = (tag_t){.time = start_time, .microstep = 0}; #ifndef FEDERATED lf_tracing_set_start_time(start_time); #endif diff --git a/core/utils/pqueue_tag.c b/core/utils/pqueue_tag.c index c1abe35ba..57e52ef5c 100644 --- a/core/utils/pqueue_tag.c +++ b/core/utils/pqueue_tag.c @@ -159,3 +159,14 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t) { } void pqueue_tag_dump(pqueue_tag_t* q) { pqueue_dump((pqueue_t*)q, pqueue_tag_print_element); } + +tag_t pqueue_tag_max_tag(pqueue_tag_t* q) { + tag_t result = NEVER_TAG; + for (size_t i = 1; i < q->size; i++) { + pqueue_tag_element_t* element = (pqueue_tag_element_t*)(q->d[i]); + if (lf_tag_compare(element->tag, result) > 0) { + result = element->tag; + } + } + return result; +} diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index e7697f259..cec2803ce 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -191,6 +191,12 @@ typedef struct federate_instance_t { */ instant_t min_delay_from_physical_action_to_federate_output; + /** + * Indicator of whether this federate is transient. + * The default value of false may be overridden in _lf_initialize_trigger_objects. + */ + bool is_transient; + #ifdef FEDERATED_DECENTRALIZED /** * Thread responsible for setting ports to absent by an STAA offset if they @@ -486,6 +492,11 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int */ void lf_set_federation_id(const char* fid); +/** + * @brief Return the federation id. + */ +const char* lf_get_federation_id(); + #ifdef FEDERATED_DECENTRALIZED /** * @brief Spawn a thread to iterate through STAA structs. diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 36a5dcac4..d6b214221 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -44,10 +44,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * When it has successfully opened a TCP connection, the first message it sends * to the RTI is a MSG_TYPE_FED_IDS message, which contains the ID of this federate * within the federation, contained in the global variable _lf_my_fed_id - * in the federate code - * (which is initialized by the code generator) and the unique ID of - * the federation, a GUID that is created at run time by the generated script - * that launches the federation. + * in the federate code (which is initialized by the code generator), + * the type of this federate (persistent (0) or transient (1)), + * and the unique ID of the federation, a GUID that is created at run time by the + * generated script that launches the federation. * If you launch the federates and the RTI manually, rather than using the script, * then the federation ID is a string that is optionally given to the federate * on the command line when it is launched. The federate will connect @@ -237,17 +237,15 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_UDP_PORT 254 -/** Byte identifying a message from a federate to an RTI containing - * the federation ID and the federate ID. The message contains, in - * this order: +/** Byte identifying a message from a (persistent) federate to an RTI containing + * the federate ID and the federation ID. The message contains, in this order: * * One byte equal to MSG_TYPE_FED_IDS. * * Two bytes (ushort) giving the federate ID. * * One byte (uchar) giving the length N of the federation ID. * * N bytes containing the federation ID. - * Each federate needs to have a unique ID between 0 and - * NUMBER_OF_FEDERATES-1. - * Each federate, when starting up, should send this message - * to the RTI. This is its first message to the RTI. + * Each federate needs to have a unique ID between 0 and NUMBER_OF_FEDERATES-1. + * Each federate, when starting up, should send either this message, or MSG_TYPE_TRANSIENT_FED_IDS + * to the RTI, as its first message to the RTI. * The RTI will respond with either MSG_TYPE_REJECT, MSG_TYPE_ACK, or MSG_TYPE_UDP_PORT. * If the federate is a C target LF program, the generated federate * code does this by calling lf_synchronize_with_other_federates(), @@ -255,6 +253,23 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_FED_IDS 1 +/** Byte identifying a message from a transient federate to an RTI containing + * the federate ID and the federation ID. The message contains, in this order: + * * One byte equal to MSG_TYPE_TRANSIENT_FED_IDS. + * * Two bytes (ushort) giving the federate ID. + * * One byte (uchar) giving the length N of the federation ID. + * * One byte giving the type of the federate (1 if transient, 0 if persistent) + * * N bytes containing the federation ID. + * Each federate needs to have a unique ID between 0 and NUMBER_OF_FEDERATES-1. + * Each federate, when starting up, should send either this message, or MSG_TYPE_FED_IDS + * to the RTI, as its first message to the RTI. + * The RTI will respond with either MSG_TYPE_REJECT, MSG_TYPE_ACK, or MSG_TYPE_UDP_PORT. + * If the federate is a C target LF program, the generated federate + * code does this by calling lf_synchronize_with_other_federates(), + * passing to it its federate ID. + */ +#define MSG_TYPE_TRANSIENT_FED_IDS 103 + /////////// Messages used for authenticated federation. /////////////// /** * Byte identifying a message from a federate to an RTI containing @@ -307,11 +322,13 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /** * Byte identifying a timestamp message, which is 64 bits long. * Each federate sends its starting physical time as a message of this - * type, and the RTI broadcasts to all the federates the starting logical + * type, and the RTI broadcasts to all persistent federates the starting * time as a message of this type. - s*/ + * In case of a joining federate, the RTI will also send the effective start tag. + */ #define MSG_TYPE_TIMESTAMP 2 -#define MSG_TYPE_TIMESTAMP_LENGTH (1 + sizeof(int64_t)) +#define MSG_TYPE_TIMESTAMP_LENGTH (1 + sizeof(instant_t)) +#define MSG_TYPE_TIMESTAMP_TAG_LENGTH (1 + sizeof(instant_t) + sizeof(tag_t)) /** Byte identifying a message to forward to another federate. * The next two bytes will be the ID of the destination port. @@ -630,32 +647,48 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG 26 ///////////////////////////////////////////// -//// Rejection codes +//// Transient federate support /** - * These codes are sent in a MSG_TYPE_REJECT message. - * They are limited to one byte (uchar). + * A message the informs a downstream federate that a federate upstream of it + * is connected. The next 2 bytes are the federate ID of the upstream federate. */ +#define MSG_TYPE_UPSTREAM_CONNECTED 27 +#define MSG_TYPE_UPSTREAM_CONNECTED_LENGTH (1 + sizeof(uint16_t)) -/** Federation ID does not match. */ -#define FEDERATION_ID_DOES_NOT_MATCH 1 - -/** Federate with the specified ID has already joined. */ -#define FEDERATE_ID_IN_USE 2 - -/** Federate ID out of range. */ -#define FEDERATE_ID_OUT_OF_RANGE 3 - -/** Incoming message is not expected. */ -#define UNEXPECTED_MESSAGE 4 +/** + * A message the informs a downstream federate that a federate upstream of it + * is no longer connected. The next 2 bytes are the federate ID of the upstream federate. + */ +#define MSG_TYPE_UPSTREAM_DISCONNECTED 28 +#define MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t)) -/** Connected to the wrong server. */ -#define WRONG_SERVER 5 +/** + * Byte sent by the RTI ordering the federate to stop. Upon receiving the message, + * the federate will call lf_stop(), which will make it resign at its current_tag + * plus 1 microstep. + * The next 8 bytes will be the time at which the federates will stop. + * The next 4 bytes will be the microstep at which the federates will stop.. + */ +#define MSG_TYPE_STOP 29 +#define MSG_TYPE_STOP_LENGTH 1 -/** HMAC authentication failed. */ -#define HMAC_DOES_NOT_MATCH 6 +///////////////////////////////////////////// +//// Rejection codes -/** RTI not executed using -a or --auth option. */ -#define RTI_NOT_EXECUTED_WITH_AUTH 7 +/** + * These codes are sent in a MSG_TYPE_REJECT message. + * They are limited to one byte (uchar). + */ +typedef enum { + FEDERATION_ID_DOES_NOT_MATCH = 1, + FEDERATE_ID_IN_USE = 2, + FEDERATE_ID_OUT_OF_RANGE = 3, + UNEXPECTED_MESSAGE = 4, + WRONG_SERVER = 5, + HMAC_DOES_NOT_MATCH = 6, + RTI_NOT_EXECUTED_WITH_AUTH = 7, + JOINING_TOO_LATE = 8 +} rejection_code_t; #endif /* NET_COMMON_H */ diff --git a/include/core/utils/pqueue_tag.h b/include/core/utils/pqueue_tag.h index e06e074be..907428cac 100644 --- a/include/core/utils/pqueue_tag.h +++ b/include/core/utils/pqueue_tag.h @@ -216,4 +216,11 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t); */ void pqueue_tag_dump(pqueue_tag_t* q); +/** + * @brief Return the maximum tag in the queue or NEVER_TAG if the queue is empty. + * + * @param q The queue. + */ +tag_t pqueue_tag_max_tag(pqueue_tag_t* q); + #endif // PQUEUE_TAG_H diff --git a/include/core/utils/util.h b/include/core/utils/util.h index 77b7b767d..34ce8a301 100644 --- a/include/core/utils/util.h +++ b/include/core/utils/util.h @@ -194,4 +194,14 @@ void lf_vprint_error_and_exit(const char* format, va_list args) ATTRIBUTE_FORMAT */ #define LF_CRITICAL_SECTION_EXIT(env) LF_ASSERT(!lf_critical_section_exit(env), "Could not exit critical section") +/** + * @brief Stop the execution of a federate. + * Every enclave within the federate will stop at one microstep later than its + * current tag. Unlike lf_request_stop(), this process does not require any + * involvement from the RTI, nor does it necessitate any consensus. + * + * This function is particularly useful for testing transient federates. + */ +void lf_stop(); + #endif /* UTIL_H */ diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 8b25206ff..030a049b9 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master \ No newline at end of file +transient-fed \ No newline at end of file diff --git a/tag/api/tag.h b/tag/api/tag.h index f6243b552..63ffcf389 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -218,6 +218,14 @@ instant_t lf_time_physical_elapsed(void); */ instant_t lf_time_start(void); +/** + * Return the tag at which the execution effectively started. + * Most of the time, this will default to {.time = start_time, .microstep: 0}. + * When the reactor is a transient federate, however, the value will be different. + * @return A tag. + */ +tag_t lf_tag_start_effective(void); + /** * For user-friendly reporting of time values, the buffer length required. * This is calculated as follows, based on 64-bit time in nanoseconds: diff --git a/test/general/utils/pqueue_test.c b/test/general/utils/pqueue_test.c index 665c4e13f..18b3009a8 100644 --- a/test/general/utils/pqueue_test.c +++ b/test/general/utils/pqueue_test.c @@ -23,6 +23,8 @@ static void insert_on_queue(pqueue_tag_t* q) { assert(!pqueue_tag_insert_tag(q, t2)); assert(!pqueue_tag_insert_tag(q, t3)); + assert(lf_tag_compare(pqueue_tag_max_tag(q), t1) == 0); + assert(!pqueue_tag_insert_if_no_match(q, t4)); assert(pqueue_tag_insert_if_no_match(q, t1)); assert(pqueue_tag_insert_if_no_match(q, t4)); diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index b7cc7be0c..af1b91ec9 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -74,6 +74,8 @@ typedef enum { receive_ADR_QR, receive_DNET, receive_UNIDENTIFIED, + send_STOP, + receive_STOP, NUM_EVENT_TYPES } trace_event_t; @@ -139,6 +141,8 @@ static const char* trace_event_names[] = { "Receiving ADR_QR", "Receiving DNET", "Receiving UNIDENTIFIED", + "Sending STOP", + "Receiving STOP", }; static inline void _suppress_unused_variable_warning_for_static_variable() { (void)trace_event_names; } diff --git a/trace/impl/src/trace_impl.c b/trace/impl/src/trace_impl.c index f819507c4..4e880407c 100644 --- a/trace/impl/src/trace_impl.c +++ b/trace/impl/src/trace_impl.c @@ -260,10 +260,37 @@ void lf_tracing_global_init(char* process_name, char* process_names, int fedid, } process_id = fedid; char filename[100]; + + // When tracing transient federates, a new trace file is created for each execution. For this, the function + // checks for file existance. If the file exists, the function appends a number to the file name and checks + // again. + int iter = 0; + bool file_exists = false; + bool new_file = false; if (strcmp(process_name, "rti") == 0) { sprintf(filename, "%s.lft", process_name); } else { - sprintf(filename, "%s_%d.lft", process_name, process_id); + FILE* file; + do { + if (iter == 0) { + sprintf(filename, "%s_%d.lft", process_name, process_id); + } else { + sprintf(filename, "%s_%d_%d.lft", process_name, process_id, iter); + } + file = fopen(filename, "r"); + if (file) { + file_exists = true; + new_file = true; + fclose(file); + iter++; + } else { + file_exists = false; + } + } while (file_exists); + } + if (new_file) { + lf_print_warning("No overwriting! The default file name already exists. A new trace file named %s is created.", + filename); } trace_new(filename); start_trace(&trace, max_num_local_threads); diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index a680d27c4..8107d45bf 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -29,6 +29,7 @@ .DNET { stroke: purple; fill: purple} \ .TIMESTAMP { stroke: grey; fill: grey } \ .FED_ID {stroke: #80DD99; fill: #80DD99 } \ + .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ .ADV {stroke-linecap="round" ; stroke: "red" ; fill: "red"} \ text { \ font-size: smaller; \ @@ -86,7 +87,9 @@ "Receiving ADR_QR": "ADR_QR", "Receiving DNET": "DNET", "Receiving UNIDENTIFIED": "UNIDENTIFIED", - "Scheduler advancing time ends": "AdvLT" + "Scheduler advancing time ends": "AdvLT", + "Sending STOP": "STOP", + "Receiving STOP": "STOP" } prune_event_name.setdefault(" ", "UNIDENTIFIED") @@ -113,7 +116,7 @@ # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of tagged events (messages) -non_tagged_messages = {'FED_ID', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_AD', 'MSG', 'P2P_MSG'} +non_tagged_messages = {'FED_ID', 'ACK', 'RESIGN', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} ################################################################################ @@ -212,7 +215,6 @@ def svg_string_draw_label(x1, y1, x2, y2, label) : else: rotation = 0 str_line = '\t'+label+'\n' - #print('rot = '+str(rotation)+' x1='+str(x1)+' y1='+str(y1)+' x2='+str(x2)+' y2='+str(y2)) return str_line @@ -507,11 +509,17 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files, start_time, end if (not fed_df.empty): # Get the federate id number fed_id = fed_df.iloc[-1]['self_id'] - # Add to the list of sequence diagram actors and add the name - actors.append(fed_id) - actors_names[fed_id] = Path(fed_trace).stem - # Derive the x coordinate of the actor - x_coor[fed_id] = (padding * 2) + (spacing * (len(actors) - 1)) + + ### Check that the federate id have not been entrered yet. + ### This is particlurly useful for transient actors, when + ### they leave and join several times + if (actors.count(fed_id) == 0): + # Add to the list of sequence diagram actors and add the name + actors.append(fed_id) + actors_names[fed_id] = Path(fed_trace).stem + # Derive the x coordinate of the actor + x_coor[fed_id] = (padding * 2) + (spacing * (len(actors)-1)) + fed_df['x1'] = x_coor[fed_id] trace_df = pd.concat([trace_df, fed_df]) fed_df = fed_df[0:0] @@ -675,7 +683,7 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files, start_time, end # FIXME: Using microseconds is hardwired here. physical_time = f'{int(row["physical_time"]/1000):,}' - if (row['event'] in {'FED_ID', 'ACK', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_AD', 'MSG', 'P2P_MSG'}): + if (row['event'] in {'FED_ID', 'ACK', 'FAILED', 'REJECT', 'ADR_QR', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'}): label = row['event'] else: label = row['event'] + '(' + f'{int(row["logical_time"]):,}' + ', ' + str(row['microstep']) + ')'