Skip to content

Commit

Permalink
fixed issue with threads not waiting using CountDownLatch
Browse files Browse the repository at this point in the history
  • Loading branch information
sunitparekh committed Feb 24, 2018
1 parent faed96c commit c80d592
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Sample Maven based project are available at...
* [Java](https://github.com/dataanon/dataanon-java-sample)

----------------------
## Tips
## Notes

1. In Whitelist approach provide source database connection user with READONLY access.
2. Use `where` and `limit` to limit the number of rows during anonymization. Very useful for testing purpose.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.dataanon</groupId>
<artifactId>data-anon</artifactId>
<version>0.9.0</version>
<version>0.9.1</version>
<packaging>jar</packaging>

<name>data-anon</name>
Expand Down
24 changes: 16 additions & 8 deletions src/main/kotlin/com/github/dataanon/dsl/Strategy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,32 @@ import com.github.dataanon.model.Table
import com.github.dataanon.utils.ProgressBarGenerator
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import java.util.concurrent.CountDownLatch



abstract class Strategy {
protected val tables = mutableListOf<Table>()

fun execute(progressBarEnabled: Boolean = true) {
val latch = CountDownLatch(tables.size)
Flux.fromIterable(tables)
.parallel()
.parallel(tables.size)
.runOn(Schedulers.parallel())
.log()
.subscribe { table -> executeOnTable(table, progressBarEnabled) }
.subscribe { table -> executeOnTable(table, progressBarEnabled, latch) }
latch.await()
}

private fun executeOnTable(table: Table, progressBarEnabled: Boolean) {
val reader = TableReader(sourceDbConfig(), table)
val progressBar = ProgressBarGenerator(progressBarEnabled, table.name, { reader.totalNoOfRecords() })
val writer = TableWriter(destDbConfig(), table, progressBar)
private fun executeOnTable(table: Table, progressBarEnabled: Boolean, latch: CountDownLatch) {
try {
val reader = TableReader(sourceDbConfig(), table)
val progressBar = ProgressBarGenerator(progressBarEnabled, table.name, { reader.totalNoOfRecords() })
val writer = TableWriter(destDbConfig(), table, progressBar)

Flux.fromIterable(Iterable { reader }).map { table.execute(it) }.subscribe(writer)
Flux.fromIterable(Iterable { reader }).map { table.execute(it) }.subscribe(writer)
} finally {
latch.countDown()
}
}

abstract protected fun sourceDbConfig(): DbConfig
Expand Down

0 comments on commit c80d592

Please sign in to comment.