Skip to content

Commit

Permalink
completed error handling with default stop at 1000 errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sunitparekh committed Mar 2, 2018
1 parent 8b70dfd commit aa594ff
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 41 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.version>1.2.21</kotlin.version>
<kotlin.version>1.2.30</kotlin.version>
<kotlin.test.version>2.0.7</kotlin.test.version>
<version.jacoco>0.8.0</version.jacoco>
</properties>
Expand Down Expand Up @@ -82,7 +82,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>RELEASE</version>
<version>3.1.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/com/github/dataanon/dsl/Strategy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.github.dataanon.utils.ProgressBarGenerator
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import java.util.concurrent.CountDownLatch
import java.util.logging.Level
import java.util.logging.LogManager
import java.util.logging.Logger

Expand Down Expand Up @@ -37,7 +38,7 @@ abstract class Strategy {

Flux.fromIterable(Iterable { reader }).map { table.execute(it) }.subscribe(writer)
} catch (t: Throwable) {
logger.severe { "Error processing table '${table.name}': ${t.message}" }
logger.log(Level.SEVERE,"Error processing table '${table.name}': ${t.message}",t)
} finally {
latch.countDown()
}
Expand Down
34 changes: 17 additions & 17 deletions src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,39 @@ import java.util.logging.Logger
class TableWriter(dbConfig: DbConfig, private val table: Table, private val progressBar: ProgressBarGenerator) : BaseSubscriber<Record>() {
private val logger = Logger.getLogger(TableWriter::class.java.name)

private val BATCH_COUNT = 1000
private val TOTAL_ALLOWED_ERRORS = 1000

private val conn = dbConfig.connection()

private lateinit var stmt: PreparedStatement
private lateinit var fields: List<String>
private var stmt: PreparedStatement
private var fields: List<String>

private var batchIndex = 0
private var errorCount = 0

init {
conn.autoCommit = false
}

override fun hookOnSubscribe(subscription: Subscription?) {
progressBar.start()
val sql = table.generateWriteQuery()
logger.info { "WRITE SQL: $sql" }
this.stmt = conn.prepareStatement(sql)
this.fields = table.allColumns()
}

override fun hookOnSubscribe(subscription: Subscription?) {
request(1)
}

override fun hookOnNext(record: Record) {
override fun hookOnError(throwable: Throwable) {
throw throwable
}

override fun hookOnNext(record: Record) {
fields.map { record.find(it) }.forEachIndexed { index, field -> writeToStatement(index, field) }

stmt.addBatch()
batchIndex++
progressBar.step()

if (batchIndex % BATCH_COUNT == 0) executeBatchStmt()
if (batchIndex % table.batchSize == 0) executeBatchStmt()

request(1)
}
Expand Down Expand Up @@ -84,11 +84,11 @@ class TableWriter(dbConfig: DbConfig, private val table: Table, private val prog
}
} else {
errorCount++
logger.log(Level.SEVERE, "Error executing batch record: ${t.message}", t)
logger.log(Level.SEVERE, "Error executing batch: ${t.message}", t)
}
if (errorCount > TOTAL_ALLOWED_ERRORS) {
logger.severe { "Total number of errors occurred is $errorCount for table ${table.name} exceeds allowed $TOTAL_ALLOWED_ERRORS." }
throw Exception("Too many errors while processing table ${table.name} exceeds $TOTAL_ALLOWED_ERRORS hence terminating table processing.")
if (errorCount > table.allowedErrors) {
logger.severe { "Total number of errors occurred is $errorCount for table ${table.name} exceeds allowed ${table.allowedErrors} ." }
throw Exception("Too many errors while processing table ${table.name} exceeds ${table.allowedErrors} hence terminating table processing.")
}
}

Expand All @@ -98,9 +98,9 @@ class TableWriter(dbConfig: DbConfig, private val table: Table, private val prog
}

override fun hookFinally(type: SignalType?) {
progressBar.stop()
stmt.close()
conn.close()
progressBar?.stop()
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}

}
12 changes: 12 additions & 0 deletions src/main/kotlin/com/github/dataanon/model/Table.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ abstract class Table(val name: String) {
private val columnStrategyContainer = mutableMapOf<String, ColumnStrategy>()
internal var whereCondition = ""
internal var limit = -1
internal var allowedErrors = 1000
internal var batchSize = 1000

fun anonymize(columnName: String): ColumnStrategy {
val columnStrategy = ColumnStrategy()
Expand All @@ -25,6 +27,16 @@ abstract class Table(val name: String) {
return this
}

fun writeBatchSize(size: Int): Table {
this.batchSize = size
return this
}

fun allowedErrors(count: Int): Table {
this.allowedErrors = count
return this
}

protected fun columnNames() = columnStrategyContainer.keys.toList()

internal fun execute(record: Record): Record {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import com.github.dataanon.dsl.Whitelist
import com.github.dataanon.model.DbConfig
import com.github.dataanon.strategy.string.FixedString
import com.github.dataanon.support.MoviesTable
import com.github.dataanon.support.MoviesTableHavingGenreSize10
import com.github.dataanon.support.RatingsTable
import com.github.dataanon.utils.DataAnonTestLogHandler
import io.kotlintest.matchers.*
import io.kotlintest.specs.FunSpec
import java.sql.Date
import java.util.regex.Pattern
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import java.util.logging.Level

class SpecialFeaturesIntegrationTest : FunSpec() {

Expand All @@ -29,15 +31,17 @@ class SpecialFeaturesIntegrationTest : FunSpec() {
whitelist("MOVIE_ID", "RELEASE_DATE")
anonymize("TITLE").using(FixedString("MY VALUE"))
anonymize("GENRE").using(FixedString("Action"))
}.execute(progressBarEnabled = false)
}.execute(false)

val records = destTable.findAll()

assertEquals(1, records.size)
assertEquals(1, records[0]["MOVIE_ID"])
assertEquals("MY VALUE", records[0]["TITLE"])
assertEquals(Date(1999, 5, 2), records[0]["RELEASE_DATE"])
assertTrue(Pattern.compile("[a-zA-Z]+").matcher(records[0]["GENRE"].toString()).matches())
records.size shouldBe 1

val anonymizedRecord = records[0]
anonymizedRecord["MOVIE_ID"] shouldBe 1
anonymizedRecord["TITLE"] shouldBe "MY VALUE"
anonymizedRecord["RELEASE_DATE"] shouldBe Date(1999, 5, 2)
anonymizedRecord["GENRE"].toString() should match("[a-zA-Z]+")

sourceTable.close()
destTable.close()
Expand All @@ -52,15 +56,17 @@ class SpecialFeaturesIntegrationTest : FunSpec() {
.table("MOVIES", listOf("MOVIE_ID")) {
anonymize("TITLE").using(FixedString("MY VALUE"))
anonymize("GENRE")
}.execute(progressBarEnabled = true)
}.execute(true)

val records = moviesTable.findAll()

assertEquals(1, records.size)
assertEquals(1, records[0]["MOVIE_ID"])
assertEquals("MY VALUE", records[0]["TITLE"])
assertEquals(Date(1999, 5, 2), records[0]["RELEASE_DATE"])
assertTrue(Pattern.compile("[a-zA-Z]+").matcher(records[0]["GENRE"].toString()).matches())
records.size shouldBe 1

val anonymizedRecord = records[0]
anonymizedRecord["MOVIE_ID"] shouldBe 1
anonymizedRecord["TITLE"] shouldBe "MY VALUE"
anonymizedRecord["RELEASE_DATE"] shouldBe Date(1999, 5, 2)
anonymizedRecord["GENRE"].toString() should match("[a-zA-Z]+")

moviesTable.close()
}
Expand All @@ -80,19 +86,111 @@ class SpecialFeaturesIntegrationTest : FunSpec() {
whitelist("MOVIE_ID", "RELEASE_DATE")
anonymize("TITLE").using(FixedString("MY VALUE"))
anonymize("GENRE").using(FixedString("Action"))
}.execute(progressBarEnabled = false)
}.execute(false)

val records = destTable.findAll()

assertEquals(1, records.size)
records.size shouldBe 1

val anonymizedRecord = records[0]
assertEquals(1, anonymizedRecord["MOVIE_ID"])
assertEquals("MY VALUE", anonymizedRecord["TITLE"])
assertEquals(Date(1999, 5, 2), anonymizedRecord["RELEASE_DATE"])
assertTrue(Pattern.compile("[a-zA-Z]+").matcher(anonymizedRecord["GENRE"].toString()).matches())
anonymizedRecord["MOVIE_ID"] shouldBe 1
anonymizedRecord["TITLE"] shouldBe "MY VALUE"
anonymizedRecord["RELEASE_DATE"] shouldBe Date(1999, 5, 2)
anonymizedRecord["GENRE"].toString() should match("[a-zA-Z]+")

sourceTable.close()
destTable.close()
}

test("error handling in case of destination table doesn't exists") {
DataAnonTestLogHandler.records.clear()

val sourceDbConfig = DbConfig("jdbc:h2:mem:movies", "", "")
val sourceTable = MoviesTable(sourceDbConfig)
.insert(1, "Movie 1", "Drama", Date(1999, 5, 2))
.insert(2, "Movie 2", "Action", Date(2005, 5, 2))

val destDbConfig = DbConfig("jdbc:h2:mem:movies_new", "", "")

Whitelist(sourceDbConfig, destDbConfig)
.table("MOVIES") {
whitelist("MOVIE_ID", "RELEASE_DATE")
anonymize("TITLE").using(FixedString("MY VALUE"))
anonymize("GENRE").using(FixedString("Action"))
}.execute(false)

val errors = DataAnonTestLogHandler.records.filter { it.level.intValue() > Level.INFO.intValue() }
errors.size shouldBe 1
errors[0].message should haveSubstring("Table \"MOVIES\" not found")

sourceTable.close()
}


test("error handling in case of source table doesn't exists") {
DataAnonTestLogHandler.records.clear()

val sourceDbConfig = DbConfig("jdbc:h2:mem:movies_source", "", "")

val destDbConfig = DbConfig("jdbc:h2:mem:movies_dest", "", "")
val destTable = MoviesTable(destDbConfig)

Whitelist(sourceDbConfig, destDbConfig)
.table("MOVIES") {
whitelist("MOVIE_ID", "RELEASE_DATE")
anonymize("TITLE").using(FixedString("MY VALUE"))
anonymize("GENRE").using(FixedString("Action"))
}.execute(false)

val records = destTable.findAll()
records.size shouldBe 0

val errors = DataAnonTestLogHandler.records.filter { it.level.intValue() > Level.INFO.intValue() }
errors.size shouldBe 1
errors[0].message should haveSubstring("Table \"MOVIES\" not found")

destTable.close()
}

test("error handling in case of INSERT statement error exceeds allowed errors") {
DataAnonTestLogHandler.records.clear()

val sourceDbConfig = DbConfig("jdbc:h2:mem:movies", "", "")
val sourceTable = MoviesTable(sourceDbConfig)
.insert(1, "Movie 1", "Really Long Genre To be fail 1", Date(1999, 5, 2))
.insert(2, "Movie 2", "Really Long Genre To be fail 2", Date(2005, 5, 2))
.insert(3, "Movie 3", "Action", Date(2005, 5, 2))
.insert(4, "Movie 4", "Really Long Genre To be fail 3", Date(2005, 5, 2))
.insert(5, "Movie 5", "Really Long Genre To be fail 4", Date(2005, 5, 2))
.insert(6, "Movie 6", "Action", Date(2005, 5, 2))

val destDbConfig = DbConfig("jdbc:h2:mem:movies_dest", "", "")
val destTable = MoviesTableHavingGenreSize10(destDbConfig)

Whitelist(sourceDbConfig, destDbConfig)
.table("MOVIES") {
writeBatchSize(2)
allowedErrors(3)
whitelist("MOVIE_ID", "RELEASE_DATE", "GENRE")
anonymize("TITLE").using(FixedString("MY VALUE"))
}.execute(false)

val records = destTable.findAll()
records.size should beLessThan(3)

val errors = DataAnonTestLogHandler.records.filter { it.level.intValue() > Level.INFO.intValue() }
errors.size shouldBe 6
errors[0].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"")
errors[1].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"")
errors[2].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"")
errors[3].message should haveSubstring("Value too long for column \"GENRE VARCHAR2(10)\"")
errors[4].message should haveSubstring("Total number of errors occurred is 4 for table MOVIES exceeds allowed 3")
errors[5].message should haveSubstring("Too many errors while processing table MOVIES exceeds 3 hence terminating table processing")

destTable.close()
sourceTable.close()
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.github.dataanon.support

import com.github.dataanon.model.DbConfig
import java.sql.Connection
import java.sql.Date

class MoviesTableHavingGenreSize10(dbConfig: DbConfig) {
private val conn: Connection = dbConfig.connection()

init {
conn.createStatement().executeUpdate("DROP TABLE IF EXISTS MOVIES")
val createMovieTable = "CREATE TABLE MOVIES( " +
"MOVIE_ID INT, " +
"TITLE VARCHAR2(255), " +
"GENRE VARCHAR2(10), " +
"RELEASE_DATE DATE, " +
"PRIMARY KEY(MOVIE_ID) )"
conn.createStatement().executeUpdate(createMovieTable)
}

fun insert(movieId: Int, title: String, genre: String?, releaseDate: Date): MoviesTableHavingGenreSize10 {
val stmt = conn.prepareStatement("INSERT INTO MOVIES(MOVIE_ID,TITLE,GENRE,RELEASE_DATE) VALUES(?,?,?,?)")
stmt.setInt(1, movieId)
stmt.setString(2, title)
stmt.setString(3, genre)
stmt.setDate(4, releaseDate)
stmt.executeUpdate()
stmt.close()
return this
}

fun findAll(): List<Map<String, Any?>> {
val records = mutableListOf<Map<String, Any?>>()
val rs = conn.createStatement().executeQuery("SELECT * FROM MOVIES")
while (rs.next()) {
val record = hashMapOf<String, Any?>()
record["MOVIE_ID"] = rs.getInt("MOVIE_ID")
record["TITLE"] = rs.getString("TITLE")
record["GENRE"] = rs.getString("GENRE")
record["RELEASE_DATE"] = rs.getDate("RELEASE_DATE")
records.add(record)
}
rs.close()
return records
}

fun close() {
conn.close()
}
}
Loading

0 comments on commit aa594ff

Please sign in to comment.