Skip to content

Commit

Permalink
filterx: delegate per LogPipe filterx <> message sync to filterx
Browse files Browse the repository at this point in the history
Signed-off-by: Balazs Scheidler <balazs.scheidler@axoflow.com>
  • Loading branch information
bazsi committed May 5, 2024
1 parent 7db26bf commit 9782a80
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
22 changes: 22 additions & 0 deletions lib/filterx/filterx-eval.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,26 @@ void filterx_eval_store_weak_ref(FilterXObject *object);
void filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previous_context);
void filterx_eval_deinit_context(FilterXEvalContext *context);

static inline void
filterx_eval_sync_message(FilterXEvalContext *context, LogMessage **pmsg, const LogPathOptions *path_options)
{
if (!context)
return;

if (!filterx_scope_is_dirty(context->scope))
return;

log_msg_make_writable(pmsg, path_options);
filterx_scope_sync(context->scope, *pmsg);
}

static inline void
filterx_eval_prepare_for_fork(FilterXEvalContext *context, LogMessage **pmsg, const LogPathOptions *path_options)
{
filterx_eval_sync_message(context, pmsg, path_options);
if (context)
filterx_scope_write_protect(context->scope);
log_msg_write_protect(*pmsg);
}

#endif
13 changes: 6 additions & 7 deletions lib/logmpx.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,12 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op
log_path_options_push_junction(&local_path_options, &matched, path_options);
if (_has_multiple_arcs(self))
{
if (path_options->filterx_scope)
{
log_msg_make_writable(&msg, path_options);
filterx_scope_sync(path_options->filterx_scope, msg);
filterx_scope_write_protect(path_options->filterx_scope);
}
log_msg_write_protect(msg);
/* if we are delivering to multiple branches, we need to sync the
* filterx state with our message and also need to make everything
* write protected so that changes in those branches don't overwrite
* data we still need */

filterx_eval_prepare_for_fork(path_options->filterx_context, &msg, path_options);
}
for (fallback = 0; (fallback == 0) || (fallback == 1 && self->fallback_exists && !delivered); fallback++)
{
Expand Down
9 changes: 2 additions & 7 deletions lib/logpipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,8 @@ log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
}
}

if ((s->flags & PIF_SYNC_SCOPE) &&
path_options->filterx_scope &&
filterx_scope_is_dirty(path_options->filterx_scope))
{
log_msg_make_writable(&msg, path_options);
filterx_scope_sync(path_options->filterx_scope, msg);
}
if ((s->flags & PIF_SYNC_SCOPE))
filterx_eval_sync_message(path_options->filterx_context, &msg, path_options);

if (G_UNLIKELY(s->flags & (PIF_HARD_FLOW_CONTROL | PIF_JUNCTION_END | PIF_CONDITIONAL_MIDPOINT)))
{
Expand Down

0 comments on commit 9782a80

Please sign in to comment.