Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sync API #39

Merged
merged 6 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

> Distributed transaction library based on Choreography

<img src = "https://github.com/rooftop-MSA/Netx/assets/62425964/08ed9050-1923-42b5-803f-5b7ea37a263f" width="330" align="right"/>
<img src = "https://github.com/rooftop-MSA/Netx/assets/62425964/08ed9050-1923-42b5-803f-5b7ea37a263f" width="360" align="right"/>

<br>

![version 0.1.9](https://img.shields.io/badge/version-0.1.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
![version 0.2.0](https://img.shields.io/badge/version-0.2.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

Choreography 방식으로 구현된 분산 트랜잭션 라이브러리 입니다.
`Netx` 는 다음 기능을 제공합니다.

1. [Reactor](https://projectreactor.io/) 기반의 완전한 비동기 트랜잭션 관리
2. 처리되지 않은 트랜잭션을 찾아 자동으로 재실행
3. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지
4. `At Least Once` 방식의 메시지 전달 보장
5. 처리되지 않은 메시지를 자동으로 재실행
5. 비동기 API와 동기 API 지원

## How to use

Expand Down Expand Up @@ -56,6 +57,20 @@ class Application {
#### Scenario1. Start pay transaction

```kotlin
// Sync
fun pay(param: Any): Any {
val transactionId = transactionManager.syncStart("paid=1000") // start transaction

runCatching { // This is kotlin try catch, not netx library spec
// Do your bussiness logic
}.fold(
onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction
onFailure = { transactionManager.syncRollback(transactionId, it.message) } // rollback transaction
)
}


// Async
fun pay(param: Any): Mono<Any> {
return transactionManager.start("paid=1000") // Start distributed transaction and publish transaction start event
.flatMap { transactionId ->
Expand All @@ -75,6 +90,19 @@ fun pay(param: Any): Mono<Any> {
#### Scenario2. Join order transaction

```kotlin
//Sync
fun order(param: Any): Any {
val transactionId = transactionManager.syncJoin(param.transactionId, "orderId=1:state=PENDING") // join transaction

runCatching { // This is kotlin try catch, not netx library spec
// Do your bussiness logic
}.fold(
onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction
onFailure = { transactionManager.syncRollback(transactionId, it.message) } // rollback transaction
)
}

// Async
fun order(param: Any): Mono<Any> {
return transactionManager.join(
param.transactionId,
Expand All @@ -94,6 +122,12 @@ fun order(param: Any): Mono<Any> {
#### Scenario3. Check exists transaction

```kotlin
// Sync
fun exists(param: Any): Any {
return transactionManager.syncExists(param.transactionId)
}

// Async
fun exists(param: Any): Mono<Any> {
return transactionManager.exists(param.transactionId) // Find any transaction has ever been started
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@ interface TransactionManager {

fun start(undo: String): Mono<String>

fun syncStart(undo: String): String

fun join(transactionId: String, undo: String): Mono<String>

fun syncJoin(transactionId: String, undo: String): String

fun exists(transactionId: String): Mono<String>

fun syncExists(transactionId: String): String

fun commit(transactionId: String): Mono<String>

fun syncCommit(transactionId: String): String

fun rollback(transactionId: String, cause: String): Mono<String>

fun syncRollback(transactionId: String, cause: String): String

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,30 @@ abstract class AbstractTransactionManager(
private val transactionIdGenerator: TransactionIdGenerator = TransactionIdGenerator(nodeId),
) : TransactionManager {

override fun syncStart(undo: String): String {
return start(undo).block() ?: error("Cannot start transaction")
}

override fun syncJoin(transactionId: String, undo: String): String {
return join(transactionId, undo).block()
?: error("Cannot join transaction \"$transactionId\", \"$undo\"")
}

override fun syncExists(transactionId: String): String {
return exists(transactionId).block()
?: error("Cannot exists transaction \"$transactionId\"")
}

override fun syncCommit(transactionId: String): String {
return commit(transactionId).block()
?: error("Cannot commit transaction \"$transactionId\"")
}

override fun syncRollback(transactionId: String, cause: String): String {
return rollback(transactionId, cause).block()
?: error("Cannot rollback transaction \"$transactionId\", \"$cause\"")
}

final override fun start(undo: String): Mono<String> {
return startTransaction(undo)
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.rooftop.netx.redis

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.assertions.throwables.shouldThrowMessage
import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.shouldBe
import org.rooftop.netx.api.*
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.springframework.test.context.ContextConfiguration
Expand Down Expand Up @@ -32,9 +34,9 @@ internal class RedisStreamTransactionManagerTest(
}

describe("start 메소드는") {
context("replay 를 입력받으면,") {
context("UNDO 를 입력받으면,") {
it("트랜잭션을 시작하고 transaction-id를 반환한다.") {
transactionManager.start(REPLAY).subscribe()
transactionManager.start(UNDO).subscribe()

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(1)
Expand All @@ -45,8 +47,33 @@ internal class RedisStreamTransactionManagerTest(

context("서로 다른 id의 트랜잭션이 여러번 시작되어도") {
it("모두 읽을 수 있다.") {
transactionManager.start(REPLAY).block()
transactionManager.start(REPLAY).block()
transactionManager.start(UNDO).block()
transactionManager.start(UNDO).block()

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(2)
noPublisherTransactionHandlerAssertions.startCountShouldBe(2)
}
}
}
}

describe("syncStart 메소드는") {
context("UNDO 를 입력받으면,") {
it("트랜잭션을 시작하고 transaction-id를 반환한다.") {
transactionManager.syncStart(UNDO)

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(1)
noPublisherTransactionHandlerAssertions.startCountShouldBe(1)
}
}
}

context("서로 다른 id의 트랜잭션이 여러번 시작되어도") {
it("모두 읽을 수 있다.") {
transactionManager.syncStart(UNDO)
transactionManager.syncStart(UNDO)

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(2)
Expand All @@ -58,10 +85,10 @@ internal class RedisStreamTransactionManagerTest(

describe("join 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("트랜잭션에 참여한다.") {
transactionManager.join(transactionId, REPLAY).subscribe()
transactionManager.join(transactionId, UNDO).subscribe()

eventually(5.seconds) {
monoTransactionHandlerAssertions.joinCountShouldBe(1)
Expand All @@ -72,17 +99,40 @@ internal class RedisStreamTransactionManagerTest(

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
val result = transactionManager.join(NOT_EXIST_TX_ID, REPLAY)
val result = transactionManager.join(NOT_EXIST_TX_ID, UNDO)

StepVerifier.create(result)
.verifyErrorMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"")
}
}
}

describe("syncJoin 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("트랜잭션에 참여한다.") {
transactionManager.syncJoin(transactionId, UNDO)

eventually(5.seconds) {
monoTransactionHandlerAssertions.joinCountShouldBe(1)
noPublisherTransactionHandlerAssertions.joinCountShouldBe(1)
}
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncJoin(NOT_EXIST_TX_ID, UNDO)
}
}
}
}

describe("exists 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("트랜잭션 id를 반환한다.") {
val result = transactionManager.exists(transactionId)
Expand All @@ -103,9 +153,29 @@ internal class RedisStreamTransactionManagerTest(
}
}

describe("syncExists 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("트랜잭션 id를 반환한다.") {
val result = transactionManager.syncExists(transactionId)

result shouldBe transactionId
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncExists(NOT_EXIST_TX_ID)
}
}
}
}

describe("commit 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("commit 메시지를 publish 한다") {
transactionManager.commit(transactionId).block()
Expand All @@ -127,12 +197,35 @@ internal class RedisStreamTransactionManagerTest(
}
}

describe("syncCommit 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("commit 메시지를 publish 한다") {
transactionManager.syncCommit(transactionId)

eventually(5.seconds) {
monoTransactionHandlerAssertions.commitCountShouldBe(1)
noPublisherTransactionHandlerAssertions.commitCountShouldBe(1)
}
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncCommit(NOT_EXIST_TX_ID)
}
}
}
}

describe("rollback 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("rollback 메시지를 publish 한다") {
transactionManager.rollback(transactionId, "rollback occured for test").block()
transactionManager.rollback(transactionId, "rollback for test").block()

eventually(5.seconds) {
monoTransactionHandlerAssertions.rollbackCountShouldBe(1)
Expand All @@ -143,17 +236,40 @@ internal class RedisStreamTransactionManagerTest(

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
val result = transactionManager.commit(NOT_EXIST_TX_ID)
val result = transactionManager.rollback(NOT_EXIST_TX_ID, "rollback for test")

StepVerifier.create(result)
.verifyErrorMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"")
}
}
}

describe("syncRollback 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("rollback 메시지를 publish 한다") {
transactionManager.syncRollback(transactionId, "rollback for test")

eventually(5.seconds) {
monoTransactionHandlerAssertions.rollbackCountShouldBe(1)
noPublisherTransactionHandlerAssertions.rollbackCountShouldBe(1)
}
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncRollback(NOT_EXIST_TX_ID, "rollback for test")
}
}
}
}
}) {

private companion object {
private const val REPLAY = "REPLAY"
private const val UNDO = "UNDO"
private const val NOT_EXIST_TX_ID = "NOT_EXISTS_TX_ID"
}
}
Loading