diff --git a/lib/fluent/plugin/in_systemd.rb b/lib/fluent/plugin/in_systemd.rb index 1ca51c1..924cb91 100644 --- a/lib/fluent/plugin/in_systemd.rb +++ b/lib/fluent/plugin/in_systemd.rb @@ -85,7 +85,12 @@ def watch while @running init_journal if @journal.wait(0) == :invalidate while @journal.move_next && @running - yield @journal.current_entry + begin + yield @journal.current_entry + rescue Systemd::JournalError => e + log.warn("Error Parsing Journal: #{e.class}: #{e.message}") + next + end @pos_writer.update(@journal.cursor) end # prevent a loop of death diff --git a/test/fixture/corrupt/test.badmsg.journal b/test/fixture/corrupt/test.badmsg.journal new file mode 100644 index 0000000..e2702df Binary files /dev/null and b/test/fixture/corrupt/test.badmsg.journal differ diff --git a/test/plugin/test_in_systemd.rb b/test/plugin/test_in_systemd.rb index c31a9e9..dd0e5b8 100644 --- a/test/plugin/test_in_systemd.rb +++ b/test/plugin/test_in_systemd.rb @@ -12,6 +12,12 @@ def setup path test/fixture ) + @badmsg_config = %( + tag test + path test/fixture/corrupt + read_from_head true + ) + @strip_config = base_config + %( strip_underscores true ) @@ -37,7 +43,7 @@ def setup ) end - attr_reader :journal, :base_config, :pos_path, :pos_config, :head_config, :filter_config, :strip_config, :tail_config + attr_reader :journal, :base_config, :pos_path, :pos_config, :head_config, :filter_config, :strip_config, :tail_config, :badmsg_config def create_driver(config) Fluent::Test::InputTestDriver.new(Fluent::SystemdInput).configure(config) @@ -180,4 +186,10 @@ def test_reading_from_the_journal_tail_explicit_setting d.run end + def test_continue_on_bad_message + d = create_driver(badmsg_config) + d.run + assert_equal 460, d.events.size + end + end