Skip to content

Commit

Permalink
feat: Supports event publishing and handling (#60)
Browse files Browse the repository at this point in the history
* feat: Add event field to transaction

* build: sync idl repository
  • Loading branch information
devxb authored Mar 2, 2024
1 parent 80121c3 commit 1cb3b95
Show file tree
Hide file tree
Showing 14 changed files with 390 additions and 79 deletions.
2 changes: 1 addition & 1 deletion idl
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ class TransactionCommitEvent(
transactionId: String,
nodeName: String,
group: String,
): TransactionEvent(transactionId, nodeName, group)
event: String?,
codec: Codec,
): TransactionEvent(transactionId, nodeName, group, event, codec)
14 changes: 12 additions & 2 deletions src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package org.rooftop.netx.api

abstract class TransactionEvent(
import kotlin.reflect.KClass

sealed class TransactionEvent(
val transactionId: String,
val nodeName: String,
val group: String,
)
private val event: String?,
private val codec: Codec,
) {
fun <T : Any> decodeEvent(type: KClass<T>): T =
codec.decode(
event ?: throw NullPointerException("Cannot decode event cause event is null"),
type
)
}
4 changes: 3 additions & 1 deletion src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ class TransactionJoinEvent(
transactionId: String,
nodeName: String,
group: String,
): TransactionEvent(transactionId, nodeName, group)
event: String?,
codec: Codec,
): TransactionEvent(transactionId, nodeName, group, event, codec)
16 changes: 16 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,38 @@ interface TransactionManager {

fun <T> start(undo: T): Mono<String>

fun <T, S> start(undo: T, event: S): Mono<String>

fun <T> syncStart(undo: T): String

fun <T, S> syncStart(undo: T, event: S): String

fun <T> join(transactionId: String, undo: T): Mono<String>

fun <T, S> join(transactionId: String, undo: T, event: S): Mono<String>

fun <T> syncJoin(transactionId: String, undo: T): String

fun <T, S> syncJoin(transactionId: String, undo: T, event: S): String

fun exists(transactionId: String): Mono<String>

fun syncExists(transactionId: String): String

fun commit(transactionId: String): Mono<String>

fun <T> commit(transactionId: String, event: T): Mono<String>

fun syncCommit(transactionId: String): String

fun <T> syncCommit(transactionId: String, event: T): String

fun rollback(transactionId: String, cause: String): Mono<String>

fun <T> rollback(transactionId: String, cause: String, event: T): Mono<String>

fun syncRollback(transactionId: String, cause: String): String

fun <T> syncRollback(transactionId: String, cause: String, event: T): String

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ class TransactionRollbackEvent(
transactionId: String,
nodeName: String,
group: String,
event: String?,
val cause: String?,
private val codec: Codec,
private val undo: String,
) : TransactionEvent(transactionId, nodeName, group) {
private val codec: Codec,
) : TransactionEvent(transactionId, nodeName, group, event, codec) {

fun <T : Any> decodeUndo(type: KClass<T>): T = codec.decode(undo, type)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ class TransactionStartEvent(
transactionId: String,
nodeName: String,
group: String,
) : TransactionEvent(transactionId, nodeName, group)
event: String?,
codec: Codec,
) : TransactionEvent(transactionId, nodeName, group, event, codec)
Original file line number Diff line number Diff line change
Expand Up @@ -94,45 +94,62 @@ abstract class AbstractTransactionDispatcher(
return when (transaction.state) {
TransactionState.TRANSACTION_STATE_START -> Mono.just(
TransactionStartEvent(
transaction.id,
transaction.serverId,
transaction.group
transactionId = transaction.id,
nodeName = transaction.serverId,
group = transaction.group,
event = extractEvent(transaction),
codec = codec,
)
)

TransactionState.TRANSACTION_STATE_COMMIT -> Mono.just(
TransactionCommitEvent(
transaction.id,
transaction.serverId,
transaction.group,
transactionId = transaction.id,
nodeName = transaction.serverId,
group = transaction.group,
event = extractEvent(transaction),
codec = codec
)
)

TransactionState.TRANSACTION_STATE_JOIN -> Mono.just(
TransactionJoinEvent(
transaction.id,
transaction.serverId,
transaction.group,
transactionId = transaction.id,
nodeName = transaction.serverId,
group = transaction.group,
event = extractEvent(transaction),
codec = codec,
)
)

TransactionState.TRANSACTION_STATE_ROLLBACK -> findOwnUndo(transaction)
.warningOnError("Error occurred when findOwnUndo transaction \n{\n$transaction}")
.map {
TransactionRollbackEvent(
transaction.id,
transaction.serverId,
transaction.group,
transaction.cause,
codec,
it,
transactionId = transaction.id,
nodeName = transaction.serverId,
group = transaction.group,
event = extractEvent(transaction),
cause = when (transaction.hasCause()) {
true -> transaction.cause
false -> null
},
undo = it,
codec = codec,
)
}

else -> throw cannotFindMatchedTransactionEventException
}
}

private fun extractEvent(transaction: Transaction): String? {
return when (transaction.hasEvent()) {
true -> transaction.event
false -> null
}
}

protected abstract fun findOwnUndo(transaction: Transaction): Mono<String>

@PostConstruct
Expand Down
129 changes: 104 additions & 25 deletions src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.rooftop.netx.engine

import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.infoOnError
import org.rooftop.netx.engine.logging.warningOnError
Expand All @@ -19,14 +19,23 @@ abstract class AbstractTransactionManager(
) : TransactionManager {

final override fun <T> syncStart(undo: T): String {
return start(undo).block() ?: error("Cannot start transaction")
return start(undo).block() ?: error("Cannot start transaction \"$undo\"")
}

override fun <T, S> syncStart(undo: T, event: S): String {
return start(undo, event).block() ?: error("Cannot start transaction \"$undo\" \"$event\"")
}

final override fun <T> syncJoin(transactionId: String, undo: T): String {
return join(transactionId, undo).block()
?: error("Cannot join transaction \"$transactionId\", \"$undo\"")
}

override fun <T, S> syncJoin(transactionId: String, undo: T, event: S): String {
return join(transactionId, undo, event).block()
?: error("Cannot join transaction \"$transactionId\", \"$undo\", \"$event\"")
}

final override fun syncExists(transactionId: String): String {
return exists(transactionId).block()
?: error("Cannot exists transaction \"$transactionId\"")
Expand All @@ -37,21 +46,41 @@ abstract class AbstractTransactionManager(
?: error("Cannot commit transaction \"$transactionId\"")
}

override fun <T> syncCommit(transactionId: String, event: T): String {
return commit(transactionId, event).block()
?: error("Cannot commit transaction \"$transactionId\" \"$event\"")
}

final override fun syncRollback(transactionId: String, cause: String): String {
return rollback(transactionId, cause).block()
?: error("Cannot rollback transaction \"$transactionId\", \"$cause\"")
}

override fun <T> syncRollback(transactionId: String, cause: String, event: T): String {
return rollback(transactionId, cause, event).block()
?: error("Cannot rollback transaction \"$transactionId\", \"$cause\" \"$event\"")
}

final override fun <T> start(undo: T): Mono<String> {
return Mono.fromCallable { codec.encode(undo) }
.flatMap { encodedUndo ->
startTransaction(encodedUndo)
startTransaction(encodedUndo, null)
.info("Start transaction undo \"$undo\"")
}
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
}

override fun <T, S> start(undo: T, event: S): Mono<String> {
return Mono.fromCallable { codec.encode(undo) }
.map { it to codec.encode(event) }
.flatMap { (encodedUndo, encodedEvent) ->
startTransaction(encodedUndo, encodedEvent)
.info("Start transaction undo \"$undo\"")
}
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
}

private fun startTransaction(undo: String): Mono<String> {
private fun startTransaction(undo: String, event: String?): Mono<String> {
return Mono.deferContextual<String> { Mono.just(it[CONTEXT_TX_KEY]) }
.flatMap { transactionId ->
publishTransaction(transactionId, transaction {
Expand All @@ -60,6 +89,7 @@ abstract class AbstractTransactionManager(
group = nodeGroup
this.state = TransactionState.TRANSACTION_STATE_START
this.undo = undo
event?.let { this.event = it }
})
}
}
Expand All @@ -75,48 +105,103 @@ abstract class AbstractTransactionManager(
.warningOnError("Cannot join transaction cause, transaction \"$transactionId\" already Rollback state")
.map { codec.encode(undo) }
.flatMap {
joinTransaction(transactionId, it)
joinTransaction(transactionId, it, null)
.info("Join transaction transactionId \"$transactionId\", undo \"$undo\"")
}
}

private fun joinTransaction(transactionId: String, undo: String): Mono<String> {
override fun <T, S> join(transactionId: String, undo: T, event: S): Mono<String> {
return getAnyTransaction(transactionId)
.map {
if (it == TransactionState.TRANSACTION_STATE_COMMIT) {
error("Cannot join transaction cause, transaction \"$transactionId\" already \"${it.name}\"")
}
transactionId
}
.warningOnError("Cannot join transaction cause, transaction \"$transactionId\" already Rollback state")
.map { codec.encode(undo) to codec.encode(event) }
.flatMap { (encodedUndo, encodedEvent) ->
joinTransaction(transactionId, encodedUndo, encodedEvent)
.info("Join transaction transactionId \"$transactionId\", undo \"$undo\"")
}
}

private fun joinTransaction(transactionId: String, undo: String, event: String?): Mono<String> {
return publishTransaction(transactionId, transaction {
id = transactionId
serverId = nodeName
group = nodeGroup
state = TransactionState.TRANSACTION_STATE_JOIN
this.undo = undo
event?.let { this.event = it }
})
}

final override fun rollback(transactionId: String, cause: String): Mono<String> {
return exists(transactionId)
.infoOnError("Cannot rollback transaction cause, transaction \"$transactionId\" is not exists")
.publishTransaction(transaction {
id = transactionId
serverId = nodeName
group = nodeGroup
state = TransactionState.TRANSACTION_STATE_ROLLBACK
this.cause = cause
})
.flatMap {
rollbackTransaction(transactionId, cause, null)
}
.info("Rollback transaction \"$transactionId\"")
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}

override fun <T> rollback(transactionId: String, cause: String, event: T): Mono<String> {
return exists(transactionId)
.infoOnError("Cannot rollback transaction cause, transaction \"$transactionId\" is not exists")
.map { codec.encode(event) }
.flatMap { encodedEvent ->
rollbackTransaction(transactionId, cause, encodedEvent)
}
.info("Rollback transaction \"$transactionId\"")
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}

private fun rollbackTransaction(
transactionId: String,
cause: String,
event: String?
): Mono<String> {
return publishTransaction(transactionId, transaction {
id = transactionId
serverId = nodeName
group = nodeGroup
state = TransactionState.TRANSACTION_STATE_ROLLBACK
this.cause = cause
event?.let { this.event = it }
})
}

final override fun commit(transactionId: String): Mono<String> {
return exists(transactionId)
.infoOnError("Cannot commit transaction cause, transaction \"$transactionId\" is not exists")
.publishTransaction(transaction {
id = transactionId
serverId = nodeName
group = nodeGroup
state = TransactionState.TRANSACTION_STATE_COMMIT
})
.flatMap { commitTransaction(transactionId, null) }
.info("Commit transaction \"$transactionId\"")
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}

override fun <T> commit(transactionId: String, event: T): Mono<String> {
return exists(transactionId)
.infoOnError("Cannot commit transaction cause, transaction \"$transactionId\" is not exists")
.map { codec.encode(event) }
.flatMap { encodedEvent ->
commitTransaction(transactionId, encodedEvent)
}
.info("Commit transaction \"$transactionId\"")
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}

private fun commitTransaction(transactionId: String, event: String?): Mono<String> {
return publishTransaction(transactionId, transaction {
id = transactionId
serverId = nodeName
group = nodeGroup
state = TransactionState.TRANSACTION_STATE_COMMIT
event?.let { this.event = it }
})
}

final override fun exists(transactionId: String): Mono<String> {
return getAnyTransaction(transactionId)
.infoOnError("There is no transaction corresponding to transactionId \"$transactionId\"")
Expand All @@ -132,12 +217,6 @@ abstract class AbstractTransactionManager(
}
}

private fun Mono<String>.publishTransaction(transaction: Transaction): Mono<String> {
return this.flatMap {
publishTransaction(it, transaction)
}
}

protected abstract fun publishTransaction(
transactionId: String,
transaction: Transaction,
Expand Down
Loading

0 comments on commit 1cb3b95

Please sign in to comment.