Skip to content

Commit

Permalink
added progress bar and batch support for better performance of INSERT…
Browse files Browse the repository at this point in the history
…/UPDATE
  • Loading branch information
sunitparekh committed Nov 6, 2017
1 parent 00fcc1e commit 8333c35
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 12 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
<artifactId>kotlin-stdlib</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>me.tongfei</groupId>
<artifactId>progressbar</artifactId>
<version>0.5.5</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit</artifactId>
Expand Down
5 changes: 3 additions & 2 deletions src/main/kotlin/com/github/dataanon/dsl/Blacklist.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import reactor.core.publisher.Flux
class Blacklist(private val dbConfig: Map<String, String>): Strategy() {

override fun execute() {
Flux.fromIterable(Iterable { TableReader(dbConfig, tableName, anonymizer.columns, anonymizer.primaryKey) })
val reader = TableReader(dbConfig, tableName, anonymizer.columns, anonymizer.primaryKey)
Flux.fromIterable(Iterable { reader })
.map(this::anonymize)
.subscribe(BlacklistTableWriter(dbConfig, tableName, anonymizer.columns, anonymizer.primaryKey))
.subscribe(BlacklistTableWriter(dbConfig, tableName, reader.totalNoOfRecords(), anonymizer.columns, anonymizer.primaryKey))
}


Expand Down
5 changes: 3 additions & 2 deletions src/main/kotlin/com/github/dataanon/dsl/Whitelist.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import reactor.core.publisher.Flux
class Whitelist(private val sourceDbConfig: Map<String, Any>, private val destDbConfig: Map<String, Any>) : Strategy() {

override fun execute() {
Flux.fromIterable(Iterable { TableReader(sourceDbConfig, tableName, anonymizer.columns, anonymizer.whitelist) })
val reader = TableReader(sourceDbConfig, tableName, anonymizer.columns, anonymizer.whitelist)
Flux.fromIterable(Iterable { reader })
.map(this::anonymize)
.subscribe(WhitelistTableWriter(destDbConfig, tableName, anonymizer.columns, anonymizer.whitelist))
.subscribe(WhitelistTableWriter(destDbConfig, tableName, reader.totalNoOfRecords(), anonymizer.columns, anonymizer.whitelist))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package com.github.dataanon.jdbc

import com.github.dataanon.Columns

class BlacklistTableWriter(dbConfig: Map<String, Any>, tableName: String,
class BlacklistTableWriter(dbConfig: Map<String, Any>, tableName: String, totalNoOfRecords: Long,
private val columns: Columns, private val primaryKey: Array<String>) :
TableWriter(dbConfig,tableName) {
TableWriter(dbConfig, tableName, totalNoOfRecords) {

override fun buildPreparedStatement(): String {
val sql = StringBuffer("UPDATE $tableName SET ")
Expand Down
12 changes: 11 additions & 1 deletion src/main/kotlin/com/github/dataanon/jdbc/TableReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet

class TableReader(dbConfig: Map<String, Any>, tableName: String, private val columns: Columns, private val whitelist: Array<String>) : Iterator<Record> {
class TableReader(protected val dbConfig: Map<String, Any>, protected val tableName: String, private val columns: Columns, private val whitelist: Array<String>) : Iterator<Record> {
private var conn: Connection = DriverManager.getConnection(dbConfig["url"] as String, dbConfig["user"] as String, dbConfig["password"] as String)
private var rs: ResultSet
private var index = 0
Expand All @@ -22,6 +22,16 @@ class TableReader(dbConfig: Map<String, Any>, tableName: String, private val col
rs = stmt.executeQuery(sql)
}

fun totalNoOfRecords(): Long {
if (dbConfig.containsKey("limit")) return dbConfig["limit"] as Long

val rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM $tableName")
rs.next()
val count = rs.getLong(1)
rs.close()
return count
}

override fun hasNext(): Boolean {
val isNext = rs.next()
if (!isNext) {
Expand Down
26 changes: 24 additions & 2 deletions src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
package com.github.dataanon.jdbc

import com.github.dataanon.Record
import me.tongfei.progressbar.ProgressBar
import me.tongfei.progressbar.ProgressBarStyle
import org.reactivestreams.Subscription
import reactor.core.publisher.BaseSubscriber
import reactor.core.publisher.SignalType
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement

abstract class TableWriter(dbConfig: Map<String, Any>, protected val tableName: String) : BaseSubscriber<Record>() {

abstract class TableWriter(dbConfig: Map<String, Any>, protected val tableName: String, totalNoOfRecords: Long) : BaseSubscriber<Record>() {
private var conn: Connection = DriverManager.getConnection(dbConfig["url"] as String, dbConfig["user"] as String, dbConfig["password"] as String)
private lateinit var stmt: PreparedStatement
private lateinit var fields: List<String>
val pb = ProgressBar(tableName, totalNoOfRecords, ProgressBarStyle.ASCII)
var batchIndex = 0

init {
conn.autoCommit = false
}

abstract fun buildPreparedStatement(): String

override fun hookOnSubscribe(subscription: Subscription?) {
pb.start()
val sql = buildPreparedStatement()
println(sql)
this.stmt = conn.prepareStatement(sql)
Expand All @@ -24,15 +34,27 @@ abstract class TableWriter(dbConfig: Map<String, Any>, protected val tableName:
}

override fun hookOnNext(record: Record) {
batchIndex++
fields.forEachIndexed { i, f ->
val field = record.find(f)
stmt.setObject(i + 1, field.newValue)
}
stmt.executeUpdate()
stmt.addBatch()
if (batchIndex % 1000 == 0) {
stmt.executeBatch()
conn.commit()
stmt.clearBatch()
batchIndex = 0
}
pb.step()
request(1)
}

override fun hookFinally(type: SignalType?) {
pb.stop()
stmt.executeBatch()
conn.commit()
stmt.clearBatch()
stmt.close()
conn.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package com.github.dataanon.jdbc

import com.github.dataanon.Columns

class WhitelistTableWriter(dbConfig: Map<String, Any>, tableName: String,
class WhitelistTableWriter(dbConfig: Map<String, Any>, tableName: String, totalNoOfRecords: Long,
private val columns: Columns, private val whitelist: Array<String>) :
TableWriter(dbConfig,tableName) {
TableWriter(dbConfig, tableName, totalNoOfRecords) {

override fun buildPreparedStatement(): String {
val sql = StringBuffer("INSERT INTO $tableName(")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.github.dataanon.strategies.DefaultDoubleStrategy
*/
fun main(args: Array<String>) {
val source = hashMapOf("url" to "jdbc:postgresql://localhost:5432/movies", "user" to "sunitparekh", "password" to ""
, "limit" to 100000
, "limit" to 1000000L
)
val dest = hashMapOf("url" to "jdbc:postgresql://localhost:5432/moviesdest", "user" to "sunitparekh", "password" to "")

Expand Down

0 comments on commit 8333c35

Please sign in to comment.