diff --git a/lib/logproto/logproto-framed-server.c b/lib/logproto/logproto-framed-server.c index a8aac0d6075..a8f6d0109bb 100644 --- a/lib/logproto/logproto-framed-server.c +++ b/lib/logproto/logproto-framed-server.c @@ -59,6 +59,10 @@ typedef struct _LogProtoFramedServer guint32 frame_len; gboolean half_message_in_buffer; guint32 fetch_counter; + + /* auxiliary data (e.g. GSockAddr, other transport related meta + * data) associated with the already buffered data */ + LogTransportAuxData buffer_aux; } LogProtoFramedServer; static LogProtoPrepareAction @@ -90,7 +94,7 @@ log_proto_framed_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *tim */ static gboolean log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read, - LogTransportAuxData *aux, LogProtoStatus *status) + LogProtoStatus *status) { gint rc; *status = LPS_SUCCESS; @@ -104,8 +108,9 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea if (self->fetch_counter++ >= MAX_FETCH_COUNT) return FALSE; + log_transport_aux_data_reinit(&self->buffer_aux); rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end, - aux); + &self->buffer_aux); if (rc < 0) { @@ -114,7 +119,6 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea msg_error("Error reading RFC6587 style framed data", evt_tag_int("fd", self->super.transport->fd), evt_tag_error("error")); - log_transport_aux_data_reinit(aux); *status = LPS_ERROR; } else @@ -129,7 +133,6 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea { msg_trace("EOF occurred while reading", evt_tag_int(EVT_TAG_FD, self->super.transport->fd)); - log_transport_aux_data_reinit(aux); *status = LPS_EOF; return FALSE; } @@ -190,8 +193,7 @@ _is_trimmed_part_completely_fetched(LogProtoFramedServer *self) /* Returns TRUE if successfully finished consuming the data. Otherwise it is not finished, but * there is nothing left to read (or there was a read error) and expects to be called again. */ static gboolean -_consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read, - LogTransportAuxData *aux, LogProtoStatus *status) +_consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status) { /* Since trimming requires a full (buffer sized) message, the consuming * always starts at the beginning of the buffer, with a new read. */ @@ -200,7 +202,7 @@ _consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read, while (1) { - if (!log_proto_framed_server_fetch_data(self, may_read, aux, status)) + if (!log_proto_framed_server_fetch_data(self, may_read, status)) return FALSE; if (_is_trimmed_part_completely_fetched(self)) @@ -225,9 +227,9 @@ _ensure_buffer(LogProtoFramedServer *self) } static LogProtoFramedServerStateControl -_on_frame_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status) +_on_frame_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status) { - if (!log_proto_framed_server_fetch_data(self, may_read, aux, status)) + if (!log_proto_framed_server_fetch_data(self, may_read, status)) return LPFSSCTRL_RETURN_WITH_STATUS; self->state = LPFSS_FRAME_EXTRACT; @@ -235,14 +237,13 @@ _on_frame_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxDa } static LogProtoFramedServerStateControl -_on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProtoStatus *status) +_on_frame_extract(LogProtoFramedServer *self, LogProtoStatus *status) { gboolean need_more_data = FALSE; if (!log_proto_framed_server_extract_frame_length(self, &need_more_data)) { /* invalid frame header */ - log_transport_aux_data_reinit(aux); *status = LPS_ERROR; return LPFSSCTRL_RETURN_WITH_STATUS; } @@ -270,7 +271,6 @@ _on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProto msg_error("Incoming frame larger than log_msg_size()", evt_tag_int("log_msg_size", self->super.options->max_msg_size), evt_tag_int("frame_length", self->frame_len)); - log_transport_aux_data_reinit(aux); *status = LPS_ERROR; return LPFSSCTRL_RETURN_WITH_STATUS; } @@ -281,9 +281,9 @@ _on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProto } static LogProtoFramedServerStateControl -_on_trim_message_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status) +_on_trim_message_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status) { - if (!log_proto_framed_server_fetch_data(self, may_read, aux, status)) + if (!log_proto_framed_server_fetch_data(self, may_read, status)) return LPFSSCTRL_RETURN_WITH_STATUS; self->state = LPFSS_TRIM_MESSAGE; @@ -312,9 +312,9 @@ _on_trim_message(LogProtoFramedServer *self, const guchar **msg, gsize *msg_len, } static LogProtoFramedServerStateControl -_on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status) +_on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status) { - if (_consume_trimmed_part(self, may_read, aux, status)) + if (_consume_trimmed_part(self, may_read, status)) { self->state = LPFSS_FRAME_EXTRACT; /* If there is data in the buffer, try to process it immediately. @@ -330,9 +330,9 @@ _on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogTransport } static LogProtoFramedServerStateControl -_on_message_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status) +_on_message_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status) { - if (!log_proto_framed_server_fetch_data(self, may_read, aux, status)) + if (!log_proto_framed_server_fetch_data(self, may_read, status)) return LPFSSCTRL_RETURN_WITH_STATUS; self->state = LPFSS_MESSAGE_EXTRACT; @@ -364,27 +364,27 @@ _on_message_extract(LogProtoFramedServer *self, const guchar **msg, gsize *msg_l static LogProtoFramedServerStateControl _step_state_machine(LogProtoFramedServer *self, const guchar **msg, gsize *msg_len, gboolean *may_read, - LogTransportAuxData *aux, LogProtoStatus *status) + LogProtoStatus *status) { switch (self->state) { case LPFSS_FRAME_READ: - return _on_frame_read(self, may_read, aux, status); + return _on_frame_read(self, may_read, status); case LPFSS_FRAME_EXTRACT: - return _on_frame_extract(self, aux, status); + return _on_frame_extract(self, status); case LPFSS_TRIM_MESSAGE_READ: - return _on_trim_message_read(self, may_read, aux, status); + return _on_trim_message_read(self, may_read, status); case LPFSS_TRIM_MESSAGE: return _on_trim_message(self, msg, msg_len, status); case LPFSS_CONSUME_TRIMMED: - return _on_consume_trimmed(self, may_read, aux, status); + return _on_consume_trimmed(self, may_read, status); case LPFSS_MESSAGE_READ: - return _on_message_read(self, may_read, aux, status); + return _on_message_read(self, may_read, status); case LPFSS_MESSAGE_EXTRACT: return _on_message_extract(self, msg, msg_len, status); @@ -404,8 +404,11 @@ log_proto_framed_server_fetch(LogProtoServer *s, const guchar **msg, gsize *msg_ _ensure_buffer(self); self->fetch_counter = 0; - while (_step_state_machine(self, msg, msg_len, may_read, aux, &status) != LPFSSCTRL_RETURN_WITH_STATUS) ; + while (_step_state_machine(self, msg, msg_len, may_read, &status) != LPFSSCTRL_RETURN_WITH_STATUS) + ; + if (status == LPS_SUCCESS && aux) + log_transport_aux_data_copy(aux, &self->buffer_aux); return status; } @@ -415,6 +418,7 @@ log_proto_framed_server_free(LogProtoServer *s) LogProtoFramedServer *self = (LogProtoFramedServer *) s; g_free(self->buffer); + log_transport_aux_data_destroy(&self->buffer_aux); log_proto_server_free_method(s); }