Skip to content

Commit

Permalink
prov/efa: Create 1:1 relationship between libfabric CQs and IBV CQs
Browse files Browse the repository at this point in the history
Today, each endpoint has its own completion queue with the EFA provider, even if all endpoints bind the same CQ.
This caused applications to poll many more hardware completion queues than expected, at significantly reduced performance.
This patch fixes this issue by moving the IBV cq creation to fi_cq_open,
and bind it with ep explicitly via fi_ep_bind.

As a consequence of this change, the QP creation is moved to fi_ep_enable
because the ibv cq must be bound with qp during creation, which is guaranteed
during the ep enable.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Feb 14, 2024
1 parent 2f60522 commit cd99a5c
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 125 deletions.
3 changes: 3 additions & 0 deletions fabtests/pytest/efa/test_cq.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import pytest

# this test must be run in serial mode because it will open the maximal number
# of cq that efa device can support
@pytest.mark.serial
@pytest.mark.unit
def test_cq(cmdline_args):
from common import UnitTest
Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/efa_base_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av)
return 0;
}

static int efa_base_ep_destruct_qp(struct efa_base_ep *base_ep)
int efa_base_ep_destruct_qp(struct efa_base_ep *base_ep)
{
struct efa_domain *domain;
struct efa_qp *qp = base_ep->qp;
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/efa_base_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ int efa_base_ep_getname(fid_t fid, void *addr, size_t *addrlen);
int efa_base_ep_create_qp(struct efa_base_ep *base_ep,
struct ibv_qp_init_attr_ex *init_attr_ex);

int efa_base_ep_destruct_qp(struct efa_base_ep *base_ep);

bool efa_base_ep_support_op_in_order_aligned_128_bytes(struct efa_base_ep *base_ep,
enum ibv_wr_opcode op);

Expand Down
5 changes: 5 additions & 0 deletions prov/efa/src/efa_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@

#include "efa.h"

enum ibv_cq_ex_type {
IBV_CQ,
EFADV_CQ
};

/**
* @brief Create ibv_cq_ex by calling ibv_create_cq_ex
*
Expand Down
17 changes: 17 additions & 0 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ int efa_rdm_cq_close(struct fid *fid)

cq = container_of(fid, struct efa_rdm_cq, util_cq.cq_fid.fid);

if (cq->ibv_cq_ex) {
ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex));
if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to close ibv cq: %s\n",
fi_strerror(-ret));
return ret;
}
cq->ibv_cq_ex = NULL;
}

if (cq->shm_cq) {
ret = fi_close(&cq->shm_cq->fid);
if (ret) {
Expand Down Expand Up @@ -159,6 +169,13 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
}
}

ret = efa_cq_ibv_cq_ex_open(attr, efa_domain->device->ibv_ctx, &cq->ibv_cq_ex, &cq->ibv_cq_ex_type);
if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %d\n", ret);
ret = -FI_EINVAL;
goto free;
}

return 0;
free:
free(cq);
Expand Down
3 changes: 3 additions & 0 deletions prov/efa/src/rdm/efa_rdm_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
#ifndef EFA_RDM_CQ_H
#define EFA_RDM_CQ_H

#include "efa_cq.h"
#include <ofi_util.h>

struct efa_rdm_cq {
struct util_cq util_cq;
struct fid_cq *shm_cq;
struct ibv_cq_ex *ibv_cq_ex;
enum ibv_cq_ex_type ibv_cq_ex_type;
};

/*
Expand Down
11 changes: 3 additions & 8 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@

#define EFA_RDM_ERROR_MSG_BUFFER_LENGTH 1024

enum ibv_cq_ex_type {
IBV_CQ,
EFADV_CQ
};

/** @brief Information of a queued copy.
*
* This struct is used when receiving buffer is on device.
Expand Down Expand Up @@ -50,9 +45,9 @@ struct efa_rdm_ep {
/* per-version extra feature/request flag */
uint64_t extra_info[EFA_RDM_MAX_NUM_EXINFO];

struct ibv_cq_ex *ibv_cq_ex;
struct efa_rdm_cq *tx_cq;

enum ibv_cq_ex_type ibv_cq_ex_type;
struct efa_rdm_cq *rx_cq;

/* shm provider fid */
struct fid_ep *shm_ep;
Expand Down Expand Up @@ -280,7 +275,7 @@ ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep,

size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface iface);

int efa_rdm_ep_get_prov_errno(struct efa_rdm_ep *ep);
int efa_rdm_ep_get_prov_errno(struct efa_rdm_ep *ep, struct ibv_cq_ex *ibv_cq_ex);

static inline
struct efa_domain *efa_rdm_ep_domain(struct efa_rdm_ep *ep)
Expand Down
168 changes: 84 additions & 84 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
*/

#include "efa.h"
#include "efa_cq.h"
#include "efa_av.h"
#include "efa_rdm_ep.h"
#include "efa_rdm_cq.h"
Expand All @@ -58,13 +57,39 @@ int efa_rdm_ep_create_base_ep_ibv_qp(struct efa_rdm_ep *ep)
{
struct ibv_qp_init_attr_ex attr_ex = { 0 };

attr_ex.cap.max_send_wr = ep->base_ep.domain->device->rdm_info->tx_attr->size;
attr_ex.cap.max_send_sge = ep->base_ep.domain->device->rdm_info->tx_attr->iov_limit;
attr_ex.send_cq = ibv_cq_ex_to_cq(ep->ibv_cq_ex);
if (!ep->tx_cq && !ep->rx_cq) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send or receive completion queue\n");
return -FI_ENOCQ;
}

attr_ex.cap.max_recv_wr = ep->base_ep.domain->device->rdm_info->rx_attr->size;
attr_ex.cap.max_recv_sge = ep->base_ep.domain->device->rdm_info->rx_attr->iov_limit;
attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->ibv_cq_ex);
if (!ep->tx_cq && ofi_needs_tx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n");
return -FI_ENOCQ;
}

if (!ep->rx_cq && ofi_needs_rx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a receive completion queue when it has receive capabilities enabled. (FI_RECV)\n");
return -FI_ENOCQ;
}

if (ep->tx_cq) {
attr_ex.cap.max_send_wr = ep->base_ep.domain->device->rdm_info->tx_attr->size;
attr_ex.cap.max_send_sge = ep->base_ep.domain->device->rdm_info->tx_attr->iov_limit;
attr_ex.send_cq = ibv_cq_ex_to_cq(ep->tx_cq->ibv_cq_ex);
} else {
attr_ex.send_cq = ibv_cq_ex_to_cq(ep->rx_cq->ibv_cq_ex);
}

if (ep->rx_cq) {
attr_ex.cap.max_recv_wr = ep->base_ep.domain->device->rdm_info->rx_attr->size;
attr_ex.cap.max_recv_sge = ep->base_ep.domain->device->rdm_info->rx_attr->iov_limit;
attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->rx_cq->ibv_cq_ex);
} else {
attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->tx_cq->ibv_cq_ex);
}

attr_ex.cap.max_inline_data = ep->base_ep.domain->device->efa_attr.inline_buf_size;

Expand Down Expand Up @@ -501,19 +526,14 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info,
efa_rdm_ep->efa_rx_pkts_held = 0;
efa_rdm_ep->efa_outstanding_tx_ops = 0;

assert(!efa_rdm_ep->ibv_cq_ex);

ret = efa_cq_ibv_cq_ex_open(&cq_attr, efa_domain->device->ibv_ctx,
&efa_rdm_ep->ibv_cq_ex, &efa_rdm_ep->ibv_cq_ex_type);

if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %s\n", strerror(errno));
goto err_close_shm_ep;
}

ret = efa_rdm_ep_create_buffer_pools(efa_rdm_ep);
if (ret)
goto err_close_core_cq;
goto err_close_shm_ep;

efa_rdm_ep_init_linked_lists(efa_rdm_ep);

Expand All @@ -539,15 +559,11 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info,
efa_rdm_ep->sendrecv_in_order_aligned_128_bytes = false;
efa_rdm_ep->write_in_order_aligned_128_bytes = false;

ret = efa_rdm_ep_create_base_ep_ibv_qp(efa_rdm_ep);
if (ret)
goto err_close_core_cq;

efa_rdm_ep->pke_vec = calloc(sizeof(struct efa_rdm_pke *), EFA_RDM_EP_MAX_WR_PER_IBV_POST_RECV);
if (!efa_rdm_ep->pke_vec) {
EFA_WARN(FI_LOG_EP_CTRL, "cannot alloc memory for efa_rdm_ep->pke_vec!\n");
ret = -FI_ENOMEM;
goto err_close_core_cq;
goto err_close_shm_ep;
}

*ep = &efa_rdm_ep->base_ep.util_ep.ep_fid;
Expand All @@ -560,11 +576,6 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info,
(*ep)->cm = &efa_rdm_ep_cm_ops;
return 0;

err_close_core_cq:
retv = -ibv_destroy_cq(ibv_cq_ex_to_cq(efa_rdm_ep->ibv_cq_ex));
if (retv)
EFA_WARN(FI_LOG_CQ, "Unable to close cq: %s\n",
fi_strerror(-retv));
err_close_shm_ep:
if (efa_rdm_ep->shm_ep) {
retv = fi_close(&efa_rdm_ep->shm_ep->fid);
Expand Down Expand Up @@ -625,6 +636,12 @@ static int efa_rdm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags)
if (ret)
return ret;

if (flags & FI_TRANSMIT)
efa_rdm_ep->tx_cq = cq;

if (flags & FI_RECV)
efa_rdm_ep->rx_cq = cq;

if (cq->shm_cq) {
/* Bind ep with shm provider's cq */
ret = fi_ep_bind(efa_rdm_ep->shm_ep, &cq->shm_cq->fid, flags);
Expand Down Expand Up @@ -829,12 +846,6 @@ static int efa_rdm_ep_close(struct fid *fid)
retv = ret;
}

ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(efa_rdm_ep->ibv_cq_ex));
if (ret) {
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close ibv_cq_ex\n");
retv = ret;
}

if (efa_rdm_ep->shm_ep) {
ret = fi_close(&efa_rdm_ep->shm_ep->fid);
if (ret) {
Expand Down Expand Up @@ -1012,6 +1023,35 @@ void efa_rdm_ep_update_shm(struct efa_rdm_ep *ep)
efa_rdm_ep_close_shm_resources(ep);
}

/**
* @brief If user requests in-order aligned 128 bytes capability,
* check if the qp supports it.
* @param ep efa_rdm_ep
* @return int 0 on success, -FI_EOPNOTSUPP when qp doesn't support the request.
*/
static
int efa_rdm_ep_check_qp_support_in_order_aligned_128_bytes(struct efa_rdm_ep *ep)
{
if (ep->write_in_order_aligned_128_bytes &&
!efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_WRITE)) {
EFA_WARN(FI_LOG_EP_CTRL,
"FI_OPT_EFA_WRITE_IN_ORDER_ALIGNED_128_BYTES is set to true but the QP doesn't support it\n");
return -FI_EOPNOTSUPP;
}

/*
* RDMA read is used to copy data from host bounce buffer to the
* application buffer on device
*/
if (ep->sendrecv_in_order_aligned_128_bytes &&
!efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_READ)) {
EFA_WARN(FI_LOG_EP_CTRL,
"FI_OPT_EFA_SENDRECV_IN_ORDER_ALIGNED_128_BYTES is set to true but the QP doesn't support it\n");
return -FI_EOPNOTSUPP;
}

return 0;
}

/**
* @brief implement the fi_enable() API for EFA RDM endpoint
Expand All @@ -1033,9 +1073,6 @@ static int efa_rdm_ep_ctrl(struct fid *fid, int command, void *arg)
switch (command) {
case FI_ENABLE:
ep = container_of(fid, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);
ret = efa_base_ep_enable(&ep->base_ep);
if (ret)
return ret;

/*
* efa uses util SRX no matter shm is enabled, so we need to initialize
Expand Down Expand Up @@ -1087,6 +1124,19 @@ static int efa_rdm_ep_ctrl(struct fid *fid, int command, void *arg)
if (ret)
goto out;
}

ret = efa_rdm_ep_create_base_ep_ibv_qp(ep);
if (ret)
goto out;

ret = efa_rdm_ep_check_qp_support_in_order_aligned_128_bytes(ep);
if (ret) {
efa_base_ep_destruct_qp(&ep->base_ep);
goto out;
}

/* efa_base_ep_enable destroys qp in the error path */
ret = efa_base_ep_enable(&ep->base_ep);
out:
ofi_genlock_unlock(srx_ctx->lock);
break;
Expand Down Expand Up @@ -1276,52 +1326,6 @@ static int efa_rdm_ep_set_use_device_rdma(struct efa_rdm_ep *ep, bool use_device
return 0;
}

/**
* @brief set sendrecv_in_order_aligned_128_bytes flag in efa_rdm_ep
* called by efa_rdm_ep_setopt
* @param[in,out] ep endpoint
* @param[in] sendrecv_in_order_aligned_128_bytes whether to enable in_order send/recv
* for each 128 bytes aligned buffer
* @return 0 on success, -FI_EOPNOTSUPP if the option cannot be supported
* @related efa_rdm_ep
*/
static
int efa_rdm_ep_set_sendrecv_in_order_aligned_128_bytes(struct efa_rdm_ep *ep,
bool sendrecv_in_order_aligned_128_bytes)
{
/*
* RDMA read is used to copy data from host bounce buffer to the
* application buffer on device
*/
if (sendrecv_in_order_aligned_128_bytes &&
!efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_READ))
return -FI_EOPNOTSUPP;

ep->sendrecv_in_order_aligned_128_bytes = sendrecv_in_order_aligned_128_bytes;
return 0;
}

/**
* @brief set write_in_order_aligned_128_bytes flag in efa_rdm_ep
* called by efa_rdm_ep_set_opt
* @param[in,out] ep endpoint
* @param[in] write_in_order_aligned_128_bytes whether to enable RDMA in order write
* for each 128 bytes aligned buffer.
* @return 0 on success, -FI_EOPNOTSUPP if the option cannot be supported.
* @related efa_rdm_ep
*/
static
int efa_rdm_ep_set_write_in_order_aligned_128_bytes(struct efa_rdm_ep *ep,
bool write_in_order_aligned_128_bytes)
{
if (write_in_order_aligned_128_bytes &&
!efa_base_ep_support_op_in_order_aligned_128_bytes(&ep->base_ep, IBV_WR_RDMA_WRITE))
return -FI_EOPNOTSUPP;

ep->write_in_order_aligned_128_bytes = write_in_order_aligned_128_bytes;
return 0;
}

/**
* @brief implement the fi_setopt() API for EFA RDM endpoint
* @param[in] fid fid to endpoint
Expand Down Expand Up @@ -1414,16 +1418,12 @@ static int efa_rdm_ep_setopt(fid_t fid, int level, int optname,
case FI_OPT_EFA_SENDRECV_IN_ORDER_ALIGNED_128_BYTES:
if (optlen != sizeof(bool))
return -FI_EINVAL;
ret = efa_rdm_ep_set_sendrecv_in_order_aligned_128_bytes(efa_rdm_ep, *(bool *)optval);
if (ret)
return ret;
efa_rdm_ep->sendrecv_in_order_aligned_128_bytes = *(bool *)optval;
break;
case FI_OPT_EFA_WRITE_IN_ORDER_ALIGNED_128_BYTES:
if (optlen != sizeof(bool))
return -FI_EINVAL;
ret = efa_rdm_ep_set_write_in_order_aligned_128_bytes(efa_rdm_ep, *(bool *)optval);
if (ret)
return ret;
efa_rdm_ep->write_in_order_aligned_128_bytes = *(bool *)optval;
break;
default:
EFA_WARN(FI_LOG_EP_CTRL,
Expand Down
Loading

0 comments on commit cd99a5c

Please sign in to comment.