From 1798a0e9e97f2bb52028debcb4dd88a4945e5297 Mon Sep 17 00:00:00 2001 From: Shi Jin Date: Tue, 13 Feb 2024 05:04:21 +0000 Subject: [PATCH] prov/efa: Move ibv cq open to fi_cq_open Signed-off-by: Shi Jin --- prov/efa/src/efa_cq.h | 5 + prov/efa/src/rdm/efa_rdm_cq.c | 17 +++ prov/efa/src/rdm/efa_rdm_cq.h | 3 + prov/efa/src/rdm/efa_rdm_ep.h | 11 +- prov/efa/src/rdm/efa_rdm_ep_fiops.c | 161 ++++++++++++------------- prov/efa/src/rdm/efa_rdm_ep_progress.c | 62 +++++----- prov/efa/src/rdm/efa_rdm_ep_utils.c | 7 +- 7 files changed, 145 insertions(+), 121 deletions(-) diff --git a/prov/efa/src/efa_cq.h b/prov/efa/src/efa_cq.h index f70f0074219..af33bb2eace 100644 --- a/prov/efa/src/efa_cq.h +++ b/prov/efa/src/efa_cq.h @@ -32,6 +32,11 @@ #include "efa.h" +enum ibv_cq_ex_type { + IBV_CQ, + EFADV_CQ +}; + /** * @brief Create ibv_cq_ex by calling ibv_create_cq_ex * diff --git a/prov/efa/src/rdm/efa_rdm_cq.c b/prov/efa/src/rdm/efa_rdm_cq.c index 2c6440739a8..4d7f7c17cd0 100644 --- a/prov/efa/src/rdm/efa_rdm_cq.c +++ b/prov/efa/src/rdm/efa_rdm_cq.c @@ -32,6 +32,16 @@ int efa_rdm_cq_close(struct fid *fid) cq = container_of(fid, struct efa_rdm_cq, util_cq.cq_fid.fid); + if (cq->ibv_cq_ex) { + ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex)); + if (ret) { + EFA_WARN(FI_LOG_CQ, "Unable to close ibv cq: %s\n", + fi_strerror(-ret)); + return ret; + } + cq->ibv_cq_ex = NULL; + } + if (cq->shm_cq) { ret = fi_close(&cq->shm_cq->fid); if (ret) { @@ -159,6 +169,13 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, } } + ret = efa_cq_ibv_cq_ex_open(attr, efa_domain->device->ibv_ctx, &cq->ibv_cq_ex, &cq->ibv_cq_ex_type); + if (ret) { + EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %d\n", ret); + ret = -FI_EINVAL; + goto free; + } + return 0; free: free(cq); diff --git a/prov/efa/src/rdm/efa_rdm_cq.h b/prov/efa/src/rdm/efa_rdm_cq.h index 247a7956b91..e963e53bd83 100644 --- a/prov/efa/src/rdm/efa_rdm_cq.h +++ b/prov/efa/src/rdm/efa_rdm_cq.h @@ -34,11 +34,14 @@ #ifndef EFA_RDM_CQ_H #define EFA_RDM_CQ_H +#include "efa_cq.h" #include struct efa_rdm_cq { struct util_cq util_cq; struct fid_cq *shm_cq; + struct ibv_cq_ex *ibv_cq_ex; + enum ibv_cq_ex_type ibv_cq_ex_type; }; /* diff --git a/prov/efa/src/rdm/efa_rdm_ep.h b/prov/efa/src/rdm/efa_rdm_ep.h index 5a13b22d517..3c9ca4034aa 100644 --- a/prov/efa/src/rdm/efa_rdm_ep.h +++ b/prov/efa/src/rdm/efa_rdm_ep.h @@ -12,11 +12,6 @@ #define EFA_RDM_ERROR_MSG_BUFFER_LENGTH 1024 -enum ibv_cq_ex_type { - IBV_CQ, - EFADV_CQ -}; - /** @brief Information of a queued copy. * * This struct is used when receiving buffer is on device. @@ -50,9 +45,9 @@ struct efa_rdm_ep { /* per-version extra feature/request flag */ uint64_t extra_info[EFA_RDM_MAX_NUM_EXINFO]; - struct ibv_cq_ex *ibv_cq_ex; + struct efa_rdm_cq *tx_cq; - enum ibv_cq_ex_type ibv_cq_ex_type; + struct efa_rdm_cq *rx_cq; /* shm provider fid */ struct fid_ep *shm_ep; @@ -280,7 +275,7 @@ ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep, size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface iface); -int efa_rdm_ep_get_prov_errno(struct efa_rdm_ep *ep); +int efa_rdm_ep_get_prov_errno(struct efa_rdm_ep *ep, struct ibv_cq_ex *ibv_cq_ex); static inline struct efa_domain *efa_rdm_ep_domain(struct efa_rdm_ep *ep) diff --git a/prov/efa/src/rdm/efa_rdm_ep_fiops.c b/prov/efa/src/rdm/efa_rdm_ep_fiops.c index f8f7ca01624..d24c4b81bc4 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_fiops.c +++ b/prov/efa/src/rdm/efa_rdm_ep_fiops.c @@ -32,7 +32,6 @@ */ #include "efa.h" -#include "efa_cq.h" #include "efa_av.h" #include "efa_rdm_ep.h" #include "efa_rdm_cq.h" @@ -58,13 +57,39 @@ int efa_rdm_ep_create_base_ep_ibv_qp(struct efa_rdm_ep *ep) { struct ibv_qp_init_attr_ex attr_ex = { 0 }; - attr_ex.cap.max_send_wr = ep->base_ep.domain->device->rdm_info->tx_attr->size; - attr_ex.cap.max_send_sge = ep->base_ep.domain->device->rdm_info->tx_attr->iov_limit; - attr_ex.send_cq = ibv_cq_ex_to_cq(ep->ibv_cq_ex); + if (!ep->tx_cq && !ep->rx_cq) { + EFA_WARN(FI_LOG_EP_CTRL, + "Endpoint is not bound to a send or receive completion queue\n"); + return -FI_ENOCQ; + } + + if (!ep->tx_cq && ofi_needs_tx(ep->base_ep.info->caps)) { + EFA_WARN(FI_LOG_EP_CTRL, + "Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n"); + return -FI_ENOCQ; + } + + if (!ep->rx_cq && ofi_needs_rx(ep->base_ep.info->caps)) { + EFA_WARN(FI_LOG_EP_CTRL, + "Endpoint is not bound to a receive completion queue when it has receive capabilities enabled. (FI_RECV)\n"); + return -FI_ENOCQ; + } + + if (ep->tx_cq) { + attr_ex.cap.max_send_wr = ep->base_ep.domain->device->rdm_info->tx_attr->size; + attr_ex.cap.max_send_sge = ep->base_ep.domain->device->rdm_info->tx_attr->iov_limit; + attr_ex.send_cq = ibv_cq_ex_to_cq(ep->tx_cq->ibv_cq_ex); + } else { + attr_ex.send_cq = ibv_cq_ex_to_cq(ep->rx_cq->ibv_cq_ex); + } - attr_ex.cap.max_recv_wr = ep->base_ep.domain->device->rdm_info->rx_attr->size; - attr_ex.cap.max_recv_sge = ep->base_ep.domain->device->rdm_info->rx_attr->iov_limit; - attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->ibv_cq_ex); + if (ep->rx_cq) { + attr_ex.cap.max_recv_wr = ep->base_ep.domain->device->rdm_info->rx_attr->size; + attr_ex.cap.max_recv_sge = ep->base_ep.domain->device->rdm_info->rx_attr->iov_limit; + attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->rx_cq->ibv_cq_ex); + } else { + attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->tx_cq->ibv_cq_ex); + } attr_ex.cap.max_inline_data = ep->base_ep.domain->device->efa_attr.inline_buf_size; @@ -501,11 +526,6 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info, efa_rdm_ep->efa_rx_pkts_held = 0; efa_rdm_ep->efa_outstanding_tx_ops = 0; - assert(!efa_rdm_ep->ibv_cq_ex); - - ret = efa_cq_ibv_cq_ex_open(&cq_attr, efa_domain->device->ibv_ctx, - &efa_rdm_ep->ibv_cq_ex, &efa_rdm_ep->ibv_cq_ex_type); - if (ret) { EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %s\n", strerror(errno)); goto err_close_shm_ep; @@ -513,7 +533,7 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info, ret = efa_rdm_ep_create_buffer_pools(efa_rdm_ep); if (ret) - goto err_close_core_cq; + goto err_close_shm_ep; efa_rdm_ep_init_linked_lists(efa_rdm_ep); @@ -539,15 +559,11 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info, efa_rdm_ep->sendrecv_in_order_aligned_128_bytes = false; efa_rdm_ep->write_in_order_aligned_128_bytes = false; - ret = efa_rdm_ep_create_base_ep_ibv_qp(efa_rdm_ep); - if (ret) - goto err_close_core_cq; - efa_rdm_ep->pke_vec = calloc(sizeof(struct efa_rdm_pke *), EFA_RDM_EP_MAX_WR_PER_IBV_POST_RECV); if (!efa_rdm_ep->pke_vec) { EFA_WARN(FI_LOG_EP_CTRL, "cannot alloc memory for efa_rdm_ep->pke_vec!\n"); ret = -FI_ENOMEM; - goto err_close_core_cq; + goto err_close_shm_ep; } *ep = &efa_rdm_ep->base_ep.util_ep.ep_fid; @@ -560,11 +576,6 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info, (*ep)->cm = &efa_rdm_ep_cm_ops; return 0; -err_close_core_cq: - retv = -ibv_destroy_cq(ibv_cq_ex_to_cq(efa_rdm_ep->ibv_cq_ex)); - if (retv) - EFA_WARN(FI_LOG_CQ, "Unable to close cq: %s\n", - fi_strerror(-retv)); err_close_shm_ep: if (efa_rdm_ep->shm_ep) { retv = fi_close(&efa_rdm_ep->shm_ep->fid); @@ -625,6 +636,12 @@ static int efa_rdm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) if (ret) return ret; + if (flags & FI_TRANSMIT) + efa_rdm_ep->tx_cq = cq; + + if (flags & FI_RECV) + efa_rdm_ep->rx_cq = cq; + if (cq->shm_cq) { /* Bind ep with shm provider's cq */ ret = fi_ep_bind(efa_rdm_ep->shm_ep, &cq->shm_cq->fid, flags); @@ -829,12 +846,6 @@ static int efa_rdm_ep_close(struct fid *fid) retv = ret; } - ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(efa_rdm_ep->ibv_cq_ex)); - if (ret) { - EFA_WARN(FI_LOG_EP_CTRL, "Unable to close ibv_cq_ex\n"); - retv = ret; - } - if (efa_rdm_ep->shm_ep) { ret = fi_close(&efa_rdm_ep->shm_ep->fid); if (ret) { @@ -1012,6 +1023,35 @@ void efa_rdm_ep_update_shm(struct efa_rdm_ep *ep) efa_rdm_ep_close_shm_resources(ep); } +/** + * @brief If user requests in-order aligned 128 bytes capability, + * check if the qp supports it. + * @param ep efa_rdm_ep + * @return int 0 on success, -FI_EOPNOTSUPP when qp doesn't support the request. + */ +static +int efa_rdm_ep_check_qp_support_in_order_aligned_128_bytes(struct efa_rdm_ep *ep) +{ + if (ep->write_in_order_aligned_128_bytes && + !efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_WRITE)) { + EFA_WARN(FI_LOG_EP_CTRL, + "FI_OPT_EFA_WRITE_IN_ORDER_ALIGNED_128_BYTES is set to true but the QP doesn't support it\n"); + return -FI_EOPNOTSUPP; + } + + /* + * RDMA read is used to copy data from host bounce buffer to the + * application buffer on device + */ + if (ep->sendrecv_in_order_aligned_128_bytes && + !efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_READ)) { + EFA_WARN(FI_LOG_EP_CTRL, + "FI_OPT_EFA_SENDRECV_IN_ORDER_ALIGNED_128_BYTES is set to true but the QP doesn't support it\n"); + return -FI_EOPNOTSUPP; + } + + return 0; +} /** * @brief implement the fi_enable() API for EFA RDM endpoint @@ -1033,6 +1073,15 @@ static int efa_rdm_ep_ctrl(struct fid *fid, int command, void *arg) switch (command) { case FI_ENABLE: ep = container_of(fid, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid); + + ret = efa_rdm_ep_create_base_ep_ibv_qp(ep); + if (ret) + return ret; + + ret = efa_rdm_ep_check_qp_support_in_order_aligned_128_bytes(ep); + if (ret) + return ret; + ret = efa_base_ep_enable(&ep->base_ep); if (ret) return ret; @@ -1276,52 +1325,6 @@ static int efa_rdm_ep_set_use_device_rdma(struct efa_rdm_ep *ep, bool use_device return 0; } -/** - * @brief set sendrecv_in_order_aligned_128_bytes flag in efa_rdm_ep - * called by efa_rdm_ep_setopt - * @param[in,out] ep endpoint - * @param[in] sendrecv_in_order_aligned_128_bytes whether to enable in_order send/recv - * for each 128 bytes aligned buffer - * @return 0 on success, -FI_EOPNOTSUPP if the option cannot be supported - * @related efa_rdm_ep - */ -static -int efa_rdm_ep_set_sendrecv_in_order_aligned_128_bytes(struct efa_rdm_ep *ep, - bool sendrecv_in_order_aligned_128_bytes) -{ - /* - * RDMA read is used to copy data from host bounce buffer to the - * application buffer on device - */ - if (sendrecv_in_order_aligned_128_bytes && - !efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_READ)) - return -FI_EOPNOTSUPP; - - ep->sendrecv_in_order_aligned_128_bytes = sendrecv_in_order_aligned_128_bytes; - return 0; -} - -/** - * @brief set write_in_order_aligned_128_bytes flag in efa_rdm_ep - * called by efa_rdm_ep_set_opt - * @param[in,out] ep endpoint - * @param[in] write_in_order_aligned_128_bytes whether to enable RDMA in order write - * for each 128 bytes aligned buffer. - * @return 0 on success, -FI_EOPNOTSUPP if the option cannot be supported. - * @related efa_rdm_ep - */ -static -int efa_rdm_ep_set_write_in_order_aligned_128_bytes(struct efa_rdm_ep *ep, - bool write_in_order_aligned_128_bytes) -{ - if (write_in_order_aligned_128_bytes && - !efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_WRITE)) - return -FI_EOPNOTSUPP; - - ep->write_in_order_aligned_128_bytes = write_in_order_aligned_128_bytes; - return 0; -} - /** * @brief implement the fi_setopt() API for EFA RDM endpoint * @param[in] fid fid to endpoint @@ -1414,16 +1417,12 @@ static int efa_rdm_ep_setopt(fid_t fid, int level, int optname, case FI_OPT_EFA_SENDRECV_IN_ORDER_ALIGNED_128_BYTES: if (optlen != sizeof(bool)) return -FI_EINVAL; - ret = efa_rdm_ep_set_sendrecv_in_order_aligned_128_bytes(efa_rdm_ep, *(bool *)optval); - if (ret) - return ret; + efa_rdm_ep->sendrecv_in_order_aligned_128_bytes = *(bool *)optval; break; case FI_OPT_EFA_WRITE_IN_ORDER_ALIGNED_128_BYTES: if (optlen != sizeof(bool)) return -FI_EINVAL; - ret = efa_rdm_ep_set_write_in_order_aligned_128_bytes(efa_rdm_ep, *(bool *)optval); - if (ret) - return ret; + efa_rdm_ep->write_in_order_aligned_128_bytes = *(bool *)optval; break; default: EFA_WARN(FI_LOG_EP_CTRL, diff --git a/prov/efa/src/rdm/efa_rdm_ep_progress.c b/prov/efa/src/rdm/efa_rdm_ep_progress.c index c7c2cf24b30..73616c1a923 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_progress.c +++ b/prov/efa/src/rdm/efa_rdm_ep_progress.c @@ -5,6 +5,7 @@ #include "efa_av.h" #include "efa_rdm_ep.h" +#include "efa_rdm_cq.h" #include "efa_rdm_tracepoint.h" #include "efa_cntr.h" #include "efa_rdm_pke_cmd.h" @@ -266,24 +267,26 @@ void efa_rdm_ep_check_peer_backoff_timer(struct efa_rdm_ep *ep) * @param[in] len Payload length * @param[in] flags flags (such as FI_REMOTE_CQ_DATA) */ +static void efa_rdm_ep_proc_ibv_recv_rdma_with_imm_completion(struct efa_rdm_ep *ep, - uint32_t imm_data, - uint32_t len, uint64_t flags, - struct efa_rdm_pke *pkt_entry) + struct efa_rdm_pke *pkt_entry, + struct ibv_cq_ex *ibv_cq_ex) { struct util_cq *target_cq; int ret; fi_addr_t src_addr; struct efa_av *efa_av; + uint32_t imm_data = ibv_wc_read_imm_data(ibv_cq_ex); + uint32_t len = ibv_wc_read_byte_len(ibv_cq_ex); target_cq = ep->base_ep.util_ep.rx_cq; efa_av = ep->base_ep.av; if (ep->base_ep.util_ep.caps & FI_SOURCE) { src_addr = efa_av_reverse_lookup_rdm(efa_av, - ibv_wc_read_slid(ep->ibv_cq_ex), - ibv_wc_read_src_qp(ep->ibv_cq_ex), + ibv_wc_read_slid(ibv_cq_ex), + ibv_wc_read_src_qp(ibv_cq_ex), NULL); ret = ofi_cq_write_src(target_cq, NULL, flags, len, NULL, imm_data, 0, src_addr); } else { @@ -314,7 +317,8 @@ void efa_rdm_ep_proc_ibv_recv_rdma_with_imm_completion(struct efa_rdm_ep *ep, */ static inline fi_addr_t efa_rdm_ep_determine_peer_address_from_efadv(struct efa_rdm_ep *ep, - struct ibv_cq_ex *ibv_cqx) + struct ibv_cq_ex *ibv_cqx, + enum ibv_cq_ex_type ibv_cq_ex_type) { struct efa_rdm_pke *pkt_entry; struct efa_ep_addr efa_ep_addr = {0}; @@ -322,7 +326,7 @@ fi_addr_t efa_rdm_ep_determine_peer_address_from_efadv(struct efa_rdm_ep *ep, union ibv_gid gid = {0}; uint32_t *connid = NULL; - if (ep->ibv_cq_ex_type != EFADV_CQ) { + if (ibv_cq_ex_type != EFADV_CQ) { /* EFA DV CQ is not supported. This could be due to old EFA kernel module versions. */ return FI_ADDR_NOTAVAIL; } @@ -371,7 +375,7 @@ fi_addr_t efa_rdm_ep_determine_peer_address_from_efadv(struct efa_rdm_ep *ep, * @param ibv_cqx Pointer to CQ * @returns Peer address, or FI_ADDR_NOTAVAIL if unsuccessful. */ -static inline fi_addr_t efa_rdm_ep_determine_addr_from_ibv_cq(struct efa_rdm_ep *ep, struct ibv_cq_ex *ibv_cqx) +static inline fi_addr_t efa_rdm_ep_determine_addr_from_ibv_cq(struct efa_rdm_ep *ep, struct ibv_cq_ex *ibv_cqx, enum ibv_cq_ex_type ibv_cq_ex_type) { struct efa_rdm_pke *pkt_entry; fi_addr_t addr = FI_ADDR_NOTAVAIL; @@ -381,7 +385,7 @@ static inline fi_addr_t efa_rdm_ep_determine_addr_from_ibv_cq(struct efa_rdm_ep addr = efa_rdm_pke_determine_addr(pkt_entry); if (addr == FI_ADDR_NOTAVAIL) { - addr = efa_rdm_ep_determine_peer_address_from_efadv(ep, ibv_cqx); + addr = efa_rdm_ep_determine_peer_address_from_efadv(ep, ibv_cqx, ibv_cq_ex_type); } return addr; @@ -398,7 +402,7 @@ static inline fi_addr_t efa_rdm_ep_determine_addr_from_ibv_cq(struct efa_rdm_ep * @returns Peer address, or FI_ADDR_NOTAVAIL if unsuccessful. */ static inline -fi_addr_t efa_rdm_ep_determine_addr_from_ibv_cq(struct efa_rdm_ep *ep, struct ibv_cq_ex *ibv_cqx) +fi_addr_t efa_rdm_ep_determine_addr_from_ibv_cq(struct efa_rdm_ep *ep, struct ibv_cq_ex *ibv_cqx, enum ibv_cq_ex_type ibv_cq_ex_type) { struct efa_rdm_pke *pkt_entry; @@ -414,7 +418,7 @@ fi_addr_t efa_rdm_ep_determine_addr_from_ibv_cq(struct efa_rdm_ep *ep, struct ib * @param[in] ep RDM endpoint * @param[in] cqe_to_process Max number of cq entry to poll and process. Must be positive. */ -static inline void efa_rdm_ep_poll_ibv_cq(struct efa_rdm_ep *ep, size_t cqe_to_process) +static inline void efa_rdm_ep_poll_ibv_cq(struct efa_rdm_ep *ep, size_t cqe_to_process, struct efa_rdm_cq *efa_rdm_cq) { bool should_end_poll = false; /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll. @@ -433,15 +437,15 @@ static inline void efa_rdm_ep_poll_ibv_cq(struct efa_rdm_ep *ep, size_t cqe_to_p efa_av = ep->base_ep.av; /* Call ibv_start_poll only once */ - err = ibv_start_poll(ep->ibv_cq_ex, &poll_cq_attr); + err = ibv_start_poll(efa_rdm_cq->ibv_cq_ex, &poll_cq_attr); should_end_poll = !err; while (!err) { - pkt_entry = (void *)(uintptr_t)ep->ibv_cq_ex->wr_id; - efa_rdm_tracepoint(poll_cq, (size_t) ep->ibv_cq_ex->wr_id); - opcode = ibv_wc_read_opcode(ep->ibv_cq_ex); - if (ep->ibv_cq_ex->status) { - prov_errno = efa_rdm_ep_get_prov_errno(ep); + pkt_entry = (void *)(uintptr_t)efa_rdm_cq->ibv_cq_ex->wr_id; + efa_rdm_tracepoint(poll_cq, (size_t) efa_rdm_cq->ibv_cq_ex->wr_id); + opcode = ibv_wc_read_opcode(efa_rdm_cq->ibv_cq_ex); + if (efa_rdm_cq->ibv_cq_ex->status) { + prov_errno = efa_rdm_ep_get_prov_errno(ep, efa_rdm_cq->ibv_cq_ex); switch (opcode) { case IBV_WC_SEND: /* fall through */ case IBV_WC_RDMA_WRITE: /* fall through */ @@ -466,14 +470,14 @@ static inline void efa_rdm_ep_poll_ibv_cq(struct efa_rdm_ep *ep, size_t cqe_to_p efa_rdm_pke_handle_send_completion(pkt_entry); break; case IBV_WC_RECV: - pkt_entry->addr = efa_av_reverse_lookup_rdm(efa_av, ibv_wc_read_slid(ep->ibv_cq_ex), - ibv_wc_read_src_qp(ep->ibv_cq_ex), pkt_entry); + pkt_entry->addr = efa_av_reverse_lookup_rdm(efa_av, ibv_wc_read_slid(efa_rdm_cq->ibv_cq_ex), + ibv_wc_read_src_qp(efa_rdm_cq->ibv_cq_ex), pkt_entry); if (pkt_entry->addr == FI_ADDR_NOTAVAIL) { - pkt_entry->addr = efa_rdm_ep_determine_addr_from_ibv_cq(ep, ep->ibv_cq_ex); + pkt_entry->addr = efa_rdm_ep_determine_addr_from_ibv_cq(ep, efa_rdm_cq->ibv_cq_ex, efa_rdm_cq->ibv_cq_ex_type); } - pkt_entry->pkt_size = ibv_wc_read_byte_len(ep->ibv_cq_ex); + pkt_entry->pkt_size = ibv_wc_read_byte_len(efa_rdm_cq->ibv_cq_ex); assert(pkt_entry->pkt_size > 0); efa_rdm_pke_handle_recv_completion(pkt_entry); #if ENABLE_DEBUG @@ -486,10 +490,8 @@ static inline void efa_rdm_ep_poll_ibv_cq(struct efa_rdm_ep *ep, size_t cqe_to_p break; case IBV_WC_RECV_RDMA_WITH_IMM: efa_rdm_ep_proc_ibv_recv_rdma_with_imm_completion(ep, - ibv_wc_read_imm_data(ep->ibv_cq_ex), - ibv_wc_read_byte_len(ep->ibv_cq_ex), FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE, - pkt_entry ); + pkt_entry, efa_rdm_cq->ibv_cq_ex); break; default: EFA_WARN(FI_LOG_EP_CTRL, @@ -506,17 +508,17 @@ static inline void efa_rdm_ep_poll_ibv_cq(struct efa_rdm_ep *ep, size_t cqe_to_p * ibv_next_poll MUST be call after the current WC is fully processed, * which prevents later calls on ibv_cq_ex from reading the wrong WC. */ - err = ibv_next_poll(ep->ibv_cq_ex); + err = ibv_next_poll(efa_rdm_cq->ibv_cq_ex); } if (err && err != ENOENT) { err = err > 0 ? err : -err; - prov_errno = efa_rdm_ep_get_prov_errno(ep); + prov_errno = efa_rdm_ep_get_prov_errno(ep, efa_rdm_cq->ibv_cq_ex); efa_base_ep_write_eq_error(&ep->base_ep, err, prov_errno); } if (should_end_poll) - ibv_end_poll(ep->ibv_cq_ex); + ibv_end_poll(efa_rdm_cq->ibv_cq_ex); } @@ -589,7 +591,11 @@ void efa_rdm_ep_progress_internal(struct efa_rdm_ep *ep) /* Poll the EFA completion queue. Restrict poll size * to avoid CQE flooding and thereby blocking user thread. */ - efa_rdm_ep_poll_ibv_cq(ep, efa_env.efa_cq_read_size); + if (ep->tx_cq) + efa_rdm_ep_poll_ibv_cq(ep, efa_env.efa_cq_read_size, ep->tx_cq); + + if (ep->rx_cq && (ep->rx_cq != ep->tx_cq)) + efa_rdm_ep_poll_ibv_cq(ep, efa_env.efa_cq_read_size, ep->rx_cq); efa_rdm_ep_progress_post_internal_rx_pkts(ep); diff --git a/prov/efa/src/rdm/efa_rdm_ep_utils.c b/prov/efa/src/rdm/efa_rdm_ep_utils.c index bd77646a797..da988e17c9c 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_utils.c +++ b/prov/efa/src/rdm/efa_rdm_ep_utils.c @@ -9,7 +9,6 @@ #include #include "efa.h" #include "efa_av.h" -#include "efa_cq.h" #include "efa_rdm_msg.h" #include "efa_rdm_rma.h" #include "efa_rdm_atomic.h" @@ -698,9 +697,9 @@ size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface * RDMA Core error codes (#EFA_IO_COMP_STATUSES) for the sake of more accurate * error reporting */ -int efa_rdm_ep_get_prov_errno(struct efa_rdm_ep *ep) { - uint32_t vendor_err = ibv_wc_read_vendor_err(ep->ibv_cq_ex); - struct efa_rdm_pke *pkt_entry = (void *) (uintptr_t) ep->ibv_cq_ex->wr_id; +int efa_rdm_ep_get_prov_errno(struct efa_rdm_ep *ep, struct ibv_cq_ex *ibv_cq_ex) { + uint32_t vendor_err = ibv_wc_read_vendor_err(ibv_cq_ex); + struct efa_rdm_pke *pkt_entry = (void *) (uintptr_t) ibv_cq_ex->wr_id; struct efa_rdm_peer *peer; if (OFI_LIKELY(pkt_entry && pkt_entry->addr))