diff --git a/pom.xml b/pom.xml index da20d6f..9faa904 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ UTF-8 - 1.2.21 + 1.2.30 2.0.7 0.8.0 @@ -82,7 +82,7 @@ io.projectreactor reactor-core - RELEASE + 3.1.5.RELEASE org.jetbrains.kotlin diff --git a/src/main/kotlin/com/github/dataanon/dsl/Strategy.kt b/src/main/kotlin/com/github/dataanon/dsl/Strategy.kt index 6a7bb14..a8f6ba2 100644 --- a/src/main/kotlin/com/github/dataanon/dsl/Strategy.kt +++ b/src/main/kotlin/com/github/dataanon/dsl/Strategy.kt @@ -8,6 +8,7 @@ import com.github.dataanon.utils.ProgressBarGenerator import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers import java.util.concurrent.CountDownLatch +import java.util.logging.Level import java.util.logging.LogManager import java.util.logging.Logger @@ -37,7 +38,7 @@ abstract class Strategy { Flux.fromIterable(Iterable { reader }).map { table.execute(it) }.subscribe(writer) } catch (t: Throwable) { - logger.severe { "Error processing table '${table.name}': ${t.message}" } + logger.log(Level.SEVERE,"Error processing table '${table.name}': ${t.message}",t) } finally { latch.countDown() } diff --git a/src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt b/src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt index 4bf3f45..d46bf3e 100644 --- a/src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt +++ b/src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt @@ -18,39 +18,39 @@ import java.util.logging.Logger class TableWriter(dbConfig: DbConfig, private val table: Table, private val progressBar: ProgressBarGenerator) : BaseSubscriber() { private val logger = Logger.getLogger(TableWriter::class.java.name) - private val BATCH_COUNT = 1000 - private val TOTAL_ALLOWED_ERRORS = 1000 - private val conn = dbConfig.connection() - private lateinit var stmt: PreparedStatement - private lateinit var fields: List + private var stmt: PreparedStatement + private var fields: List private var batchIndex = 0 private var errorCount = 0 init { conn.autoCommit = false - } - - override fun hookOnSubscribe(subscription: Subscription?) { progressBar.start() val sql = table.generateWriteQuery() logger.info { "WRITE SQL: $sql" } this.stmt = conn.prepareStatement(sql) this.fields = table.allColumns() + } + + override fun hookOnSubscribe(subscription: Subscription?) { request(1) } - override fun hookOnNext(record: Record) { + override fun hookOnError(throwable: Throwable) { + throw throwable + } + override fun hookOnNext(record: Record) { fields.map { record.find(it) }.forEachIndexed { index, field -> writeToStatement(index, field) } stmt.addBatch() batchIndex++ progressBar.step() - if (batchIndex % BATCH_COUNT == 0) executeBatchStmt() + if (batchIndex % table.batchSize == 0) executeBatchStmt() request(1) } @@ -84,11 +84,11 @@ class TableWriter(dbConfig: DbConfig, private val table: Table, private val prog } } else { errorCount++ - logger.log(Level.SEVERE, "Error executing batch record: ${t.message}", t) + logger.log(Level.SEVERE, "Error executing batch: ${t.message}", t) } - if (errorCount > TOTAL_ALLOWED_ERRORS) { - logger.severe { "Total number of errors occurred is $errorCount for table ${table.name} exceeds allowed $TOTAL_ALLOWED_ERRORS." } - throw Exception("Too many errors while processing table ${table.name} exceeds $TOTAL_ALLOWED_ERRORS hence terminating table processing.") + if (errorCount > table.allowedErrors) { + logger.severe { "Total number of errors occurred is $errorCount for table ${table.name} exceeds allowed ${table.allowedErrors} ." } + throw Exception("Too many errors while processing table ${table.name} exceeds ${table.allowedErrors} hence terminating table processing.") } } @@ -98,9 +98,9 @@ class TableWriter(dbConfig: DbConfig, private val table: Table, private val prog } override fun hookFinally(type: SignalType?) { - progressBar.stop() - stmt.close() - conn.close() + progressBar?.stop() + if (stmt != null) stmt.close() + if (conn != null) conn.close() } } \ No newline at end of file diff --git a/src/main/kotlin/com/github/dataanon/model/Table.kt b/src/main/kotlin/com/github/dataanon/model/Table.kt index 5a308dd..3a14a2a 100644 --- a/src/main/kotlin/com/github/dataanon/model/Table.kt +++ b/src/main/kotlin/com/github/dataanon/model/Table.kt @@ -8,6 +8,8 @@ abstract class Table(val name: String) { private val columnStrategyContainer = mutableMapOf() internal var whereCondition = "" internal var limit = -1 + internal var allowedErrors = 1000 + internal var batchSize = 1000 fun anonymize(columnName: String): ColumnStrategy { val columnStrategy = ColumnStrategy() @@ -25,6 +27,16 @@ abstract class Table(val name: String) { return this } + fun writeBatchSize(size: Int): Table { + this.batchSize = size + return this + } + + fun allowedErrors(count: Int): Table { + this.allowedErrors = count + return this + } + protected fun columnNames() = columnStrategyContainer.keys.toList() internal fun execute(record: Record): Record { diff --git a/src/test/kotlin/com/github/dataanon/integration/SpecialFeaturesIntegrationTest.kt b/src/test/kotlin/com/github/dataanon/integration/SpecialFeaturesIntegrationTest.kt index 05aa855..c74b195 100644 --- a/src/test/kotlin/com/github/dataanon/integration/SpecialFeaturesIntegrationTest.kt +++ b/src/test/kotlin/com/github/dataanon/integration/SpecialFeaturesIntegrationTest.kt @@ -5,11 +5,13 @@ import com.github.dataanon.dsl.Whitelist import com.github.dataanon.model.DbConfig import com.github.dataanon.strategy.string.FixedString import com.github.dataanon.support.MoviesTable +import com.github.dataanon.support.MoviesTableHavingGenreSize10 +import com.github.dataanon.support.RatingsTable +import com.github.dataanon.utils.DataAnonTestLogHandler +import io.kotlintest.matchers.* import io.kotlintest.specs.FunSpec import java.sql.Date -import java.util.regex.Pattern -import kotlin.test.assertEquals -import kotlin.test.assertTrue +import java.util.logging.Level class SpecialFeaturesIntegrationTest : FunSpec() { @@ -29,15 +31,17 @@ class SpecialFeaturesIntegrationTest : FunSpec() { whitelist("MOVIE_ID", "RELEASE_DATE") anonymize("TITLE").using(FixedString("MY VALUE")) anonymize("GENRE").using(FixedString("Action")) - }.execute(progressBarEnabled = false) + }.execute(false) val records = destTable.findAll() - assertEquals(1, records.size) - assertEquals(1, records[0]["MOVIE_ID"]) - assertEquals("MY VALUE", records[0]["TITLE"]) - assertEquals(Date(1999, 5, 2), records[0]["RELEASE_DATE"]) - assertTrue(Pattern.compile("[a-zA-Z]+").matcher(records[0]["GENRE"].toString()).matches()) + records.size shouldBe 1 + + val anonymizedRecord = records[0] + anonymizedRecord["MOVIE_ID"] shouldBe 1 + anonymizedRecord["TITLE"] shouldBe "MY VALUE" + anonymizedRecord["RELEASE_DATE"] shouldBe Date(1999, 5, 2) + anonymizedRecord["GENRE"].toString() should match("[a-zA-Z]+") sourceTable.close() destTable.close() @@ -52,15 +56,17 @@ class SpecialFeaturesIntegrationTest : FunSpec() { .table("MOVIES", listOf("MOVIE_ID")) { anonymize("TITLE").using(FixedString("MY VALUE")) anonymize("GENRE") - }.execute(progressBarEnabled = true) + }.execute(true) val records = moviesTable.findAll() - assertEquals(1, records.size) - assertEquals(1, records[0]["MOVIE_ID"]) - assertEquals("MY VALUE", records[0]["TITLE"]) - assertEquals(Date(1999, 5, 2), records[0]["RELEASE_DATE"]) - assertTrue(Pattern.compile("[a-zA-Z]+").matcher(records[0]["GENRE"].toString()).matches()) + records.size shouldBe 1 + + val anonymizedRecord = records[0] + anonymizedRecord["MOVIE_ID"] shouldBe 1 + anonymizedRecord["TITLE"] shouldBe "MY VALUE" + anonymizedRecord["RELEASE_DATE"] shouldBe Date(1999, 5, 2) + anonymizedRecord["GENRE"].toString() should match("[a-zA-Z]+") moviesTable.close() } @@ -80,19 +86,111 @@ class SpecialFeaturesIntegrationTest : FunSpec() { whitelist("MOVIE_ID", "RELEASE_DATE") anonymize("TITLE").using(FixedString("MY VALUE")) anonymize("GENRE").using(FixedString("Action")) - }.execute(progressBarEnabled = false) + }.execute(false) val records = destTable.findAll() - assertEquals(1, records.size) + records.size shouldBe 1 + val anonymizedRecord = records[0] - assertEquals(1, anonymizedRecord["MOVIE_ID"]) - assertEquals("MY VALUE", anonymizedRecord["TITLE"]) - assertEquals(Date(1999, 5, 2), anonymizedRecord["RELEASE_DATE"]) - assertTrue(Pattern.compile("[a-zA-Z]+").matcher(anonymizedRecord["GENRE"].toString()).matches()) + anonymizedRecord["MOVIE_ID"] shouldBe 1 + anonymizedRecord["TITLE"] shouldBe "MY VALUE" + anonymizedRecord["RELEASE_DATE"] shouldBe Date(1999, 5, 2) + anonymizedRecord["GENRE"].toString() should match("[a-zA-Z]+") sourceTable.close() destTable.close() } + + test("error handling in case of destination table doesn't exists") { + DataAnonTestLogHandler.records.clear() + + val sourceDbConfig = DbConfig("jdbc:h2:mem:movies", "", "") + val sourceTable = MoviesTable(sourceDbConfig) + .insert(1, "Movie 1", "Drama", Date(1999, 5, 2)) + .insert(2, "Movie 2", "Action", Date(2005, 5, 2)) + + val destDbConfig = DbConfig("jdbc:h2:mem:movies_new", "", "") + + Whitelist(sourceDbConfig, destDbConfig) + .table("MOVIES") { + whitelist("MOVIE_ID", "RELEASE_DATE") + anonymize("TITLE").using(FixedString("MY VALUE")) + anonymize("GENRE").using(FixedString("Action")) + }.execute(false) + + val errors = DataAnonTestLogHandler.records.filter { it.level.intValue() > Level.INFO.intValue() } + errors.size shouldBe 1 + errors[0].message should haveSubstring("Table \"MOVIES\" not found") + + sourceTable.close() + } + + + test("error handling in case of source table doesn't exists") { + DataAnonTestLogHandler.records.clear() + + val sourceDbConfig = DbConfig("jdbc:h2:mem:movies_source", "", "") + + val destDbConfig = DbConfig("jdbc:h2:mem:movies_dest", "", "") + val destTable = MoviesTable(destDbConfig) + + Whitelist(sourceDbConfig, destDbConfig) + .table("MOVIES") { + whitelist("MOVIE_ID", "RELEASE_DATE") + anonymize("TITLE").using(FixedString("MY VALUE")) + anonymize("GENRE").using(FixedString("Action")) + }.execute(false) + + val records = destTable.findAll() + records.size shouldBe 0 + + val errors = DataAnonTestLogHandler.records.filter { it.level.intValue() > Level.INFO.intValue() } + errors.size shouldBe 1 + errors[0].message should haveSubstring("Table \"MOVIES\" not found") + + destTable.close() + } + + test("error handling in case of INSERT statement error exceeds allowed errors") { + DataAnonTestLogHandler.records.clear() + + val sourceDbConfig = DbConfig("jdbc:h2:mem:movies", "", "") + val sourceTable = MoviesTable(sourceDbConfig) + .insert(1, "Movie 1", "Really Long Genre To be fail 1", Date(1999, 5, 2)) + .insert(2, "Movie 2", "Really Long Genre To be fail 2", Date(2005, 5, 2)) + .insert(3, "Movie 3", "Action", Date(2005, 5, 2)) + .insert(4, "Movie 4", "Really Long Genre To be fail 3", Date(2005, 5, 2)) + .insert(5, "Movie 5", "Really Long Genre To be fail 4", Date(2005, 5, 2)) + .insert(6, "Movie 6", "Action", Date(2005, 5, 2)) + + val destDbConfig = DbConfig("jdbc:h2:mem:movies_dest", "", "") + val destTable = MoviesTableHavingGenreSize10(destDbConfig) + + Whitelist(sourceDbConfig, destDbConfig) + .table("MOVIES") { + writeBatchSize(2) + allowedErrors(3) + whitelist("MOVIE_ID", "RELEASE_DATE", "GENRE") + anonymize("TITLE").using(FixedString("MY VALUE")) + }.execute(false) + + val records = destTable.findAll() + records.size should beLessThan(3) + + val errors = DataAnonTestLogHandler.records.filter { it.level.intValue() > Level.INFO.intValue() } + errors.size shouldBe 6 + errors[0].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"") + errors[1].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"") + errors[2].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"") + errors[3].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"") + errors[4].message should haveSubstring("Total number of errors occurred is 4 for table MOVIES exceeds allowed 3") + errors[5].message should haveSubstring("Too many errors while processing table MOVIES exceeds 3 hence terminating table processing") + + destTable.close() + sourceTable.close() + } + + } } \ No newline at end of file diff --git a/src/test/kotlin/com/github/dataanon/support/MoviesTableHavingGenreSize10.kt b/src/test/kotlin/com/github/dataanon/support/MoviesTableHavingGenreSize10.kt new file mode 100644 index 0000000..0069892 --- /dev/null +++ b/src/test/kotlin/com/github/dataanon/support/MoviesTableHavingGenreSize10.kt @@ -0,0 +1,50 @@ +package com.github.dataanon.support + +import com.github.dataanon.model.DbConfig +import java.sql.Connection +import java.sql.Date + +class MoviesTableHavingGenreSize10(dbConfig: DbConfig) { + private val conn: Connection = dbConfig.connection() + + init { + conn.createStatement().executeUpdate("DROP TABLE IF EXISTS MOVIES") + val createMovieTable = "CREATE TABLE MOVIES( " + + "MOVIE_ID INT, " + + "TITLE VARCHAR2(255), " + + "GENRE VARCHAR2(10), " + + "RELEASE_DATE DATE, " + + "PRIMARY KEY(MOVIE_ID) )" + conn.createStatement().executeUpdate(createMovieTable) + } + + fun insert(movieId: Int, title: String, genre: String?, releaseDate: Date): MoviesTableHavingGenreSize10 { + val stmt = conn.prepareStatement("INSERT INTO MOVIES(MOVIE_ID,TITLE,GENRE,RELEASE_DATE) VALUES(?,?,?,?)") + stmt.setInt(1, movieId) + stmt.setString(2, title) + stmt.setString(3, genre) + stmt.setDate(4, releaseDate) + stmt.executeUpdate() + stmt.close() + return this + } + + fun findAll(): List> { + val records = mutableListOf>() + val rs = conn.createStatement().executeQuery("SELECT * FROM MOVIES") + while (rs.next()) { + val record = hashMapOf() + record["MOVIE_ID"] = rs.getInt("MOVIE_ID") + record["TITLE"] = rs.getString("TITLE") + record["GENRE"] = rs.getString("GENRE") + record["RELEASE_DATE"] = rs.getDate("RELEASE_DATE") + records.add(record) + } + rs.close() + return records + } + + fun close() { + conn.close() + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/github/dataanon/utils/DataAnonTestLogHandler.kt b/src/test/kotlin/com/github/dataanon/utils/DataAnonTestLogHandler.kt new file mode 100644 index 0000000..6b45f89 --- /dev/null +++ b/src/test/kotlin/com/github/dataanon/utils/DataAnonTestLogHandler.kt @@ -0,0 +1,22 @@ +package com.github.dataanon.utils + +import java.util.logging.Handler +import java.util.logging.LogRecord + +class DataAnonTestLogHandler: Handler() { + companion object { + val records = mutableListOf() + } + + override fun publish(record: LogRecord) { + records.add(record) + } + + override fun flush() { + } + + override fun close() { + } + + +} \ No newline at end of file diff --git a/src/test/resources/logging.properties b/src/test/resources/logging.properties new file mode 100644 index 0000000..1a410e0 --- /dev/null +++ b/src/test/resources/logging.properties @@ -0,0 +1,2 @@ +.level=INFO +handlers=com.github.dataanon.utils.DataAnonTestLogHandler \ No newline at end of file