Skip to content

Commit

Permalink
refactored table writer code for blacklist and whitelist
Browse files Browse the repository at this point in the history
  • Loading branch information
sunitparekh committed Nov 6, 2017
1 parent e85a1b8 commit 00fcc1e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 64 deletions.
41 changes: 9 additions & 32 deletions src/main/kotlin/com/github/dataanon/jdbc/BlacklistTableWriter.kt
Original file line number Diff line number Diff line change
@@ -1,46 +1,23 @@
package com.github.dataanon.jdbc

import com.github.dataanon.Columns
import com.github.dataanon.Record
import org.reactivestreams.Subscription
import reactor.core.publisher.BaseSubscriber
import reactor.core.publisher.SignalType
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement

class BlacklistTableWriter(dbConfig: Map<String, Any>, tableName: String, private val columns: Columns, private val primaryKey: Array<String>) : BaseSubscriber<Record>() {
private var conn: Connection = DriverManager.getConnection(dbConfig["url"] as String, dbConfig["user"] as String, dbConfig["password"] as String)
private val stmt: PreparedStatement
class BlacklistTableWriter(dbConfig: Map<String, Any>, tableName: String,
private val columns: Columns, private val primaryKey: Array<String>) :
TableWriter(dbConfig,tableName) {

init {
override fun buildPreparedStatement(): String {
val sql = StringBuffer("UPDATE $tableName SET ")
sql.append(columns.joinToString(", ") { c -> " ${c.name} = ? " })
sql.append(" WHERE ")
sql.append(primaryKey.joinToString(" AND ") { k -> " $k = ? " })
println(sql)
stmt = conn.prepareStatement(sql.toString())
return sql.toString()
}

override fun hookOnSubscribe(subscription: Subscription?) {
request(1)
override fun orderedFieldsInStmt(): List<String> {
val fields = columns.map { c -> c.name }.toMutableList()
primaryKey.forEach { p -> fields.add(p) }
return fields
}

override fun hookOnNext(record: Record) {
columns.forEachIndexed { i, c ->
val field = record.find(c.name)
stmt.setObject(i+1, field.newValue)
}
primaryKey.forEachIndexed { i, p ->
val field = record.find(p)
stmt.setObject(columns.size + i + 1, field.newValue)
}
stmt.executeUpdate()
request(1)
}

override fun hookFinally(type: SignalType?) {
stmt.close()
conn.close()
}
}
41 changes: 41 additions & 0 deletions src/main/kotlin/com/github/dataanon/jdbc/TableWriter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.dataanon.jdbc

import com.github.dataanon.Record
import org.reactivestreams.Subscription
import reactor.core.publisher.BaseSubscriber
import reactor.core.publisher.SignalType
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement

abstract class TableWriter(dbConfig: Map<String, Any>, protected val tableName: String) : BaseSubscriber<Record>() {
private var conn: Connection = DriverManager.getConnection(dbConfig["url"] as String, dbConfig["user"] as String, dbConfig["password"] as String)
private lateinit var stmt: PreparedStatement
private lateinit var fields: List<String>

abstract fun buildPreparedStatement(): String

override fun hookOnSubscribe(subscription: Subscription?) {
val sql = buildPreparedStatement()
println(sql)
this.stmt = conn.prepareStatement(sql)
this.fields = orderedFieldsInStmt()
request(1)
}

override fun hookOnNext(record: Record) {
fields.forEachIndexed { i, f ->
val field = record.find(f)
stmt.setObject(i + 1, field.newValue)
}
stmt.executeUpdate()
request(1)
}

override fun hookFinally(type: SignalType?) {
stmt.close()
conn.close()
}

abstract fun orderedFieldsInStmt(): List<String>
}
41 changes: 9 additions & 32 deletions src/main/kotlin/com/github/dataanon/jdbc/WhitelistTableWriter.kt
Original file line number Diff line number Diff line change
@@ -1,49 +1,26 @@
package com.github.dataanon.jdbc

import com.github.dataanon.Columns
import com.github.dataanon.Record
import org.reactivestreams.Subscription
import reactor.core.publisher.BaseSubscriber
import reactor.core.publisher.SignalType
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement

class WhitelistTableWriter(dbConfig: Map<String, Any>, tableName: String, private val columns: Columns, private val whitelist: Array<String>) : BaseSubscriber<Record>() {
private var conn: Connection = DriverManager.getConnection(dbConfig["url"] as String, dbConfig["user"] as String, dbConfig["password"] as String)
private val stmt: PreparedStatement
class WhitelistTableWriter(dbConfig: Map<String, Any>, tableName: String,
private val columns: Columns, private val whitelist: Array<String>) :
TableWriter(dbConfig,tableName) {

init {
override fun buildPreparedStatement(): String {
val sql = StringBuffer("INSERT INTO $tableName(")
sql.append(whitelist.joinToString(", ")).append(", ")
sql.append(columns.joinToString(", ") { c -> c.name })
sql.append(") VALUES(")
sql.append(whitelist.joinToString(",") { "?" }).append(",")
sql.append(columns.joinToString(",") { "?" })
sql.append(")")
println(sql)
stmt = conn.prepareStatement(sql.toString())
return sql.toString()
}

override fun hookOnSubscribe(subscription: Subscription?) {
request(1)
override fun orderedFieldsInStmt(): List<String> {
val fields = whitelist.toMutableList()
columns.forEach { c -> fields.add(c.name) }
return fields
}

override fun hookOnNext(record: Record) {
whitelist.forEachIndexed { i, p ->
val field = record.find(p)
stmt.setObject(i + 1, field.newValue)
}
columns.forEachIndexed { i, c ->
val field = record.find(c.name)
stmt.setObject(whitelist.size + i+1, field.newValue)
}
stmt.executeUpdate()
request(1)
}

override fun hookFinally(type: SignalType?) {
stmt.close()
conn.close()
}
}

0 comments on commit 00fcc1e

Please sign in to comment.