Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROTON-2790: finer grained session flow control #434

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion c/docs/buffering.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ gets a @ref PN_LINK_FLOW event.

The AMQP protocol allows peers to exchange session limits so they can predict
their buffering requirements for incoming data (
`pn_session_set_incoming_capacity()` and
`pn_session_set_incoming_incoming_window_and_lwm()` and
`pn_session_set_outgoing_window()`). Proton will not exceed those limits when
sending to or receiving from the peer. However proton does *not* limit the
amount of data buffered in local memory at the request of the application. It
Expand Down
82 changes: 82 additions & 0 deletions c/include/proton/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ PN_EXTERN void pn_session_open(pn_session_t *session);
PN_EXTERN void pn_session_close(pn_session_t *session);

/**
* **Deprecated** - Use ::pn_session_incoming_window().
*
* Get the incoming capacity of the session measured in bytes.
*
* The incoming capacity of a session determines how much incoming
Expand All @@ -205,6 +207,8 @@ PN_EXTERN void pn_session_close(pn_session_t *session);
PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);

/**
* **Deprecated** - Use ::pn_session_set_incoming_window_and_lwm().
*
* Set the incoming capacity for a session object.
*
* The incoming capacity of a session determines how much incoming message
Expand All @@ -223,6 +227,84 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);
*/
PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t capacity);

/**
* Get the maximum incoming window window for a session object.
*
* The maximum incoming window can be set by ::pn_session_set_incoming_window_and_lwm.
*
* @param[in] session the session object
* @return the maximum size of the incoming window or 0 if not set.
**/
PN_EXTERN pn_frame_count_t pn_session_incoming_window(pn_session_t *session);

/**
* Get the low water mark for the session incoming window.
*
* The low water mark governs how frequently the session updates the remote
* peer with changes to the incoming window.
*
* A value of zero indicates that Proton will choose a default strategy for
* updating the peer.
*
* The low water mark can be set by ::pn_session_set_incoming_window_and_lwm.
*
* @param[in] session the session object
* @return the low water mark of incoming window.
**/
PN_EXTERN pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *session);

/**
* Set the maximum incoming window and low water mark for a session object.
*
* The session incoming window is a count of the number of AMQP transfer frames
* that can be accepted and buffered locally by the session object until
* processed by the application (i.e. consumed by ::pn_link_recv or dropped by
* ::pn_link_advance). The maximum bytes buffered by the session will never
* exceed (max_incoming_window * max_frame_size). The incoming window frame count
* decreases 1-1 with incoming AMQP transfer frames. Whenever the application
* processes the buffered incoming bytes, the incoming window increases to the
* largest frame count that can be used by the peer without causing the local
* buffered bytes to exceed the maximum stated above.
*
* The session will defer updating the peer with a changed incoming window until
* it drops below the low water mark (lwm). Too many updates can delay
* other traffic on the connection without providing improved performance on the
* session. Too few can leave a remote sender frequently unable to send due
* to a closed window. The best balance is application specific. Note that the
* session incoming window is always updated along with the link credit on any
* of its child links, so the frequency of link credit updates is also a
* consideration when choosing a low water mark.
*
* The low water mark must be less than or equal to the incoming window. If
* set to zero, Proton will choose a default strategy for updating the
* incoming window.
*
* This call is only valid before the call to ::pn_session_open on the session.
* Subsequently, the settings are fixed for the life of the session and only
* have effect if a max frame size is also set on the session's connection.
*
* @param[in] session the session object
* @param[in] window the maximum incoming window buffered by the session
* @param[in] lwm the low water mark (or 0 for default window updating)
*
* @return 0 on success, PN_ARG_ERR if window is zero or lwm is greater than
* window, or PN_STATE_ERR if the session is already open.
*/
PN_EXTERN int pn_session_set_incoming_window_and_lwm(pn_session_t *session, pn_frame_count_t window, pn_frame_count_t lwm);

/**
* Get the remote view of the incoming window for the session.
*
* This evaluates to the most recent incoming window value communicated by the
* peer minus any subsequent transfer frames for the session that have been
* sent. It does not include transfer frames that may be created in future
* for locally buffered content tracked by @ref pn_session_outgoing_bytes.
*
* @param[in] session the session object
* @return the remote incoming window
*/
PN_EXTERN pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *session);

/**
* Get the outgoing window for a session object.
*
Expand Down
8 changes: 7 additions & 1 deletion c/include/proton/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,16 @@ PN_EXTERN uint32_t pn_transport_get_max_frame(pn_transport_t *transport);
/**
* Set the maximum frame size of a transport.
*
* The negotiated frame size cannot change over the life of the transport. After
* the transport has started sending AMQP frames to the peer, this function call
* has no effect. Typically, the maximum frame size is set when the transport is
* created.
*
* @param[in] transport a transport object
* @param[in] size the maximum frame size for the transport object
*
* @internal XXX Deprecate when moved to connection
* @internal XXX Deprecate when moved to connection, note size can change on
* reconnect with new transport, consider status return on new API.
*/
PN_EXTERN void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size);

Expand Down
7 changes: 7 additions & 0 deletions c/include/proton/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ extern "C" {
*/
typedef uint32_t pn_sequence_t;

/**
* A count or limit of AMQP transfer frames.
*
* @ingroup api_types
*/
typedef uint32_t pn_frame_count_t;

/**
* A span of time in milliseconds.
*
Expand Down
6 changes: 6 additions & 0 deletions c/src/core/engine-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ struct pn_session_t {
pn_sequence_t incoming_deliveries;
pn_sequence_t outgoing_deliveries;
pn_sequence_t outgoing_window;
pn_frame_count_t incoming_window_lwm;
pn_frame_count_t max_incoming_window;
bool check_flow;
bool need_flow;
bool lwm_default;
};

struct pn_terminus_t {
Expand Down Expand Up @@ -395,6 +400,7 @@ void pn_ep_incref(pn_endpoint_t *endpoint);
void pn_ep_decref(pn_endpoint_t *endpoint);

ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n);
void pni_session_update_incoming_lwm(pn_session_t *ssn);

#if __cplusplus
}
Expand Down
86 changes: 80 additions & 6 deletions c/src/core/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,10 @@ pn_session_t *pn_session(pn_connection_t *conn)
ssn->outgoing_deliveries = 0;
ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE;
ssn->local_handle_max = PN_IMPL_HANDLE_MAX;
ssn->incoming_window_lwm = 1;
ssn->check_flow = false;
ssn->need_flow = false;
ssn->lwm_default = true;

// begin transport state
memset(&ssn->state, 0, sizeof(ssn->state));
Expand Down Expand Up @@ -1095,11 +1099,74 @@ size_t pn_session_get_incoming_capacity(pn_session_t *ssn)
return ssn->incoming_capacity;
}

// Update required when (re)set by user or when session started (proxy: BEGIN frame). No
// session flow control actually means flow control with huge window, so set lwm to 1. There is
// low probability of a stall. Any link credit flow frame will update session credit too.
void pni_session_update_incoming_lwm(pn_session_t *ssn) {
if (ssn->incoming_capacity) {
// Old API.
if (!ssn->connection->transport)
return; // Defer until called again from BEGIN frame setup with max frame known.
if (ssn->connection->transport->local_max_frame) {
ssn->incoming_window_lwm = (ssn->incoming_capacity / ssn->connection->transport->local_max_frame) / 2;
if (!ssn->incoming_window_lwm)
ssn->incoming_window_lwm = 1; // Zero may hang.
} else {
ssn->incoming_window_lwm = 1;
}
} else if (ssn->max_incoming_window) {
// New API.
// Only need to deal with default. Called whensending BEGIN frame.
if (ssn->connection->transport && ssn->connection->transport->local_max_frame && ssn->lwm_default) {
ssn->incoming_window_lwm = (ssn->max_incoming_window + 1) / 2;
}
} else {
ssn->incoming_window_lwm = 1;
}
assert(ssn->incoming_window_lwm != 0); // 0 allows session flow to hang
}

void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
{
assert(ssn);
// XXX: should this trigger a flow?
ssn->incoming_capacity = capacity;
ssn->max_incoming_window = 0;
ssn->incoming_window_lwm = 1;
ssn->lwm_default = true;
if (ssn->connection->transport) {
ssn->check_flow = true;
ssn->need_flow = true;
pn_modified(ssn->connection, &ssn->endpoint, false);
}
pni_session_update_incoming_lwm(ssn);
// If capacity invalid, failure occurs when transport calculates value of incoming window.
}

int pn_session_set_incoming_window_and_lwm(pn_session_t *ssn, pn_frame_count_t window, pn_frame_count_t lwm) {
assert(ssn);
if (!window || (lwm && lwm > window))
return PN_ARG_ERR;
// Settings fixed after session open for simplicity. AMPQ actually allows dynamic change with risk
// of overflow if window reduced while transfers in flight.
if (ssn->endpoint.state & PN_LOCAL_ACTIVE)
return PN_STATE_ERR;
ssn->incoming_capacity = 0;
ssn->max_incoming_window = window;
ssn->lwm_default = (lwm == 0);
ssn->incoming_window_lwm = lwm;
return 0;
}

pn_frame_count_t pn_session_incoming_window(pn_session_t *ssn) {
return ssn->max_incoming_window;
}

pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *ssn) {
return (!ssn->max_incoming_window || ssn->lwm_default) ? 0 : ssn->incoming_window_lwm;
}

pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *ssn) {
return ssn->state.remote_incoming_window;
}

size_t pn_session_get_outgoing_window(pn_session_t *ssn)
Expand Down Expand Up @@ -1873,11 +1940,16 @@ static void pni_advance_receiver(pn_link_t *link)
link->session->incoming_deliveries--;

pn_delivery_t *current = link->current;
link->session->incoming_bytes -= pn_buffer_size(current->bytes);
size_t drop_count = pn_buffer_size(current->bytes);
pn_buffer_clear(current->bytes);

if (!link->session->state.incoming_window) {
pni_add_tpwork(current);
if (drop_count) {
pn_session_t *ssn = link->session;
ssn->incoming_bytes -= drop_count;
if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) {
ssn->check_flow = true;
pni_add_tpwork(current);
}
}

link->current = link->current->unsettled_next;
Expand Down Expand Up @@ -2025,8 +2097,10 @@ ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
pn_buffer_trim(delivery->bytes, size, 0);
if (size) {
receiver->session->incoming_bytes -= size;
if (!receiver->session->state.incoming_window) {
pn_session_t *ssn = receiver->session;
ssn->incoming_bytes -= size;
if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) {
ssn->check_flow = true;
pni_add_tpwork(delivery);
}
return size;
Expand Down
Loading
Loading