diff --git a/ferry.go b/ferry.go index 1a327713..c933443e 100644 --- a/ferry.go +++ b/ferry.go @@ -880,7 +880,6 @@ func (f *Ferry) WaitUntilBinlogStreamerCatchesUp() { // You will know that the BinlogStreamer finished when .Run() returns. func (f *Ferry) FlushBinlogAndStopStreaming() { if f.WaitUntilReplicaIsCaughtUpToMaster != nil { - f.logger.Info("flush binlog and stop streaming: wait until replica is caught up to master") isReplica, err := CheckDbIsAReplica(f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB) if err != nil { f.ErrorHandler.Fatal("wait_replica", err) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 4e3a9f15..a79e7132 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -6,7 +6,6 @@ require "tmpdir" require "webrick" require "cgi" -require "securerandom" module GhostferryHelper GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") @@ -50,12 +49,10 @@ module Status AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" end - attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines, :tag + attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines - def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393) - @log_capturer = log_capturer - @logger = log_capturer.logger - @tag = SecureRandom.hex[0..3] + def initialize(main_path, config: {}, logger:, message_timeout: 30, port: 39393) + @logger = logger @main_path = main_path @config = config @@ -98,7 +95,6 @@ def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: # The main method to call to run a Ghostferry subprocess. def run(resuming_state = nil) - @logger.info("[#{@tag}] ghostferry#run(state:#{(!resuming_state.nil?).inspect})") resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) compile_binary @@ -117,26 +113,21 @@ def run(resuming_state = nil) # When using this method, you need to ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_interrupt(resuming_state = nil) - @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue ExitError - @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got Ghostferry::ExitError") dumped_state = @stdout.join("") JSON.parse(dumped_state) else - @logger.error("[#{@tag}] ghostferry#run_expecting_interrupt: something's wrong") raise "Ghostferry did not get interrupted" end # Same as above - ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_failure(resuming_state = nil) - @logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue ExitError - @logger.info("[#{@tag}] ghostferry#run_expecting_failure: got Ghostferry::ExitError") else - raise "[#{@tag}] Ghostferry did not fail" + raise "Ghostferry did not fail" end def run_with_logs(resuming_state = nil) @@ -150,14 +141,14 @@ def run_with_logs(resuming_state = nil) def compile_binary return if File.exist?(@compiled_binary_path) - @logger.debug("[#{@tag}] compiling test binary to #{@compiled_binary_path}") + @logger.debug("compiling test binary to #{@compiled_binary_path}") rc = system( "go", "build", "-o", @compiled_binary_path, @main_path ) - raise "[#{@tag}] could not compile ghostferry" unless rc + raise "could not compile ghostferry" unless rc end def start_server @@ -182,27 +173,17 @@ def start_server query = CGI::parse(req.body) - statuses = Array(query["status"]) + status = query["status"] + data = query["data"] - if statuses.empty? + unless status @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") resp.status = 400 @server.shutdown - elsif statuses.size > 1 - @logger.warn("[#{@tag}] Got multiple statuses at once: #{statuses.inspect}") - puts "Got multiple statuses at once: #{statuses.inspect}" end @last_message_time = now - - data = query["data"] - - @logger.info("[#{@tag}] server: got / with #{statuses.inspect}") - statuses.each do |status| - next if @status_handlers[status].nil? - - @status_handlers[status].each { |f| f.call(*data) } - end + @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? rescue StandardError => e # errors are not reported from WEBrick but the server should fail early # as this indicates there is likely a programming error. @@ -213,7 +194,6 @@ def start_server @server.mount_proc "/callbacks/progress" do |req, resp| begin - @logger.info("[#{@tag}] server: got /callbacks/progress") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -228,7 +208,6 @@ def start_server @server.mount_proc "/callbacks/state" do |req, resp| begin - @logger.info("[#{@tag}] server: got /callbacks/state") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -241,15 +220,14 @@ def start_server end @server.mount_proc "/callbacks/error" do |req, resp| - @logger.info("[#{@tag}] server: got /callbacks/error") @error = JSON.parse(JSON.parse(req.body)["Payload"]) @callback_handlers["error"].each { |f| f.call(@error) } unless @callback_handlers["error"].nil? end @server_thread = Thread.new do - @logger.debug("[#{@tag}] starting server thread") + @logger.debug("starting server thread") @server.start - @logger.debug("[#{@tag}] server thread stopped") + @logger.debug("server thread stopped") end end @@ -289,7 +267,7 @@ def start_ghostferry(resuming_state = nil) environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia] end - @logger.debug("[#{@tag}] starting ghostferry test binary #{@compiled_binary_path}") + @logger.debug("starting ghostferry test binary #{@compiled_binary_path}") Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| stdin.puts(resuming_state) unless resuming_state.nil? stdin.close @@ -311,7 +289,7 @@ def start_ghostferry(resuming_state = nil) if reader == stdout @stdout << line - @logger.debug("[#{@tag}] stdout: #{line}") + @logger.debug("stdout: #{line}") elsif reader == stderr @stderr << line if json_log_line?(line) @@ -324,11 +302,8 @@ def start_ghostferry(resuming_state = nil) if logline["level"] == "error" @error_lines << logline end - - @logger.debug("[#{@tag}] stderr: #{line}") unless tag.end_with?("_binlog_streamer") - else - @logger.debug("[#{@tag}] stderr: #{line}") end + @logger.debug("stderr: #{line}") end end end @@ -337,9 +312,9 @@ def start_ghostferry(resuming_state = nil) @pid = 0 end - @logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}") + @logger.debug("ghostferry test binary exitted: #{@exit_status}") if @exit_status.exitstatus != 0 - raise ExitError, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}" + raise ExitError, "ghostferry test binary returned non-zero status: #{@exit_status}" end end end @@ -352,15 +327,14 @@ def start_server_watchdog while @subprocess_thread.alive? do if (now - @last_message_time) > @message_timeout @server.shutdown - @log_capturer.print_output - raise TimeoutError, "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s" + raise TimeoutError, "ghostferry did not report to the integration test server for the last #{@message_timeout}s" end sleep 1 end @server.shutdown - @logger.debug("[#{@tag}] server watchdog thread stopped") + @logger.debug("server watchdog thread stopped") end @server_watchdog_thread.abort_on_exception = true diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index b6baccc1..0da17264 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -11,7 +11,6 @@ def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_in # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do - info("test[09]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -33,7 +32,6 @@ def test_interrupt_and_resume_without_last_known_schema_cache # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do - info("test[31]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -450,26 +448,21 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts end def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_source - @debug_me = true - info("test[452] start\n\n") ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) datawriter = new_source_datawriter start_datawriter_with_ghostferry(datawriter, ghostferry) - info("test[461] ghostferry#run_expecting_interrupt, no state\n\n") dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) - info("test[466] ghostferry#run_expecting_interrupt, with state\n\n") ghostferry.run_expecting_interrupt(dumped_state) ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) stop_datawriter_during_cutover(datawriter, ghostferry) - info("test[472] ghostferry#run_with_logs, with state\n\n") ghostferry.run_with_logs(dumped_state) assert_test_table_is_identical diff --git a/test/test_helper.rb b/test/test_helper.rb index 6ca72ba1..77c22653 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -49,8 +49,7 @@ class GhostferryTestCase < Minitest::Test def new_ghostferry(filepath, config: {}) # Transform path to something ruby understands path = File.join(GO_CODE_PATH, filepath, "main.go") - g = Ghostferry.new(path, config: config, log_capturer: @log_capturer) - info("[#{g.tag}] new_ghostferry: create") + g = Ghostferry.new(path, config: config, logger: @log_capturer.logger) @ghostferry_instances << g g end @@ -58,16 +57,12 @@ def new_ghostferry(filepath, config: {}) def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0) g = new_ghostferry(filepath, config: config) - info("[#{g.tag}] new_ghostferry_wiarc: register status hook") batches_written = 0 g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do batches_written += 1 if batches_written >= after_batches_written - info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> true") g.send_signal("TERM") - else - info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> false") end end @@ -89,10 +84,6 @@ def setup_signal_watcher Signal.trap("TERM") { self.on_term } end - def info(msg) - @log_capturer.logger.info(msg) - end - ############## # Test Hooks # ############## @@ -115,7 +106,6 @@ def before_setup # Same thing with DataWriter as above @datawriter_instances = [] - @debug_me = nil end def after_teardown @@ -127,7 +117,7 @@ def after_teardown datawriter.stop_and_join end - @log_capturer.print_output if self.failure || @debug_me + @log_capturer.print_output if self.failure @log_capturer.reset super end