Skip to content

Commit

Permalink
prov/efa: Clean up rxe map during rxe release
Browse files Browse the repository at this point in the history
rxe can be inserted to rxe map when receiver
gets a multi-req pkts (medium, runting). When the
receive ep is closed early before the receive
completes, the stale entry in the rxe map will
cause an assertion error when destroying
the rxe_map buffer pool during ep close. This
patch fixes this issue by recording the rxe_map
insertion status in rxe, and removing the rxe
from the map during the rxe release.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Feb 19, 2025
1 parent 75c559e commit 09f1124
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 22 deletions.
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct efa_rdm_ope *efa_rdm_ep_alloc_rxe(struct efa_rdm_ep *ep, fi_addr_t addr,
rxe->op = op;
rxe->peer_rxe = NULL;
rxe->unexp_pkt = NULL;
rxe->rxe_map = NULL;
rxe->atomrsp_data = NULL;
rxe->bytes_read_total_len = 0;

Expand Down
4 changes: 2 additions & 2 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_msgrtm(struct efa_rdm_ep *ep,

pkt_type = efa_rdm_pke_get_base_hdr(*pkt_entry_ptr)->type;
if (efa_rdm_pkt_type_is_mulreq(pkt_type))
efa_rdm_rxe_map_insert(&ep->rxe_map, *pkt_entry_ptr, rxe);
efa_rdm_rxe_map_insert(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(*pkt_entry_ptr), (*pkt_entry_ptr)->addr, rxe);

return rxe;
}
Expand Down Expand Up @@ -874,7 +874,7 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_tagrtm(struct efa_rdm_ep *ep,

pkt_type = efa_rdm_pke_get_base_hdr(*pkt_entry_ptr)->type;
if (efa_rdm_pkt_type_is_mulreq(pkt_type))
efa_rdm_rxe_map_insert(&ep->rxe_map, *pkt_entry_ptr, rxe);
efa_rdm_rxe_map_insert(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(*pkt_entry_ptr), (*pkt_entry_ptr)->addr, rxe);

return rxe;
}
Expand Down
9 changes: 3 additions & 6 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe)

dlist_remove(&rxe->ep_entry);

if (rxe->rxe_map)
efa_rdm_rxe_map_remove(rxe->rxe_map, rxe->msg_id, rxe->addr, rxe);

for (i = 0; i < rxe->iov_count; i++) {
if (rxe->mr[i]) {
err = fi_close((struct fid *)rxe->mr[i]);
Expand Down Expand Up @@ -1096,12 +1099,6 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope)
efa_rdm_rxe_report_completion(rxe);
}

if (ope->internal_flags & EFA_RDM_OPE_READ_NACK) {
assert(ope->type == EFA_RDM_RXE);
/* Apply to both DC and non-DC */
efa_rdm_rxe_map_remove(&ope->ep->rxe_map, ope->msg_id, ope->peer->efa_fiaddr, ope);
}

/* As can be seen, this function does not release rxe when
* efa_rdm_ope_post_send_or_queue() was successful.
*
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ struct efa_rdm_ope {
struct efa_rdm_pke *unexp_pkt;
char *atomrsp_data;
enum efa_rdm_cuda_copy_method cuda_copy_method;
/* the rxe_map that the rxe is ever inserted */
struct efa_rdm_rxe_map *rxe_map;
/* end of RX related variables */
/* the following variables are for TX operation only */
uint64_t bytes_acked;
Expand Down
6 changes: 3 additions & 3 deletions prov/efa/src/rdm/efa_rdm_pke_rtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ ssize_t efa_rdm_pke_proc_msgrtm(struct efa_rdm_pke *pkt_entry)

rtm_hdr = (struct efa_rdm_rtm_base_hdr *)pkt_entry->wiredata;
if (rtm_hdr->flags & EFA_RDM_REQ_READ_NACK) {
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, pkt_entry);
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr);
rxe->internal_flags |= EFA_RDM_OPE_READ_NACK;
} else {
rxe = efa_rdm_msg_alloc_rxe_for_msgrtm(ep, &pkt_entry);
Expand Down Expand Up @@ -329,7 +329,7 @@ ssize_t efa_rdm_pke_proc_tagrtm(struct efa_rdm_pke *pkt_entry)

rtm_hdr = (struct efa_rdm_rtm_base_hdr *) pkt_entry->wiredata;
if (rtm_hdr->flags & EFA_RDM_REQ_READ_NACK) {
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, pkt_entry);
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr);
rxe->internal_flags |= EFA_RDM_OPE_READ_NACK;
} else {
rxe = efa_rdm_msg_alloc_rxe_for_tagrtm(ep, &pkt_entry);
Expand Down Expand Up @@ -446,7 +446,7 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry)
struct efa_rdm_ope *rxe;
struct efa_rdm_pke *unexp_pkt_entry;

rxe = efa_rdm_rxe_map_lookup(&pkt_entry->ep->rxe_map, pkt_entry);
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr);
if (rxe) {
if (rxe->state == EFA_RDM_RXE_MATCHED) {
pkt_entry->ope = rxe;
Expand Down
3 changes: 2 additions & 1 deletion prov/efa/src/rdm/efa_rdm_pke_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "efa_rdm_pke.h"
#include "efa_rdm_protocol.h"
#include "efa_rdm_pkt_type.h"
#include "efa_rdm_pke_rtm.h"
#include "efa_mr.h"

/**
Expand Down Expand Up @@ -160,7 +161,7 @@ efa_rdm_pke_post_remote_read_or_nack(struct efa_rdm_ep *ep,
}

if (efa_rdm_pkt_type_is_rtm(pkt_type)) {
efa_rdm_rxe_map_insert(&ep->rxe_map, pkt_entry, rxe);
efa_rdm_rxe_map_insert(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr, rxe);
}

return efa_rdm_ope_post_send_or_queue(rxe, EFA_RDM_READ_NACK_PKT);
Expand Down
19 changes: 11 additions & 8 deletions prov/efa/src/rdm/efa_rdm_rxe_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* pointer to an RX entry. If such RX entry does not exist, return NULL
*/
struct efa_rdm_ope *efa_rdm_rxe_map_lookup(struct efa_rdm_rxe_map *rxe_map,
struct efa_rdm_pke *pkt_entry)
uint64_t msg_id, fi_addr_t addr)
{
struct efa_rdm_rxe_map_entry *entry = NULL;
struct efa_rdm_rxe_map_key key;

memset(&key, 0, sizeof(key));
key.msg_id = efa_rdm_pke_get_rtm_msg_id(pkt_entry);
key.addr = pkt_entry->addr;
key.msg_id = msg_id;
key.addr = addr;
HASH_FIND(hh, rxe_map->head, &key, sizeof(struct efa_rdm_rxe_map_key), entry);
return entry ? entry->rxe : NULL;
}
Expand All @@ -39,22 +39,22 @@ struct efa_rdm_ope *efa_rdm_rxe_map_lookup(struct efa_rdm_rxe_map *rxe_map,
* @param[in] rxe RX entry
*/
void efa_rdm_rxe_map_insert(struct efa_rdm_rxe_map *rxe_map,
struct efa_rdm_pke *pkt_entry,
uint64_t msg_id, fi_addr_t addr,
struct efa_rdm_ope *rxe)
{
struct efa_rdm_rxe_map_entry *entry;

entry = ofi_buf_alloc(pkt_entry->ep->map_entry_pool);
entry = ofi_buf_alloc(rxe->ep->map_entry_pool);
if (OFI_UNLIKELY(!entry)) {
EFA_WARN(FI_LOG_CQ,
"Map entries for medium size message exhausted.\n");
efa_base_ep_write_eq_error(&pkt_entry->ep->base_ep, FI_ENOBUFS, FI_EFA_ERR_RXE_POOL_EXHAUSTED);
efa_base_ep_write_eq_error(&rxe->ep->base_ep, FI_ENOBUFS, FI_EFA_ERR_RXE_POOL_EXHAUSTED);
return;
}

memset(&entry->key, 0, sizeof(entry->key));
entry->key.msg_id = efa_rdm_pke_get_rtm_msg_id(pkt_entry);
entry->key.addr = pkt_entry->addr;
entry->key.msg_id = msg_id;
entry->key.addr = addr;

#if ENABLE_DEBUG
{
Expand All @@ -67,6 +67,7 @@ void efa_rdm_rxe_map_insert(struct efa_rdm_rxe_map *rxe_map,

entry->rxe = rxe;
HASH_ADD(hh, rxe_map->head, key, sizeof(struct efa_rdm_rxe_map_key), entry);
rxe->rxe_map = rxe_map;
}

/**
Expand Down Expand Up @@ -95,4 +96,6 @@ void efa_rdm_rxe_map_remove(struct efa_rdm_rxe_map *rxe_map, uint64_t msg_id,
assert(entry && entry->rxe == rxe);
HASH_DEL(rxe_map->head, entry);
ofi_buf_free(entry);
/* Now the rxe is removed from the map, reset it to NULL */
rxe->rxe_map = NULL;
}
4 changes: 2 additions & 2 deletions prov/efa/src/rdm/efa_rdm_rxe_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ void efa_rdm_rxe_map_construct(struct efa_rdm_rxe_map *rxe_map)
struct efa_rdm_pke;

struct efa_rdm_ope *efa_rdm_rxe_map_lookup(struct efa_rdm_rxe_map *rxe_map,
struct efa_rdm_pke *pkt_entry);
uint64_t msg_id, fi_addr_t addr);

void efa_rdm_rxe_map_insert(struct efa_rdm_rxe_map *rxe_map,
struct efa_rdm_pke *pkt_entry,
uint64_t msg_id, fi_addr_t addr,
struct efa_rdm_ope *rxe);

void efa_rdm_rxe_map_remove(struct efa_rdm_rxe_map *rxe_map, uint64_t msg_id,
Expand Down
32 changes: 32 additions & 0 deletions prov/efa/test/efa_unit_test_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,35 @@ void test_efa_rdm_rxe_handle_error_not_write_cq(struct efa_resource **state)

efa_rdm_rxe_release(rxe);
}

void test_efa_rdm_rxe_map(struct efa_resource **state)
{
struct efa_resource *resource = *state;
struct efa_rdm_ope *rxe;
struct efa_rdm_ep *efa_rdm_ep;

efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME);

rxe = efa_unit_test_alloc_rxe(resource, ofi_op_tagged);
rxe->msg_id = 1;
assert_non_null(rxe);

/* rxe has not been inserted to any rxe_map yet */
assert_null(rxe->rxe_map);

efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);

efa_rdm_rxe_map_insert(&efa_rdm_ep->rxe_map, rxe->msg_id, rxe->addr, rxe);
assert_true(rxe->rxe_map == &efa_rdm_ep->rxe_map);
assert_true(rxe == efa_rdm_rxe_map_lookup(rxe->rxe_map, rxe->msg_id, rxe->addr));

efa_rdm_rxe_release(rxe);

/**
* Now the map_entry_pool should be empty so we can destroy it
* Otherwise there will be an assertion error on the use cnt is
* is non-zero
*/
ofi_bufpool_destroy(efa_rdm_ep->map_entry_pool);
efa_rdm_ep->map_entry_pool = NULL;
}
1 change: 1 addition & 0 deletions prov/efa/test/efa_unit_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ int main(void)
cmocka_unit_test_setup_teardown(test_efa_rdm_txe_handle_error_not_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_handle_error_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_handle_error_not_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_map, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_msg_send_to_local_peer_with_null_desc, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_fork_support_request_initialize_when_ibv_fork_support_is_needed, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_fork_support_request_initialize_when_ibv_fork_support_is_unneeded, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
Expand Down
1 change: 1 addition & 0 deletions prov/efa/test/efa_unit_tests.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ void test_efa_rdm_txe_handle_error_write_cq();
void test_efa_rdm_txe_handle_error_not_write_cq();
void test_efa_rdm_rxe_handle_error_write_cq();
void test_efa_rdm_rxe_handle_error_not_write_cq();
void test_efa_rdm_rxe_map();
void test_efa_rdm_msg_send_to_local_peer_with_null_desc();
void test_efa_fork_support_request_initialize_when_ibv_fork_support_is_needed();
void test_efa_fork_support_request_initialize_when_ibv_fork_support_is_unneeded();
Expand Down

0 comments on commit 09f1124

Please sign in to comment.