diff --git a/lib/fluent/plugin/in_systemd.rb b/lib/fluent/plugin/in_systemd.rb index f1134c9..76194bf 100644 --- a/lib/fluent/plugin/in_systemd.rb +++ b/lib/fluent/plugin/in_systemd.rb @@ -19,9 +19,6 @@ class SystemdInput < Input def configure(conf) super @pos_writer = PosWriter.new(@pos_file) - @journal = Systemd::Journal.new(path: @path) - @journal.filter(*@filters) - seek end def start @@ -37,6 +34,12 @@ def shutdown private + def init_journal + @journal = Systemd::Journal.new(path: @path) + @journal.filter(*@filters) + seek + end + def seek seek_to(@pos_writer.cursor || read_from) rescue Systemd::JournalError @@ -59,6 +62,7 @@ def read_from end def run + init_journal Thread.current.abort_on_exception = true watch do |entry| begin @@ -67,6 +71,7 @@ def run log.error("Exception emitting record: #{e}") end end + @pos_writer.sync end def formatted(entry) diff --git a/lib/fluent/plugin/systemd/pos_writer.rb b/lib/fluent/plugin/systemd/pos_writer.rb index 67f1cf8..1d2d4e3 100644 --- a/lib/fluent/plugin/systemd/pos_writer.rb +++ b/lib/fluent/plugin/systemd/pos_writer.rb @@ -27,6 +27,10 @@ def shutdown write_pos end + def sync + write_pos + end + def update(c) return unless @path @lock.synchronize { @cursor = c } diff --git a/test/plugin/systemd/test_pos_writer.rb b/test/plugin/systemd/test_pos_writer.rb index 5941dd9..5441487 100644 --- a/test/plugin/systemd/test_pos_writer.rb +++ b/test/plugin/systemd/test_pos_writer.rb @@ -32,6 +32,17 @@ def test_writing_the_cursor_when_file_does_not_exist_yet FileUtils.rm_rf dir end + def test_syncing_the_cursor_when_file_does_not_exist_yet + dir = Dir.mktmpdir("posdir") + path = "#{dir}/foo.pos" + pos_writer = Fluent::Plugin::SystemdInput::PosWriter.new(path) + pos_writer.start + pos_writer.update("this is the cursor") + pos_writer.sync + assert_equal File.read(path), "this is the cursor" + FileUtils.rm_rf dir + end + def test_file_permission_when_file_does_not_exist_yet dir = Dir.mktmpdir("posdir") path = "#{dir}/foo.pos"