Skip to content

Commit

Permalink
Use external locking for model
Browse files Browse the repository at this point in the history
  • Loading branch information
rufusutt committed Aug 30, 2023
1 parent 9120682 commit 978ff94
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 114 deletions.
1 change: 0 additions & 1 deletion model/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ rust-version = "1.56"
nmos-schema = { path = "../schema" }
serde = "1"
serde_json = "1"
tokio = { version = "1", features = ["sync"] }
uuid = { version = "1", features = ["v4"] }
76 changes: 20 additions & 56 deletions model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ pub mod version;
use std::collections::HashMap;

use resource::{Device, Flow, Node, Receiver, ResourceBundle, Sender, Source};
use tokio::sync::{RwLock, RwLockReadGuard};
use uuid::Uuid;

#[derive(Debug, Default)]
pub struct Model {
// IS-04 resources
nodes: RwLock<HashMap<Uuid, Node>>,
devices: RwLock<HashMap<Uuid, Device>>,
sources: RwLock<HashMap<Uuid, Source>>,
flows: RwLock<HashMap<Uuid, Flow>>,
senders: RwLock<HashMap<Uuid, Sender>>,
receivers: RwLock<HashMap<Uuid, Receiver>>,
pub nodes: HashMap<Uuid, Node>,
pub devices: HashMap<Uuid, Device>,
pub sources: HashMap<Uuid, Source>,
pub flows: HashMap<Uuid, Flow>,
pub senders: HashMap<Uuid, Sender>,
pub receivers: HashMap<Uuid, Receiver>,
}

impl Model {
Expand Down Expand Up @@ -81,74 +80,39 @@ impl Model {
});

Self {
nodes: RwLock::new(nodes),
devices: RwLock::new(devices),
sources: RwLock::new(sources),
flows: RwLock::new(flows),
senders: RwLock::new(senders),
receivers: RwLock::new(receivers),
nodes,
devices,
sources,
flows,
senders,
receivers,
}
}

// Get nodes
pub async fn nodes(&self) -> RwLockReadGuard<'_, HashMap<Uuid, Node>> {
self.nodes.read().await
}

// Get devices
pub async fn devices(&self) -> RwLockReadGuard<'_, HashMap<Uuid, Device>> {
self.devices.read().await
}

// Get receivers
pub async fn receivers(&self) -> RwLockReadGuard<'_, HashMap<Uuid, Receiver>> {
self.receivers.read().await
}

// Get senders
pub async fn senders(&self) -> RwLockReadGuard<'_, HashMap<Uuid, Sender>> {
self.senders.read().await
}

// Get sources
pub async fn sources(&self) -> RwLockReadGuard<'_, HashMap<Uuid, Source>> {
self.sources.read().await
}

// Get flows
pub async fn flows(&self) -> RwLockReadGuard<'_, HashMap<Uuid, Flow>> {
self.flows.read().await
}

pub async fn insert_node(&self, node: Node) -> Option<()> {
let mut nodes = self.nodes.write().await;
nodes.insert(node.core.id, node);
pub async fn insert_node(&mut self, node: Node) -> Option<()> {
self.nodes.insert(node.core.id, node);

Some(())
}

pub async fn insert_device(&self, device: Device) -> Option<()> {
pub async fn insert_device(&mut self, device: Device) -> Option<()> {
// Check node id in model
let nodes = self.nodes.read().await;
if !nodes.contains_key(&device.node_id) {
if !self.nodes.contains_key(&device.node_id) {
return None;
}

let mut devices = self.devices.write().await;
devices.insert(device.core.id, device);
self.devices.insert(device.core.id, device);

Some(())
}

pub async fn insert_receiver(&self, receiver: Receiver) -> Option<()> {
pub async fn insert_receiver(&mut self, receiver: Receiver) -> Option<()> {
// Check device id in model
let devices = self.devices.read().await;
if !devices.contains_key(&receiver.device_id) {
if !self.devices.contains_key(&receiver.device_id) {
return None;
}

let mut receivers = self.receivers.write().await;
receivers.insert(receiver.core.id, receiver);
self.receivers.insert(receiver.core.id, receiver);

Some(())
}
Expand Down
3 changes: 2 additions & 1 deletion node/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use error::ServiceError;
use futures::Future;
use nmos_model::Model;
use serde_json::json;
use tokio::sync::RwLock;
use tower::Service;

use self::node::{
Expand All @@ -31,7 +32,7 @@ pub struct NodeApi {
}

impl NodeApi {
pub fn new(model: Arc<Model>) -> Self {
pub fn new(model: Arc<RwLock<Model>>) -> Self {
let router = Router::new()
.route(
"/",
Expand Down
95 changes: 52 additions & 43 deletions node/src/api/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use nmos_model::resource::{DeviceJson, FlowJson, NodeJson, ReceiverJson, SenderJ
use nmos_model::version::is_04::V1_0;
use nmos_model::version::APIVersion;
use nmos_model::Model;
use tokio::sync::RwLock;
use uuid::Uuid;

use super::ServiceError;
Expand Down Expand Up @@ -37,47 +38,48 @@ fn parse_api_version(api: &str) -> Result<APIVersion, ServiceError> {

pub async fn get_self(
Path(api): Path<String>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<NodeJson>, ServiceError> {
let api = parse_api_version(&api)?;

let nodes = model.nodes().await;
let model = model.read().await;

let node = nodes
.iter()
let node = model
.nodes
.values()
.next()
.expect("Missing self resource")
.1
.to_json(&api);

Ok(Json(node))
}

pub async fn get_devices(
Path(api): Path<String>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<Vec<DeviceJson>>, ServiceError> {
let api = parse_api_version(&api)?;

let devices = model.devices().await;
let model = model.read().await;

let devices: Vec<_> = devices
.iter()
.map(|(_, device)| device.to_json(&api))
let devices: Vec<_> = model
.devices
.values()
.map(|device| device.to_json(&api))
.collect();

Ok(Json(devices))
}

pub async fn get_device(
Path((api, id)): Path<(String, Uuid)>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<DeviceJson>, ServiceError> {
let api = parse_api_version(&api)?;

let devices = model.devices().await;
let model = model.read().await;

let device = match devices.get(&id) {
let device = match model.devices.get(&id) {
Some(d) => d.to_json(&api),

None => {
Expand All @@ -93,29 +95,30 @@ pub async fn get_device(

pub async fn get_receivers(
Path(api): Path<String>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<Vec<ReceiverJson>>, ServiceError> {
let api = parse_api_version(&api)?;

let receivers = model.receivers().await;
let model = model.read().await;

let receivers: Vec<_> = receivers
.iter()
.map(|(_, receiver)| receiver.to_json(&api))
let receivers: Vec<_> = model
.receivers
.values()
.map(|receiver| receiver.to_json(&api))
.collect();

Ok(Json(receivers))
}

pub async fn get_receiver(
Path((api, id)): Path<(String, Uuid)>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<ReceiverJson>, ServiceError> {
let api = parse_api_version(&api)?;

let receivers = model.receivers().await;
let model = model.read().await;

let receiver = match receivers.get(&id) {
let receiver = match model.receivers.get(&id) {
Some(r) => r.to_json(&api),
None => {
return Err(ServiceError::new(
Expand All @@ -130,29 +133,30 @@ pub async fn get_receiver(

pub async fn get_senders(
Path(api): Path<String>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<Vec<SenderJson>>, ServiceError> {
let api = parse_api_version(&api)?;

let senders = model.senders().await;
let model = model.read().await;

let senders: Vec<_> = senders
.iter()
.map(|(_, sender)| sender.to_json(&api))
let senders: Vec<_> = model
.senders
.values()
.map(|sender| sender.to_json(&api))
.collect();

Ok(Json(senders))
}

pub async fn get_sender(
Path((api, id)): Path<(String, Uuid)>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<SenderJson>, ServiceError> {
let api = parse_api_version(&api)?;

let senders = model.senders().await;
let model = model.read().await;

let sender = match senders.get(&id) {
let sender = match model.senders.get(&id) {
Some(s) => s.to_json(&api),
None => {
return Err(ServiceError::new(
Expand All @@ -167,29 +171,30 @@ pub async fn get_sender(

pub async fn get_sources(
Path(api): Path<String>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<Vec<SourceJson>>, ServiceError> {
let api = parse_api_version(&api)?;

let sources = model.sources().await;
let model = model.read().await;

let sources: Vec<_> = sources
.iter()
.map(|(_, source)| source.to_json(&api))
let sources: Vec<_> = model
.sources
.values()
.map(|source| source.to_json(&api))
.collect();

Ok(Json(sources))
}

pub async fn get_source(
Path((api, id)): Path<(String, Uuid)>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<SourceJson>, ServiceError> {
let api = parse_api_version(&api)?;

let sources = model.sources().await;
let model = model.read().await;

let source = match sources.get(&id) {
let source = match model.sources.get(&id) {
Some(s) => s.to_json(&api),
None => {
return Err(ServiceError::new(
Expand All @@ -204,26 +209,30 @@ pub async fn get_source(

pub async fn get_flows(
Path(api): Path<String>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<Vec<FlowJson>>, ServiceError> {
let api = parse_api_version(&api)?;

let flows = model.flows().await;
let model = model.read().await;

let flows: Vec<_> = flows.iter().map(|(_, flow)| flow.to_json(&api)).collect();
let flows: Vec<_> = model
.flows
.values()
.map(|flow| flow.to_json(&api))
.collect();

Ok(Json(flows))
}

pub async fn get_flow(
Path((api, id)): Path<(String, Uuid)>,
Extension(model): Extension<Arc<Model>>,
Extension(model): Extension<Arc<RwLock<Model>>>,
) -> Result<Json<FlowJson>, ServiceError> {
let api = parse_api_version(&api)?;

let flows = model.flows().await;
let model = model.read().await;

let flow = match flows.get(&id) {
let flow = match model.flows.get(&id) {
Some(f) => f.to_json(&api),
None => {
return Err(ServiceError::new(
Expand Down
Loading

0 comments on commit 978ff94

Please sign in to comment.