Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mt/add configuration to allow keeping float trailing 0 #369

Draft
wants to merge 5 commits into
base: mt/patch-trailing-zero-json-serialisation
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *BinlogStreamer) createBinlogSyncer() error {
Password: s.DBConfig.Pass,
TLSConfig: tlsConfig,
UseDecimal: true,
KeepFloatTrailingZero: true,
TimestampStringLocation: time.UTC,
}

Expand Down
17 changes: 14 additions & 3 deletions test/go/dml_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (this *DMLEventsTestSuite) SetupTest() {

columns := []schema.TableColumn{
{Name: "col1"},
{Name: "col2"},
{Name: "col2", Type: schema.TYPE_JSON},
{Name: "col3"},
}

Expand Down Expand Up @@ -62,12 +62,13 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() {
Rows: [][]interface{}{
{1000, []byte("val1"), true},
{1001, []byte("val2"), false},
{1002, "{\"val\": 42.0}", false},
},
}

dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent)
this.Require().Nil(err)
this.Require().Equal(2, len(dmlEvents))
this.Require().Equal(3, len(dmlEvents))

q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
Expand All @@ -76,6 +77,10 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() {
q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1001,_binary'val2',0)", q2)

q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3)
}

func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() {
Expand Down Expand Up @@ -117,12 +122,14 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() {
{1000, []byte("val2"), false},
{1001, []byte("val3"), false},
{1001, []byte("val4"), true},
{1002, "{\"val\": 42.0}", false},
{1002, "{\"val\": 43.0}", false},
},
}

dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.eventBase, rowsEvent)
this.Require().Nil(err)
this.Require().Equal(2, len(dmlEvents))
this.Require().Equal(3, len(dmlEvents))

q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
Expand All @@ -131,6 +138,10 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() {
q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("UPDATE `target_schema`.`target_table` SET `col1`=1001,`col2`=_binary'val4',`col3`=1 WHERE `col1`=1001 AND `col2`=_binary'val3' AND `col3`=0", q2)

q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("UPDATE `target_schema`.`target_table` SET `col1`=1002,`col2`=CAST('{\"val\": 43.0}' AS JSON),`col3`=0 WHERE `col1`=1002 AND `col2`=CAST('{\"val\": 42.0}' AS JSON) AND `col3`=0", q3)
}

func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithWrongColumnsReturnsError() {
Expand Down
69 changes: 43 additions & 26 deletions test/integration/types_test.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
require "test_helper"

class TypesTest < GhostferryTestCase
JSON_OBJ = '{"data": {"quote": "\\\'", "value": [1]}}'
JSON_OBJ = '{"data": {"quote": "\\\'", "value": [1, 12.13]}}'
JSON_OBJ_WITH_TRAILING_ZERO = '{"data": {"float": 32.0}}'
EMPTY_JSON = '{}'
JSON_ARRAY = '[\"test_data\", \"test_data_2\"]'
JSON_NULL = 'null'
JSON_TRUE = 'true'
JSON_FALSE = 'false'
JSON_NUMBER = '42'
JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART = '52.0'
JSON_FLOATING_POINT_WITH_NON_ZERO_FRACTIONAL_PART = '52.13'

def test_json_colum_not_null_with_no_default_is_invalid_this_is_fine
# See: https://bugs.mysql.com/bug.php?id=98496
Expand Down Expand Up @@ -103,28 +106,30 @@ def test_json_data_insert
# with a JSON column is broken on 5.7.
# See: https://bugs.mysql.com/bug.php?id=87847
res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 16, res.first["cnt"]
assert_equal 22, res.first["cnt"]

expected = [
{"id"=>1, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"},
{"id"=>1, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1, 12.13]}}"},
{"id"=>2, "data"=>"[\"test_data\", \"test_data_2\"]"},
{"id"=>3, "data"=>"{}"},
{"id"=>4, "data"=>nil},
{"id"=>5, "data"=>"null"},
{"id"=>6, "data"=>"true"},
{"id"=>7, "data"=>"false"},
{"id"=>8, "data"=>"42"},

{"id"=>9, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"},
{"id"=>10, "data"=>"[\"test_data\", \"test_data_2\"]"},
{"id"=>11, "data"=>"{}"},
{"id"=>12, "data"=>nil},
{"id"=>13, "data"=>"null"},
{"id"=>14, "data"=>"true"},
{"id"=>15, "data"=>"false"},
{"id"=>16, "data"=>"42"},
{"id"=>9, "data"=>"52.13"},
{"id" => 10, "data" => format_float_based_on_mysql_version("52.0")},
{"id" => 11, "data" => "{\"data\": {\"float\": #{format_float_based_on_mysql_version("32.0")}}}"}
]

expected_length = expected.length
expected_for_second_insert = Marshal.load(Marshal.dump(expected)) # makes deep copy of the original array

expected += expected_for_second_insert.map do |row|
row["id"] += expected_length
row
end

res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME} ORDER BY id ASC")
res.zip(expected).each do |row, expected_row|
assert_equal expected_row, row
Expand Down Expand Up @@ -152,8 +157,8 @@ def test_json_data_delete
loop do
sleep 0.1
res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}")
if res.first["cnt"] == 8
1.upto(8) do |i|
if res.first["cnt"] == 11
1.upto(11) do |i|
source_db.query("DELETE FROM #{DEFAULT_FULL_TABLE_NAME} WHERE id = #{i}")
end
break
Expand Down Expand Up @@ -194,15 +199,18 @@ def test_json_data_update
loop do
sleep 0.1
res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}")
if res.first["cnt"] == 8
if res.first["cnt"] == 11
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{EMPTY_JSON}' WHERE id = 1")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_ARRAY}' WHERE id = 2")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = NULL WHERE id = 3")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_OBJ}' WHERE id = 4")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_OBJ_WITH_TRAILING_ZERO}' WHERE id = 4")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_TRUE}' WHERE id = 5")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FALSE}' WHERE id = 6")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NUMBER}' WHERE id = 7")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NULL}' WHERE id = 8")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NULL}' WHERE id = 7")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART}' WHERE id = 8")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FLOATING_POINT_WITH_NON_ZERO_FRACTIONAL_PART}' WHERE id = 9")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NUMBER}' WHERE id = 10")
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_OBJ}' WHERE id = 11")
break
end

Expand All @@ -217,17 +225,20 @@ def test_json_data_update
refute timedout, "failed due to time out while waiting for the 4 insert binlogs to be written to the target"

res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 8, res.first["cnt"]
assert_equal 11, res.first["cnt"]

expected = [
{"id"=>1, "data"=>"{}"},
{"id"=>2, "data"=>"[\"test_data\", \"test_data_2\"]"},
{"id"=>3, "data"=>nil},
{"id"=>4, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"},
{"id"=>4, "data"=>"{\"data\": {\"float\": #{format_float_based_on_mysql_version("32.0")}}}"},
{"id"=>5, "data"=>"true"},
{"id"=>6, "data"=>"false"},
{"id"=>7, "data"=>"42"},
{"id"=>8, "data"=>"null"},
{"id"=>7, "data"=>"null"},
{"id"=>8, "data"=>format_float_based_on_mysql_version("52.0")},
{"id"=>9, "data"=>"52.13"},
{"id" => 10, "data" => "42"},
{"id" => 11, "data" => "{\"data\": {\"quote\": \"'\", \"value\": [1, 12.13]}}"},
]

res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME} ORDER BY id ASC")
Expand Down Expand Up @@ -319,7 +330,7 @@ def test_copy_data_in_fixed_size_binary_column

def test_copy_data_in_fixed_size_binary_column__value_completely_filled
# Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258
#
#
# NOTE: This test is interesting (beyond what is covered above already),
# because it seems the server strips the trailing 0-bytes before sending
# them to the binlog even when the trailing 0-bytes are inserted by the user.
Expand All @@ -334,7 +345,7 @@ def test_copy_data_in_fixed_size_binary_column__value_completely_filled

def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1
# Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258
#
#
# slight variation to cover the corner-case where there is no data in the
# column at all and the entire value is 0-padded (here, only 1 byte)
execute_copy_data_in_fixed_size_binary_column(
Expand Down Expand Up @@ -393,10 +404,13 @@ def test_decimal
end
end



private

def format_float_based_on_mysql_version(value)
# mysql 5.7 removes the trailing zeros when `cast...as json` is used
ENV["MYSQL_VERSION"] == "8.0" ? value.to_s : value.to_i.to_s
end

def insert_json_on_source
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_OBJ}')")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_ARRAY}')")
Expand All @@ -406,6 +420,9 @@ def insert_json_on_source
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_TRUE}')")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FALSE}')")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_NUMBER}')")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FLOATING_POINT_WITH_NON_ZERO_FRACTIONAL_PART}')")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART}')")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_OBJ_WITH_TRAILING_ZERO}')")
end

def execute_copy_data_in_fixed_size_binary_column(column_size:, inserted_data:, expected_inserted_data:, updated_data:)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 27 additions & 5 deletions vendor/github.com/go-mysql-org/go-mysql/replication/json_binary.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions vendor/github.com/go-mysql-org/go-mysql/replication/parser.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading