Skip to content

Commit

Permalink
prov/efa: Customized changes for changing tx/rx sizes
Browse files Browse the repository at this point in the history
Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Jan 25, 2024
1 parent a2efd14 commit 5158cb1
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 22 deletions.
2 changes: 2 additions & 0 deletions fabtests/benchmarks/rdm_pingpong.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ int main(int argc, char **argv)
hints->domain_attr->mr_mode = opts.mr_mode;
hints->domain_attr->threading = FI_THREAD_DOMAIN;
hints->tx_attr->tclass = FI_TC_LOW_LATENCY;
hints->tx_attr->size = 64;
hints->rx_attr->size = 64;
hints->addr_format = opts.address_format;

ret = run();
Expand Down
11 changes: 9 additions & 2 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ struct efa_rdm_ep {
/* rx/tx queue size of core provider */
size_t efa_max_outstanding_rx_ops;
size_t efa_max_outstanding_tx_ops;
size_t efa_max_outstanding_tx_ops_curr;
size_t efa_outstanding_send_ops;
size_t efa_outstanding_write_ops;
size_t efa_outstanding_read_ops;
size_t efa_max_outstanding_send_ops_curr;
size_t efa_max_outstanding_write_ops_curr;
size_t efa_max_outstanding_read_ops_curr;
size_t efa_rnr_queued_pkt_cnt;
size_t max_data_payload_size;

Expand Down Expand Up @@ -260,9 +267,9 @@ struct efa_rdm_ope *efa_rdm_ep_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
struct efa_rdm_ope *efa_rdm_ep_alloc_rxe(struct efa_rdm_ep *ep,
fi_addr_t addr, uint32_t op);

void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry);
void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry, int op);

void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry);
void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry, int op);

static inline size_t efa_rdm_ep_get_rx_pool_size(struct efa_rdm_ep *ep)
{
Expand Down
17 changes: 15 additions & 2 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ int efa_rdm_ep_create_base_ep_ibv_qp(struct efa_rdm_ep *ep)
{
struct ibv_qp_init_attr_ex attr_ex = { 0 };

attr_ex.cap.max_send_wr = ep->base_ep.domain->device->rdm_info->tx_attr->size;
attr_ex.cap.max_send_wr = ep->user_info->tx_attr->size; // ep->base_ep.domain->device->rdm_info->tx_attr->size;
attr_ex.cap.max_send_sge = ep->base_ep.domain->device->rdm_info->tx_attr->iov_limit;
attr_ex.send_cq = ibv_cq_ex_to_cq(ep->ibv_cq_ex);

attr_ex.cap.max_recv_wr = ep->base_ep.domain->device->rdm_info->rx_attr->size;
attr_ex.cap.max_recv_wr = ep->user_info->rx_attr->size; // ep->base_ep.domain->device->rdm_info->rx_attr->size;
attr_ex.cap.max_recv_sge = ep->base_ep.domain->device->rdm_info->rx_attr->iov_limit;
attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->ibv_cq_ex);

//printf("efa_rdm_ep_create_base_ep_ibv_qp: tx_size: %u, rx_size: %u, iov_limit: %u\n", attr_ex.cap.max_send_wr, attr_ex.cap.max_recv_wr, attr_ex.cap.max_send_sge);
attr_ex.cap.max_inline_data = ep->base_ep.domain->device->efa_attr.inline_buf_size;

attr_ex.qp_type = IBV_QPT_DRIVER;
Expand Down Expand Up @@ -498,6 +499,13 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info,
efa_rdm_ep->efa_rx_pkts_posted = 0;
efa_rdm_ep->efa_rx_pkts_to_post = 0;
efa_rdm_ep->efa_outstanding_tx_ops = 0;
efa_rdm_ep->efa_max_outstanding_tx_ops_curr = 0;
efa_rdm_ep->efa_max_outstanding_send_ops_curr = 0;
efa_rdm_ep->efa_max_outstanding_read_ops_curr = 0;
efa_rdm_ep->efa_max_outstanding_write_ops_curr = 0;
efa_rdm_ep->efa_outstanding_send_ops = 0;
efa_rdm_ep->efa_outstanding_read_ops = 0;
efa_rdm_ep->efa_outstanding_write_ops = 0;

assert(!efa_rdm_ep->ibv_cq_ex);

Expand Down Expand Up @@ -851,6 +859,11 @@ static int efa_rdm_ep_close(struct fid *fid)
}
efa_rdm_ep_destroy_buffer_pools(efa_rdm_ep);

printf("ep: %p max tx: %lu, send: %lu, read: %lu, write: %lu\n", (void *)efa_rdm_ep,
efa_rdm_ep->efa_max_outstanding_tx_ops_curr,
efa_rdm_ep->efa_max_outstanding_send_ops_curr,
efa_rdm_ep->efa_max_outstanding_read_ops_curr,
efa_rdm_ep->efa_max_outstanding_write_ops_curr);
if (efa_rdm_ep->pke_vec)
free(efa_rdm_ep->pke_vec);
free(efa_rdm_ep);
Expand Down
6 changes: 5 additions & 1 deletion prov/efa/src/rdm/efa_rdm_ep_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,13 @@ static inline void efa_rdm_ep_poll_ibv_cq(struct efa_rdm_ep *ep, size_t cqe_to_p
prov_errno = ibv_wc_read_vendor_err(ep->ibv_cq_ex);
switch (opcode) {
case IBV_WC_SEND: /* fall through */
efa_rdm_pke_handle_tx_error(pkt_entry, FI_EIO, prov_errno, FI_SEND);
break;
case IBV_WC_RDMA_WRITE: /* fall through */
efa_rdm_pke_handle_tx_error(pkt_entry, FI_EIO, prov_errno, FI_WRITE);
break;
case IBV_WC_RDMA_READ:
efa_rdm_pke_handle_tx_error(pkt_entry, FI_EIO, prov_errno);
efa_rdm_pke_handle_tx_error(pkt_entry, FI_EIO, prov_errno, FI_READ);
break;
case IBV_WC_RECV: /* fall through */
case IBV_WC_RECV_RDMA_WITH_IMM:
Expand Down
44 changes: 42 additions & 2 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ struct efa_rdm_ope *efa_rdm_ep_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
* @param[in] pkt_entry TX pkt_entry, which contains
* the info of the TX op.
*/
void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry)
void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry, int op)
{
struct efa_rdm_peer *peer;
struct efa_rdm_ope *ope;
Expand All @@ -355,6 +355,33 @@ void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke

assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_EFA_TX_POOL);
ep->efa_outstanding_tx_ops++;

switch (op) {
case FI_SEND:
ep->efa_outstanding_send_ops++;
break;
case FI_READ:
ep->efa_outstanding_read_ops++;
break;
case FI_WRITE:
ep->efa_outstanding_write_ops++;
break;
default:
break;
}

if (ep->efa_outstanding_tx_ops > ep->efa_max_outstanding_tx_ops_curr)
ep->efa_max_outstanding_tx_ops_curr = ep->efa_outstanding_tx_ops;

if (ep->efa_outstanding_send_ops > ep->efa_max_outstanding_send_ops_curr)
ep->efa_max_outstanding_send_ops_curr = ep->efa_outstanding_send_ops;

if (ep->efa_outstanding_write_ops > ep->efa_max_outstanding_write_ops_curr)
ep->efa_max_outstanding_write_ops_curr = ep->efa_outstanding_write_ops;

if (ep->efa_outstanding_read_ops > ep->efa_max_outstanding_read_ops_curr)
ep->efa_max_outstanding_read_ops_curr = ep->efa_outstanding_read_ops;

if (peer)
peer->efa_outstanding_tx_ops++;

Expand Down Expand Up @@ -398,7 +425,7 @@ void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke
* @param[in] pkt_entry TX pkt_entry, which contains
* the info of the TX op
*/
void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry)
void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry, int op)
{
struct efa_rdm_ope *ope = NULL;
struct efa_rdm_peer *peer;
Expand All @@ -419,6 +446,19 @@ void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke

assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_EFA_TX_POOL);
ep->efa_outstanding_tx_ops--;
switch (op) {
case FI_SEND:
ep->efa_outstanding_send_ops--;
break;
case FI_READ:
ep->efa_outstanding_read_ops--;
break;
case FI_WRITE:
ep->efa_outstanding_write_ops--;
break;
default:
break;
}
if (peer)
peer->efa_outstanding_tx_ops--;

Expand Down
11 changes: 6 additions & 5 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,9 @@ ssize_t efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type)
segment_offset = efa_rdm_pkt_type_contains_data(pkt_type) ? ope->bytes_sent : -1;
for (i = 0; i < pkt_entry_cnt; ++i) {
pkt_entry_vec[i] = efa_rdm_pke_alloc(ep, ep->efa_tx_pkt_pool, EFA_RDM_PKE_FROM_EFA_TX_POOL);
assert(pkt_entry_vec[i]);
if (!pkt_entry_vec[i])
return -FI_EAGAIN;
//assert(pkt_entry_vec[i]);

err = efa_rdm_pke_fill_data(pkt_entry_vec[i],
pkt_type,
Expand All @@ -1736,18 +1738,17 @@ ssize_t efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type)
}

err = efa_rdm_pke_sendv(pkt_entry_vec, pkt_entry_cnt);
if (err) {
for (i = 0; i < pkt_entry_cnt; ++i)
efa_rdm_pke_release_tx(pkt_entry_vec[i]);
if (err)
goto handle_err;
}

ope->peer->flags |= EFA_RDM_PEER_REQ_SENT;
for (i = 0; i < pkt_entry_cnt; ++i)
efa_rdm_pke_handle_sent(pkt_entry_vec[i]);
return 0;

handle_err:
for (i = 0; i < pkt_entry_cnt; ++i)
efa_rdm_pke_release_tx(pkt_entry_vec[i]);
return efa_rdm_ope_post_send_fallback(ope, pkt_type, err);
}

Expand Down
9 changes: 5 additions & 4 deletions prov/efa/src/rdm/efa_rdm_pke.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,15 +457,16 @@ ssize_t efa_rdm_pke_sendv(struct efa_rdm_pke **pkt_entry_vec,
#if HAVE_LTTNG
efa_tracepoint_wr_id_post_send((void *)pkt_entry);
#endif
efa_rdm_ep_record_tx_op_submitted(ep, pkt_entry_vec[pkt_idx], FI_SEND);
}

ret = ibv_wr_complete(qp->ibv_qp_ex);
if (OFI_UNLIKELY(ret)) {
return ret;
}

for (pkt_idx = 0; pkt_idx < pkt_entry_cnt; ++pkt_idx)
efa_rdm_ep_record_tx_op_submitted(ep, pkt_entry_vec[pkt_idx]);
//for (pkt_idx = 0; pkt_idx < pkt_entry_cnt; ++pkt_idx)
// efa_rdm_ep_record_tx_op_submitted(ep, pkt_entry_vec[pkt_idx], FI_SEND);
return 0;
}

Expand Down Expand Up @@ -527,7 +528,7 @@ int efa_rdm_pke_read(struct efa_rdm_pke *pkt_entry,
if (OFI_UNLIKELY(err))
return err;

efa_rdm_ep_record_tx_op_submitted(ep, pkt_entry);
efa_rdm_ep_record_tx_op_submitted(ep, pkt_entry, FI_READ);
return 0;
}

Expand Down Expand Up @@ -615,7 +616,7 @@ int efa_rdm_pke_write(struct efa_rdm_pke *pkt_entry)
if (OFI_UNLIKELY(err))
return err;

efa_rdm_ep_record_tx_op_submitted(ep, pkt_entry);
efa_rdm_ep_record_tx_op_submitted(ep, pkt_entry, FI_WRITE);
return 0;
}

Expand Down
8 changes: 4 additions & 4 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ void efa_rdm_pke_handle_data_copied(struct efa_rdm_pke *pkt_entry)
* @param[in] err libfabric error code
* @param[in] prov_errno provider specific error code
*/
void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int err, int prov_errno)
void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int err, int prov_errno, int op)
{
struct efa_rdm_peer *peer;
struct efa_rdm_ope *txe;
Expand All @@ -420,7 +420,7 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int err, int pro
efa_strerror(prov_errno, NULL), prov_errno);

ep = pkt_entry->ep;
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, op);

peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
if (!peer) {
Expand Down Expand Up @@ -569,7 +569,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
if (pkt_entry->addr == FI_ADDR_NOTAVAIL &&
!(pkt_entry->flags & EFA_RDM_PKE_LOCAL_READ)) {
EFA_WARN(FI_LOG_CQ, "ignoring send completion of a packet to a removed peer.\n");
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, FI_SEND);
efa_rdm_pke_release_tx(pkt_entry);
return;
}
Expand Down Expand Up @@ -674,7 +674,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
return;
}

efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, FI_SEND);
efa_rdm_pke_release_tx(pkt_entry);
}

Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_pke_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fi_addr_t efa_rdm_pke_determine_addr(struct efa_rdm_pke *pkt_entry);
void efa_rdm_pke_handle_data_copied(struct efa_rdm_pke *pkt_entry);

void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry,
int err, int prov_errno);
int err, int prov_errno, int op);

void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry);

Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_pke_nonreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ void efa_rdm_pke_handle_rma_completion(struct efa_rdm_pke *context_pkt_entry)
assert(0 && "invalid EFA_RDM_RMA_CONTEXT_PKT rma_context_type\n");
}

efa_rdm_ep_record_tx_op_completed(context_pkt_entry->ep, context_pkt_entry);
efa_rdm_ep_record_tx_op_completed(context_pkt_entry->ep, context_pkt_entry, (rma_context_pkt->context_type == EFA_RDM_RDMA_WRITE_CONTEXT) ? FI_WRITE : FI_READ);
efa_rdm_pke_release_tx(context_pkt_entry);
}

Expand Down

0 comments on commit 5158cb1

Please sign in to comment.