Skip to content

Commit

Permalink
logproto: save aux for buffered data
Browse files Browse the repository at this point in the history
Since we might be buffering data, the aux information is only available
at the first read. So do this similarly to LogProtoBufferedServer, which
saves "buffer_aux" to a member variable and then copy that in case
we return anything from the buffer.

Signed-off-by: Balazs Scheidler <balazs.scheidler@axoflow.com>
  • Loading branch information
bazsi committed Apr 21, 2024
1 parent 6620f14 commit f22b2e4
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions lib/logproto/logproto-framed-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ typedef struct _LogProtoFramedServer
guint32 frame_len;
gboolean half_message_in_buffer;
guint32 fetch_counter;

/* auxiliary data (e.g. GSockAddr, other transport related meta
* data) associated with the already buffered data */
LogTransportAuxData buffer_aux;
} LogProtoFramedServer;

static LogProtoPrepareAction
Expand Down Expand Up @@ -90,7 +94,7 @@ log_proto_framed_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *tim
*/
static gboolean
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read,
LogTransportAuxData *aux, LogProtoStatus *status)
LogProtoStatus *status)
{
gint rc;
*status = LPS_SUCCESS;
Expand All @@ -104,8 +108,9 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
if (self->fetch_counter++ >= MAX_FETCH_COUNT)
return FALSE;

log_transport_aux_data_reinit(&self->buffer_aux);
rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end,
aux);
&self->buffer_aux);

if (rc < 0)
{
Expand All @@ -114,7 +119,6 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
msg_error("Error reading RFC6587 style framed data",
evt_tag_int("fd", self->super.transport->fd),
evt_tag_error("error"));
log_transport_aux_data_reinit(aux);
*status = LPS_ERROR;
}
else
Expand All @@ -129,7 +133,6 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
{
msg_trace("EOF occurred while reading",
evt_tag_int(EVT_TAG_FD, self->super.transport->fd));
log_transport_aux_data_reinit(aux);
*status = LPS_EOF;
return FALSE;
}
Expand Down Expand Up @@ -190,8 +193,7 @@ _is_trimmed_part_completely_fetched(LogProtoFramedServer *self)
/* Returns TRUE if successfully finished consuming the data. Otherwise it is not finished, but
* there is nothing left to read (or there was a read error) and expects to be called again. */
static gboolean
_consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read,
LogTransportAuxData *aux, LogProtoStatus *status)
_consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
{
/* Since trimming requires a full (buffer sized) message, the consuming
* always starts at the beginning of the buffer, with a new read. */
Expand All @@ -200,7 +202,7 @@ _consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read,

while (1)
{
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
if (!log_proto_framed_server_fetch_data(self, may_read, status))
return FALSE;

if (_is_trimmed_part_completely_fetched(self))
Expand All @@ -225,24 +227,23 @@ _ensure_buffer(LogProtoFramedServer *self)
}

static LogProtoFramedServerStateControl
_on_frame_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
_on_frame_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
{
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
if (!log_proto_framed_server_fetch_data(self, may_read, status))
return LPFSSCTRL_RETURN_WITH_STATUS;

self->state = LPFSS_FRAME_EXTRACT;
return LPFSSCTRL_NEXT_STATE;
}

static LogProtoFramedServerStateControl
_on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProtoStatus *status)
_on_frame_extract(LogProtoFramedServer *self, LogProtoStatus *status)
{
gboolean need_more_data = FALSE;

if (!log_proto_framed_server_extract_frame_length(self, &need_more_data))
{
/* invalid frame header */
log_transport_aux_data_reinit(aux);
*status = LPS_ERROR;
return LPFSSCTRL_RETURN_WITH_STATUS;
}
Expand Down Expand Up @@ -270,7 +271,6 @@ _on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProto
msg_error("Incoming frame larger than log_msg_size()",
evt_tag_int("log_msg_size", self->super.options->max_msg_size),
evt_tag_int("frame_length", self->frame_len));
log_transport_aux_data_reinit(aux);
*status = LPS_ERROR;
return LPFSSCTRL_RETURN_WITH_STATUS;
}
Expand All @@ -281,9 +281,9 @@ _on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProto
}

static LogProtoFramedServerStateControl
_on_trim_message_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
_on_trim_message_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
{
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
if (!log_proto_framed_server_fetch_data(self, may_read, status))
return LPFSSCTRL_RETURN_WITH_STATUS;
self->state = LPFSS_TRIM_MESSAGE;

Expand Down Expand Up @@ -312,9 +312,9 @@ _on_trim_message(LogProtoFramedServer *self, const guchar **msg, gsize *msg_len,
}

static LogProtoFramedServerStateControl
_on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
_on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
{
if (_consume_trimmed_part(self, may_read, aux, status))
if (_consume_trimmed_part(self, may_read, status))
{
self->state = LPFSS_FRAME_EXTRACT;
/* If there is data in the buffer, try to process it immediately.
Expand All @@ -330,9 +330,9 @@ _on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogTransport
}

static LogProtoFramedServerStateControl
_on_message_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
_on_message_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
{
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
if (!log_proto_framed_server_fetch_data(self, may_read, status))
return LPFSSCTRL_RETURN_WITH_STATUS;

self->state = LPFSS_MESSAGE_EXTRACT;
Expand Down Expand Up @@ -364,27 +364,27 @@ _on_message_extract(LogProtoFramedServer *self, const guchar **msg, gsize *msg_l

static LogProtoFramedServerStateControl
_step_state_machine(LogProtoFramedServer *self, const guchar **msg, gsize *msg_len, gboolean *may_read,
LogTransportAuxData *aux, LogProtoStatus *status)
LogProtoStatus *status)
{
switch (self->state)
{
case LPFSS_FRAME_READ:
return _on_frame_read(self, may_read, aux, status);
return _on_frame_read(self, may_read, status);

case LPFSS_FRAME_EXTRACT:
return _on_frame_extract(self, aux, status);
return _on_frame_extract(self, status);

case LPFSS_TRIM_MESSAGE_READ:
return _on_trim_message_read(self, may_read, aux, status);
return _on_trim_message_read(self, may_read, status);

case LPFSS_TRIM_MESSAGE:
return _on_trim_message(self, msg, msg_len, status);

case LPFSS_CONSUME_TRIMMED:
return _on_consume_trimmed(self, may_read, aux, status);
return _on_consume_trimmed(self, may_read, status);

case LPFSS_MESSAGE_READ:
return _on_message_read(self, may_read, aux, status);
return _on_message_read(self, may_read, status);

case LPFSS_MESSAGE_EXTRACT:
return _on_message_extract(self, msg, msg_len, status);
Expand All @@ -404,8 +404,11 @@ log_proto_framed_server_fetch(LogProtoServer *s, const guchar **msg, gsize *msg_
_ensure_buffer(self);

self->fetch_counter = 0;
while (_step_state_machine(self, msg, msg_len, may_read, aux, &status) != LPFSSCTRL_RETURN_WITH_STATUS) ;
while (_step_state_machine(self, msg, msg_len, may_read, &status) != LPFSSCTRL_RETURN_WITH_STATUS)
;

if (status == LPS_SUCCESS && aux)
log_transport_aux_data_copy(aux, &self->buffer_aux);
return status;
}

Expand All @@ -415,6 +418,7 @@ log_proto_framed_server_free(LogProtoServer *s)
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
g_free(self->buffer);

log_transport_aux_data_destroy(&self->buffer_aux);
log_proto_server_free_method(s);
}

Expand Down

0 comments on commit f22b2e4

Please sign in to comment.