-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCoordinator.scala
58 lines (50 loc) · 1.64 KB
/
Coordinator.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package TwoPhaseCommit
import org.apache.zookeeper.common.Time
import org.apache.zookeeper._
import java.util.concurrent.TimeUnit
import scala.util.Random
case class Coordinator(host:String, root:String, n_workers:Integer) extends Watcher {
val zk = new ZooKeeper(host, 3000, this)
val coordinatorPath = root + "/coordinator"
zk.create(coordinatorPath,
Array.emptyByteArray,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL
)
override def process(event: WatchedEvent): Unit = {
mutex.synchronized {
while(true) {
val workers = zk.getChildren(coordinatorPath, this)
if (workers.size == n_workers) {
println("All workers voted")
var commits = 0
var aborts = 0
for (i <- 0 until n_workers) {
val w = workers.get(i)
val data = new String(zk.getData(s"$coordinatorPath/worker_$i", false, null))
if (data == "commit") commits += 1
else if (data == "abort") aborts += 1
}
val decision = if (commits > aborts) "commit" else "abort"
for (i <- 0 until n_workers) {
val w = workers.get(i)
zk.setData(s"$coordinatorPath/worker_$i", decision.getBytes, -1)
}
println(decision)
while (true) {
val workers = zk.getChildren(coordinatorPath, this)
if (workers.size == 0) {
zk.delete(coordinatorPath, -1)
zk.close()
return
} else {
TimeUnit.SECONDS.sleep(5)
}
}
} else {
println(s"register nodes $workers")
}
}
}
}
}