Skip to content

Commit

Permalink
added support for handling exception during execution, supports upto …
Browse files Browse the repository at this point in the history
…5 batch errors in a table
  • Loading branch information
sunitparekh committed Feb 25, 2018
1 parent 7bf4923 commit 2288206
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 33 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
*.iml
target/**
.DS_Store
.DS_Store
dataanon.log
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.dataanon</groupId>
<artifactId>data-anon</artifactId>
<version>0.9.1</version>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>data-anon</name>
Expand Down Expand Up @@ -128,7 +128,6 @@
<build>
<sourceDirectory>src/main/kotlin</sourceDirectory>
<testSourceDirectory>src/test/kotlin</testSourceDirectory>

<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
Expand Down
12 changes: 9 additions & 3 deletions src/main/kotlin/com/github/dataanon/dsl/Strategy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ import com.github.dataanon.utils.ProgressBarGenerator
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import java.util.concurrent.CountDownLatch

import java.util.logging.LogManager
import java.util.logging.Logger


abstract class Strategy {
private val logger = Logger.getLogger(Strategy::class.java.name)
protected val tables = mutableListOf<Table>()

fun execute(progressBarEnabled: Boolean = true) {
val inputStream = Strategy::class.java.classLoader.getResourceAsStream("logging.properties")
LogManager.getLogManager().readConfiguration(inputStream)
val latch = CountDownLatch(tables.size)

Flux.fromIterable(tables)
Expand All @@ -27,11 +31,13 @@ abstract class Strategy {

private fun executeOnTable(table: Table, progressBarEnabled: Boolean, latch: CountDownLatch) {
try {
val reader = TableReader(sourceDbConfig(), table)
val reader = TableReader(sourceDbConfig(), table)
val progressBar = ProgressBarGenerator(progressBarEnabled, table.name, { reader.totalNoOfRecords() })
val writer = TableWriter(destDbConfig(), table, progressBar)
val writer = TableWriter(destDbConfig(), table, progressBar)

Flux.fromIterable(Iterable { reader }).map { table.execute(it) }.subscribe(writer)
} catch (t: Throwable) {
logger.severe { "Error processing table '${table.name}': ${t.message}" }
} finally {
latch.countDown()
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/kotlin/com/github/dataanon/jdbc/TableReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package com.github.dataanon.jdbc

import com.github.dataanon.model.*
import java.sql.ResultSet
import java.util.logging.Logger

class TableReader(dbConfig: DbConfig, private val table: Table) : Iterator<Record> {
private val logger = Logger.getLogger(TableReader::class.java.name)

private val conn = dbConfig.connection()
private var rs: ResultSet
private var index = 0
Expand All @@ -13,11 +16,11 @@ class TableReader(dbConfig: DbConfig, private val table: Table) : Iterator<Recor
val stmt = conn.createStatement()
if (table.limit >= 1 ) stmt.maxRows = table.limit

println(sql)
logger.info { "READ SQL: $sql" }
rs = stmt.executeQuery(sql)
}

fun totalNoOfRecords(): Int = if (table.limit >= 1) table.limit else getTotalRecords()
fun totalNoOfRecords(): Int = if (table.limit >= 1 && getTotalRecords() > table.limit) table.limit else getTotalRecords()

override fun hasNext(): Boolean = if (rs.next()) true else closeConnection()

Expand Down
78 changes: 53 additions & 25 deletions src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,26 @@ import com.github.dataanon.utils.ProgressBarGenerator
import org.reactivestreams.Subscription
import reactor.core.publisher.BaseSubscriber
import reactor.core.publisher.SignalType
import java.sql.BatchUpdateException
import java.sql.PreparedStatement
import java.sql.Types
import java.util.logging.Level
import java.util.logging.Logger


class TableWriter(dbConfig: DbConfig, private val table: Table, private val progressBar: ProgressBarGenerator) : BaseSubscriber<Record>() {
private val logger = Logger.getLogger(TableWriter::class.java.name)

private val BATCH_COUNT = 1000
private val conn = dbConfig.connection()
private val TOTAL_ALLOWED_BATCH_ERRORS = 5

private val conn = dbConfig.connection()

private lateinit var stmt: PreparedStatement
private lateinit var stmt: PreparedStatement
private lateinit var fields: List<String>

private var batchIndex = 0
private var errorCount = 0

init {
conn.autoCommit = false
Expand All @@ -28,46 +36,66 @@ class TableWriter(dbConfig: DbConfig, private val table: Table, private val prog
override fun hookOnSubscribe(subscription: Subscription?) {
progressBar.start()
val sql = table.generateWriteQuery()
println(sql)
this.stmt = conn.prepareStatement(sql)
logger.info { "WRITE SQL: $sql" }
this.stmt = conn.prepareStatement(sql)
this.fields = table.allColumns()
request(1)
}

override fun hookOnNext(record: Record) {
batchIndex++

fun executeBatch() {
stmt.addBatch()
fields.map { record.find(it) }.forEachIndexed { index, field -> writeToStatement(index, field) }
stmt.addBatch()

if (batchIndex % BATCH_COUNT == 0) {
stmt.executeBatch()
conn.commit()
stmt.clearBatch()
batchIndex = 0
}
if (batchIndex % BATCH_COUNT == 0) {
executeBatchStmt()
}
fun setStatementParameters(record: Record) =
fields.map { record.find(it) }
.forEachIndexed { index, field -> writeToStatement(index, field) }

setStatementParameters(record)
executeBatch()
progressBar.step()

request(1)
}

private fun writeToStatement(index: Int, field: Field<Any>) {
when {
field.isNull() -> stmt.setNull(index + 1, Types.NULL)
else -> stmt.setObject(index + 1, field.newValue)
}
}

private fun executeBatchStmt() {
try {
stmt.executeBatch()
} catch (t: Throwable) {
handleError(t)
} finally {
conn.commit()
stmt.clearBatch()
batchIndex = 0
}
}


private fun handleError(t: Throwable) {
errorCount++
if (t is BatchUpdateException) {
logger.severe { "BatchUpdateException update counts: ${t.updateCounts}" }
t.forEach { logger.log(Level.SEVERE, "Individual error messages in BatchUpdateException: ${it.message}", it) }
} else {
logger.log(Level.SEVERE, "Error executing batch record: ${t.message}", t)
}
if (errorCount > TOTAL_ALLOWED_BATCH_ERRORS)
throw Exception("Too many error while processing table ${table.name}, terminating table processing.")
}

override fun hookOnComplete() {
executeBatchStmt()
}

override fun hookFinally(type: SignalType?) {
progressBar.stop()
stmt.executeBatch()
conn.commit()
stmt.clearBatch()
stmt.close()
conn.close()
}

private fun writeToStatement(index: Int, field: Field<Any>) {
if ( field.isNull() ) stmt.setNull (index + 1, Types.NULL)
else stmt.setObject(index + 1, field.newValue)
}
}
7 changes: 7 additions & 0 deletions src/main/resources/logging.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.level=INFO
handlers=java.util.logging.FileHandler

java.util.logging.FileHandler.pattern=dataanon.log

java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL [%4$-7s] [%2$s] %5$s %6$s%n

0 comments on commit 2288206

Please sign in to comment.