Skip to content

Commit

Permalink
Orchestrating State Stored Aggregate added.
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Dec 21, 2023
1 parent 1da26c6 commit e7200aa
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 4 deletions.
114 changes: 113 additions & 1 deletion src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::marker::PhantomData;

use async_trait::async_trait;

use crate::decider::{EventComputation, StateComputation};
use crate::decider::{Decider, EventComputation, StateComputation};
use crate::saga::{ActionComputation, Saga};

/// Event Repository trait
///
Expand Down Expand Up @@ -230,3 +231,114 @@ where
}
}
}

/// Orchestrating State Stored Aggregate.
///
/// It is using a [Decider] and [Saga] to compute new state based on the current state and the command.
/// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
/// It is using a [StateRepository] to fetch the current state and to save the new state.
///
/// Generic parameters:
///
/// - `C` - Command
/// - `S` - State
/// - `E` - Event
/// - `Repository` - State repository
/// - `Version` - Version
/// - `Error` - Error
pub struct StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
{
repository: Repository,
decider: Decider<'a, C, S, E>,
saga: Saga<'a, E, C>,
_marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<'a, C, S, E, Repository, Version, Error> StateComputation<C, S, E>
for StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
S: Clone,
{
/// Computes new state based on the current state and the command.
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> S {
let effective_current_state =
current_state.unwrap_or_else(|| (self.decider.initial_state)());
let events = (self.decider.decide)(command, &effective_current_state);
let mut new_state = events.iter().fold(effective_current_state, |state, event| {
(self.decider.evolve)(&state, event)
});
let commands = events
.iter()
.flat_map(|event: &E| self.saga.compute_new_actions(event))
.collect::<Vec<C>>();
commands.iter().for_each(|action| {
new_state = self.compute_new_state(Some(new_state.clone()), action);
});
new_state
}
}

#[async_trait]
impl<'a, C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
for StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
/// Fetches current state, based on the command.
async fn fetch_state(&self, command: &C) -> Result<Option<(S, Version)>, Error> {
self.repository.fetch_state(command).await
}
/// Saves state.
async fn save(&self, state: &S, version: &Option<Version>) -> Result<(S, Version), Error> {
self.repository.save(state, version).await
}
}

impl<'a, C, S, E, Repository, Version, Error>
StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
C: Sync,
S: Sync + Clone,
E: Sync,
Version: Sync,
Error: Sync,
{
/// Creates a new instance of [StateStoredAggregate].
pub fn new(
repository: Repository,
decider: Decider<'a, C, S, E>,
saga: Saga<'a, E, C>,
) -> Self {
StateStoredOrchestratingAggregate {
repository,
decider,
saga,
_marker: PhantomData,
}
}
/// Handles the command by fetching the state from the repository, computing new state based on the current state and the command, and saving the new state to the repository.
pub async fn handle(&self, command: &C) -> Result<(S, Version), Error> {
let state_version = self.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.compute_new_state(None, command);
let saved_state = self.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.compute_new_state(Some(state), command);
let saved_state = self.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
}
}
}
217 changes: 214 additions & 3 deletions tests/aggregate_combined_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use derive_more::Display;

use fmodel_rust::aggregate::{
EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate,
StateStoredOrchestratingAggregate,
};
use fmodel_rust::decider::Decider;
use fmodel_rust::saga::Saga;
use fmodel_rust::Sum;

use crate::api::{
CancelOrderCommand, CreateOrderCommand, OrderCancelledEvent, OrderCommand, OrderCreatedEvent,
OrderEvent, OrderUpdatedEvent, ShipmentCommand, ShipmentCreatedEvent, ShipmentEvent,
UpdateOrderCommand,
CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent,
OrderCommand, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, ShipmentCommand,
ShipmentCreatedEvent, ShipmentEvent, UpdateOrderCommand,
};

mod api;
Expand Down Expand Up @@ -268,6 +270,40 @@ fn shipment_decider<'a>() -> Decider<'a, ShipmentCommand, ShipmentState, Shipmen
}
}

fn order_saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> {
Saga {
react: Box::new(|event| match event {
OrderEvent::Created(created_event) => {
vec![ShipmentCommand::Create(CreateShipmentCommand {
shipment_id: created_event.order_id,
order_id: created_event.order_id,
customer_name: created_event.customer_name.to_owned(),
items: created_event.items.to_owned(),
})]
}
OrderEvent::Updated(_updated_event) => {
vec![]
}
OrderEvent::Cancelled(_cancelled_event) => {
vec![]
}
}),
}
}

fn shipment_saga<'a>() -> Saga<'a, ShipmentEvent, OrderCommand> {
Saga {
react: Box::new(|event| match event {
ShipmentEvent::Created(created_event) => {
vec![OrderCommand::Update(api::UpdateOrderCommand {
order_id: created_event.order_id,
new_items: created_event.items.to_owned(),
})]
}
}),
}
}

#[tokio::test]
async fn es_test() {
let combined_decider = order_decider().combine(shipment_decider());
Expand Down Expand Up @@ -545,3 +581,178 @@ async fn ss_test() {
handle1.join().unwrap().await;
handle2.join().unwrap().await;
}

#[tokio::test]
async fn ss_combined_test() {
let combined_decider = order_decider().combine(shipment_decider());
let combined_saga = order_saga().combine(shipment_saga());

let repository = InMemoryStateRepository::new();
let aggregate = Arc::new(StateStoredOrchestratingAggregate::new(
repository,
combined_decider,
combined_saga,
));
let aggregate2 = Arc::clone(&aggregate);

let handle1 = thread::spawn(|| async move {
let command = Sum::First(OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}));
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(
(
OrderState {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
is_cancelled: false,
},
ShipmentState {
shipment_id: 1,
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}
),
0
)
);
let command = Sum::First(OrderCommand::Update(UpdateOrderCommand {
order_id: 1,
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
}));
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(
(
OrderState {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 3".to_string(), "Item 4".to_string()],
is_cancelled: false,
},
ShipmentState {
shipment_id: 1,
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}
),
1
)
);
let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 1 }));
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(
(
OrderState {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 3".to_string(), "Item 4".to_string()],
is_cancelled: true,
},
ShipmentState {
shipment_id: 1,
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}
),
2
)
);
});

let handle2 = thread::spawn(|| async move {
let command = Sum::First(OrderCommand::Create(CreateOrderCommand {
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}));
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(
(
OrderState {
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
is_cancelled: false,
},
ShipmentState {
shipment_id: 2,
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}
),
0
)
);
let command = Sum::First(OrderCommand::Update(UpdateOrderCommand {
order_id: 2,
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
}));
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(
(
OrderState {
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 3".to_string(), "Item 4".to_string()],
is_cancelled: false,
},
ShipmentState {
shipment_id: 2,
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}
),
1
)
);
let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 2 }));
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(
(
OrderState {
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 3".to_string(), "Item 4".to_string()],
is_cancelled: true,
},
ShipmentState {
shipment_id: 2,
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}
),
2
)
);
});

handle1.join().unwrap().await;
handle2.join().unwrap().await;
}

0 comments on commit e7200aa

Please sign in to comment.