Skip to content

Commit

Permalink
feat: Event-based transaction handling (#61)
Browse files Browse the repository at this point in the history
* refactor: Chage Transaction...Handler to Transaction...Listener

* feat: Dispatch transaction event based on event-type
  • Loading branch information
devxb authored Mar 3, 2024
1 parent 1cb3b95 commit 8f5e9e5
Show file tree
Hide file tree
Showing 15 changed files with 328 additions and 65 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.rooftop.netx.api

import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionCommitListener(
val event: KClass<*> = Any::class
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.rooftop.netx.api

import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionJoinListener(
val event: KClass<*> = Any::class
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.rooftop.netx.api

import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionRollbackListener(
val event: KClass<*> = Any::class
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.rooftop.netx.api

import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionStartListener(
val event: KClass<*> = Any::class
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ abstract class AbstractTransactionDispatcher(
) {

private val monoTransactionHandleFunctions =
mutableMapOf<TransactionState, MutableList<Pair<KFunction<Mono<*>>, Any>>>()
mutableMapOf<TransactionState, MutableList<MonoFunction>>()

private val notPublisherTransactionHandlerFunctions =
mutableMapOf<TransactionState, MutableList<Pair<KFunction<*>, Any>>>()
mutableMapOf<TransactionState, MutableList<NotPublisherFunction>>()

fun dispatch(transaction: Transaction, messageId: String): Boolean {
var isSuccess = true
Expand Down Expand Up @@ -68,11 +68,10 @@ abstract class AbstractTransactionDispatcher(
Flux.fromIterable(monoTransactionHandleFunctions[state] ?: listOf())
}
.publishOn(Schedulers.boundedElastic())
.flatMap { (function, instance) ->
.flatMap { monoFunction ->
mapToTransactionEvent(transaction)
.info("Call Mono TransactionHandler \"${function.name}\" with transaction \n{\n$transaction}")
.flatMap { function.call(instance, it) }
.warningOnError("Error occurred in TransactionHandler function \"${function.name}\" with transaction \n{\n$transaction}")
.flatMap { monoFunction.call(it) }
.warningOnError("Error occurred in TransactionHandler function \"${monoFunction.name()}\" with transaction \n{\n$transaction}")
}
}

Expand All @@ -82,11 +81,10 @@ abstract class AbstractTransactionDispatcher(
Flux.fromIterable(notPublisherTransactionHandlerFunctions[state] ?: listOf())
}
.publishOn(Schedulers.boundedElastic())
.flatMap { (function, instance) ->
.flatMap { notPublisherFunction ->
mapToTransactionEvent(transaction)
.info("Call Not publisher TransactionHandler \"${function.name}\" with transaction \n{\n$transaction}")
.map { function.call(instance, it) }
.warningOnError("Error occurred in TransactionHandler function \"${function.name}\" with transaction \n{\n$transaction}")
.map { notPublisherFunction.call(it) }
.warningOnError("Error occurred in TransactionHandler function \"${notPublisherFunction.name()}\" with transaction \n{\n$transaction}")
}
}

Expand Down Expand Up @@ -155,32 +153,35 @@ abstract class AbstractTransactionDispatcher(
@PostConstruct
fun initHandler() {
val transactionHandler = findHandlers(TransactionHandler::class)
val monoFunctions = getFunctions(transactionHandler, Mono::class)
val monoFunctions = getMonoFunctions(transactionHandler)
monoTransactionHandleFunctions.putAll(monoFunctions)
val notPublisherFunctions = getNotPublisherFunctions(transactionHandler)
notPublisherTransactionHandlerFunctions.putAll(notPublisherFunctions)
}

@Suppress("unchecked_cast")
private fun <T : Any> getFunctions(
@Suppress("UNCHECKED_CAST")
private fun getMonoFunctions(
foundHandlers: List<Any>,
returnType: KClass<T>
): MutableMap<TransactionState, MutableList<Pair<KFunction<T>, Any>>> {
val handlers = mutableMapOf<TransactionState, MutableList<Pair<KFunction<T>, Any>>>()
): MutableMap<TransactionState, MutableList<MonoFunction>> {
val handlers =
mutableMapOf<TransactionState, MutableList<MonoFunction>>()

for (handler in foundHandlers) {
val returnTypeMatchedHandlers = handler::class.declaredMemberFunctions
.filter { it.returnType.classifier == returnType }
.filter { it.returnType.classifier == Mono::class }

returnTypeMatchedHandlers.forEach { function ->
function.annotations
.forEach { annotation ->
runCatching {
val transactionState = matchedTransactionState(annotation)
val transactionState = getMatchedTransactionState(annotation)
val eventType = getEventType(annotation)
handlers.putIfAbsent(transactionState, mutableListOf())
handlers[transactionState]?.add(function as KFunction<T> to handler)
handlers[transactionState]?.add(
MonoFunction(eventType, function as KFunction<Mono<*>>, handler)
)
}.onFailure {
throw IllegalStateException("Cannot add TransactionHandler", it)
throw IllegalStateException("Cannot add Mono TransactionHandler", it)
}
}
}
Expand All @@ -191,8 +192,9 @@ abstract class AbstractTransactionDispatcher(

private fun getNotPublisherFunctions(
foundHandlers: List<Any>
): MutableMap<TransactionState, MutableList<Pair<KFunction<*>, Any>>> {
val handlers = mutableMapOf<TransactionState, MutableList<Pair<KFunction<*>, Any>>>()
): MutableMap<TransactionState, MutableList<NotPublisherFunction>> {
val handlers =
mutableMapOf<TransactionState, MutableList<NotPublisherFunction>>()

for (handler in foundHandlers) {
val returnTypeMatchedHandlers = handler::class.declaredMemberFunctions
Expand All @@ -202,9 +204,12 @@ abstract class AbstractTransactionDispatcher(
function.annotations
.forEach { annotation ->
runCatching {
val transactionState = matchedTransactionState(annotation)
val transactionState = getMatchedTransactionState(annotation)
val eventType = getEventType(annotation)
handlers.putIfAbsent(transactionState, mutableListOf())
handlers[transactionState]?.add(function to handler)
handlers[transactionState]?.add(
NotPublisherFunction(eventType, function, handler)
)
}.onFailure {
throw IllegalStateException("Cannot add TransactionHandler", it)
}
Expand All @@ -217,16 +222,79 @@ abstract class AbstractTransactionDispatcher(

protected abstract fun <T : Annotation> findHandlers(type: KClass<T>): List<Any>

private fun matchedTransactionState(annotation: Annotation): TransactionState {
private fun getEventType(annotation: Annotation): KClass<*> {
return when (annotation) {
is TransactionStartListener -> annotation.event
is TransactionCommitListener -> annotation.event
is TransactionJoinListener -> annotation.event
is TransactionRollbackListener -> annotation.event
else -> throw notMatchedTransactionHandlerException
}
}

private fun getMatchedTransactionState(annotation: Annotation): TransactionState {
return when (annotation) {
is TransactionStartHandler -> TransactionState.TRANSACTION_STATE_START
is TransactionCommitHandler -> TransactionState.TRANSACTION_STATE_COMMIT
is TransactionJoinHandler -> TransactionState.TRANSACTION_STATE_JOIN
is TransactionRollbackHandler -> TransactionState.TRANSACTION_STATE_ROLLBACK
is TransactionStartListener -> TransactionState.TRANSACTION_STATE_START
is TransactionCommitListener -> TransactionState.TRANSACTION_STATE_COMMIT
is TransactionJoinListener -> TransactionState.TRANSACTION_STATE_JOIN
is TransactionRollbackListener -> TransactionState.TRANSACTION_STATE_ROLLBACK
else -> throw notMatchedTransactionHandlerException
}
}

private class MonoFunction(
private val eventType: KClass<*>,
private val function: KFunction<Mono<*>>,
private val handler: Any,
) {

fun name(): String = function.name
fun call(transactionEvent: TransactionEvent): Mono<*> {
runCatching { transactionEvent.decodeEvent(eventType) }
.fold(
onSuccess = {
return function.call(handler, transactionEvent)
.info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
},
onFailure = {
if (it is NullPointerException && eventType == Any::class) {
return function.call(handler, transactionEvent)
.info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
}
}
)
return Mono.empty<String>()
}
}

private class NotPublisherFunction(
private val eventType: KClass<*>,
private val function: KFunction<*>,
private val handler: Any,
) {
fun name(): String = function.name

fun call(transactionEvent: TransactionEvent): Any? {
runCatching {
transactionEvent.decodeEvent(eventType)
}.fold(
onSuccess = {
val result = function.call(handler, transactionEvent)
info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
return result
},
onFailure = {
if (it is NullPointerException && eventType == Any::class) {
val result = function.call(handler, transactionEvent)
info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
return result
}
}
)
return Unit
}
}

protected abstract fun ack(
transaction: Transaction,
messageId: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ class TransactionReceiveStorage(
(storage["ROLLBACK"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}

@TransactionRollbackHandler
@TransactionRollbackListener
fun logRollback(transaction: TransactionRollbackEvent): Mono<Unit> {
return Mono.fromCallable { log("ROLLBACK", transaction) }
}

@TransactionStartHandler
@TransactionStartListener
fun logStart(transaction: TransactionStartEvent): Mono<Unit> {
return Mono.fromCallable { log("START", transaction) }
}

@TransactionJoinHandler
@TransactionJoinListener
fun logJoin(transaction: TransactionJoinEvent): Mono<Unit> {
return Mono.fromCallable { log("JOIN", transaction) }
}

@TransactionCommitHandler
@TransactionCommitListener
fun logCommit(transaction: TransactionCommitEvent): Mono<Unit> {
return Mono.fromCallable { log("COMMIT", transaction) }
}
Expand Down
Loading

0 comments on commit 8f5e9e5

Please sign in to comment.