Skip to content

Commit

Permalink
Merge pull request #323 from Shopify/detect-ddl-add-handlers-temp
Browse files Browse the repository at this point in the history
Detect ddl add handlers
  • Loading branch information
shivnagarajan authored Feb 24, 2022
2 parents abcda3a + ba70310 commit 05fb06b
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 97 deletions.
224 changes: 139 additions & 85 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
sqlorig "database/sql"
"errors"
"fmt"
"time"

Expand All @@ -16,6 +17,14 @@ import (

const caughtUpThreshold = 10 * time.Second

// this is passed into event handlers to keep track of state of the binlog event stream.
type BinlogEventState struct {
evPosition mysql.Position
isEventPositionResumable bool
isEventPositionValid bool
nextFilename string
}

type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
Expand Down Expand Up @@ -48,6 +57,11 @@ type BinlogStreamer struct {

logger *logrus.Entry
eventListeners []func([]DMLEvent) error
// eventhandlers can be attached to binlog Replication Events
// for any event that does not have a specific handler attached, a default eventHandler
// is provided (defaultEventHandler). Event handlers are provided the replication binLogEvent
// and a state object that carries information about the state of the binlog event stream.
eventHandlers map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)
}

func (s *BinlogStreamer) ensureLogger() {
Expand Down Expand Up @@ -133,6 +147,93 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPositio
return s.lastStreamedBinlogPosition, err
}

// the default event handler is called for replication binLogEvents that do not have a
// separate event Handler registered.

func (s *BinlogStreamer) defaultEventHandler(ev *replication.BinlogEvent, query []byte, es *BinlogEventState) ([]byte, error) {
var err error
switch e := ev.Event.(type) {
case *replication.RotateEvent:
// This event is used to keep the "current binlog filename" of the binlog streamer in sync.
es.nextFilename = string(e.NextLogName)

isFakeRotateEvent := ev.Header.LogPos == 0 && ev.Header.Timestamp == 0
if isFakeRotateEvent {
// Sometimes the RotateEvent is fake and not a real rotation. we want to ignore the log position in the header for those events
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L278-L287
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L904-L907

// However, we can always advance our lastStreamedBinlogPosition according to its data fields
es.evPosition = mysql.Position{
Name: string(e.NextLogName),
Pos: uint32(e.Position),
}
}

s.logger.WithFields(logrus.Fields{
"new_position": es.evPosition.Pos,
"new_filename": es.evPosition.Name,
"last_position": s.lastStreamedBinlogPosition.Pos,
"last_filename": s.lastStreamedBinlogPosition.Name,
}).Info("binlog file rotated")
case *replication.FormatDescriptionEvent:
// This event is sent:
// 1) when our replication client connects to mysql
// 2) at the beginning of each binlog file
//
// For (1), if we are starting the binlog from a position that's greater
// than BIN_LOG_HEADER_SIZE (currently, 4th byte), this event's position
// is explicitly set to 0 and should not be considered valid according to
// the mysql source. See:
// https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026
es.isEventPositionValid = ev.Header.LogPos != 0
case *replication.RowsQueryEvent:
// A RowsQueryEvent will always precede the corresponding RowsEvent
// if binlog_rows_query_log_events is enabled, and is used to get
// the full query that was executed on the master (with annotations)
// that is otherwise not possible to reconstruct
query = ev.Event.(*replication.RowsQueryEvent).Query
case *replication.RowsEvent:
err = s.handleRowsEvent(ev, query)
if err != nil {
s.logger.WithError(err).Error("failed to handle rows event")
s.ErrorHandler.Fatal("binlog_streamer", err)
}
case *replication.XIDEvent, *replication.GTIDEvent:
// With regards to DMLs, we see (at least) the following sequence
// of events in the binlog stream:
//
// - GTIDEvent <- START of transaction
// - QueryEvent
// - RowsQueryEvent
// - TableMapEvent
// - RowsEvent
// - RowsEvent
// - XIDEvent <- END of transaction
//
// *NOTE*
//
// First, RowsQueryEvent is only available with `binlog_rows_query_log_events`
// set to "ON".
//
// Second, there will be at least one (but potentially more) RowsEvents
// depending on the number of rows updated in the transaction.
//
// Lastly, GTIDEvents will only be available if they are enabled.
//
// As a result, the following case will set the last resumable position for
// interruption to EITHER the start (if using GTIDs) or the end of the
// last transaction
es.isEventPositionResumable = true

// Here we also reset the query event as we are either at the beginning
// or the end of the current/next transaction. As such, the query will be
// reset following the next RowsQueryEvent before the corresponding RowsEvent(s)
query = nil
}
return query, err
}

func (s *BinlogStreamer) Run() {
s.ensureLogger()

Expand All @@ -145,13 +246,13 @@ func (s *BinlogStreamer) Run() {
}()

var query []byte
es := BinlogEventState{}

currentFilename := s.lastStreamedBinlogPosition.Name
nextFilename := s.lastStreamedBinlogPosition.Name

es.nextFilename = s.lastStreamedBinlogPosition.Name
s.logger.Info("starting binlog streamer")
for !s.stopRequested || (s.stopRequested && s.lastStreamedBinlogPosition.Compare(s.stopAtBinlogPosition) < 0) {
currentFilename = nextFilename
currentFilename = es.nextFilename
var ev *replication.BinlogEvent
var timedOut bool
var err error
Expand All @@ -174,109 +275,62 @@ func (s *BinlogStreamer) Run() {
continue
}

evPosition := mysql.Position{
es.evPosition = mysql.Position{
Name: currentFilename,
Pos: ev.Header.LogPos,
}

s.logger.WithFields(logrus.Fields{
"position": evPosition.Pos,
"file": evPosition.Name,
"position": es.evPosition.Pos,
"file": es.evPosition.Name,
"type": fmt.Sprintf("%T", ev.Event),
"lastStreamedBinlogPosition": s.lastStreamedBinlogPosition,
}).Debug("reached position")

isEventPositionResumable := false
isEventPositionValid := true

switch e := ev.Event.(type) {
case *replication.RotateEvent:
// This event is used to keep the "current binlog filename" of the binlog streamer in sync.
nextFilename = string(e.NextLogName)

isFakeRotateEvent := ev.Header.LogPos == 0 && ev.Header.Timestamp == 0
if isFakeRotateEvent {
// Sometimes the RotateEvent is fake and not a real rotation. we want to ignore the log position in the header for those events
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L278-L287
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L904-L907

// However, we can always advance our lastStreamedBinlogPosition according to its data fields
evPosition = mysql.Position{
Name: string(e.NextLogName),
Pos: uint32(e.Position),
}
}
es.isEventPositionResumable = false
es.isEventPositionValid = true

s.logger.WithFields(logrus.Fields{
"new_position": evPosition.Pos,
"new_filename": evPosition.Name,
"last_position": s.lastStreamedBinlogPosition.Pos,
"last_filename": s.lastStreamedBinlogPosition.Name,
}).Info("binlog file rotated")
case *replication.FormatDescriptionEvent:
// This event is sent:
// 1) when our replication client connects to mysql
// 2) at the beginning of each binlog file
//
// For (1), if we are starting the binlog from a position that's greater
// than BIN_LOG_HEADER_SIZE (currently, 4th byte), this event's position
// is explicitly set to 0 and should not be considered valid according to
// the mysql source. See:
// https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026
isEventPositionValid = ev.Header.LogPos != 0
case *replication.RowsQueryEvent:
// A RowsQueryEvent will always precede the corresponding RowsEvent
// if binlog_rows_query_log_events is enabled, and is used to get
// the full query that was executed on the master (with annotations)
// that is otherwise not possible to reconstruct
query = ev.Event.(*replication.RowsQueryEvent).Query
case *replication.RowsEvent:
err = s.handleRowsEvent(ev, query)
// if there is a handler associated with this eventType, call it
eventTypeString := ev.Header.EventType.String()
if handler, ok := s.eventHandlers[eventTypeString]; ok {
query, err = handler(ev, query, &es)
if err != nil {
s.logger.WithError(err).Error("failed to handle rows event")
s.logger.WithError(err).Error("failed to handle event")
s.ErrorHandler.Fatal("binlog_streamer", err)
}
case *replication.XIDEvent, *replication.GTIDEvent:
// With regards to DMLs, we see (at least) the following sequence
// of events in the binlog stream:
//
// - GTIDEvent <- START of transaction
// - QueryEvent
// - RowsQueryEvent
// - TableMapEvent
// - RowsEvent
// - RowsEvent
// - XIDEvent <- END of transaction
//
// *NOTE*
//
// First, RowsQueryEvent is only available with `binlog_rows_query_log_events`
// set to "ON".
//
// Second, there will be at least one (but potentially more) RowsEvents
// depending on the number of rows updated in the transaction.
//
// Lastly, GTIDEvents will only be available if they are enabled.
//
// As a result, the following case will set the last resumable position for
// interruption to EITHER the start (if using GTIDs) or the end of the
// last transaction
isEventPositionResumable = true

// Here we also reset the query event as we are either at the beginning
// or the end of the current/next transaction. As such, the query will be
// reset following the next RowsQueryEvent before the corresponding RowsEvent(s)
query = nil
} else {
// call the default event handler for everything else
query, err = s.defaultEventHandler(ev, query, &es)
}

if isEventPositionValid {
if es.isEventPositionValid {
evType := fmt.Sprintf("%T", ev.Event)
evTimestamp := ev.Header.Timestamp
s.updateLastStreamedPosAndTime(evTimestamp, evPosition, evType, isEventPositionResumable)
s.updateLastStreamedPosAndTime(evTimestamp, es.evPosition, evType, es.isEventPositionResumable)
}
}
}

// Attach an event handler to a replication BinLogEvent
// We only support attaching events to any of the events defined in
// https://github.com/go-mysql-org/go-mysql/blob/master/replication/const.go
// custom event handlers are provided the replication BinLogEvent and a state object
// that carries the current state of the binlog event stream.
func (s *BinlogStreamer) AddBinlogEventHandler(evType replication.EventType, eh func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)) error {
// verify that event-type is valid
// if eventTypeString is unrecognized, bail
eventTypeString := evType.String()
if eventTypeString == "UnknownEvent" {
return errors.New("Unknown event type")
}

if s.eventHandlers == nil {
s.eventHandlers = make(map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error))
}
s.eventHandlers[eventTypeString] = eh
return nil
}

func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error) {
s.eventListeners = append(s.eventListeners, listener)
}
Expand Down
3 changes: 2 additions & 1 deletion sharding/test/trivial_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package test

import (
sql "github.com/Shopify/ghostferry/sqlwrapper"
"math/rand"
"testing"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/Shopify/ghostferry/sharding"
"github.com/Shopify/ghostferry/testhelpers"
"github.com/stretchr/testify/assert"
Expand Down
15 changes: 15 additions & 0 deletions test/go/binlog_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/testhelpers"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -195,6 +196,20 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsQueryEventOnRowsEvent
this.Require().True(eventAsserted)
}

func (this *BinlogStreamerTestSuite) TestBinlogStreamerAddEventHandlerEventTypes() {
qe := func(ev *replication.BinlogEvent, query []byte, es *ghostferry.BinlogEventState) ([]byte, error) {
return query, nil
}

// try attaching a handler to a valid event type
err := this.binlogStreamer.AddBinlogEventHandler(replication.TABLE_MAP_EVENT, qe)
this.Require().Nil(err)

// try attaching a handler to an invalid event type
err = this.binlogStreamer.AddBinlogEventHandler(replication.EventType(byte(0)), qe)
this.Require().NotNil(err)
}

func TestBinlogStreamerTestSuite(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &BinlogStreamerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
Expand Down
47 changes: 47 additions & 0 deletions test/integration/ddl_events_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require "test_helper"

class DdlEventsTest < GhostferryTestCase
DDL_GHOSTFERRY = "ddl_ghostferry"

def test_default_event_handler
seed_simple_database_with_single_table

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

ghostferry.run_with_logs()

assert_ghostferry_completed(ghostferry, times: 1)
end

def test_ddl_event_handler
seed_simple_database_with_single_table

ghostferry = new_ghostferry(DDL_GHOSTFERRY)
ghostferry.run_with_logs()

assert_ghostferry_completed(ghostferry, times: 1)
end

def test_ddl_event_handler_with_ddl_events
seed_simple_database_with_single_table

table_name = full_table_name(DEFAULT_DB, DEFAULT_TABLE)

ghostferry = new_ghostferry(DDL_GHOSTFERRY)

ghostferry.on_status(GhostferryHelper::Ghostferry::Status::BINLOG_STREAMING_STARTED) do
source_db.query("INSERT INTO #{table_name} VALUES (9000, 'test')")
source_db.query("ALTER TABLE #{table_name} ADD INDEX (data(100))")
source_db.query("INSERT INTO #{table_name} (id, data) VALUES (9001, 'test')")
end

ghostferry.run_expecting_failure

source, target = source_and_target_table_metrics
source_count = source[DEFAULT_FULL_TABLE_NAME][:row_count]
target_count = target[DEFAULT_FULL_TABLE_NAME][:row_count]

refute_equal(source_count, target_count, "target should have fewer rows than source")

end
end
Loading

0 comments on commit 05fb06b

Please sign in to comment.