diff --git a/prov/efa/Makefile.include b/prov/efa/Makefile.include index 9bd2e7a5e17..ba2dc1ecfa2 100644 --- a/prov/efa/Makefile.include +++ b/prov/efa/Makefile.include @@ -163,6 +163,7 @@ prov_efa_test_efa_unit_test_LDFLAGS = $(cmocka_rpath) $(efa_LDFLAGS) $(cmocka_LD -Wl,--wrap=ofi_copy_from_hmem_iov \ -Wl,--wrap=efa_rdm_pke_read \ -Wl,--wrap=efa_rdm_pke_proc_matched_rtm \ + -Wl,--wrap=efa_rdm_ope_post_send \ -Wl,--wrap=efa_device_support_unsolicited_write_recv if HAVE_EFADV_CQ_EX diff --git a/prov/efa/src/rdm/efa_rdm_ep_utils.c b/prov/efa/src/rdm/efa_rdm_ep_utils.c index 9cf297acfc2..b6426afca87 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_utils.c +++ b/prov/efa/src/rdm/efa_rdm_ep_utils.c @@ -521,6 +521,7 @@ ssize_t efa_rdm_ep_trigger_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer */ txe->fi_flags = EFA_RDM_TXE_NO_COMPLETION | EFA_RDM_TXE_NO_COUNTER; txe->msg_id = -1; + txe->internal_flags |= EFA_RDM_OPE_INTERNAL; err = efa_rdm_ope_post_send(txe, EFA_RDM_EAGER_RTW_PKT); @@ -559,6 +560,7 @@ ssize_t efa_rdm_ep_post_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer *pe * reset to desired flags (remove things like FI_DELIVERY_COMPLETE, and FI_COMPLETION) */ txe->fi_flags = EFA_RDM_TXE_NO_COMPLETION | EFA_RDM_TXE_NO_COUNTER; + txe->internal_flags |= EFA_RDM_OPE_INTERNAL; pkt_entry = efa_rdm_pke_alloc(ep, ep->efa_tx_pkt_pool, EFA_RDM_PKE_FROM_EFA_TX_POOL); if (OFI_UNLIKELY(!pkt_entry)) { diff --git a/prov/efa/src/rdm/efa_rdm_ope.c b/prov/efa/src/rdm/efa_rdm_ope.c index 33203f9c832..de09e7b1983 100644 --- a/prov/efa/src/rdm/efa_rdm_ope.c +++ b/prov/efa/src/rdm/efa_rdm_ope.c @@ -634,6 +634,13 @@ void efa_rdm_rxe_handle_error(struct efa_rdm_ope *rxe, int err, int prov_errno) */ //efa_rdm_rxe_release(rxe); + if (rxe->internal_flags & EFA_RDM_OPE_INTERNAL) { + EFA_WARN(FI_LOG_CQ, + "Writing eq error for rxe from internal operations\n"); + efa_base_ep_write_eq_error(&ep->base_ep, err, prov_errno); + return; + } + efa_cntr_report_error(&ep->base_ep.util_ep, err_entry.flags); write_cq_err = ofi_cq_write_error(util_cq, &err_entry); if (write_cq_err) { @@ -731,6 +738,13 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno) */ //efa_rdm_txe_release(txe); + if (txe->internal_flags & EFA_RDM_OPE_INTERNAL) { + EFA_WARN(FI_LOG_CQ, + "Writing eq error for txe from internal operations\n"); + efa_base_ep_write_eq_error(&ep->base_ep, err, prov_errno); + return; + } + efa_cntr_report_error(&ep->base_ep.util_ep, txe->cq_entry.flags); write_cq_err = ofi_cq_write_error(util_cq, &err_entry); if (write_cq_err) { @@ -1681,6 +1695,7 @@ int efa_rdm_rxe_post_local_read_or_queue(struct efa_rdm_ope *rxe, } txe->local_read_pkt_entry = pkt_entry; + txe->internal_flags |= EFA_RDM_OPE_INTERNAL; err = efa_rdm_ope_post_remote_read_or_queue(txe); /* The rx pkts are held until the local read completes */ if (err) diff --git a/prov/efa/src/rdm/efa_rdm_ope.h b/prov/efa/src/rdm/efa_rdm_ope.h index 626eb6dc8d4..f070f775f98 100644 --- a/prov/efa/src/rdm/efa_rdm_ope.h +++ b/prov/efa/src/rdm/efa_rdm_ope.h @@ -276,6 +276,8 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe); */ #define EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE BIT_ULL(14) +#define EFA_RDM_OPE_INTERNAL BIT_ULL(15) + #define EFA_RDM_OPE_QUEUED_FLAGS (EFA_RDM_OPE_QUEUED_RNR | EFA_RDM_OPE_QUEUED_CTRL | EFA_RDM_OPE_QUEUED_READ | EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE) void efa_rdm_ope_try_fill_desc(struct efa_rdm_ope *ope, int mr_iov_start, uint64_t access); diff --git a/prov/efa/src/rdm/efa_rdm_pke_rta.c b/prov/efa/src/rdm/efa_rdm_pke_rta.c index 3fe95ab52f3..56fa7b3046e 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rta.c +++ b/prov/efa/src/rdm/efa_rdm_pke_rta.c @@ -98,6 +98,8 @@ struct efa_rdm_ope *efa_rdm_pke_alloc_rta_rxe(struct efa_rdm_pke *pkt_entry, int return NULL; } + rxe->internal_flags |= EFA_RDM_OPE_INTERNAL; + if (op == ofi_op_atomic) { rxe->addr = pkt_entry->addr; return rxe; @@ -301,7 +303,9 @@ int efa_rdm_pke_proc_dc_write_rta(struct efa_rdm_pke *pkt_entry) EFA_WARN(FI_LOG_CQ, "Posting of receipt packet failed! err=%s\n", fi_strerror(err)); - efa_rdm_rxe_handle_error(rxe, -err, FI_EFA_ERR_PKT_POST); + efa_base_ep_write_eq_error(&pkt_entry->ep->base_ep, err, FI_EFA_ERR_PKT_POST); + efa_rdm_rxe_release(rxe); + efa_rdm_pke_release_rx(pkt_entry); return err; } @@ -415,10 +419,16 @@ int efa_rdm_pke_proc_fetch_rta(struct efa_rdm_pke *pkt_entry) } err = efa_rdm_ope_post_send_or_queue(rxe, EFA_RDM_ATOMRSP_PKT); - if (OFI_UNLIKELY(err)) - efa_rdm_rxe_handle_error(rxe, -err, FI_EFA_ERR_PKT_POST); efa_rdm_pke_release_rx(pkt_entry); + + if (OFI_UNLIKELY(err)) { + EFA_WARN(FI_LOG_CQ, "Posting of atomrsp packet failed! err=%ld\n", err); + efa_base_ep_write_eq_error(&ep->base_ep, err, FI_EFA_ERR_PKT_POST); + efa_rdm_rxe_release(rxe); + return err; + } + return 0; } diff --git a/prov/efa/src/rdm/efa_rdm_pke_rta.h b/prov/efa/src/rdm/efa_rdm_pke_rta.h index ad4e928bc4e..ddfd93b8afd 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rta.h +++ b/prov/efa/src/rdm/efa_rdm_pke_rta.h @@ -33,4 +33,6 @@ ssize_t efa_rdm_pke_init_compare_rta(struct efa_rdm_pke *pkt_entry, int efa_rdm_pke_proc_compare_rta(struct efa_rdm_pke *pkt_entry); +struct efa_rdm_ope *efa_rdm_pke_alloc_rta_rxe(struct efa_rdm_pke *pkt_entry, int op); + #endif \ No newline at end of file diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtr.c b/prov/efa/src/rdm/efa_rdm_pke_rtr.c index 2ad5718865d..f61471c036d 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtr.c +++ b/prov/efa/src/rdm/efa_rdm_pke_rtr.c @@ -66,6 +66,40 @@ ssize_t efa_rdm_pke_init_longcts_rtr(struct efa_rdm_pke *pkt_entry, return 0; } +/** + * @brief allcoate an RX entry for a incoming RTR packet + * + * The RX entry will be allocated from endpoint's OP entry + * pool + * @param[in] pkt_entry received RTR packet + * + * @return + * pointer to the newly allocated RX entry. + * NULL when OP entry pool has been exhausted. + */ +struct efa_rdm_ope *efa_rdm_pke_alloc_rtr_rxe(struct efa_rdm_pke *pkt_entry) +{ + struct efa_rdm_ep *ep = pkt_entry->ep; + struct efa_rdm_ope *rxe; + struct efa_rdm_rtr_hdr *rtr_hdr; + + rxe = efa_rdm_ep_alloc_rxe(ep, pkt_entry->addr, ofi_op_read_rsp); + if (OFI_UNLIKELY(!rxe)) + return NULL; + + rxe->addr = pkt_entry->addr; + rxe->bytes_received = 0; + rxe->bytes_copied = 0; + + rtr_hdr = (struct efa_rdm_rtr_hdr *)pkt_entry->wiredata; + rxe->tx_id = rtr_hdr->recv_id; + rxe->window = rtr_hdr->recv_length; + rxe->iov_count = rtr_hdr->rma_iov_count; + rxe->internal_flags |= EFA_RDM_OPE_INTERNAL; + + return rxe; +} + /** * @brief process an incoming RTR packet * @@ -81,7 +115,7 @@ void efa_rdm_pke_handle_rtr_recv(struct efa_rdm_pke *pkt_entry) ep = pkt_entry->ep; - rxe = efa_rdm_ep_alloc_rxe(ep, pkt_entry->addr, ofi_op_read_rsp); + rxe = efa_rdm_pke_alloc_rtr_rxe(pkt_entry); if (OFI_UNLIKELY(!rxe)) { EFA_WARN(FI_LOG_CQ, "RX entries exhausted.\n"); @@ -90,14 +124,7 @@ void efa_rdm_pke_handle_rtr_recv(struct efa_rdm_pke *pkt_entry) return; } - rxe->addr = pkt_entry->addr; - rxe->bytes_received = 0; - rxe->bytes_copied = 0; - rtr_hdr = (struct efa_rdm_rtr_hdr *)pkt_entry->wiredata; - rxe->tx_id = rtr_hdr->recv_id; - rxe->window = rtr_hdr->recv_length; - rxe->iov_count = rtr_hdr->rma_iov_count; err = efa_rdm_rma_verified_copy_iov(ep, rtr_hdr->rma_iov, rtr_hdr->rma_iov_count, FI_REMOTE_READ, rxe->iov, rxe->desc); if (OFI_UNLIKELY(err)) { diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtr.h b/prov/efa/src/rdm/efa_rdm_pke_rtr.h index afe31abf9c2..d866eff9183 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtr.h +++ b/prov/efa/src/rdm/efa_rdm_pke_rtr.h @@ -21,4 +21,5 @@ ssize_t efa_rdm_pke_init_longcts_rtr(struct efa_rdm_pke *pkt_entry, void efa_rdm_pke_handle_rtr_recv(struct efa_rdm_pke *pkt_entry); +struct efa_rdm_ope *efa_rdm_pke_alloc_rtr_rxe(struct efa_rdm_pke *pkt_entry); #endif \ No newline at end of file diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtw.c b/prov/efa/src/rdm/efa_rdm_pke_rtw.c index 5872302136f..38f71da7d8e 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtw.c +++ b/prov/efa/src/rdm/efa_rdm_pke_rtw.c @@ -60,7 +60,6 @@ ssize_t efa_rdm_pke_init_rtw_common(struct efa_rdm_pke *pkt_entry, * pointer to the newly allocated RX entry. * NULL when OP entry pool has been exhausted. */ -static struct efa_rdm_ope *efa_rdm_pke_alloc_rtw_rxe(struct efa_rdm_pke *pkt_entry) { struct efa_rdm_ope *rxe; @@ -79,6 +78,7 @@ struct efa_rdm_ope *efa_rdm_pke_alloc_rtw_rxe(struct efa_rdm_pke *pkt_entry) rxe->addr = pkt_entry->addr; rxe->bytes_received = 0; rxe->bytes_copied = 0; + rxe->internal_flags |= EFA_RDM_OPE_INTERNAL; return rxe; } diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtw.h b/prov/efa/src/rdm/efa_rdm_pke_rtw.h index 32c8d29d664..05e3440651c 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtw.h +++ b/prov/efa/src/rdm/efa_rdm_pke_rtw.h @@ -48,4 +48,6 @@ ssize_t efa_rdm_pke_init_longread_rtw(struct efa_rdm_pke *pkt_entry, void efa_rdm_pke_handle_longread_rtw_recv(struct efa_rdm_pke *pkt_entry); +struct efa_rdm_ope *efa_rdm_pke_alloc_rtw_rxe(struct efa_rdm_pke *pkt_entry); + #endif diff --git a/prov/efa/test/efa_unit_test_common.c b/prov/efa/test/efa_unit_test_common.c index 13b38f52550..6f7d7c53242 100644 --- a/prov/efa/test/efa_unit_test_common.c +++ b/prov/efa/test/efa_unit_test_common.c @@ -376,3 +376,46 @@ void efa_unit_test_handshake_pkt_construct(struct efa_rdm_pke *pkt_entry, struct APPEND_OPT_HANDSHAKE_FIELD(host_id, EFA_RDM_HANDSHAKE_HOST_ID_HDR); APPEND_OPT_HANDSHAKE_FIELD(device_version, EFA_RDM_HANDSHAKE_DEVICE_VERSION_HDR); } + + +struct efa_rdm_ope *efa_unit_test_alloc_txe(struct efa_resource *resource, uint32_t op) +{ + fi_addr_t peer_addr = 0; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + struct efa_rdm_peer *peer; + struct fi_msg msg = {0}; + struct efa_rdm_ep *efa_rdm_ep; + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + + /* Create and register a fake peer */ + assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); + raw_addr.qpn = 0; + raw_addr.qkey = 0x1234; + + assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1); + + peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr); + + return efa_rdm_ep_alloc_txe(efa_rdm_ep, peer, &msg, op, 0, 0); +} + +struct efa_rdm_ope *efa_unit_test_alloc_rxe(struct efa_resource *resource, uint32_t op) +{ + fi_addr_t peer_addr = 0; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + struct efa_rdm_ep *efa_rdm_ep; + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + + /* Create and register a fake peer */ + assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); + raw_addr.qpn = 0; + raw_addr.qkey = 0x1234; + + assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1); + + return efa_rdm_ep_alloc_rxe(efa_rdm_ep, peer_addr, op); +} diff --git a/prov/efa/test/efa_unit_test_ep.c b/prov/efa/test/efa_unit_test_ep.c index 9f68753c5bf..65e9de043e8 100644 --- a/prov/efa/test/efa_unit_test_ep.c +++ b/prov/efa/test/efa_unit_test_ep.c @@ -661,6 +661,67 @@ void test_efa_rdm_ep_read_queue_before_handshake(struct efa_resource **state) test_efa_rdm_ep_rma_queue_before_handshake(state, ofi_op_read_req); } +/** + * @brief Test the efa_rdm_ep_trigger_handshake function + * with different peer setup and check the txe flags + * + * @param state efa_resource + */ +void test_efa_rdm_ep_trigger_handshake(struct efa_resource **state) +{ + struct efa_rdm_ope *txe; + struct efa_rdm_ep *efa_rdm_ep; + struct efa_resource *resource = *state; + struct efa_rdm_peer *peer; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + fi_addr_t peer_addr = 0; + + g_efa_unit_test_mocks.efa_rdm_ope_post_send = &efa_mock_efa_rdm_ope_post_send_return_mock; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + + will_return_always(efa_mock_efa_rdm_ope_post_send_return_mock, FI_SUCCESS); + + /* Create and register a fake peer */ + assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); + raw_addr.qpn = 0; + raw_addr.qkey = 0x1234; + + assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1); + + peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr); + assert_non_null(peer); + + /* No txe should have been allocated yet */ + assert_true(dlist_empty(&efa_rdm_ep->txe_list)); + + /* + * When the peer already has made , the function should be a no-op + * and no txe is allocated + */ + peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED | EFA_RDM_PEER_REQ_SENT; + assert_int_equal(efa_rdm_ep_trigger_handshake(efa_rdm_ep, peer), FI_SUCCESS); + assert_true(dlist_empty(&efa_rdm_ep->txe_list)); + + /* + * Reset the peer flags to 0, now we should expect a txe allocated + */ + peer->flags = 0; + assert_int_equal(efa_rdm_ep_trigger_handshake(efa_rdm_ep, peer), FI_SUCCESS); + assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1); + + txe = container_of(efa_rdm_ep->txe_list.next, struct efa_rdm_ope, ep_entry); + + assert_true(txe->fi_flags & EFA_RDM_TXE_NO_COMPLETION); + assert_true(txe->fi_flags & EFA_RDM_TXE_NO_COUNTER); + assert_true(txe->internal_flags & EFA_RDM_OPE_INTERNAL); + + efa_rdm_txe_release(txe); +} + /** * @brief When local support unsolicited write, but the peer doesn't, fi_writedata * (use rdma-write with imm) should fail as FI_EINVAL diff --git a/prov/efa/test/efa_unit_test_mocks.c b/prov/efa/test/efa_unit_test_mocks.c index 215792ded82..de257fc91ab 100644 --- a/prov/efa/test/efa_unit_test_mocks.c +++ b/prov/efa/test/efa_unit_test_mocks.c @@ -213,6 +213,11 @@ int efa_mock_efa_rdm_pke_read_return_mock(struct efa_rdm_ope *ope) return mock(); } +ssize_t efa_mock_efa_rdm_ope_post_send_return_mock(struct efa_rdm_ope *ope, int pkt_type) +{ + return mock(); +} + ssize_t efa_mock_efa_rdm_pke_proc_matched_rtm_no_op(struct efa_rdm_pke *pkt_entry) { return FI_SUCCESS; @@ -261,6 +266,7 @@ struct efa_unit_test_mocks g_efa_unit_test_mocks = { .ofi_copy_from_hmem_iov = __real_ofi_copy_from_hmem_iov, .efa_rdm_pke_read = __real_efa_rdm_pke_read, .efa_rdm_pke_proc_matched_rtm = __real_efa_rdm_pke_proc_matched_rtm, + .efa_rdm_ope_post_send = __real_efa_rdm_ope_post_send, .efa_device_support_unsolicited_write_recv = __real_efa_device_support_unsolicited_write_recv, .ibv_is_fork_initialized = __real_ibv_is_fork_initialized, #if HAVE_EFADV_QUERY_MR @@ -401,6 +407,11 @@ int __wrap_efa_rdm_pke_proc_matched_rtm(struct efa_rdm_pke *pkt_entry) return g_efa_unit_test_mocks.efa_rdm_pke_proc_matched_rtm(pkt_entry); } +int __wrap_efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type) +{ + return g_efa_unit_test_mocks.efa_rdm_ope_post_send(ope, pkt_type); +} + bool __wrap_efa_device_support_unsolicited_write_recv(void) { return g_efa_unit_test_mocks.efa_device_support_unsolicited_write_recv(); diff --git a/prov/efa/test/efa_unit_test_mocks.h b/prov/efa/test/efa_unit_test_mocks.h index b773850433a..c74df8646e9 100644 --- a/prov/efa/test/efa_unit_test_mocks.h +++ b/prov/efa/test/efa_unit_test_mocks.h @@ -99,6 +99,10 @@ ssize_t __real_efa_rdm_pke_proc_matched_rtm(struct efa_rdm_pke *pkt_entry); ssize_t efa_mock_efa_rdm_pke_proc_matched_rtm_no_op(struct efa_rdm_pke *pkt_entry); +ssize_t __real_efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type); + +ssize_t efa_mock_efa_rdm_ope_post_send_return_mock(struct efa_rdm_ope *ope, int pkt_type); + bool efa_mock_efa_device_support_unsolicited_write_recv(void); int efa_mock_ibv_post_recv(struct ibv_qp *qp, struct ibv_recv_wr *wr, @@ -144,6 +148,8 @@ struct efa_unit_test_mocks ssize_t (*efa_rdm_pke_proc_matched_rtm)(struct efa_rdm_pke *pkt_entry); + ssize_t (*efa_rdm_ope_post_send)(struct efa_rdm_ope *ope, int pkt_type); + bool (*efa_device_support_unsolicited_write_recv)(void); enum ibv_fork_status (*ibv_is_fork_initialized)(void); diff --git a/prov/efa/test/efa_unit_test_ope.c b/prov/efa/test/efa_unit_test_ope.c index cdfb465a188..14abad34ee6 100644 --- a/prov/efa/test/efa_unit_test_ope.c +++ b/prov/efa/test/efa_unit_test_ope.c @@ -3,6 +3,8 @@ #include "efa_unit_tests.h" +typedef void (*efa_rdm_ope_handle_error_func_t)(struct efa_rdm_ope *ope, int err, int prov_errno); + void test_efa_rdm_ope_prepare_to_post_send_impl(struct efa_resource *resource, enum fi_hmem_iface iface, size_t total_len, @@ -296,7 +298,7 @@ void test_efa_rdm_rxe_post_local_read_or_queue_cleanup_txe(struct efa_resource * { struct efa_rdm_ep *efa_rdm_ep; struct efa_resource *resource = *state; - struct efa_rdm_pke *pkt_entry; + struct efa_rdm_pke *pkt_entry, *tx_pkt_entry; struct efa_rdm_ope *rxe; struct efa_mr cuda_mr = {0}; int expected_err = -FI_ENOMR; @@ -305,6 +307,7 @@ void test_efa_rdm_rxe_post_local_read_or_queue_cleanup_txe(struct efa_resource * .iov_base = buf, .iov_len = sizeof buf }; + struct efa_rdm_ope *txe; /** * TODO: Ideally we should mock efa_rdm_ope_post_remote_read_or_queue here, @@ -334,10 +337,124 @@ void test_efa_rdm_rxe_post_local_read_or_queue_cleanup_txe(struct efa_resource * pkt_entry->ope = rxe; assert_true(dlist_empty(&efa_rdm_ep->txe_list)); + + /* Test error path */ will_return(efa_mock_efa_rdm_pke_read_return_mock, expected_err); assert_int_equal(efa_rdm_rxe_post_local_read_or_queue(rxe, 0, pkt_entry, pkt_entry->payload, 16), expected_err); /* Make sure txe is cleaned for a failed read */ assert_true(dlist_empty(&efa_rdm_ep->txe_list)); + /* Now test happy path */ + will_return(efa_mock_efa_rdm_pke_read_return_mock, 0); + assert_int_equal(efa_rdm_rxe_post_local_read_or_queue(rxe, 0, pkt_entry, pkt_entry->payload, 16), 0); + + /* Now we should have a txe allocated */ + assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1); + txe = container_of(efa_rdm_ep->txe_list.next, struct efa_rdm_ope, ep_entry); + assert_true(txe->internal_flags & EFA_RDM_OPE_INTERNAL); + + /* We also have a tx pkt allocated inside efa_rdm_ope_read + * and we need to clean it */ + tx_pkt_entry = ofi_bufpool_get_ibuf(efa_rdm_ep->efa_tx_pkt_pool, 0); + efa_rdm_pke_release(tx_pkt_entry); + efa_rdm_pke_release_rx(pkt_entry); + efa_rdm_txe_release(txe); +} + +static +void test_efa_rdm_ope_handle_error_impl( + struct efa_resource *resource, + efa_rdm_ope_handle_error_func_t efa_rdm_ope_handle_error, + struct efa_rdm_ope *ope, bool expect_cq_error) +{ + struct fi_cq_data_entry cq_entry; + struct fi_cq_err_entry cq_err_entry = {0}; + struct fi_eq_err_entry eq_err_entry; + + efa_rdm_ope_handle_error(ope, FI_ENOTCONN, + EFA_IO_COMP_STATUS_LOCAL_ERROR_UNREACH_REMOTE); + + if (expect_cq_error) { + assert_int_equal(fi_cq_read(resource->cq, &cq_entry, 1), + -FI_EAVAIL); + assert_int_equal(fi_cq_readerr(resource->cq, &cq_err_entry, 0), + 1); + assert_int_equal(cq_err_entry.err, FI_ENOTCONN); + assert_int_equal(cq_err_entry.prov_errno, + EFA_IO_COMP_STATUS_LOCAL_ERROR_UNREACH_REMOTE); + } else { + /* We should expect an empty cq and an eq error */ + assert_int_equal(fi_cq_read(resource->cq, &cq_entry, 1), + -FI_EAGAIN); + assert_int_equal(fi_eq_readerr(resource->eq, &eq_err_entry, 0), + sizeof(eq_err_entry)); + assert_int_equal(eq_err_entry.err, FI_ENOTCONN); + assert_int_equal(eq_err_entry.prov_errno, + EFA_IO_COMP_STATUS_LOCAL_ERROR_UNREACH_REMOTE); + } +} + +void test_efa_rdm_txe_handle_error_write_cq(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ope *txe; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + txe = efa_unit_test_alloc_txe(resource, ofi_op_write); + assert_non_null(txe); + + test_efa_rdm_ope_handle_error_impl(resource, efa_rdm_txe_handle_error, txe, true); + + efa_rdm_txe_release(txe); +} + +void test_efa_rdm_txe_handle_error_not_write_cq(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ope *txe; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + txe = efa_unit_test_alloc_txe(resource, ofi_op_write); + assert_non_null(txe); + + txe->internal_flags |= EFA_RDM_OPE_INTERNAL; + + test_efa_rdm_ope_handle_error_impl(resource, efa_rdm_txe_handle_error, txe, false); + + efa_rdm_txe_release(txe); +} + +void test_efa_rdm_rxe_handle_error_write_cq(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ope *rxe; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + rxe = efa_unit_test_alloc_rxe(resource, ofi_op_tagged); + assert_non_null(rxe); + + test_efa_rdm_ope_handle_error_impl(resource, efa_rdm_rxe_handle_error, rxe, true); + + efa_rdm_rxe_release(rxe); +} + +void test_efa_rdm_rxe_handle_error_not_write_cq(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ope *rxe; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + rxe = efa_unit_test_alloc_rxe(resource, ofi_op_tagged); + assert_non_null(rxe); + + rxe->internal_flags |= EFA_RDM_OPE_INTERNAL; + + test_efa_rdm_ope_handle_error_impl(resource, efa_rdm_rxe_handle_error, rxe, false); + + efa_rdm_rxe_release(rxe); } diff --git a/prov/efa/test/efa_unit_test_pke.c b/prov/efa/test/efa_unit_test_pke.c index 61aa5700d61..b2f0d185f34 100644 --- a/prov/efa/test/efa_unit_test_pke.c +++ b/prov/efa/test/efa_unit_test_pke.c @@ -1,5 +1,9 @@ #include "efa_unit_tests.h" #include "rdm/efa_rdm_pke_rtm.h" +#include "rdm/efa_rdm_pke_rta.h" +#include "rdm/efa_rdm_pke_rtw.h" +#include "rdm/efa_rdm_pke_utils.h" + /** * @brief When handling a long cts rtm as read nack fallback, @@ -108,3 +112,122 @@ void test_efa_rdm_pke_release_rx_list(struct efa_resource **state) ofi_bufpool_destroy(efa_rdm_ep->efa_rx_pkt_pool); efa_rdm_ep->efa_rx_pkt_pool = NULL; } + +void test_efa_rdm_pke_alloc_rta_rxe(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ep *efa_rdm_ep; + struct efa_rdm_pke *pke; + struct efa_rdm_ope *rxe; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + fi_addr_t peer_addr = 0; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + + /* Fake a rx pkt entry */ + pke = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, EFA_RDM_PKE_FROM_EFA_RX_POOL); + assert_non_null(pke); + efa_rdm_ep->efa_rx_pkts_posted = efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); + + + /* Create and register a fake peer */ + assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); + raw_addr.qpn = 0; + raw_addr.qkey = 0x1234; + assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1); + + pke->addr = peer_addr; + + rxe = efa_rdm_pke_alloc_rta_rxe(pke, ofi_op_atomic); + assert_true(rxe->internal_flags & EFA_RDM_OPE_INTERNAL); + + efa_rdm_rxe_release(rxe); + efa_rdm_pke_release_rx(pke); +} + +void test_efa_rdm_pke_alloc_rtw_rxe(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ep *efa_rdm_ep; + struct efa_rdm_pke *pke; + struct efa_rdm_ope *rxe; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + fi_addr_t peer_addr = 0; + struct efa_rdm_base_hdr *base_hdr; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + + /* Fake a rx pkt entry */ + pke = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, EFA_RDM_PKE_FROM_EFA_RX_POOL); + assert_non_null(pke); + efa_rdm_ep->efa_rx_pkts_posted = efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); + + + /* Create and register a fake peer */ + assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); + raw_addr.qpn = 0; + raw_addr.qkey = 0x1234; + assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1); + + base_hdr = efa_rdm_pke_get_base_hdr(pke); + /* Clean the flags to avoid having garbage value */ + base_hdr->flags = 0; + pke->addr = peer_addr; + + rxe = efa_rdm_pke_alloc_rtw_rxe(pke); + + assert_true(rxe->internal_flags & EFA_RDM_OPE_INTERNAL); + assert_int_equal(rxe->bytes_received, 0); + assert_int_equal(rxe->bytes_copied, 0); + + efa_rdm_rxe_release(rxe); + efa_rdm_pke_release_rx(pke); +} + +void test_efa_rdm_pke_alloc_rtr_rxe(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ep *efa_rdm_ep; + struct efa_rdm_pke *pke; + struct efa_rdm_ope *rxe; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(raw_addr); + fi_addr_t peer_addr = 0; + struct efa_rdm_base_hdr *base_hdr; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + + /* Fake a rx pkt entry */ + pke = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, EFA_RDM_PKE_FROM_EFA_RX_POOL); + assert_non_null(pke); + efa_rdm_ep->efa_rx_pkts_posted = efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); + + + /* Create and register a fake peer */ + assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); + raw_addr.qpn = 0; + raw_addr.qkey = 0x1234; + assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1); + + base_hdr = efa_rdm_pke_get_base_hdr(pke); + /* Clean the flags to avoid having garbage value */ + base_hdr->flags = 0; + pke->addr = peer_addr; + + rxe = efa_rdm_pke_alloc_rtw_rxe(pke); + + assert_true(rxe->internal_flags & EFA_RDM_OPE_INTERNAL); + assert_int_equal(rxe->bytes_received, 0); + assert_int_equal(rxe->bytes_copied, 0); + + efa_rdm_rxe_release(rxe); + efa_rdm_pke_release_rx(pke); +} \ No newline at end of file diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c index e4b5550f9ee..ae4d95478fc 100644 --- a/prov/efa/test/efa_unit_tests.c +++ b/prov/efa/test/efa_unit_tests.c @@ -71,6 +71,7 @@ static int efa_unit_test_mocks_teardown(void **state) .ofi_copy_from_hmem_iov = __real_ofi_copy_from_hmem_iov, .efa_rdm_pke_read = __real_efa_rdm_pke_read, .efa_rdm_pke_proc_matched_rtm = __real_efa_rdm_pke_proc_matched_rtm, + .efa_rdm_ope_post_send = __real_efa_rdm_ope_post_send, .efa_device_support_unsolicited_write_recv = __real_efa_device_support_unsolicited_write_recv, .ibv_is_fork_initialized = __real_ibv_is_fork_initialized, }; @@ -116,6 +117,7 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_rdm_ep_dc_send_queue_limit_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_read_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_write_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_ep_trigger_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_read_copy_pkt_pool_128_alignment, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_send_with_shm_no_copy, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_rma_without_caps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), @@ -205,6 +207,10 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_rdm_ope_post_write_0_byte, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_post_local_read_or_queue_cleanup_txe, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_txe_handle_error_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_txe_handle_error_not_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_handle_error_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_handle_error_not_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_msg_send_to_local_peer_with_null_desc, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_fork_support_request_initialize_when_ibv_fork_support_is_needed, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_fork_support_request_initialize_when_ibv_fork_support_is_unneeded, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), @@ -222,6 +228,9 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_rdm_peer_select_readbase_rtm_do_runt, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_pke_get_available_copy_methods_align128, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_pke_release_rx_list, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_pke_alloc_rta_rxe, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_pke_alloc_rtw_rxe, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_pke_alloc_rtr_rxe, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_domain_open_ops_wrong_name, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_domain_open_ops_mr_query, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_cq_ibv_cq_poll_list_same_tx_rx_cq_single_ep, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h index 0cb2d3cb27d..a934d77115b 100644 --- a/prov/efa/test/efa_unit_tests.h +++ b/prov/efa/test/efa_unit_tests.h @@ -98,6 +98,10 @@ void efa_unit_test_eager_msgrtm_pkt_construct(struct efa_rdm_pke *pkt_entry, str void efa_unit_test_handshake_pkt_construct(struct efa_rdm_pke *pkt_entry, struct efa_unit_test_handshake_pkt_attr *attr); +struct efa_rdm_ope *efa_unit_test_alloc_txe(struct efa_resource *resource, uint32_t op); + +struct efa_rdm_ope *efa_unit_test_alloc_rxe(struct efa_resource *resource, uint32_t op); + /* test cases */ /* begin efa_unit_test_av.c */ @@ -125,6 +129,7 @@ void test_efa_rdm_ep_dc_send_queue_before_handshake(); void test_efa_rdm_ep_dc_send_queue_limit_before_handshake(); void test_efa_rdm_ep_write_queue_before_handshake(); void test_efa_rdm_ep_read_queue_before_handshake(); +void test_efa_rdm_ep_trigger_handshake(); void test_efa_rdm_read_copy_pkt_pool_128_alignment(); void test_efa_rdm_ep_send_with_shm_no_copy(); void test_efa_rdm_ep_rma_without_caps(); @@ -216,6 +221,10 @@ void test_efa_rdm_ope_prepare_to_post_send_cuda_memory(); void test_efa_rdm_ope_prepare_to_post_send_cuda_memory_align128(); void test_efa_rdm_ope_post_write_0_byte(); void test_efa_rdm_rxe_post_local_read_or_queue_cleanup_txe(); +void test_efa_rdm_txe_handle_error_write_cq(); +void test_efa_rdm_txe_handle_error_not_write_cq(); +void test_efa_rdm_rxe_handle_error_write_cq(); +void test_efa_rdm_rxe_handle_error_not_write_cq(); void test_efa_rdm_msg_send_to_local_peer_with_null_desc(); void test_efa_fork_support_request_initialize_when_ibv_fork_support_is_needed(); void test_efa_fork_support_request_initialize_when_ibv_fork_support_is_unneeded(); @@ -250,6 +259,9 @@ void test_efa_rdm_peer_keep_pke_in_overflow_list(); void test_efa_rdm_peer_append_overflow_pke_to_recvwin(); void test_efa_rdm_pke_handle_longcts_rtm_send_completion(); void test_efa_rdm_pke_release_rx_list(); +void test_efa_rdm_pke_alloc_rta_rxe(); +void test_efa_rdm_pke_alloc_rtw_rxe(); +void test_efa_rdm_pke_alloc_rtr_rxe(); void test_efa_msg_fi_recv(); void test_efa_msg_fi_recvv(); void test_efa_msg_fi_recvmsg();