This library is compatible with Go 1.11+
Please refer to CHANGELOG.md
if you encounter breaking changes.
- Motivation
- Introduction
- Contract
- Usage
- Data comparision strategy
- Applying custom filters
- Partitioned synchronization
- Pseudo columns
- Chunked synchronization
- Query based synchronization
- Non PK tables synchronization
- Managing transfer
- Running end to end test
- Supported databases
- Deployment
- Checking data sync quality
- GoCover
- License
- Credits and Acknowledgements
With cloud echo system, data synchronization between various database and cloud vendor becomes more and more frequent task.
Brute force method can be accomplished simply by coping data from source to destination followed by removal records from dest table that do not exist in source and merging the rest.
While dealing with large dataset or high sync frequency , data sync cost (ingress bandwidth, actual data transfer cost) can not be simply ignored, thus brute force approach may not be the best strategy.
This project provides SQL based cross database vendor data synchronization for small and large(billions+ records) tables/views in a cost effective way.
This is achieved by both determining the smallest changed dataset and by dividing transferable dataset in the partitioned/chunked segments. Both read and writes can be easily parallelized.
Synchronization process
In this step, synchronizer uses aggregation function to compare source and destination table. In case of data discrepancy, the process narrows down source dataset to the one that has been changed. When chunks or partition or both are used, on top of narrowing source dataset, only out of sync data segments are transferred to destination database. Sync process uses transient table for each partition, chunk or partition chunk combination which creates prior transfer based on dest table. This provides additional sync safety, while it is possible that data transfer may fail half-way through, with this approach only successful data segment is merged with dest table.
Changed dataset is moved from source to transient table in destination database with transfer service. Transfer service streamlines data copy with parallel writes and compacted collections. It uses batched inserts or batched load job (BigQuery) to reduce unnecessary round trips. On top of that large dataset can be divided in to partition or/and smaller transferable chunks, which provides additional level of the read paralelization.
By default all columns from destination table are used with sync process, you can override this behaviour by supplying explicit columns list.
During checking synchronization status, sync process determines merge strategy based on the changed dataset which is one of the following:
- insert - append data to destination table
- merge - append or update data in destination table
- delete merge - remove data from destination table if it does not exist in transferred transient table, then merge
- delete insert - remove data from destination table if it does not exist in transferred transient table, then append
- --url URL location with scheduled synchronization
- --urlRefresh scehdule location refresh in ms
- --port service port
- --debug enabled debug
- --statsHistory number of completed sync info stored in memory (/v1/api/history/{ID})
JSON format
curl -d @request.json -X POST -H "Content-Type: application/json" http://127.0.0.1:8081/v1/api/sync
{
"Table": "events",
"IDColumns": [
"id"
],
"Source": {
"Credentials": "mysql-e2e",
"Descriptor": "[username]:[password]@tcp(127.0.0.1:3306)/[dbname]?parseTime=true",
"DriverName": "mysql",
"Parameters": {
"dbname": "db1"
}
},
"Dest": {
"Credentials": "mysql-e2e",
"Descriptor": "[username]:[password]@tcp(127.0.0.1:3306)/[dbname]?parseTime=true",
"DriverName": "mysql",
"Parameters": {
"dbname": "db2"
}
},
"Transfer": {
"EndpointIP": "127.0.0.1:8080",
"WriterThreads": 2,
"BatchSize": 2048
}
}
YAML format
curl --data-binary @request.yaml -X POST -H "Content-Type: application/yaml" http://127.0.0.1:8081/v1/api/sync
table: events
idColumns:
- id
source:
credentials: mysql-e2e
descriptor: [username]:[password]@tcp(127.0.0.1:3306)/[dbname]?parseTime=true
driverName: mysql
parameters:
dbname: db1
dest:
credentials: mysql-e2e
descriptor: [username]:[password]@tcp(127.0.0.1:3306)/[dbname]?parseTime=true
driverName: mysql
parameters:
dbname: db2
transfer:
endpointIP: 127.0.0.1:8080
writerThreads: 2
batchSize: 2048
Place scheduled request in url specified with sync service
table: events
idColumns:
- id
columns:
- id
- event_type
- timestamp
diff:
countOnly: true
source:
...
dest:
...
transfer:
...
schedule:
frequency:
value: 1
unit: hour
Schedule can use either frequency or at schedule, if both specified frequency take precedence
table: events
idColumns:
- id
diff:
countOnly: true
source:
...
dest:
...
transfer:
...
schedule:
at:
minute: 0,30
hour: *
weekDay: *
- /v1/api/jobs: list all recently active sync jobs
- /v1/api/schedules list all scheduled jobs
- /v1/api/job/{ids}: progress info for specified jobs id(s)
- /v1/api/job/history/{ids}: history info for specified jobs id(s)
- /v1/api/status: overall service status, with all transfers, and errors for the x past run
curl http://127.0.0.1:8081//v1/api/jobs
- /v1/api/tasks: list all recently active transfer tasks
Detecting data discrepancy uses aggregate function on all or just specified columns. Data comparision can be applied on the whole table, virtual partition(s) or a chunk level.
By default all dest table columns are used to identified data discrepancy, the following aggregate function rules apply:
- for any numeric data type SUM aggregate function is used
- for any time/date data type MAX aggregate function is used
- for other data type COUNT DISTINCT is used
When countOnly option is selected, total rows COUNT is used, this is especially useful when source table uses data appends only.
In either case for single column ID based table the following aggregates are also added:
- max ID: MAX(id)
- min ID: MIN(id)
- total rows count: COUNT(1)
- unique distinct count: COUNT(distinct ID)
- unique not null sum: SUM(CASE WHEN ID IS NULL THEN 1 ELSE 0 END)
The last three are used to check if data inconsistency, duplication, id constraint violation.
Narrowing process try to find max ID in destination dataset which is in sync with the source. Note that this process is only applicable for single numeric ID based table.
In case when source and dest dataset are discrepant and source ID is greater than dest ID, synchronizer takes dest max ID, to check if up to that ID both dataset are equal, if so it uses INSERT strategy and transfer only source data where source ID is greater then dest max ID.
When source ID is greater then dest ID and insert strategy can not be applied, synchronizer would try to reduce/expand dest dataset range where upper bound is limited by dest max ID and delta defined as half dataset ID distance (max id +/- delta), if probed data is in sync, narrowed ID is used and delta is increased by half, otherwise decrease for next try. Number of iteration in this process is controlled by depth parameter (0 by default).
When narrowed dataset is determined, merge(inser/update) strategy is used, and synchronizer transfers only source data where source ID is greater then narrowed ID.
When source ID is lower than dest ID, or source row count is lower than dest, delete/merge strategy is used. In case of non-chunked transfer all source dataset is copied to dest transient table, followed by deletion of any dest table records which are not be found in transient table, then data is merged.
When chunked-transfer is used only discrepant chunk are transferred, thus deletion is reduced to discrepant chunks.
- diff.columns: custom columns with aggregate function used to compare source and destination
- diff.countOnly: flag to use only row COUNT comparision
- diff.depth: specifies number attempts to find max dest ID synchronized with the source
- diff.batchSize: number of partition (512)
- diff.numericPrecision: default required decimal precision when comparing decimal data (can be also specified on diff.columns level)
- diff.dateFormat: default date format used to compare date/time data type (can be also specified on diff.columns level)
- diff.dateLayout: default date layout used to compare date/time data type (can be also specified on diff.columns level)
- diff.newIDOnly: flag to sync only source where dest.ID > source.ID
In scenario where each source data mutation also updates specified column(s) i.e UPDATED, it is more effective to just use column(s) in question instead of all of them. Note that, since table is single ID based beside COUNT(1), MAX(ID), MIN(ID) is also used in data comparision.
table: sites
idColumns:
- id
source:
...
dest:
...
diff:
depth: 2
columns:
- name: UPDATED
func: MAX
Sometimes instead of sync the whole table, one might be interested in getting only table subset sync, in that case you can use criteria
table: events
criteria:
event_type: ' > 4'
diff:
countOnly: true
source:
...
dest:
...
transfer:
...
Partition synchronization uses set of values provided by SQL as filter to divide dataset into a smaller partition bound segments. Partition column values are generated by a sqlProvider. Only out of syn partition are synchronized, diff computation could be batched into one SQL for number of partitions which is controlled with batchSize on strategy diff level (512 by default).
While each discrepant partition is synchronized individually, multiple partition can be processed concurrently which is controlled with threads strategy partition level setting (2 by default)
- source|dest.partitionSQL: SQL providing partition values
- partition.columns: partition column or pseudo columns
- partition.threads: number of threads processing partition sync
- partition.syncMode: batch | individual (individual by default)
Partition based sync example.
table: events
tempDatabase: transfer
idColumns:
- id
partition:
threads: 10
columns:
- date
- event_type
diff:
countOnly: true
batchSize: 1024
source:
datastore: db
driverName: odbc
descriptor: driver=Vertica;Database=[database];ServerName=[server];port=5433;user=[username];password=[password]
positionReference: true
pseudoColumns:
- name: date
expression: TO_CHAR(t.timestamp, 'YYYY-MM-DD')
partitionSQL: SELECT DATE(timestamp)::varchar AS date, event_type
FROM db.events
WHERE DATE(timestamp) > sysdate - 3
GROUP BY 1, 2
ORDER BY 1 DESC, 2 DESC
dest:
driverName: bigquery
positionReference: true
parameters:
datasetId: db
pseudoColumns:
- name: date
expression: DATE(t.timestamp)
transfer:
endpointIP: 10.55.1.181:8080
writerThreads: 3
writerCount: 3
batchSize: 524287
Various database vendor provide SQL specific function set like DATE/TIME formatting, or event dialect expression i.e data type casting.
Pseudo column is SQL expression based column to leverage partition or criteria, defined separately source or destination databases.
Each actual column reference has to use t. alias.
Pseudo column example
pseudoColumns:
- name: date
expression: TO_CHAR(t.timestamp, 'YYYY-MM-DD')
- name: date2
expression: DATE(t.timestamp)::varchar
Chunk synchronization uses ID based range to divide a dataset into a smaller segments. Each segement is compared and synchronized with destination table. All non deleteMerge segments transfers are combined into final transient table before merging with destination table. This approach is especially handy with database vendors that have insert/load operation limits per table i.e. BigQuery.
Node that only table with single numeric ID based table can use chunked data sync.
SELECT min_value, max_value, count_value
FROM (
SELECT
MIN($id) min_value,
MAX($id) AS max_value,
COUNT(1) AS count_value
FROM $table t
$whereClause
ORDER BY $id
LIMIT $limit
)
WHERE: - $table: sync table - $id: table ID - $limit: chunk.size - $whereClause is dynamic where clause it looks like (WHERE $id > $max) - $where is dynamic clause fragment without WHERE keyword it looks like ( $id > $max)
For database that do not support LIMIT yuu have provide custom SQL
Oracle chunk SQL example
SELECT
MIN(ID) AS MIN_VALUE,
MAX(ID) AS MAX_VALUE,
COUNT(1) AS COUNT_VALUE
FROM (
SELECT /*+ INDEX_ASC(t PK_ID)*/ ID
FROM events t
WHERE ROWNUM <= $limit $where
) t
- chunk.size - chunk max size
- chunk.threads - number of threads processing chunk sync
- chunk.syncMode: batch | individual (batch by default)
- resource(dest|source).chunkSQL - custom chunking SQL
Chunk sync example
table: events
tempDatabase: transfer
idColumns:
- id
diff:
depth: 2
countOnly: true
source:
datastore: db
driverName: odbc
descriptor: driver=Vertica;Database=[database];ServerName=[server];port=5433;user=[username];password=[password]
positionReference: true
dest:
driverName: bigquery
positionReference: true
parameters:
datasetId: db
transfer:
endpointIP: 127.0.0.1:8080
writerThreads: 3
writerCount: 3
batchSize: 524287
chunk:
size: 1048576
threads: 10
In some cases view or actual SQL can be source for data sync, in that scenario SQL can be used as source.
Query based sync example
table: events
idColumns:
- id
source:
drom: SELECT * FROM events WHERE ID > 5
descriptor: "[username]:[password]@tcp(127.0.0.1:3306)/[dbname]?parseTime=true"
driverName: mysql
parameters:
dbname: db1
dest:
descriptor: "[username]:[password]@tcp(127.0.0.1:3306)/[dbname]?parseTime=true"
driverName: mysql
parameters:
dbname: db2
transfer:
EndpointIP: 127.0.0.1:8080
writerThreads: 1
tempDatabase: transfer
batchSize: 2048
Force option, instruct synchronizer to copy entire source dataset with insert sync strategy.
Make sure that destination table is empty.
Since force does not optimize data sync, you can also use with some NoSQL stores (dynamodb, mongodb, firebase, firestore, aerospike) on the source side.
table: events
force: true
source:
driverName: aerospike
descriptor: tcp(127.0.0.1:3000)/test
parameters:
dateFormat: yyyy-MM-dd hh:mm:ss
keyColumn: id
namespace: test
dest:
credentials: mysql-e2e
descriptor: "[username]:[password]@tcp(127.0.0.1:3306)/[dbname]?parseTime=true"
driverName: mysql
parameters:
dbname: db1
transfer:
endpointIP: 127.0.0.1:8080
writerThreads: 2
batchSize: 2048
Sometimes there is need to move data that does not have unique/pk contraint (i.e aggregated data), in that case synchronizer moves all source data to transient table followed by delete all from dest and insert all from transient table. Partitioned sync and custom criteria can be used in conjunction with non ID based data synchronization.
Non ID based sync example non_id.yaml
table: events
diff:
CountOnly: true
source:
...
dest:
...
transfer:
...
Transfer process is delegated to the transfer service, the following parameter can be used:
-
id: sync job id, dest table name by default
-
table: name of source and dest table, otherwise override on source,dest level
-
force: flag to brute force data sync: (can be used with NoSQL database type)
-
mergeStyle: merge SQL dialect style valid option are:
- merge
- mergeInto
- insertOrReplace
- insertOnDuplicateUpdate
- insertOnConflictUpdate
-
appendOnly: flag to use append new entries
-
appendUpdateOnly: flag to use sync with merge (without delete)
-
directAppend: flag to insert data directly to dest table (without transfer table, only recommended with streaming API, or where dest has unique constraints)
-
transfer.batchSize: batch size for insert/load operation
-
transfer.writerThreads: number of writing threads
-
transfer.endpointID: ip:port of transfer service
-
transfer.maxRetries
-
transfer.tempDatabase: name of database used for transient tables
-
transfer.suffix: transient table suffix fragment
Some database do not support GROUP/ORDER BY position, in that case actual unaliased expression has to be used resource.positionReference informs query builder if databae vendor support this option.
- source.positionReference flags if source database support GROUP/ORDER BY position
- dest.positionReference flags if dest database support GROUP/ORDER BY position
Many tables use create or last modification time, in that case you can use time base signature as unix timestamp with SUM aggregates
diff:
depth: 3
columns:
- name: MODIFIED
func: SUM
You can use the following pseudo column expressions:
source|dest:
pseudoColumns:
- name: MODIFIED
expression: xxxx
- MySQL: UNIX_TIMESTAMP(COALESCE(t.updated, t.created))
- BigQuery: UNIX_SECONDS(COALESCE(t.updated, t.created))
- Oracle: ROUND((CAST(COALESCE(t.updated, t.created) AS DATE) - to_date('19700101 000000', 'YYYYMMDD HH24MISS')) *86400)
Cross database timestamp representation varies, thus event after db sync there could be timezone based discrepancy. In this scenario pseudo column with custom diff column can be used, where each pseudo column specify expression to convert a date/timestamp column to unix timestamp.
table: events
idColumns:
- id
diff:
depth: 3
columns:
- name: MODIFIED
func: SUM
source:
credentials: ora-e2e
descriptor: '[username]/[password]@${dbIP.ora}:1521/xe'
driverName: oci8
parameters:
dbname: oradb
pseudoColumns:
- name: MODIFIED
expression: ROUND((CAST(COALESCE(t.updated, t.created) AS DATE) - to_date('19700101 000000', 'YYYYMMDD HH24MISS')) *86400)
dest:
credentials: gcp-e2e
driverName: bigquery
parameters:
datasetId: bqdb
pseudoColumns:
- name: MODIFIED
expression: UNIX_SECONDS(COALESCE(t.updated, t.created))
transfer:
endpointIP: 127.0.0.1:8080
writerThreads: 2
batchSize: 2048
End to end testing provide practical examples with how to configure data sync between various database vendor and scenarios
This project uses endly e2e test runner
docker run --name endly -v /var/run/docker.sock:/var/run/docker.sock -v ~/e2e:/e2e -v ~/e2e/.secret/:/root/.secret/ -p 7722:22 -d endly/endly:latest-ubuntu16.04
ssh root@127.0.0.1 -p 7722 ## password is dev
### this instruction would work with endly 0.37.4+
endly -v
### create secrets for endly with root/dev credentials
endly -c=localhost
### create secrets for mysql database with root/dev credentials
endly -c=mysql-e2e
### create secrets for postgress database with root/dev credentials
endly -c=pg-e2e
### create secrets for oracle database with oradb/oracle credentials
endly -c=ora-e2e
### create secrets for oracle admin database with system/oracle credentials
endly -c=ora-admin-e2e
##### BigQuery secrets setup is optional
### create secrets ~/.secret/gcp-e2e.json for BigQuery dedicated e2e project (https://github.com/viant/endly/tree/master/doc/secrets#google-cloud-credentials)
### when configured remove skip.txt from BigQuery (bq) related use cases and uncomment #bqdb tasks from data.yaml and datastore.yaml)
### check all created secrets files
ls -al ~/.secret/
## clone dbsync project and run tests
cd /e2e
git clone https://github.com/viant/dbsync.git
cd dbsync/e2e
endly
## or to run individual use case run
endly -i=partitioned_merge
All drivers implementing database/sql should be supported, in addition some NoSQL database supported by viant/dsc can be used as source with brute-force sync method (force:true)
- BigQuery
- Oracle
- MySQL
- Postgress
- ODBC
- Aerospike
- Standalone services
export sourcePath=/tmp/build/dbsync
export GO111MODULE=on
unset GOPATH
export LD_LIBRARY_PATH=${env.HOME}/Downloads/instantclient_12_2
export PKG_CONFIG_PATH=${env.HOME}/Downloads/instantclient_12_2
cd /tmp/build/
git clone https://github.com/viant/dbsync.git
rm $sourcePath/transfer/app/vtransfer
rm $sourcePath/transfer/app/nohup.out
cd $sourcePath/transfer/app
go build -o dbtransfer
rm $sourcePath/sync/app/nohup.out
rm $sourcePath/sync/app/vsync
cd $sourcePath/sync/app
rm -rf $sourcePath/sync/app/cron
go build -o dbsync
- Building docker images
This project use endly workflow to build docker size optimized service images
cd /tmp/build/
git clone https://github.com/viant/dbsync.git
cd dbsync/docker
endly -r=build
- Docker compose
-
Cloud run - TODO provide examples
-
Kubernetes - TODO provide examples
If database/sql driver is not listed in the in the currently imported list, you can add import for additional drivers and customize a service build.
Add default import to the following service entry points:
endly -r=run
@run.yaml
init:
i: 1
pipeline:
loop:
info:
action: print
init:
dstamp: $FormatTime('${i}daysAgo', 'yyyy-MM-dd')
message: processing $dstamp
post:
when: $HasResource(request.json)
action: rest/runner:send
request: '@sync @request'
inc:
action: nop
logging: false
init:
_: ${self.i++}
until:
action: goto
logging: false
task: loop
when: $self.i <= 35
done:
action: print
comments: done
@sync.json
{
"Method": "post",
"Request": $arg0,
"URL": "http://127.0.0.1:8081/v1/api/sync",
"Expect": {
"Status": "done"
}
}
@request.json
{
"Id":"event_${dstamp}",
"Table": "events",
"IDColumns": [
"id"
],
"Source":{
"PartitionSQL": "SELECT event_type FROM db1.events WHERE DATE(timestamp) = '$dstamp' GROUP BY 1 ORDER BY 1",
"...": "..."
},
"Dest": "...",
"Partition": {
"Columns": [
"event_type"
],
"Threads": 5
},
"Transfer": {
"EndpointIP": "127.0.0.1:8080",
"WriterThreads": 2,
"BatchSize": 2048
}
}
All utlity workflows use endly automation/e2e runner.
In order to compare dataset between source and dest database, you can use endly runner with compare workflow. It uses dsunit and asserly testing framework for comprehensive data validation.
endly -r=compare
init:
date: '2019-01-01'
pipeline:
register:
verticadb:
action: dsunit:register
datastore: db1
config:
driverName: odbc
descriptor: driver=Vertica;Database=[database];ServerName=[server];port=5433;user=[username];password=[password]
bigquerydb:
action: dsunit:register
datastore: db2
config:
driverName: bigquery
parameters:
datasetId: db
compare:
action: dsunit:compare
maxRowDiscrepancy: 1000000
directives:
"@indexBy@": date
"@numericPrecisionPoint@": 1
"@coalesceWithZero@": true
"@caseSensitive@": false
"@timeFormat@date": 'yyyy-MM-dd'
omitEmpty: true
source1:
datastore: db1
SQL: SELECT
DATE(timestamp) AS date,
COUNT(*) AS cnt,
SUM(revenue) AS revenue,
SUM(payment) AS payment,
SUM(charges) AS charges
FROM db.events
WHERE DATE(timestamp) >= '$date'
GROUP BY 1
ORDER BY 1
source2:
datastore: db2
SQL: SELECT
DATE(timestamp) AS date,
COUNT(*) AS cnt,
SUM(revenue) AS revenue,
SUM(payment) AS payment,
SUM(charges) AS charges
FROM db.events
WHERE DATE(timestamp) >= '$date'
GROUP BY 1
ORDER BY 1
endly check_schema.yaml
(endly 0.40.0+)
pipeline:
register:
mysqldb:
action: dsunit:register
datastore: mysqldb
config:
credentials: my-mysql-credentials
Descriptor: "[username]:[password]@tcp(127.0.0.1:3306)/mydb?parseTime=true&charset=utf8mb4,utf8"
DriverName: mysql
parameters:
dbname: mydb
bigquerydb:
action: dsunit:register
datastore: bigquerydb
config:
driverName: bigquery
credentials: mybq-credentials
parameters:
datasetId: mydb
checkSchema:
action: dsunit:checkSchema
source:
datastore: mysqldb
target: bigquery
dest:
datastore: bigquerydb
target: bigquery
endly reverse_schema.yaml
pipeline:
register:
mysqldb:
action: dsunit:register
datastore: mysqldb
config:
credentials: my-mysql-credentials
Descriptor: "[username]:[password]@tcp(127.0.0.1:3306)/mydb?parseTime=true&charset=utf8mb4,utf8"
DriverName: mysql
parameters:
dbname: mydb
reverseSchema:
action: dsunit:dump
datastore: mysqldb
target: bigquery
destURL: schema.sql
tables:
- table1
- table2
The source code is made available under the terms of the Apache License, Version 2, as stated in the file LICENSE
.
Individual files may be made available under their own specific license, all compatible with Apache License, Version 2. Please see individual files for details.
Library Author: Adrian Witas
Contributors: