diff --git a/.java-version b/.java-version
new file mode 100644
index 0000000..03b6389
--- /dev/null
+++ b/.java-version
@@ -0,0 +1 @@
+17.0
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
old mode 100755
new mode 100644
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
old mode 100755
new mode 100644
diff --git a/README.md b/README.md
index 4230456..717d633 100644
--- a/README.md
+++ b/README.md
@@ -26,11 +26,14 @@ There are two main use cases:
[![Watch the video](docs/user/commerce-db-sync-demo.png)](https://video.sap.com/embed/secure/iframe/entryId/1_7bhihtlz/uiConfId/30317401/st/0)
+## Release v1.2
+
+Features and changes [presentation video](https://sapvideoa35699dc5.hana.ondemand.com/?entry_id=1_sipgb1l8).
# Features Overview
- Database Connectivity
- - Multipe supported databases: Oracle, MySQL, HANA, MSSQL
+ - Multipe supported databases: Oracle, MySQL, HANA, MSSQL, PostgreSQL
- UI based connection validation
- Schema Differences
- UI based schema differences detector
@@ -49,6 +52,7 @@ There are two main use cases:
- Table exclusions/inclusions
- Incremental mode (delta)
- Custom tables
+ - Resume failed migration
- Staged approach using table prefix
- View usage instead of table
- Reporting / Audit
@@ -56,6 +60,7 @@ There are two main use cases:
- Automated reporting for copy processes
- Stored on blob storage
- Logging of all actions triggered from the UI
+ - JDBC logging
# Requirements
@@ -63,20 +68,22 @@ There are two main use cases:
- Tested with source databases:
- Azure SQL
- MySQL (5.7)
- - Oracle (XE 11g)
+ - Oracle (XE 11g, XE 18c)
- HANA (express 2.0) and HANA Cloud
+ - PostgreSQL 15.x
- Tested with target databases:
- Azure SQL
- - Oracle (XE 11g)
+ - Oracle (XE 11g, XE 18c)
- HANA (express 2.0) and HANA Cloud
+ - PostgreSQL 15.x
# Performance
Commerce DB Sync has been built to offer reasonable performance with large amount of data using the following design:
- Table to table replication using JDBC (low level)
-- Selection of tables so we do not need a full synchronization in particular for large technical table (task logs, audit logs...)
-- Multi-threaded and can manage multiple tables at the same time
+- Selection of tables so we do not need a full synchronization in particular for large technical table (task logs, audit logs...)
+- Multi-threaded and can manage multiple tables at the same time
- Using UPSERT (INSERT/UPDATE)
- Use read replica Commerce database as a source database
diff --git a/commercedbsync/.project b/commercedbsync/.project
deleted file mode 100644
index 3efecdf..0000000
--- a/commercedbsync/.project
+++ /dev/null
@@ -1,28 +0,0 @@
-
-
- commercedbsync
-
-
-
-
-
- org.eclipse.jdt.core.javabuilder
-
-
-
-
- org.eclipse.ui.externaltools.ExternalToolBuilder
- full,incremental,
-
-
- LaunchConfigHandle
- <project>/.externalToolBuilders/com.hybris.hyeclipse.tsv.builder.launch
-
-
-
-
-
- com.hybris.hyeclipse.tsv.hybris
- org.eclipse.jdt.core.javanature
-
-
diff --git a/commercedbsync/.springBeans b/commercedbsync/.springBeans
deleted file mode 100644
index e476d04..0000000
--- a/commercedbsync/.springBeans
+++ /dev/null
@@ -1,15 +0,0 @@
-
-
- 1
-
-
-
-
-
-
- resources/commercedbsync-spring.xml
- web/webroot/WEB-INF/commercedbsync-web-spring.xml
-
-
-
-
diff --git a/commercedbsync/external-dependencies.xml b/commercedbsync/external-dependencies.xml
index 4abd5ae..7f1c4e8 100644
--- a/commercedbsync/external-dependencies.xml
+++ b/commercedbsync/external-dependencies.xml
@@ -36,7 +36,7 @@
com.zaxxerHikariCP
- 3.4.5
+ 5.0.1com.github.freva
diff --git a/commercedbsync/project.properties b/commercedbsync/project.properties
index 948700f..d5f563b 100644
--- a/commercedbsync/project.properties
+++ b/commercedbsync/project.properties
@@ -4,106 +4,538 @@
#
#
commercedbsync.application-context=commercedbsync-spring.xml
-
-################################
-# Migration specific properties
-################################
+##
+# Specifies the profile name of data source that serves as migration input
+#
+# @values name of the data source profile
+# @optional true
+##
+migration.input.profiles=source
+##
+# Specifies the profile name of data sources that serves as migration output
+#
+# @values name of the data source profile
+# @optional true
+##
+migration.output.profiles=target
+##
+# Specifies the driver class for the source jdbc connection
+#
+# @values any valid jdbc driver class
+# @optional false
+##
migration.ds.source.db.driver=
+##
+# Specifies the url for the source jdbc connection
+#
+# @values any valid jdbc url
+# @optional false
+##
migration.ds.source.db.url=
+##
+# Specifies the user name for the source jdbc connection
+#
+# @values any valid user name for the jdbc connection
+# @optional false
+##
migration.ds.source.db.username=
+##
+# Specifies the password for the source jdbc connection
+#
+# @values any valid password for the jdbc connection
+# @optional false
+##
migration.ds.source.db.password=
+##
+# Specifies the table prefix used on the source commerce database.
+# This may be relevant if a commerce installation was initialized using 'db.tableprefix'.
+#
+# @values any valid commerce database table prefix.
+# @optional true
+##
migration.ds.source.db.tableprefix=
+##
+# Specifies the schema the respective commerce installation is deployed to.
+#
+# @values any valid schema name for the commerce installation
+# @optional false
+##
migration.ds.source.db.schema=
+##
+# Specifies the name of the type system that should be taken into account
+#
+# @values any valid type system name
+# @optional true
+##
migration.ds.source.db.typesystemname=DEFAULT
+##
+# Specifies the suffix which is used for the source typesystem
+#
+# @values the suffix used for typesystem. I.e, 'attributedescriptors1' means the suffix is '1'
+# @optional true
+# @dependency migration.ds.source.db.typesystemname
+##
migration.ds.source.db.typesystemsuffix=
-migration.ds.source.db.connection.removeabandoned=true
+##
+# Specifies minimum amount of idle connections available in the source db pool
+#
+# @values integer value
+# @optional false
+##
migration.ds.source.db.connection.pool.size.idle.min=${db.pool.minIdle}
+##
+# Specifies maximum amount of connections in the source db pool
+#
+# @values integer value
+# @optional false
+##
migration.ds.source.db.connection.pool.size.idle.max=${db.pool.maxIdle}
+##
+# Specifies maximum amount of active connections in the source db pool
+#
+# @values integer value
+# @optional false
+##
migration.ds.source.db.connection.pool.size.active.max=${db.pool.maxActive}
+##
+# Specifies the driver class for the target jdbc connection
+#
+# @values any valid jdbc driver class
+# @optional false
+##
migration.ds.target.db.driver=${db.driver}
+##
+# Specifies the url for the target jdbc connection
+#
+# @values any valid jdbc url
+# @optional false
+##
migration.ds.target.db.url=${db.url}
+##
+# Specifies the user name for the target jdbc connection
+#
+# @values any valid user name for the jdbc connection
+# @optional false
+##
migration.ds.target.db.username=${db.username}
+##
+# Specifies the password for the target jdbc connection
+#
+# @values any valid password for the jdbc connection
+# @optional false
+##
migration.ds.target.db.password=${db.password}
+##
+# Specifies the table prefix used on the target commerce database.
+# This may be relevant if a commerce installation was initialized using `${db.tableprefix}` / staged approach.
+#
+# @values any valid commerce database table prefix.
+# @optional true
+##
migration.ds.target.db.tableprefix=${db.tableprefix}
migration.ds.target.db.catalog=
+##
+# Specifies the schema the target commerce installation is deployed to.
+#
+# @values any valid schema name for the commerce installation
+# @optional false
+##
migration.ds.target.db.schema=dbo
+##
+# Specifies the name of the type system that should be taken into account
+#
+# @values any valid type system name
+# @optional true
+##
migration.ds.target.db.typesystemname=DEFAULT
+##
+# Specifies the suffix which is used for the target typesystem
+#
+# @values the suffix used for typesystem. I.e, 'attributedescriptors1' means the suffix is '1'
+# @optional true
+# @dependency migration.ds.source.db.typesystemname
+##
migration.ds.target.db.typesystemsuffix=
-migration.ds.target.db.connection.removeabandoned=true
+##
+# Specifies minimum amount of idle connections available in the target db pool
+#
+# @values integer value
+# @optional false
+##
migration.ds.target.db.connection.pool.size.idle.min=${db.pool.minIdle}
+##
+# Specifies maximum amount of idle connections available in the target db pool
+#
+# @values integer value
+# @optional false
+##
migration.ds.target.db.connection.pool.size.idle.max=${db.pool.maxIdle}
+##
+# Specifies maximum amount of connections in the target db pool
+#
+# @values integer value
+# @optional false
+##
migration.ds.target.db.connection.pool.size.active.max=${db.pool.maxActive}
+##
+# When using the staged approach, multiple sets of commerce tables may exists (each having its own tableprefix).
+# To prevent cluttering the db, this property specifies the maximum number of table sets that can exist,
+# if exceeded the schema migrator will complain and suggest a cleanup.
+#
+# @values integer value
+# @optional true
+##
migration.ds.target.db.max.stage.migrations=5
-#triggered by updatesystem process or manually by hac
+##
+# Specifies whether the data migration shall be triggered by the 'update running system' operation.
+#
+# @values true or false
+# @optional true
+##
migration.trigger.updatesystem=false
-# Schema migration section - parameters for copying schema from source to target
+##
+# Globally enables / disables schema migration. If set to false, no schema changes will be applied.
+#
+# @values true or false
+# @optional true
+##
migration.schema.enabled=true
+##
+# Specifies if tables which are missing in the target should be added by schema migration.
+#
+# @values true or false
+# @optional true
+# @dependency migration.schema.enabled
+##
migration.schema.target.tables.add.enabled=true
+##
+# Specifies if extra tables in target (compared to source schema) should be removed by schema migration.
+#
+# @values true or false
+# @optional true
+# @dependency migration.schema.enabled
+##
migration.schema.target.tables.remove.enabled=false
+##
+# Specifies if columns which are missing in the target tables should be added by schema migration.
+#
+# @values true or false
+# @optional true
+# @dependency migration.schema.enabled
+##
migration.schema.target.columns.add.enabled=true
+##
+# Specifies if extra columns in target tables (compared to source schema) should be removed by schema migration.
+#
+# @values true or false
+# @optional true
+# @dependency migration.schema.enabled
+##
migration.schema.target.columns.remove.enabled=true
-# automatically trigger schema migrator before data copy process is started
+##
+# Specifies if the schema migrator should be automatically triggered before data copy process is started
+#
+# @values true or false
+# @optional true
+# @dependency migration.schema.enabled
+##
migration.schema.autotrigger.enabled=false
-# the number of rows read per iteration
+##
+# Activate data export to external DB via cron jobs
+#
+# @values true or false
+# @optional true
+##
+migration.data.export.enabled=false
+# Specifies the number of rows to read per batch. This only affects tables which can be batched.
+#
+# @values integer value
+# @optional true
+##
migration.data.reader.batchsize=1000
-# delete rows in target table before inserting new records
+##
+# Specifies if the target tables should be truncated before data is copied over.
+#
+# @values true or false
+# @optional true
+##
migration.data.truncate.enabled=true
-# These tables will not be emptied before records are inserted
+##
+# If truncation of target tables is enabled, this property specifies tables that should be excluded from truncation.
+#
+# @values comma separated list of table names
+# @optional true
+# @dependency migration.data.truncate.enabled
+##
migration.data.truncate.excluded=
-# maximum number of writer workers per table that can be executed in parallel within a single node in the cluster
+##
+# Specifies the number of threads used per table to write data to target.
+# Note that this value applies per table, so in total the number of threads will depend on
+# 'migration.data.maxparalleltablecopy'.
+# [total number of writer threads] = [migration.data.workers.writer.maxtasks] * [migration.data.maxparalleltablecopy]
+#
+# @values integer value
+# @optional true
+# @dependency migration.data.maxparalleltablecopy
+##
migration.data.workers.writer.maxtasks=10
-# maximum number of reader workers per table that can be executed in parallel within a single node in the cluster
+##
+# Specifies the number of threads used per table to read data from source.
+# Note that this value applies per table, so in total the number of threads will depend on
+# 'migration.data.maxparalleltablecopy'.
+# [total number of reader threads] = [migration.data.workers.reader.maxtasks] * [migration.data.maxparalleltablecopy]
+
+# @values integer value
+# @optional true
+# @dependency migration.data.maxparalleltablecopy
+##
migration.data.workers.reader.maxtasks=3
-# max retry attempts of a worker in case there is a problem
+##
+# Specifies the number of retries in case a worker task fails.
+#
+# @values integer value
+# @optional true
+##
migration.data.workers.retryattempts=0
-# maximum number of table that can be copied in parallel within a single node in the cluster
+##
+# Specifies the number of tables that are copied over in parallel.
+#
+# @values integer value
+# @optional true
+##
migration.data.maxparalleltablecopy=2
-# ignores data insertion errors and continues to the next records
+##
+# If set to true, the migration will abort as soon as an error occured.
+# If set to false, the migration will try to continue if the state of the runtime allows.
+#
+# @values true or false
+# @optional true
+##
migration.data.failonerror.enabled=true
-# columns to be excluded. format: migration.data.columns.excluded.=
+##
+# Specifies the columns to be excluded
+#
+# @values migration.data.columns.excluded.[tablename]=[comma separated list of column names]
+# @optional true
+##
migration.data.columns.excluded.attributedescriptors=
+##
+# Specifies the columns to be nullified. Whatever value there was will be replaced with NULL in the target column.
+#
+# @values migration.data.columns.nullify.[tablename]=[comma separated list of column names]
+# @optional true
+##
migration.data.columns.nullify.attributedescriptors=
-#remove all indices
+##
+# If set to true, all indices in the target table will be removed before copying over the data.
+#
+# @values true of false
+# @optional true
+##
migration.data.indices.drop.enabled=false
-#do not recreate following indices after the migration. Comma separated values
+##
+# do not recreate following indices after the migration. Comma separated values
+#
+# @values comma separated values
+# @optional true
+##
migration.data.indices.drop.recreate.exclude=
-#disable indices during migration
+##
+# If set to true, all indices in the target table will be disabled (NOT removed) before copying over the data.
+# After the data copy the indices will be enabled and rebuilt again.
+#
+# @values true of false
+# @optional true
+##
migration.data.indices.disable.enabled=false
-#if empty, disable indices on all tables. If table specified, only disable for this one.
+##
+# If disabling of indices is enabled, this property specifies the tables that should be included.
+# If no tables specified, indices for all tables will be disabled.
+#
+# @values comma separated list of tables
+# @optional true
+# @dependency migration.data.indices.disable.enabled
+##
migration.data.indices.disable.included=
-#flag to enable the migration of audit tables
+##
+# Flag to enable the migration of audit tables.
+#
+# @values true or false
+# @optional true
+##
migration.data.tables.audit.enabled=true
-#custom tables to migrate (use comma-separated list)
+##
+# Specifies a list of custom tables to migrate. Custom tables are tables that are not part of the commerce type system.
+#
+# @values comma separated list of table names.
+# @optional true
+##
migration.data.tables.custom=
-#tables to exclude (use table names name without prefix)
-migration.data.tables.excluded=SYSTEMINIT,StoredHttpSessions
-#tables to include (use table names name without prefix)
+##
+# Tables to exclude from migration (use table names name without prefix)
+#
+# @values comma separated list of table names.
+# @optional true
+##
+migration.data.tables.excluded=SYSTEMINIT,StoredHttpSessions,itemdeletionmarkers
+##
+# Tables to include (use table names name without prefix)
+#
+# @values comma separated list of table names.
+# @optional true
+##
migration.data.tables.included=
+##
+# Run migration in the cluster (based on commerce cluster config). The 'HAC' node will be the primary one.
+# A scheduling algorithm decides which table will run on which node. Nodes are notified using cluster events.
+#
+# @values true or false
+# @optional true
+##
migration.cluster.enabled=false
-#enable the incremental database migration.
+##
+# If set to true, the migration will resume from where it stopped (either due to errors or cancellation).
+#
+# @values true or false
+# @optional true
+##
+migration.scheduler.resume.enabled=false
+##
+# If set to true, the migration will run in incremental mode. Only rows that were modified after a given timestamp
+# will be taken into account.
+#
+# @values true or false
+# @optional true
+##
migration.data.incremental.enabled=false
-#Only these tables will be taken into account for incremental migration.
+##
+# Only these tables will be taken into account for incremental migration.
+#
+# @values comma separated list of tables.
+# @optional true
+# @dependency migration.data.incremental.enabled
+##
migration.data.incremental.tables=
-#The timestamp in ISO-8601 ISO_ZONED_DATE_TIME format. Records created or modified after this timestamp will be copied only.
+##
+# Records created or modified after this timestamp will be copied only.
+#
+# @values The timestamp in ISO-8601 ISO_ZONED_DATE_TIME format
+# @optional true
+# @dependency migration.data.incremental.enabled
+##
migration.data.incremental.timestamp=
-#EXPERIMENTAL: Enable bulk copy for better performance
-migration.data.bulkcopy.enabled=false
+##
+# Specifies the timeout of the data pipe.
+#
+# @values integer value
+# @optional true
+##
migration.data.pipe.timeout=7200
+##
+# Specifies the capacity of the data pipe.
+#
+# @values integer value
+# @optional true
+##
migration.data.pipe.capacity=100
-# No activity? -> migration aborted and marked as stalled
+##
+# Specifies the timeout of the migration monitor.
+# If there was no activity for too long the migration will be marked as 'stalled' and aborted.
+#
+# @values integer value
+# @optional true
+##
migration.stalled.timeout=7200
-migration.data.timeout=60
+##
+# Specifies blob storage connection string for storing reporting files.
+#
+# @values any azure blob storage connection string
+# @optional true
+##
migration.data.report.connectionstring=${media.globalSettings.cloudAzureBlobStorageStrategy.connection}
-# Properties that will be masked in the report
+##
+# Specifies the properties that should be masked in HAC.
+#
+# @values any property key
+# @optional true
+##
migration.properties.masked=migration.data.report.connectionstring,migration.ds.source.db.password,migration.ds.target.db.password
+##
+# Specifies the default locale used.
+#
+# @values any locale
+# @optional true
+##
migration.locale.default=en-US
-# support views during data migration
-## string pattern for view naming convention with '%s' as table name. e.g. v_%s
-migration.data.view.name.pattern=v_%s
-# DDL View Generation
-# more information on https://github.tools.sap/cx-boosters/sap-commerce-db-sync/wiki/Dynamic-View-Generation
-migration.data.view.t.{table}.enabled=false
-migration.data.view.t.{table}.joinWhereClause={table}
-migration.data.view.t.{table}.columnTransformation.{column}=GETDATE()
+##
+# Support views during data migration. String pattern for view naming convention with `'%s'` as table name. e.g. `v_%s`
+#
+# @values any string
+# @optional true
+##
+migration.data.view.name.pattern=v_%s
+##
+# Activate DDL view generation for specific
+#
+# @values any string
+# @optional true
+##
+migration.data.view.t.TABLE.enabled=false
+##
+# Activate DDL view generation for specific _TABLE_, with additional `JOIN` clausule
+#
+# @values any string
+# @optional true
+# @dependency migration.data.view.t.TABLE.enabled
+##
+migration.data.view.t.TABLE.joinWhereClause={table}
+##
+# Possibility to use custom functions to obfuscate values for specific columns
+#
+# @values any valid SQL function call
+# @optional true
+# @dependency migration.data.view.t.TABLE.enabled
+##
+migration.data.view.t.TABLE.columnTransformation.COLUMN=GETDATE()
+
+##
+# If set to true, the JDBC queries ran against the source and target data sources will be logged in the storage pointed by the property {migration.data.report.connectionstring}
+#
+# @values true or false
+# @optional false
+##
+migration.log.sql=false
+##
+# Specifies the number of log entries to add to the in-memory collection of JDBC log entries of a JDBC queries store before flushing the collection contents into the blob file storage associated with the JDBC store's data souce and clearing the in-memory collection to free memory
+#
+# @values an integer number
+# @optional 10,000,000
+##
+migration.log.sql.memory.flush.threshold.nbentries=10000000
+##
+# If set to true, the values of the parameters of the JDBC queries ran against the source data source will be logged in the JDBC queries logs (migration.log.sql has to be true to enable this type of logging). For security reasons, the tool will never log parameter values for the queries ran against the target datasource.
+#
+# @values true or false
+# @optional true
+##
+migration.log.sql.source.showparameters=true
+##
+# Specifies the name of the container where the tool will store the files related to migration in the blob storage pointed by the property {migration.data.report.connectionstring}
+#
+# @values any string
+# @optional false
+##
+migration.data.filestorage.container.name=migration
+migration.data.fulldatabase.enabled=true
+
+# Enhanced Logging
+log4j2.appender.migrationAppender.type=Console
+log4j2.appender.migrationAppender.name=MigrationAppender
+log4j2.appender.migrationAppender.layout.type=PatternLayout
+log4j2.appender.migrationAppender.layout.pattern=%-5p [%t] [%c{1}] %X{migrationID,pipeline,clusterID} %m%n
+log4j2.logger.migrationToolkit.name=com.sap.cx.boosters.commercedbsync
+log4j2.logger.migrationToolkit.level=INFO
+log4j2.logger.migrationToolkit.appenderRef.migration.ref=MigrationAppender
+log4j2.logger.migrationToolkit.additivity=false
diff --git a/commercedbsync/resources/commercedbsync-beans.xml b/commercedbsync/resources/commercedbsync-beans.xml
index 72741cf..226f07c 100644
--- a/commercedbsync/resources/commercedbsync-beans.xml
+++ b/commercedbsync/resources/commercedbsync-beans.xml
@@ -29,6 +29,7 @@
+
@@ -36,6 +37,16 @@
+
+
+
+
+
+
+
+
+
+
@@ -48,11 +59,13 @@
+
+
@@ -108,6 +121,7 @@
+
@@ -116,16 +130,20 @@
+
+
-
+
+
+
@@ -146,4 +164,8 @@
+
+
+
+
diff --git a/commercedbsync/resources/commercedbsync-items.xml b/commercedbsync/resources/commercedbsync-items.xml
index 88f37fc..6d1629e 100644
--- a/commercedbsync/resources/commercedbsync-items.xml
+++ b/commercedbsync/resources/commercedbsync-items.xml
@@ -50,6 +50,30 @@
false
+
+ maximum number of table that can be copied in parallel within a single node in the cluster
+
+
+ 2
+
+
+ maximum number of reader workers per table that can be executed in parallel within a single node in the cluster
+
+
+ 3
+
+
+ maximum number of writer workers per table that can be executed in parallel within a single node in the cluster
+
+
+ 10
+
+
+ the number of rows read per iteration
+
+
+ 1000
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
+
+
+
+ class="com.sap.cx.boosters.commercedbsync.concurrent.impl.DefaultDataThreadPoolFactory">
+
+
-
-
-
-
+
+
-
-
-
-
-
-
-
-
+
+
@@ -81,7 +76,6 @@
-
@@ -100,6 +94,12 @@
+
+
+
+
+
@@ -113,6 +113,8 @@
+
+
@@ -127,8 +129,15 @@
-
+
+
+
+
+
+
+
@@ -136,7 +145,7 @@
class="com.sap.cx.boosters.commercedbsync.service.impl.PipeDatabaseMigrationCopyService">
-
+
@@ -150,7 +159,6 @@
-
@@ -172,6 +180,7 @@
parent="abstractEventListener">
+
@@ -200,12 +209,13 @@
+
+
-
-
-
+
@@ -228,6 +238,7 @@
+
@@ -289,20 +300,10 @@
-
-
-
-
-
-
+
@@ -333,10 +334,4 @@
-
-
-
-
-
-
diff --git a/commercedbsync/resources/impex/essentialdata-commercemigration-jobs.impex b/commercedbsync/resources/impex/projectdata-commercemigration-jobs.impex
similarity index 88%
rename from commercedbsync/resources/impex/essentialdata-commercemigration-jobs.impex
rename to commercedbsync/resources/impex/projectdata-commercemigration-jobs.impex
index a30c758..9299434 100644
--- a/commercedbsync/resources/impex/essentialdata-commercemigration-jobs.impex
+++ b/commercedbsync/resources/impex/projectdata-commercemigration-jobs.impex
@@ -4,12 +4,12 @@ INSERT_UPDATE ServicelayerJob;code[unique=true];springId[unique=true]
;fullMigrationJob;fullMigrationJob
# Update details for incremental migration
-INSERT_UPDATE IncrementalMigrationCronJob;code[unique=true];active;job(code)[default=incrementalMigrationJob];sessionLanguage(isoCode)[default=en]
+INSERT_UPDATE IncrementalMigrationCronJob;code[unique=true];active;job(code)[default=incrementalMigrationJob];sessionLanguage(isoCode)[default=en];maxParallelTableCopy[default=2];maxReaderWorkers[default=3];maxWriterWorkers[default=10];batchSize[default=1000]
;incrementalMigrationJob;true;
-INSERT_UPDATE IncrementalMigrationCronJob;code[unique=true];migrationItems
+INSERT_UPDATE IncrementalMigrationCronJob;code[unique=true];migrationItems;maxParallelTableCopy[default=2];maxReaderWorkers[default=3];maxWriterWorkers[default=10];batchSize[default=1000]
;incrementalMigrationJob;paymentmodes,addresses,users,cat2prodrel,consignments,orders
-INSERT_UPDATE FullMigrationCronJob;code[unique=true];job(code)[default=fullMigrationJob];active;truncateEnabled;fullDatabaseMigration;schemaAutotrigger;sessionLanguage(isoCode)[default=en];migrationItems;
+INSERT_UPDATE FullMigrationCronJob;code[unique=true];job(code)[default=fullMigrationJob];active;truncateEnabled;fullDatabaseMigration;schemaAutotrigger;sessionLanguage(isoCode)[default=en];migrationItems;maxParallelTableCopy[default=2];maxReaderWorkers[default=3];maxWriterWorkers[default=10];batchSize[default=1000]
;fullDatabaseMigrationJob;;true;true;true;true;;mediaformatmapping,cat2attrrellp,categories,compositeentries,mediafolders,mediacontextlp,validationconstraintslp,validationconstraints,catalogslp,units,genericitems,pcp2wrtblecvrel,renderertemplate,dynamiccontent,userrightslp,backofficesearchcond,metainformations,unitslp,workflowactions,productprops,scripts,systemsetupaudit,gentestitems,cat2princrel,jalovelocityrenderer,paymentmodeslp,usergroupprops,orderprops,userrights,workflowactionitemsrel,parserproperty,productfeatures,productreferences,commentcompreadrels,languageslp,syncjob2pcplrel,commentitemrelations,jobs,themes,discounts,catalogversionsyncjob,cat2catrel,categorieslp,syncjob2langrel,currencieslp,impexdocumentids,userprofiles,stdpaymmodevals,links,workflowitematts,products,backofficesavedquery,productslp,workflowtemplatelinkrel,previewtickets,backofficecollections,props,retentionrule,syncjob2typerel,commentcompremoverels,genericitemslp,addresses,catalogs,languages,taxeslp,discountslp,distributedbatches,backofficesavedquerylp,searchrestrictions,aclentries,format2medforrel,keywords,paymentmodes,whereparts,commentassignrelations,commentattachments,discountrows,mediacontainerlp,commentdomains,synattcfg,mediacontext,impbatchcontent,classificationattrslp,commenttypes,globaldiscountrows,mediacontainer,searchrestrictionslp,mediaformatlp,catverdiffs,cmptype2covgrprels,workflowtemplprincrel,clattruntlp,jobslp,titles,pendingstepsrelation,themeslp,countries,commentcompwriterels,processedstepsrelation,slactions,productreferenceslp,usergroups,regionslp,userprops,exportslp,numberseries,distributedprocesses,catalogversions,externalimportkey,usergroupslp,cat2attrrel,medias,jobsearchrestriction,triggerscj,addressprops,openidexternalscopes,attr2valuerel,constraintgroup,renderertemplatelp,titleslp,indextestitem,workflowactionlinkrel,workflowactionslp,catalogversionslp,commentwatchrelations,configitems,pcpl2rdblecvrel,abstrcfgproductinfo,users,workflowitemattslp,commentcompcreaterels,derivedmedias,cat2medrel,scriptslp,regions,currencies,steps,deliverymodeslp,classattrvalueslp,mediaformat,zonedeliverymodevalues,configuratorsettings,prod2keywordrel,cat2prodrel,taxes,cat2keywordrel,classattrvalues,ydeployments,cstrgr2abscstrrel,mediaprops,pgrels,zone2country,classificationattrs,taxrows,renderersproperty,cronjobs,commentcomponents,exports,deliverymodes,comments,workflowactioncomments,countrieslp,commentusersettings,format2comtyprel,corsconfigproperty,backofficecollitemrefs,pricerows,agreements,workflowactionsrel,clattrunt,format,changedescriptors,formatlp,zones
-;fullTableMigrationJob;;true;true;false;false;;products;paymentmodes
+;fullTableMigrationJob;;true;true;false;false;;products,paymentmodes
diff --git a/commercedbsync/resources/localization/commercedbsync-locales_en.properties b/commercedbsync/resources/localization/commercedbsync-locales_en.properties
index e9a5825..010258d 100644
--- a/commercedbsync/resources/localization/commercedbsync-locales_en.properties
+++ b/commercedbsync/resources/localization/commercedbsync-locales_en.properties
@@ -17,4 +17,16 @@ type.MigrationCronJob.migrationItems.name=Migration Tables
type.MigrationCronJob.migrationItems.description=
type.FullMigrationCronJob.fullDatabaseMigration.name=Full Database Migration
-type.FullMigrationCronJob.fullDatabaseMigration.description=
\ No newline at end of file
+type.FullMigrationCronJob.fullDatabaseMigration.description=
+
+type.MigrationCronJob.maxParallelTableCopy.name=Parallel Tables
+type.MigrationCronJob.maxParallelTableCopy.description=Number of tables to be copied in parallel
+
+type.MigrationCronJob.maxReaderWorkers.name=Reader Workers
+type.MigrationCronJob.maxReaderWorkers.description=Number of reader workers to be used for each table
+
+type.MigrationCronJob.maxWriterWorkers.name=Writer Workers
+type.MigrationCronJob.maxWriterWorkers.description=Number of writer workers to be used for each table
+
+type.MigrationCronJob.batchSize.name=Batch Size
+type.MigrationCronJob.batchSize.description=Batch size used to query data
diff --git a/commercedbsync/resources/sql/createSchedulerTablesHana.sql b/commercedbsync/resources/sql/createSchedulerTablesHANA.sql
similarity index 92%
rename from commercedbsync/resources/sql/createSchedulerTablesHana.sql
rename to commercedbsync/resources/sql/createSchedulerTablesHANA.sql
index bf15d86..d2a9c74 100644
--- a/commercedbsync/resources/sql/createSchedulerTablesHana.sql
+++ b/commercedbsync/resources/sql/createSchedulerTablesHANA.sql
@@ -14,6 +14,11 @@ IF tablename = 'MIGRATIONTOOLKIT_TABLECOPYSTATUS' AND :found > 0
THEN
DROP TABLE MIGRATIONTOOLKIT_TABLECOPYSTATUS;
END IF;
+
+IF tablename = 'MIGRATIONTOOLKIT_TABLECOPYBATCHES' AND :found > 0
+ THEN
+DROP TABLE MIGRATIONTOOLKIT_TABLECOPYBATCHES;
+END IF;
END;
#
CALL MIGRATION_PROCEDURE('MIGRATIONTOOLKIT_TABLECOPYTASKS');
@@ -32,15 +37,32 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
failure char(1) NOT NULL DEFAULT '0',
error NVARCHAR(5000) NULL,
published char(1) NOT NULL DEFAULT '0',
+ truncated char(1) NOT NULL DEFAULT '0',
lastupdate Timestamp NOT NULL DEFAULT '0001-01-01 00:00:00',
avgwriterrowthroughput numeric(10,2) NULL DEFAULT 0,
avgreaderrowthroughput numeric(10,2) NULL DEFAULT 0,
+ copymethod NVARCHAR(255) NULL,
+ keycolumns NVARCHAR(255) NULL,
durationinseconds numeric(10,2) NULL DEFAULT 0,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
);
#
+CALL MIGRATION_PROCEDURE('MIGRATIONTOOLKIT_TABLECOPYBATCHES');
+#
+
+CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYBATCHES (
+ migrationId NVARCHAR(255) NOT NULL,
+ batchId int NOT NULL DEFAULT 0,
+ pipelinename NVARCHAR(255) NOT NULL,
+ lowerBoundary NVARCHAR(255) NOT NULL,
+ upperBoundary NVARCHAR(255) NULL,
+ PRIMARY KEY (migrationid, batchId, pipelinename)
+);
+
+#
+
CALL MIGRATION_PROCEDURE('MIGRATIONTOOLKIT_TABLECOPYSTATUS');
#
diff --git a/commercedbsync/resources/sql/createSchedulerTables.sql b/commercedbsync/resources/sql/createSchedulerTablesMSSQL.sql
similarity index 87%
rename from commercedbsync/resources/sql/createSchedulerTables.sql
rename to commercedbsync/resources/sql/createSchedulerTablesMSSQL.sql
index bad4a15..a34bc37 100644
--- a/commercedbsync/resources/sql/createSchedulerTables.sql
+++ b/commercedbsync/resources/sql/createSchedulerTablesMSSQL.sql
@@ -14,13 +14,27 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
failure char(1) NOT NULL DEFAULT '0',
error NVARCHAR(MAX) NULL,
published char(1) NOT NULL DEFAULT '0',
+ truncated char(1) NOT NULL DEFAULT '0',
lastupdate DATETIME2 NOT NULL DEFAULT '0001-01-01 00:00:00',
avgwriterrowthroughput numeric(10,2) NULL DEFAULT 0,
avgreaderrowthroughput numeric(10,2) NULL DEFAULT 0,
+ copymethod NVARCHAR(255) NULL,
+ keycolumns NVARCHAR(255) NULL,
durationinseconds numeric(10,2) NULL DEFAULT 0,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
);
+DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYBATCHES;
+
+CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYBATCHES (
+ migrationId NVARCHAR(255) NOT NULL,
+ batchId int NOT NULL DEFAULT 0,
+ pipelinename NVARCHAR(255) NOT NULL,
+ lowerBoundary NVARCHAR(255) NOT NULL,
+ upperBoundary NVARCHAR(255) NULL,
+ PRIMARY KEY (migrationid, batchId, pipelinename)
+);
+
DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYSTATUS;
CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYSTATUS (
@@ -45,12 +59,6 @@ AS
BEGIN
DECLARE @relevant_count integer = 0
SET NOCOUNT ON
- /*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
- * License: Apache-2.0
- *
- */
-
-- latest update overall = latest update timestamp of updated tasks
UPDATE s
SET s.lastUpdate = t.latestUpdate
diff --git a/commercedbsync/resources/sql/createSchedulerTablesMYSQL.sql b/commercedbsync/resources/sql/createSchedulerTablesMYSQL.sql
new file mode 100644
index 0000000..1838fe4
--- /dev/null
+++ b/commercedbsync/resources/sql/createSchedulerTablesMYSQL.sql
@@ -0,0 +1,112 @@
+DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYTASKS;
+#
+CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS
+(
+ targetnodeId int NOT NULL,
+ migrationId VARCHAR(255) NOT NULL,
+ pipelinename VARCHAR(255) NOT NULL,
+ sourcetablename VARCHAR(255) NOT NULL,
+ targettablename VARCHAR(255) NOT NULL,
+ columnmap TEXT NULL,
+ duration VARCHAR(255) NULL,
+ sourcerowcount int NOT NULL DEFAULT 0,
+ targetrowcount int NOT NULL DEFAULT 0,
+ failure char(1) NOT NULL DEFAULT '0',
+ error TEXT NULL,
+ published char(1) NOT NULL DEFAULT '0',
+ truncated char(1) NOT NULL DEFAULT '0',
+ lastupdate DATETIME NOT NULL DEFAULT '0001-01-01 00:00:00',
+ avgwriterrowthroughput numeric(10, 2) NULL DEFAULT 0,
+ avgreaderrowthroughput numeric(10, 2) NULL DEFAULT 0,
+ copymethod VARCHAR(255) NULL,
+ keycolumns VARCHAR(255) NULL,
+ durationinseconds numeric(10, 2) NULL DEFAULT 0,
+ PRIMARY KEY (migrationid, targetnodeid, pipelinename)
+);
+#
+DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYBATCHES;
+#
+CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYBATCHES
+(
+ migrationId VARCHAR(255) NOT NULL,
+ batchId int NOT NULL DEFAULT 0,
+ pipelinename VARCHAR(255) NOT NULL,
+ lowerBoundary VARCHAR(255) NOT NULL,
+ upperBoundary VARCHAR(255) NULL,
+ PRIMARY KEY (migrationid, batchId, pipelinename)
+);
+#
+DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYSTATUS;
+#
+CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYSTATUS
+(
+ migrationId VARCHAR(255) NOT NULL,
+ startAt datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ endAt datetime,
+ lastUpdate datetime,
+ total int NOT NULL DEFAULT 0,
+ completed int NOT NULL DEFAULT 0,
+ failed int NOT NULL DEFAULT 0,
+ status VARCHAR(255) NOT NULL DEFAULT 'RUNNING',
+ PRIMARY KEY (migrationid)
+);
+#
+DROP TRIGGER IF EXISTS MIGRATIONTOOLKIT_TABLECOPYSTATUS_Insert;
+DROP TRIGGER IF EXISTS MIGRATIONTOOLKIT_TABLECOPYSTATUS_Update;
+#
+CREATE TRIGGER MIGRATIONTOOLKIT_TABLECOPYSTATUS_Insert
+ AFTER INSERT
+ ON MIGRATIONTOOLKIT_TABLECOPYTASKS
+ FOR EACH ROW
+BEGIN
+ -- latest update overall = latest update timestamp of updated tasks
+ UPDATE MIGRATIONTOOLKIT_TABLECOPYSTATUS s
+ SET s.lastUpdate = NEW.lastUpdate
+ WHERE s.migrationId = NEW.migrationId;
+END;
+#
+CREATE TRIGGER MIGRATIONTOOLKIT_TABLECOPYSTATUS_Update
+ AFTER UPDATE
+ ON MIGRATIONTOOLKIT_TABLECOPYTASKS
+ FOR EACH ROW
+BEGIN
+ -- latest update overall = latest update timestamp of updated tasks
+ UPDATE MIGRATIONTOOLKIT_TABLECOPYSTATUS s
+ SET s.lastUpdate = NEW.lastUpdate
+ WHERE s.migrationId = OLD.migrationId;
+
+ IF NEW.failure = '1' OR NEW.duration IS NOT NULL THEN
+ UPDATE MIGRATIONTOOLKIT_TABLECOPYSTATUS s
+ INNER JOIN (
+ SELECT migrationId, COUNT(pipelinename) AS completed
+ FROM MIGRATIONTOOLKIT_TABLECOPYTASKS
+ WHERE duration IS NOT NULL
+ GROUP BY migrationId
+ ) AS t
+ ON s.migrationId = t.migrationId
+ SET s.completed = t.completed;
+
+ -- update failed count when tasks failed
+ UPDATE MIGRATIONTOOLKIT_TABLECOPYSTATUS s
+ INNER JOIN (
+ SELECT migrationId, COUNT(pipelinename) AS failed
+ FROM MIGRATIONTOOLKIT_TABLECOPYTASKS
+ WHERE failure = '1'
+ GROUP BY migrationId
+ ) AS t
+ ON s.migrationId = t.migrationId
+ SET s.failed = t.failed;
+
+ UPDATE MIGRATIONTOOLKIT_TABLECOPYSTATUS
+ SET endAt = UTC_TIMESTAMP()
+ WHERE migrationId = OLD.migrationId
+ AND total = completed
+ AND endAt IS NULL;
+
+ UPDATE MIGRATIONTOOLKIT_TABLECOPYSTATUS
+ SET status = 'PROCESSED'
+ WHERE migrationId = OLD.migrationId
+ AND status = 'RUNNING'
+ AND total = completed;
+ END IF;
+END;
diff --git a/commercedbsync/resources/sql/createSchedulerTablesOracle.sql b/commercedbsync/resources/sql/createSchedulerTablesORACLE.sql
similarity index 88%
rename from commercedbsync/resources/sql/createSchedulerTablesOracle.sql
rename to commercedbsync/resources/sql/createSchedulerTablesORACLE.sql
index 7286422..1b13a17 100644
--- a/commercedbsync/resources/sql/createSchedulerTablesOracle.sql
+++ b/commercedbsync/resources/sql/createSchedulerTablesORACLE.sql
@@ -1,22 +1,3 @@
-/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
- * License: Apache-2.0
- *
- */
-
-/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
- * License: Apache-2.0
- *
- */
-
-/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
- * License: Apache-2.0
- *
- */
-
-
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS';
EXCEPTION
@@ -39,9 +20,12 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
failure char(1) DEFAULT '0' NOT NULL,
error CLOB NULL,
published char(1) DEFAULT '0' NOT NULL,
+ truncated char(1) DEFAULT '0' NOT NULL,
lastupdate Timestamp NOT NULL,
avgwriterrowthroughput number(10,2) DEFAULT 0 NULL,
avgreaderrowthroughput number(10,2) DEFAULT 0 NULL,
+ copymethod NVARCHAR2(255) NULL,
+ keycolumns NVARCHAR2(255) NULL,
durationinseconds number(10,2) DEFAULT 0 NULL,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
)
@@ -49,6 +33,26 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
+BEGIN
+ EXECUTE IMMEDIATE 'DROP TABLE MIGRATIONTOOLKIT_TABLECOPYBATCHES';
+EXCEPTION
+ WHEN OTHERS THEN NULL;
+END;
+/
+
+
+CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYBATCHES (
+ migrationId NVARCHAR2(255) NOT NULL,
+ batchId number(10) DEFAULT 0 NOT NULL,
+ pipelinename NVARCHAR2(255) NOT NULL,
+ lowerBoundary NVARCHAR2(255) NOT NULL,
+ upperBoundary NVARCHAR2(255) NULL,
+ PRIMARY KEY (migrationid, batchId, pipelinename)
+)
+/
+
+
+
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE MIGRATIONTOOLKIT_TABLECOPYSTATUS';
diff --git a/commercedbsync/resources/sql/createSchedulerTablesPostGres.sql b/commercedbsync/resources/sql/createSchedulerTablesPOSTGRESQL.sql
similarity index 87%
rename from commercedbsync/resources/sql/createSchedulerTablesPostGres.sql
rename to commercedbsync/resources/sql/createSchedulerTablesPOSTGRESQL.sql
index 451451b..d5440aa 100644
--- a/commercedbsync/resources/sql/createSchedulerTablesPostGres.sql
+++ b/commercedbsync/resources/sql/createSchedulerTablesPOSTGRESQL.sql
@@ -1,23 +1,3 @@
-/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
- * License: Apache-2.0
- *
- */
-
-/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
- * License: Apache-2.0
- *
- */
-
-/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
- * License: Apache-2.0
- *
- */
-
-
-
DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYTASKS;
#
@@ -35,15 +15,33 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
failure char(1) NOT NULL DEFAULT '0',
error text NULL,
published char(1) NOT NULL DEFAULT '0',
+ truncated char(1) NOT NULL DEFAULT '0',
lastupdate timestamp NOT NULL DEFAULT '0001-01-01 00:00:00',
avgwriterrowthroughput numeric(10,2) NULL DEFAULT 0,
avgreaderrowthroughput numeric(10,2) NULL DEFAULT 0,
+ copymethod VARCHAR(255) NULL,
+ keycolumns VARCHAR(255) NULL,
durationinseconds numeric(10,2) NULL DEFAULT 0,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
);
#
+DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYBATCHES;
+
+#
+
+CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYBATCHES (
+ migrationId VARCHAR(255) NOT NULL,
+ batchId int NOT NULL DEFAULT 0,
+ pipelinename VARCHAR(255) NOT NULL,
+ lowerBoundary VARCHAR(255) NOT NULL,
+ upperBoundary VARCHAR(255) NULL,
+ PRIMARY KEY (migrationid, batchId, pipelinename)
+);
+
+#
+
DROP TABLE IF EXISTS MIGRATIONTOOLKIT_TABLECOPYSTATUS;
#
diff --git a/commercedbsync/resources/sql/transformationFunctions/hsqldb-general.sql b/commercedbsync/resources/sql/transformationFunctions/hsqldb-general.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/hsqldb-typeinfotable.sql b/commercedbsync/resources/sql/transformationFunctions/hsqldb-typeinfotable.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/mssql-general.sql b/commercedbsync/resources/sql/transformationFunctions/mssql-general.sql
index 174a62d..ce4b833 100644
--- a/commercedbsync/resources/sql/transformationFunctions/mssql-general.sql
+++ b/commercedbsync/resources/sql/transformationFunctions/mssql-general.sql
@@ -34,25 +34,4 @@ IF @uid = 'admin'
RETURN 'plain'
RETURN '*'
-END;
-
-CREATE OR ALTER FUNCTION mask_custom(@Prefix int, @Mask varchar(max), @Suffix int, @Original varchar(MAX))
-RETURNS VARCHAR(max)
-AS
-BEGIN
-
- RETURN SUBSTRING(@Original,1,@Prefix) +
- @Mask +
- SUBSTRING(@Original,LEN(@Original) - @Suffix + 1, LEN(@Original))
-END;
-
-CREATE OR ALTER FUNCTION mask_email(@String varchar(MAX))
-RETURNS VARCHAR(max)
-AS
-BEGIN
-
- RETURN LEFT(@String, 3) + '*****@'
- + REVERSE(LEFT(RIGHT(REVERSE(@String) , CHARINDEX('@', @String) +2), 2))
- + '******'
- + RIGHT(@String, 4)
END;
\ No newline at end of file
diff --git a/commercedbsync/resources/sql/transformationFunctions/mysql-general.sql b/commercedbsync/resources/sql/transformationFunctions/mysql-general.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/mysql-typeinfotable.sql b/commercedbsync/resources/sql/transformationFunctions/mysql-typeinfotable.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/oracle-general.sql b/commercedbsync/resources/sql/transformationFunctions/oracle-general.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/oracle-typeinfotable.sql b/commercedbsync/resources/sql/transformationFunctions/oracle-typeinfotable.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/postgresql-general.sql b/commercedbsync/resources/sql/transformationFunctions/postgresql-general.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/postgresql-typeinfotable.sql b/commercedbsync/resources/sql/transformationFunctions/postgresql-typeinfotable.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/sap-general.sql b/commercedbsync/resources/sql/transformationFunctions/sap-general.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/resources/sql/transformationFunctions/sap-typeinfotable.sql b/commercedbsync/resources/sql/transformationFunctions/sap-typeinfotable.sql
deleted file mode 100644
index e69de29..0000000
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/CommercedbsyncStandalone.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/CommercedbsyncStandalone.java
index 6e3e587..f9a74b9 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/CommercedbsyncStandalone.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/CommercedbsyncStandalone.java
@@ -1,8 +1,9 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
+
package com.sap.cx.boosters.commercedbsync;
import de.hybris.platform.core.Registry;
@@ -10,21 +11,22 @@
import de.hybris.platform.util.RedeployUtilities;
import de.hybris.platform.util.Utilities;
-
/**
- * Demonstration of how to write a standalone application that can be run directly from within eclipse or from the
- * commandline.
+ * Demonstration of how to write a standalone application that can be run
+ * directly from within eclipse or from the commandline.
* To run this from commandline, just use the following command:
*
* java -jar bootstrap/bin/ybootstrap.jar "new CommercedbsyncStandalone().run();"
- * From eclipse, just run as Java Application. Note that you maybe need to add all other projects like
- * ext-commerce, ext-pim to the Launch configuration classpath.
+ * From eclipse, just run as Java Application. Note that you maybe need
+ * to add all other projects like ext-commerce, ext-pim to the Launch
+ * configuration classpath.
*/
public class CommercedbsyncStandalone {
/**
* Main class to be able to run it directly as a java program.
*
- * @param args the arguments from commandline
+ * @param args
+ * the arguments from commandline
*/
public static void main(final String[] args) {
new CommercedbsyncStandalone().run();
@@ -35,8 +37,8 @@ public void run() {
Registry.activateMasterTenant();
final JaloSession jaloSession = JaloSession.getCurrentSession();
- System.out.println("Session ID: " + jaloSession.getSessionID()); //NOPMD
- System.out.println("User: " + jaloSession.getUser()); //NOPMD
+ System.out.println("Session ID: " + jaloSession.getSessionID()); // NOPMD
+ System.out.println("User: " + jaloSession.getUser()); // NOPMD
Utilities.printAppInfo();
RedeployUtilities.shutdown();
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/DataRepositoryAdapter.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/DataRepositoryAdapter.java
index e235c34..d0fb03b 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/DataRepositoryAdapter.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/DataRepositoryAdapter.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
@@ -21,6 +21,7 @@ public interface DataRepositoryAdapter {
DataSet getBatchOrderedByColumn(MigrationContext context, SeekQueryDefinition queryDefinition) throws Exception;
- DataSet getBatchMarkersOrderedByColumn(MigrationContext context, MarkersQueryDefinition queryDefinition) throws Exception;
+ DataSet getBatchMarkersOrderedByColumn(MigrationContext context, MarkersQueryDefinition queryDefinition)
+ throws Exception;
}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/impl/ContextualDataRepositoryAdapter.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/impl/ContextualDataRepositoryAdapter.java
index b0aae83..0049ae8 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/impl/ContextualDataRepositoryAdapter.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/adapter/impl/ContextualDataRepositoryAdapter.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
@@ -18,12 +18,12 @@
import java.time.Instant;
/**
- * Controls the way the repository is accessed by adapting the most common reading
- * operations based on the configured context
+ * Controls the way the repository is accessed by adapting the most common
+ * reading operations based on the configured context
*/
public class ContextualDataRepositoryAdapter implements DataRepositoryAdapter {
- private DataRepository repository;
+ private final DataRepository repository;
public ContextualDataRepositoryAdapter(DataRepository repository) {
this.repository = repository;
@@ -31,15 +31,15 @@ public ContextualDataRepositoryAdapter(DataRepository repository) {
@Override
public long getRowCount(MigrationContext context, String table) throws Exception {
- if(context.isDeletionEnabled() || context.isLpTableMigrationEnabled()){
- return repository.getRowCountModifiedAfter(table, getIncrementalTimestamp(context),context.isDeletionEnabled(), context.isLpTableMigrationEnabled());
- }
- else{
- if (context.isIncrementalModeEnabled()) {
- return repository.getRowCountModifiedAfter(table, getIncrementalTimestamp(context));
+ if (context.isDeletionEnabled() || context.isLpTableMigrationEnabled()) {
+ return repository.getRowCountModifiedAfter(table, getIncrementalTimestamp(context),
+ context.isDeletionEnabled(), context.isLpTableMigrationEnabled());
} else {
- return repository.getRowCount(table);
- }
+ if (context.isIncrementalModeEnabled()) {
+ return repository.getRowCountModifiedAfter(table, getIncrementalTimestamp(context));
+ } else {
+ return repository.getRowCount(table);
+ }
}
}
@@ -53,7 +53,8 @@ public DataSet getAll(MigrationContext context, String table) throws Exception {
}
@Override
- public DataSet getBatchWithoutIdentifier(MigrationContext context, OffsetQueryDefinition queryDefinition) throws Exception {
+ public DataSet getBatchWithoutIdentifier(MigrationContext context, OffsetQueryDefinition queryDefinition)
+ throws Exception {
if (context.isIncrementalModeEnabled()) {
return repository.getBatchWithoutIdentifier(queryDefinition, getIncrementalTimestamp(context));
} else {
@@ -62,7 +63,8 @@ public DataSet getBatchWithoutIdentifier(MigrationContext context, OffsetQueryDe
}
@Override
- public DataSet getBatchOrderedByColumn(MigrationContext context, SeekQueryDefinition queryDefinition) throws Exception {
+ public DataSet getBatchOrderedByColumn(MigrationContext context, SeekQueryDefinition queryDefinition)
+ throws Exception {
if (context.isIncrementalModeEnabled()) {
return repository.getBatchOrderedByColumn(queryDefinition, getIncrementalTimestamp(context));
} else {
@@ -71,7 +73,8 @@ public DataSet getBatchOrderedByColumn(MigrationContext context, SeekQueryDefini
}
@Override
- public DataSet getBatchMarkersOrderedByColumn(MigrationContext context, MarkersQueryDefinition queryDefinition) throws Exception {
+ public DataSet getBatchMarkersOrderedByColumn(MigrationContext context, MarkersQueryDefinition queryDefinition)
+ throws Exception {
if (context.isIncrementalModeEnabled()) {
return repository.getBatchMarkersOrderedByColumn(queryDefinition, getIncrementalTimestamp(context));
} else {
@@ -82,7 +85,9 @@ public DataSet getBatchMarkersOrderedByColumn(MigrationContext context, MarkersQ
private Instant getIncrementalTimestamp(MigrationContext context) {
Instant incrementalTimestamp = context.getIncrementalTimestamp();
if (incrementalTimestamp == null) {
- throw new IllegalStateException("Timestamp cannot be null in incremental mode. Set a timestamp using the property " + CommercedbsyncConstants.MIGRATION_DATA_INCREMENTAL_TIMESTAMP);
+ throw new IllegalStateException(
+ "Timestamp cannot be null in incremental mode. Set a timestamp using the property "
+ + CommercedbsyncConstants.MIGRATION_DATA_INCREMENTAL_TIMESTAMP);
}
return incrementalTimestamp;
}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataCopyMethod.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataCopyMethod.java
new file mode 100644
index 0000000..f409f2d
--- /dev/null
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataCopyMethod.java
@@ -0,0 +1,11 @@
+/*
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * License: Apache-2.0
+ *
+ */
+
+package com.sap.cx.boosters.commercedbsync.concurrent;
+
+public enum DataCopyMethod {
+ SEEK, OFFSET, DEFAULT
+}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipe.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipe.java
index fd00539..d47de79 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipe.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipe.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
@@ -9,8 +9,9 @@
import javax.annotation.concurrent.ThreadSafe;
/**
- * Used to separate database reading and writing operations, after reading data from the DB, the result
- * is put to the pipe and can be used by the database writer later on -> asynchronously
+ * Used to separate database reading and writing operations, after reading data
+ * from the DB, the result is put to the pipe and can be used by the database
+ * writer later on -> asynchronously
*
* @param
*/
@@ -21,4 +22,8 @@ public interface DataPipe {
void put(MaybeFinished value) throws Exception;
MaybeFinished get() throws Exception;
+
+ int size();
+
+ int getWaitersCount();
}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipeFactory.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipeFactory.java
index 77bdcca..381964c 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipeFactory.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataPipeFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
@@ -7,11 +7,10 @@
package com.sap.cx.boosters.commercedbsync.concurrent;
import com.sap.cx.boosters.commercedbsync.context.CopyContext;
-import com.sap.cx.boosters.commercedbsync.dataset.DataSet;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public interface DataPipeFactory {
- DataPipe create(CopyContext context, CopyContext.DataCopyItem item) throws Exception;
+ DataPipe create(CopyContext context, CopyContext.DataCopyItem item) throws Exception;
}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolConfigBuilder.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolConfigBuilder.java
new file mode 100644
index 0000000..5c56806
--- /dev/null
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolConfigBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * License: Apache-2.0
+ *
+ */
+
+package com.sap.cx.boosters.commercedbsync.concurrent;
+
+import com.sap.cx.boosters.commercedbsync.DataThreadPoolConfig;
+import com.sap.cx.boosters.commercedbsync.context.MigrationContext;
+
+public class DataThreadPoolConfigBuilder {
+
+ private final DataThreadPoolConfig config;
+
+ public DataThreadPoolConfigBuilder(MigrationContext context) {
+ config = new DataThreadPoolConfig();
+ }
+
+ public DataThreadPoolConfigBuilder withPoolSize(int poolSize) {
+ config.setPoolSize(poolSize);
+ return this;
+ }
+
+ public DataThreadPoolConfig build() {
+ return config;
+ }
+}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolFactory.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolFactory.java
new file mode 100644
index 0000000..aea73f7
--- /dev/null
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * License: Apache-2.0
+ *
+ */
+
+package com.sap.cx.boosters.commercedbsync.concurrent;
+
+import com.sap.cx.boosters.commercedbsync.context.CopyContext;
+import com.sap.cx.boosters.commercedbsync.DataThreadPoolConfig;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public interface DataThreadPoolFactory {
+ ThreadPoolTaskExecutor create(CopyContext context, DataThreadPoolConfig config);
+
+ void destroy(ThreadPoolTaskExecutor executor);
+
+ DataThreadPoolMonitor getMonitor();
+}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolMonitor.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolMonitor.java
new file mode 100644
index 0000000..d4c262f
--- /dev/null
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataThreadPoolMonitor.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * License: Apache-2.0
+ *
+ */
+
+package com.sap.cx.boosters.commercedbsync.concurrent;
+
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+public interface DataThreadPoolMonitor {
+ void subscribe(ThreadPoolTaskExecutor executor);
+
+ void unsubscribe(ThreadPoolTaskExecutor executor);
+
+ int getActiveCount();
+
+ int getMaxPoolSize();
+}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerExecutor.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerExecutor.java
index 019ee9a..d9e1a8c 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerExecutor.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerExecutor.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerPoolFactory.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerPoolFactory.java
index 64e7f0c..8a34d7a 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerPoolFactory.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/DataWorkerPoolFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MDCTaskDecorator.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MDCTaskDecorator.java
index 8d37302..2c06238 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MDCTaskDecorator.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MDCTaskDecorator.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MaybeFinished.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MaybeFinished.java
index e6458da..d1354cc 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MaybeFinished.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/MaybeFinished.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
@@ -7,12 +7,13 @@
package com.sap.cx.boosters.commercedbsync.concurrent;
/**
- * MaybeFinished keeps track status of the data set that is currently being processed -> if all is ok,
- * then status will be done, if theres an exception, it will be poison
+ * MaybeFinished keeps track status of the data set that is currently being
+ * processed -> if all is ok, then status will be done, if theres an exception,
+ * it will be poison
*
* @param
*/
-public class MaybeFinished {
+public final class MaybeFinished {
private final T value;
private final boolean done;
private final boolean poison;
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/PipeAbortedException.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/PipeAbortedException.java
index 8fe142b..3ad3605 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/PipeAbortedException.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/PipeAbortedException.java
@@ -1,5 +1,5 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipe.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipe.java
index 6d07f3f..bff9002 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipe.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipe.java
@@ -1,24 +1,25 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
package com.sap.cx.boosters.commercedbsync.concurrent.impl;
+import com.sap.cx.boosters.commercedbsync.concurrent.DataPipe;
import com.sap.cx.boosters.commercedbsync.concurrent.MaybeFinished;
+import com.sap.cx.boosters.commercedbsync.concurrent.PipeAbortedException;
import com.sap.cx.boosters.commercedbsync.constants.CommercedbsyncConstants;
+import com.sap.cx.boosters.commercedbsync.context.CopyContext;
import com.sap.cx.boosters.commercedbsync.scheduler.DatabaseCopyScheduler;
import com.sap.cx.boosters.commercedbsync.service.DatabaseCopyTaskRepository;
-import com.sap.cx.boosters.commercedbsync.concurrent.DataPipe;
-import com.sap.cx.boosters.commercedbsync.concurrent.PipeAbortedException;
-import com.sap.cx.boosters.commercedbsync.context.CopyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class DefaultDataPipe implements DataPipe {
@@ -27,12 +28,14 @@ public class DefaultDataPipe implements DataPipe {
private final BlockingQueue> queue;
private final int defaultTimeout;
private final AtomicReference abortException = new AtomicReference<>();
+ private final AtomicInteger size = new AtomicInteger();
private final CopyContext context;
private final CopyContext.DataCopyItem copyItem;
private final DatabaseCopyTaskRepository taskRepository;
private final DatabaseCopyScheduler scheduler;
- public DefaultDataPipe(DatabaseCopyScheduler scheduler, DatabaseCopyTaskRepository taskRepository, CopyContext context, CopyContext.DataCopyItem copyItem, int timeoutInSeconds, int capacity) {
+ public DefaultDataPipe(DatabaseCopyScheduler scheduler, DatabaseCopyTaskRepository taskRepository,
+ CopyContext context, CopyContext.DataCopyItem copyItem, int timeoutInSeconds, int capacity) {
this.taskRepository = taskRepository;
this.scheduler = scheduler;
this.context = context;
@@ -57,25 +60,39 @@ public void requestAbort(Exception cause) {
LOG.warn("could not update error status!", e);
}
try {
- this.queue.offer(MaybeFinished.poison(), defaultTimeout, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.warn("Could not flush pipe with poison", e);
+ flushPipe();
+ } catch (Exception e) {
+ LOG.warn("Could not flush pipe", e);
}
}
}
+ private void flushPipe() throws Exception {
+ // make sure waiting queue offers can be flushed
+ while (getWaitersCount() > 0) {
+ queue.poll(defaultTimeout, TimeUnit.SECONDS);
+ size.decrementAndGet();
+ }
+ queue.clear();
+ }
+
private boolean isAborted() throws Exception {
if (this.abortException.get() == null && scheduler.isAborted(this.context)) {
- this.requestAbort(new PipeAbortedException("Migration aborted"));
+ requestAbort(new PipeAbortedException("Migration aborted"));
}
return this.abortException.get() != null;
}
- @Override
- public void put(MaybeFinished value) throws Exception {
+ private void assertPipeNotAborted() throws Exception {
if (isAborted()) {
- throw new PipeAbortedException("pipe aborted", this.abortException.get());
+ throw new PipeAbortedException("Pipe aborted", this.abortException.get());
}
+ }
+
+ @Override
+ public void put(MaybeFinished value) throws Exception {
+ assertPipeNotAborted();
+ size.incrementAndGet();
if (!queue.offer(value, defaultTimeout, TimeUnit.SECONDS)) {
throw new RuntimeException("cannot put new item in time");
}
@@ -83,16 +100,25 @@ public void put(MaybeFinished value) throws Exception {
@Override
public MaybeFinished get() throws Exception {
- if (isAborted()) {
- throw new PipeAbortedException("pipe aborted", this.abortException.get());
- }
+ assertPipeNotAborted();
MaybeFinished element = queue.poll(defaultTimeout, TimeUnit.SECONDS);
- if (isAborted()) {
- throw new PipeAbortedException("pipe aborted", this.abortException.get());
- }
+ size.decrementAndGet();
if (element == null) {
- throw new RuntimeException(String.format("cannot get new item in time. Consider increasing the value of the property '%s' or '%s'", CommercedbsyncConstants.MIGRATION_DATA_PIPE_TIMEOUT, CommercedbsyncConstants.MIGRATION_DATA_PIPE_CAPACITY));
+ throw new RuntimeException(String.format(
+ "cannot get new item in time. Consider increasing the value of the property '%s' or '%s'",
+ CommercedbsyncConstants.MIGRATION_DATA_PIPE_TIMEOUT,
+ CommercedbsyncConstants.MIGRATION_DATA_PIPE_CAPACITY));
}
return element;
}
+
+ @Override
+ public int size() {
+ return size.get();
+ }
+
+ @Override
+ public int getWaitersCount() {
+ return size.get() - queue.size();
+ }
}
diff --git a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipeFactory.java b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipeFactory.java
index 2a37aa0..fd59dcf 100644
--- a/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipeFactory.java
+++ b/commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/DefaultDataPipeFactory.java
@@ -1,38 +1,45 @@
/*
- * Copyright: 2022 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
+ * Copyright: 2023 SAP SE or an SAP affiliate company and commerce-db-synccontributors.
* License: Apache-2.0
*
*/
package com.sap.cx.boosters.commercedbsync.concurrent.impl;
+import com.google.common.collect.Lists;
+import com.sap.cx.boosters.commercedbsync.adapter.DataRepositoryAdapter;
+import com.sap.cx.boosters.commercedbsync.adapter.impl.ContextualDataRepositoryAdapter;
+import com.sap.cx.boosters.commercedbsync.concurrent.DataCopyMethod;
+import com.sap.cx.boosters.commercedbsync.concurrent.DataPipe;
+import com.sap.cx.boosters.commercedbsync.concurrent.DataPipeFactory;
+import com.sap.cx.boosters.commercedbsync.concurrent.DataThreadPoolConfigBuilder;
+import com.sap.cx.boosters.commercedbsync.concurrent.DataThreadPoolFactory;
import com.sap.cx.boosters.commercedbsync.concurrent.DataWorkerExecutor;
import com.sap.cx.boosters.commercedbsync.concurrent.MaybeFinished;
+import com.sap.cx.boosters.commercedbsync.concurrent.impl.task.BatchMarkerDataReaderTask;
+import com.sap.cx.boosters.commercedbsync.concurrent.impl.task.BatchOffsetDataReaderTask;
+import com.sap.cx.boosters.commercedbsync.concurrent.impl.task.DataReaderTask;
+import com.sap.cx.boosters.commercedbsync.concurrent.impl.task.DefaultDataReaderTask;
+import com.sap.cx.boosters.commercedbsync.concurrent.impl.task.PipeTaskContext;
+import com.sap.cx.boosters.commercedbsync.context.CopyContext;
import com.sap.cx.boosters.commercedbsync.dataset.DataSet;
import com.sap.cx.boosters.commercedbsync.performance.PerformanceCategory;
import com.sap.cx.boosters.commercedbsync.performance.PerformanceRecorder;
-import com.sap.cx.boosters.commercedbsync.performance.PerformanceUnit;
import com.sap.cx.boosters.commercedbsync.scheduler.DatabaseCopyScheduler;
import com.sap.cx.boosters.commercedbsync.service.DatabaseCopyTaskRepository;
import com.sap.cx.boosters.commercedbsync.views.TableViewGenerator;
import org.apache.commons.lang3.tuple.Pair;
+import org.fest.util.Collections;
+import com.sap.cx.boosters.commercedbsync.DataThreadPoolConfig;
import com.sap.cx.boosters.commercedbsync.MarkersQueryDefinition;
-import com.sap.cx.boosters.commercedbsync.OffsetQueryDefinition;
-import com.sap.cx.boosters.commercedbsync.SeekQueryDefinition;
-import com.sap.cx.boosters.commercedbsync.adapter.DataRepositoryAdapter;
-import com.sap.cx.boosters.commercedbsync.adapter.impl.ContextualDataRepositoryAdapter;
-import com.sap.cx.boosters.commercedbsync.concurrent.DataPipe;
-import com.sap.cx.boosters.commercedbsync.concurrent.DataPipeFactory;
-import com.sap.cx.boosters.commercedbsync.concurrent.DataWorkerPoolFactory;
-import com.sap.cx.boosters.commercedbsync.concurrent.RetriableTask;
-import com.sap.cx.boosters.commercedbsync.context.CopyContext;
-import com.sap.cx.boosters.commercedbsync.context.MigrationContext;
+import com.sap.cx.boosters.commercedbsync.service.DatabaseCopyBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
@@ -46,9 +53,10 @@ public class DefaultDataPipeFactory implements DataPipeFactory {
private final DatabaseCopyTaskRepository taskRepository;
private final DatabaseCopyScheduler scheduler;
private final AsyncTaskExecutor executor;
- private final DataWorkerPoolFactory dataReadWorkerPoolFactory;
+ private final DataThreadPoolFactory dataReadWorkerPoolFactory;
- public DefaultDataPipeFactory(DatabaseCopyScheduler scheduler, DatabaseCopyTaskRepository taskRepository, AsyncTaskExecutor executor, DataWorkerPoolFactory dataReadWorkerPoolFactory) {
+ public DefaultDataPipeFactory(DatabaseCopyScheduler scheduler, DatabaseCopyTaskRepository taskRepository,
+ AsyncTaskExecutor executor, DataThreadPoolFactory dataReadWorkerPoolFactory) {
this.scheduler = scheduler;
this.taskRepository = taskRepository;
this.executor = executor;
@@ -59,8 +67,11 @@ public DefaultDataPipeFactory(DatabaseCopyScheduler scheduler, DatabaseCopyTaskR
public DataPipe create(CopyContext context, CopyContext.DataCopyItem item) throws Exception {
int dataPipeTimeout = context.getMigrationContext().getDataPipeTimeout();
int dataPipeCapacity = context.getMigrationContext().getDataPipeCapacity();
- DataPipe pipe = new DefaultDataPipe<>(scheduler, taskRepository, context, item, dataPipeTimeout, dataPipeCapacity);
- ThreadPoolTaskExecutor taskExecutor = dataReadWorkerPoolFactory.create(context);
+ DataPipe pipe = new DefaultDataPipe<>(scheduler, taskRepository, context, item, dataPipeTimeout,
+ dataPipeCapacity);
+ DataThreadPoolConfig threadPoolConfig = new DataThreadPoolConfigBuilder(context.getMigrationContext())
+ .withPoolSize(context.getMigrationContext().getMaxParallelReaderWorkers()).build();
+ final ThreadPoolTaskExecutor taskExecutor = dataReadWorkerPoolFactory.create(context, threadPoolConfig);
DataWorkerExecutor workerExecutor = new DefaultDataWorkerExecutor<>(taskExecutor);
try {
executor.submit(() -> {
@@ -75,9 +86,12 @@ public DataPipe create(CopyContext context, CopyContext.DataCopyItem it
} catch (Exception p) {
LOG.error("Cannot contaminate pipe ", p);
}
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
} finally {
if (taskExecutor != null) {
- taskExecutor.shutdown();
+ dataReadWorkerPoolFactory.destroy(taskExecutor);
}
}
});
@@ -87,59 +101,110 @@ public DataPipe create(CopyContext context, CopyContext.DataCopyItem it
return pipe;
}
- private void scheduleWorkers(CopyContext context, DataWorkerExecutor workerExecutor, DataPipe pipe, CopyContext.DataCopyItem copyItem) throws Exception {
- DataRepositoryAdapter dataRepositoryAdapter = new ContextualDataRepositoryAdapter(context.getMigrationContext().getDataSourceRepository());
+ private void scheduleWorkers(CopyContext context, DataWorkerExecutor workerExecutor,
+ DataPipe pipe, CopyContext.DataCopyItem copyItem) throws Exception {
+ DataRepositoryAdapter dataRepositoryAdapter = new ContextualDataRepositoryAdapter(
+ context.getMigrationContext().getDataSourceRepository());
String table = copyItem.getSourceItem();
long totalRows = copyItem.getRowCount();
long pageSize = context.getMigrationContext().getReaderBatchSize();
try {
- PerformanceRecorder recorder = context.getPerformanceProfiler().createRecorder(PerformanceCategory.DB_READ, table);
+ PerformanceRecorder recorder = context.getPerformanceProfiler().createRecorder(PerformanceCategory.DB_READ,
+ table);
recorder.start();
- PipeTaskContext pipeTaskContext = new PipeTaskContext(context, pipe, table, dataRepositoryAdapter, pageSize, recorder);
+ PipeTaskContext pipeTaskContext = new PipeTaskContext(context, pipe, table, dataRepositoryAdapter, pageSize,
+ recorder, taskRepository);
String batchColumn = "";
// help.sap.com/viewer/d0224eca81e249cb821f2cdf45a82ace/LATEST/en-US/08a27931a21441b59094c8a6aa2a880e.html
- if (context.getMigrationContext().getDataSourceRepository().isAuditTable(table) &&
- context.getMigrationContext().getDataSourceRepository().getAllColumnNames(table).contains("ID")) {
- batchColumn = "ID";
- } else if (context.getMigrationContext().getDataSourceRepository().getAllColumnNames(table).contains("PK")) {
+ final Set allColumnNames = context.getMigrationContext().getDataSourceRepository()
+ .getAllColumnNames(table);
+ if (allColumnNames.contains("PK")) {
batchColumn = "PK";
+ } else if (allColumnNames.contains("ID")
+ && context.getMigrationContext().getDataSourceRepository().isAuditTable(table)) {
+ batchColumn = "ID";
}
LOG.debug("Using batchColumn: {}", batchColumn.isEmpty() ? "NONE" : batchColumn);
if (batchColumn.isEmpty()) {
// trying offset queries with unique index columns
Set batchColumns;
- DataSet uniqueColumns = context.getMigrationContext().getDataSourceRepository().getUniqueColumns(TableViewGenerator.getTableNameForView(table, context.getMigrationContext()));
+ DataSet uniqueColumns = context.getMigrationContext().getDataSourceRepository()
+ .getUniqueColumns(TableViewGenerator.getTableNameForView(table, context.getMigrationContext()));
if (uniqueColumns.isNotEmpty()) {
if (uniqueColumns.getColumnCount() == 0) {
- throw new IllegalStateException("Corrupt dataset retrieved. Dataset should have information about unique columns");
+ throw new IllegalStateException(
+ "Corrupt dataset retrieved. Dataset should have information about unique columns");
}
- batchColumns = uniqueColumns.getAllResults().stream().map(row -> String.valueOf(row.get(0))).collect(Collectors.toCollection(LinkedHashSet::new));
- for (int offset = 0; offset < totalRows; offset += pageSize) {
- DataReaderTask dataReaderTask = new BatchOffsetDataReaderTask(pipeTaskContext, offset, batchColumns);
+ batchColumns = uniqueColumns.getAllResults().stream().map(row -> String.valueOf(row.get(0)))
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ taskRepository.updateTaskCopyMethod(context, copyItem, DataCopyMethod.OFFSET.toString());
+ taskRepository.updateTaskKeyColumns(context, copyItem, batchColumns);
+
+ List batches = null;
+ if (context.getMigrationContext().isSchedulerResumeEnabled()) {
+ Set pendingBatchesForPipeline = taskRepository
+ .findPendingBatchesForPipeline(context, copyItem);
+ batches = pendingBatchesForPipeline.stream()
+ .map(b -> Long.valueOf(b.getLowerBoundary().toString())).collect(Collectors.toList());
+ taskRepository.resetPipelineBatches(context, copyItem);
+ } else {
+ batches = new ArrayList<>();
+ for (long offset = 0; offset < totalRows; offset += pageSize) {
+ batches.add(offset);
+ }
+ }
+
+ for (int batchId = 0; batchId < batches.size(); batchId++) {
+ long offset = batches.get(batchId);
+ DataReaderTask dataReaderTask = new BatchOffsetDataReaderTask(pipeTaskContext, batchId, offset,
+ batchColumns);
+ taskRepository.scheduleBatch(context, copyItem, batchId, offset, offset + pageSize);
workerExecutor.safelyExecute(dataReaderTask);
}
} else {
- //If no unique columns available to do batch sorting, fallback to read all
- LOG.warn("Reading all rows at once without batching for table {}. Memory consumption might be negatively affected", table);
+ // If no unique columns available to do batch sorting, fallback to read all
+ LOG.warn(
+ "Reading all rows at once without batching for table {}. Memory consumption might be negatively affected",
+ table);
+ taskRepository.updateTaskCopyMethod(context, copyItem, DataCopyMethod.DEFAULT.toString());
+ if (context.getMigrationContext().isSchedulerResumeEnabled()) {
+ taskRepository.resetPipelineBatches(context, copyItem);
+ }
+ taskRepository.scheduleBatch(context, copyItem, 0, 0, totalRows);
DataReaderTask dataReaderTask = new DefaultDataReaderTask(pipeTaskContext);
workerExecutor.safelyExecute(dataReaderTask);
}
} else {
// do the pagination by value comparison
- MarkersQueryDefinition queryDefinition = new MarkersQueryDefinition();
- queryDefinition.setTable(table);
- queryDefinition.setColumn(batchColumn);
- queryDefinition.setBatchSize(pageSize);
- queryDefinition.setDeletionEnabled(context.getMigrationContext().isDeletionEnabled());
- queryDefinition.setLpTableEnabled(context.getMigrationContext().isLpTableMigrationEnabled());
- DataSet batchMarkers = dataRepositoryAdapter.getBatchMarkersOrderedByColumn(context.getMigrationContext(), queryDefinition);
- List> batchMarkersList = batchMarkers.getAllResults();
- if (batchMarkersList.isEmpty()) {
- throw new RuntimeException("Could not retrieve batch values for table " + table);
+ taskRepository.updateTaskCopyMethod(context, copyItem, DataCopyMethod.SEEK.toString());
+ taskRepository.updateTaskKeyColumns(context, copyItem, Lists.newArrayList(batchColumn));
+
+ List> batchMarkersList = null;
+ if (context.getMigrationContext().isSchedulerResumeEnabled()) {
+ batchMarkersList = new ArrayList<>();
+ Set pendingBatchesForPipeline = taskRepository
+ .findPendingBatchesForPipeline(context, copyItem);
+ batchMarkersList.addAll(pendingBatchesForPipeline.stream()
+ .map(b -> Collections.list(b.getLowerBoundary())).collect(Collectors.toList()));
+ taskRepository.resetPipelineBatches(context, copyItem);
+ } else {
+ MarkersQueryDefinition queryDefinition = new MarkersQueryDefinition();
+ queryDefinition.setTable(table);
+ queryDefinition.setColumn(batchColumn);
+ queryDefinition.setBatchSize(pageSize);
+ queryDefinition.setDeletionEnabled(context.getMigrationContext().isDeletionEnabled());
+ queryDefinition.setLpTableEnabled(context.getMigrationContext().isLpTableMigrationEnabled());
+ DataSet batchMarkers = dataRepositoryAdapter
+ .getBatchMarkersOrderedByColumn(context.getMigrationContext(), queryDefinition);
+ batchMarkersList = batchMarkers.getAllResults();
+ if (batchMarkersList.isEmpty()) {
+ throw new RuntimeException("Could not retrieve batch values for table " + table);
+ }
}
+
for (int i = 0; i < batchMarkersList.size(); i++) {
List