diff --git a/Cargo.lock b/Cargo.lock index 0b82c63c4e..12e556f1f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2749,18 +2749,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.17" +version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791" +checksum = "ba3a1acf4a3e70849f8a673497ef984f043f95d2d8252dcdf74d54e6a1e47e8a" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.17" +version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40" +checksum = "194e88048b71a3e02eb4ee36a6995fed9b8236c11a7bb9f7247a9d9835b3f265" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.6", @@ -4973,7 +4973,6 @@ dependencies = [ "actix-rt", "actix-web", "anyhow", - "async-trait", "chrono", "derive_more 0.99.7", "diesel", @@ -4994,6 +4993,7 @@ dependencies = [ "uuid 0.8.1", "ya-client", "ya-core-model", + "ya-net", "ya-persistence", "ya-service-api", "ya-service-api-interfaces", diff --git a/core/market/decentralized/Cargo.toml b/core/market/decentralized/Cargo.toml index c414e3b253..4164ab7719 100644 --- a/core/market/decentralized/Cargo.toml +++ b/core/market/decentralized/Cargo.toml @@ -9,7 +9,7 @@ market-test-suite = [] [dependencies] ya-client = "0.3" -ya-core-model = { version = "0.1", features = ["market"] } +ya-core-model = { version = "0.1", features = ["market", "net"] } ya-persistence = "0.2" ya-service-api = "0.1" ya-service-bus = "0.2" @@ -18,7 +18,6 @@ ya-service-api-web = "0.1" actix-web = "2.0" anyhow = "1.0" -async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } derive_more = "0.99.5" diesel = { version = "1.4", features = ["chrono", "sqlite", "r2d2"] } @@ -41,6 +40,6 @@ actix-rt = "1.0.0" env_logger = "0.7" rand = "0.7.2" serde_json = "1.0" -tokio = { version = "0.2", features = ["macros"] } +tokio = { version = "0.2", features = ["macros", "rt-core"] } -ya-core-model = { version = "0.1", features = ["net"] } +ya-net = { version = "0.1", features = ["service"] } diff --git a/core/market/decentralized/examples/discovery_usage.rs b/core/market/decentralized/examples/discovery_usage.rs deleted file mode 100644 index 28881280b0..0000000000 --- a/core/market/decentralized/examples/discovery_usage.rs +++ /dev/null @@ -1,97 +0,0 @@ -use anyhow; -use async_trait::async_trait; -use chrono::Utc; -use serde_json::json; -use std::sync::Arc; -use tokio::runtime::Runtime; - -use ya_client::model::market::Offer; -use ya_market_decentralized::protocol::callbacks::HandlerSlot; -use ya_market_decentralized::protocol::{ - Discovery, DiscoveryBuilder, DiscoveryError, DiscoveryFactory, DiscoveryInitError, - OfferReceived, RetrieveOffers, -}; - -// =========================================== // -// TODO: Remove this example after implementing Discovery -// =========================================== // - -/// Example implementation of Discovery. -struct DiscoveryExample { - offer_received: HandlerSlot, - retrieve_offers: HandlerSlot, -} - -impl DiscoveryFactory for DiscoveryExample { - fn new(mut builder: DiscoveryBuilder) -> Result, DiscoveryInitError> { - let offer_received = builder.offer_received_handler()?; - let retrieve_offers = builder.retrieve_offers_handler()?; - - let discovery = DiscoveryExample { - offer_received, - retrieve_offers, - }; - Ok(Arc::new(discovery)) - } -} - -#[async_trait] -impl Discovery for DiscoveryExample { - async fn bind_gsb(&self, _prefix: String) -> Result<(), DiscoveryInitError> { - Ok(()) - } - - async fn broadcast_offer(&self, offer: Offer) -> Result<(), DiscoveryError> { - Ok(self - .offer_received - .call(format!("caller"), OfferReceived { offer }) - .await?) - } - - async fn retrieve_offers(&self) -> Result, DiscoveryError> { - Ok(self - .retrieve_offers - .call( - format!("caller"), - RetrieveOffers { - newer_than: Utc::now(), - }, - ) - .await?) - } -} - -#[actix_rt::main] -async fn main() -> anyhow::Result<()> { - env_logger::init(); - - let builder = DiscoveryBuilder::new() - .bind_offer_received(move |msg: OfferReceived| async move { - log::info!("Offer from [{}] received.", msg.offer.offer_id.unwrap()); - Ok(()) - }) - .bind_retrieve_offers(move |_msg: RetrieveOffers| async move { - log::info!("Offers request received."); - Ok(vec![]) - }); - let discovery = DiscoveryExample::new(builder)?; - let dicovery_clone = discovery.clone(); - - std::thread::spawn(move || { - let offer = mock_offer(format!("Caller-thread")); - - let mut rt = Runtime::new().unwrap(); - rt.block_on(dicovery_clone.broadcast_offer(offer)) - }); - - let offer = mock_offer(format!("Caller-local")); - discovery.broadcast_offer(offer).await?; - - Ok(()) -} - -fn mock_offer(caller: String) -> Offer { - let mut offer = Offer::new(json!({}), format!("")); - offer.offer_id = Some(caller); - offer -} diff --git a/core/market/decentralized/readme.md b/core/market/decentralized/readme.md index 45e4349b48..14a9668fb7 100644 --- a/core/market/decentralized/readme.md +++ b/core/market/decentralized/readme.md @@ -19,3 +19,12 @@ or for market crate only cargo test -p ya-market-decentralized --features ya-market-decentralized/market-test-suite ``` +### Running with logs enabled + +It is very useful to see logs, if we want to debug test. We can do this as +always by adding RUST_LOG environment variable, but in test case we need to +add `env_logger::init();` on the beginning. + +``` +RUST_LOG=debug cargo test -p ya-market-decentralized --features ya-market-decentralized/market-test-suite +``` diff --git a/core/market/decentralized/src/api/provider.rs b/core/market/decentralized/src/api/provider.rs index d310a6e997..a2cfa35cc4 100644 --- a/core/market/decentralized/src/api/provider.rs +++ b/core/market/decentralized/src/api/provider.rs @@ -36,7 +36,7 @@ async fn subscribe( body: Json, id: Identity, ) -> HttpResponse { - match market.subscribe_offer(body.into_inner(), id).await { + match market.subscribe_offer(&body.into_inner(), id).await { Ok(subscription_id) => response::created(subscription_id), // TODO: Translate MarketError to better HTTP response. Err(error) => response::server_error(&format!("{}", error)), @@ -56,7 +56,7 @@ async fn unsubscribe( ) -> HttpResponse { let subscription_id = path.into_inner().subscription_id; match market.unsubscribe_offer(subscription_id.clone(), id).await { - Ok(()) => response::ok(subscription_id), + Ok(()) => response::ok("Ok"), // TODO: Translate MatcherError to better HTTP response. Err(error) => response::server_error(&format!("{}", error)), } diff --git a/core/market/decentralized/src/api/requestor.rs b/core/market/decentralized/src/api/requestor.rs index 9c305242bb..eeea804cae 100644 --- a/core/market/decentralized/src/api/requestor.rs +++ b/core/market/decentralized/src/api/requestor.rs @@ -38,7 +38,7 @@ async fn subscribe( body: Json, id: Identity, ) -> HttpResponse { - match market.subscribe_demand(body.into_inner(), id).await { + match market.subscribe_demand(&body.into_inner(), id).await { Ok(subscription_id) => response::created(subscription_id), // TODO: Translate MarketError to better HTTP response. Err(error) => response::server_error(&format!("{}", error)), @@ -58,7 +58,7 @@ async fn unsubscribe( ) -> HttpResponse { let subscription_id = path.into_inner().subscription_id; match market.unsubscribe_demand(subscription_id.clone(), id).await { - Ok(()) => response::ok(subscription_id), + Ok(()) => response::ok("Ok"), // TODO: Translate MatcherError to better HTTP response. Err(error) => response::server_error(&format!("{}", error)), } diff --git a/core/market/decentralized/src/db/models/demand.rs b/core/market/decentralized/src/db/models/demand.rs index c7c30dc7af..a4a48e17d2 100644 --- a/core/market/decentralized/src/db/models/demand.rs +++ b/core/market/decentralized/src/db/models/demand.rs @@ -83,7 +83,7 @@ impl Demand { demand_id: Some(self.id.to_string()), requestor_id: Some(self.node_id.clone()), constraints: self.constraints.clone(), - properties: serde_json::to_value(&self.properties).map_err(|error| { + properties: serde_json::from_str(&self.properties).map_err(|error| { format!( "Can't serialize Demand properties from database!!! Error: {}", error diff --git a/core/market/decentralized/src/db/models/offer.rs b/core/market/decentralized/src/db/models/offer.rs index 8ae356000f..f7dfbb4a2b 100644 --- a/core/market/decentralized/src/db/models/offer.rs +++ b/core/market/decentralized/src/db/models/offer.rs @@ -86,7 +86,7 @@ impl Offer { offer_id: Some(self.id.to_string()), provider_id: Some(self.node_id.clone()), constraints: self.constraints.clone(), - properties: serde_json::to_value(&self.properties).map_err(|error| { + properties: serde_json::from_str(&self.properties).map_err(|error| { format!( "Can't serialize Offer properties from database!!! Error: {}", error diff --git a/core/market/decentralized/src/market.rs b/core/market/decentralized/src/market.rs index 5c24b7445d..e2ee9b20a0 100644 --- a/core/market/decentralized/src/market.rs +++ b/core/market/decentralized/src/market.rs @@ -11,12 +11,12 @@ use crate::matcher::{Matcher, MatcherError, MatcherInitError}; use crate::migrations; use crate::negotiation::{NegotiationError, NegotiationInitError}; use crate::negotiation::{ProviderNegotiationEngine, RequestorNegotiationEngine}; -use crate::protocol::{DiscoveryBuilder, DiscoveryGSB}; +use crate::protocol::DiscoveryBuilder; use ya_client::error::Error::ModelError; use ya_client::model::market::{Demand, Offer}; use ya_client::model::ErrorMessage; -use ya_core_model::market::BUS_ID; +use ya_core_model::market::{private, BUS_ID}; use ya_persistence::executor::DbExecutor; use ya_service_api_interfaces::{Provider, Service}; use ya_service_api_web::middleware::Identity; @@ -56,7 +56,7 @@ impl MarketService { // TODO: Set Matcher independent parameters here or remove this todo. let builder = DiscoveryBuilder::new(); - let (matcher, listeners) = Matcher::new::(builder, db)?; + let (matcher, listeners) = Matcher::new(builder, db)?; let provider_engine = ProviderNegotiationEngine::new(db.clone())?; let requestor_engine = RequestorNegotiationEngine::new(db.clone(), listeners.proposal_receiver)?; @@ -68,18 +68,24 @@ impl MarketService { }) } - pub async fn bind_gsb(&self, prefix: String) -> Result<(), MarketInitError> { - self.matcher.bind_gsb(prefix.clone()).await?; + pub async fn bind_gsb( + &self, + public_prefix: &str, + private_prefix: &str, + ) -> Result<(), MarketInitError> { + self.matcher.bind_gsb(public_prefix, private_prefix).await?; self.provider_negotiation_engine - .bind_gsb(prefix.clone()) + .bind_gsb(public_prefix, private_prefix) + .await?; + self.requestor_negotiation_engine + .bind_gsb(public_prefix, private_prefix) .await?; - self.requestor_negotiation_engine.bind_gsb(prefix).await?; Ok(()) } pub async fn gsb>(ctx: &Context) -> anyhow::Result<()> { let market = MARKET.get_or_init_market(&ctx.component())?; - Ok(market.bind_gsb(BUS_ID.to_string()).await?) + Ok(market.bind_gsb(BUS_ID, private::BUS_ID).await?) } pub fn rest>(ctx: &Context) -> actix_web::Scope { @@ -97,15 +103,18 @@ impl MarketService { .extend(requestor::register_endpoints) } - pub async fn subscribe_offer(&self, offer: Offer, id: Identity) -> Result { - let offer = ModelOffer::from_new(&offer, &id); + pub async fn subscribe_offer( + &self, + offer: &Offer, + id: Identity, + ) -> Result { + let offer = ModelOffer::from_new(offer, &id); let subscription_id = offer.id.to_string(); self.matcher.subscribe_offer(&offer).await?; self.provider_negotiation_engine .subscribe_offer(&offer) .await?; - Ok(subscription_id) } @@ -124,10 +133,10 @@ impl MarketService { pub async fn subscribe_demand( &self, - demand: Demand, + demand: &Demand, id: Identity, ) -> Result { - let demand = ModelDemand::from_new(&demand, &id); + let demand = ModelDemand::from_new(demand, &id); let subscription_id = demand.id.to_string(); self.matcher.subscribe_demand(&demand).await?; diff --git a/core/market/decentralized/src/matcher/matcher.rs b/core/market/decentralized/src/matcher/matcher.rs index 8e61dfea36..a96f876a4f 100644 --- a/core/market/decentralized/src/matcher/matcher.rs +++ b/core/market/decentralized/src/matcher/matcher.rs @@ -11,10 +11,12 @@ use ya_persistence::executor::Error as DbError; use crate::db::dao::*; use crate::db::models::Demand as ModelDemand; use crate::db::models::Offer as ModelOffer; +use crate::db::models::SubscriptionId; use crate::db::*; use crate::migrations; use crate::protocol::{ - Discovery, DiscoveryBuilder, DiscoveryError, DiscoveryFactory, DiscoveryInitError, + Discovery, DiscoveryBuilder, DiscoveryError, DiscoveryInitError, PropagateOffer, + StopPropagateReason, }; use crate::protocol::{OfferReceived, RetrieveOffers}; @@ -66,30 +68,32 @@ pub struct EventsListeners { } /// Responsible for storing Offers and matching them with demands. +#[derive(Clone)] pub struct Matcher { db: DbExecutor, - discovery: Arc, + discovery: Discovery, proposal_emitter: UnboundedSender, } impl Matcher { - pub fn new( + pub fn new( builder: DiscoveryBuilder, db: &DbExecutor, ) -> Result<(Matcher, EventsListeners), MatcherInitError> { // TODO: Implement Discovery callbacks. + + let database = db.clone(); let builder = builder - .bind_offer_received(move |msg: OfferReceived| async move { - log::info!("Offer from [{}] received.", msg.offer.offer_id.unwrap()); - Ok(()) + .bind_offer_received(move |msg: OfferReceived| { + let database = database.clone(); + on_offer_received(database, msg) }) .bind_retrieve_offers(move |msg: RetrieveOffers| async move { - log::info!("Offers request received."); + log::info!("Offers request received. Unimplemented."); Ok(vec![]) }); - let discovery = Factory::new(builder)?; - + let discovery = builder.build()?; let (emitter, receiver) = unbounded_channel::(); let matcher = Matcher { @@ -104,8 +108,15 @@ impl Matcher { Ok((matcher, listeners)) } - pub async fn bind_gsb(&self, prefix: String) -> Result<(), MatcherInitError> { - Ok(self.discovery.bind_gsb(prefix).await?) + pub async fn bind_gsb( + &self, + public_prefix: &str, + private_prefix: &str, + ) -> Result<(), MatcherInitError> { + Ok(self + .discovery + .bind_gsb(public_prefix, private_prefix) + .await?) } pub async fn add_offer(&self, offer: Offer) { @@ -213,6 +224,40 @@ impl Matcher { } } +async fn on_offer_received(db: DbExecutor, msg: OfferReceived) -> Result { + async move { + // We shouldn't propagate Offer, if we already have it in our database. + // Note that when, we broadcast our Offer, it will reach us too, so it concerns + // not only Offers from other nodes. + if let Some(_) = db + .as_dao::() + .get_offer(msg.offer.offer_id()?) + .await? + { + return Ok(PropagateOffer::False(StopPropagateReason::AlreadyExists)); + } + + let model_offer = ModelOffer::from(&msg.offer)?; + db.as_dao::() + .create_offer(&model_offer) + .await + .map_err(OfferError::SaveOfferFailure)?; + + // TODO: Spawn matching with Demands. + + Result::<_, MatcherError>::Ok(PropagateOffer::True) + } + .await + .or_else(|error| { + let reason = StopPropagateReason::Error(format!("{}", error)); + Ok(PropagateOffer::False(reason)) + }) +} + +// =========================================== // +// Errors From impls +// =========================================== // + impl From for MatcherError { fn from(e: ErrorMessage) -> Self { MatcherError::InternalError(e.to_string()) diff --git a/core/market/decentralized/src/negotiation/provider.rs b/core/market/decentralized/src/negotiation/provider.rs index 054f5abaf3..33a95dcba9 100644 --- a/core/market/decentralized/src/negotiation/provider.rs +++ b/core/market/decentralized/src/negotiation/provider.rs @@ -16,7 +16,11 @@ impl ProviderNegotiationEngine { Ok(Arc::new(ProviderNegotiationEngine { db })) } - pub async fn bind_gsb(&self, prefix: String) -> Result<(), NegotiationInitError> { + pub async fn bind_gsb( + &self, + public_prefix: &str, + private_prefix: &str, + ) -> Result<(), NegotiationInitError> { Ok(()) } diff --git a/core/market/decentralized/src/negotiation/requestor.rs b/core/market/decentralized/src/negotiation/requestor.rs index e9beab2a75..f5f0584eba 100644 --- a/core/market/decentralized/src/negotiation/requestor.rs +++ b/core/market/decentralized/src/negotiation/requestor.rs @@ -26,7 +26,11 @@ impl RequestorNegotiationEngine { Ok(Arc::new(engine)) } - pub async fn bind_gsb(&self, prefix: String) -> Result<(), NegotiationInitError> { + pub async fn bind_gsb( + &self, + public_prefix: &str, + private_prefix: &str, + ) -> Result<(), NegotiationInitError> { Ok(()) } diff --git a/core/market/decentralized/src/protocol/discovery.rs b/core/market/decentralized/src/protocol/discovery.rs index 4c7a2cffa8..c2b30e10e9 100644 --- a/core/market/decentralized/src/protocol/discovery.rs +++ b/core/market/decentralized/src/protocol/discovery.rs @@ -1,12 +1,15 @@ -use async_trait::async_trait; use chrono::prelude::*; use derive_more::Display; use serde::{Deserialize, Serialize}; +use std::marker::Send; use std::sync::Arc; use thiserror::Error; use ya_client::model::market::Offer; -use ya_service_bus::{typed as bus, RpcMessage}; +use ya_client::model::ErrorMessage; +use ya_core_model::net; +use ya_core_model::net::local::{BroadcastMessage, SendBroadcastMessage, Subscribe, ToEndpoint}; +use ya_service_bus::{typed as bus, RpcEndpoint, RpcMessage}; use super::callbacks::{CallbackHandler, HandlerSlot}; @@ -14,9 +17,14 @@ use super::callbacks::{CallbackHandler, HandlerSlot}; // Errors // =========================================== // -#[derive(Error, Display, Debug, Serialize, Deserialize)] +#[derive(Error, Debug, Serialize, Deserialize)] pub enum DiscoveryError { + #[error(transparent)] RemoteError(#[from] DiscoveryRemoteError), + #[error("Failed to broadcast caused by gsb error: {}.", .0)] + GsbError(String), + #[error("Internal error: {}.", .0)] + InternalError(String), } #[derive(Error, Debug, Serialize, Deserialize)] @@ -26,6 +34,10 @@ pub enum DiscoveryRemoteError {} pub enum DiscoveryInitError { #[error("Uninitialized callback '{0}'.")] UninitializedCallback(String), + #[error("Failed to bind to gsb. Error: {}.", .0)] + BindingGsbFailed(String), + #[error("Failed to subscribe to broadcast. Error: {0}.")] + BroadcastSubscribeFailed(String), } // =========================================== // @@ -34,21 +46,170 @@ pub enum DiscoveryInitError { /// Responsible for communication with markets on other nodes /// during discovery phase. -#[async_trait] -pub trait Discovery: Send + Sync { - async fn bind_gsb(&self, prefix: String) -> Result<(), DiscoveryInitError>; +#[derive(Clone)] +pub struct Discovery { + inner: Arc, +} + +pub struct DiscoveryImpl { + offer_received: HandlerSlot, + retrieve_offers: HandlerSlot, +} + +impl Discovery { + fn new(mut builder: DiscoveryBuilder) -> Result { + let offer_received = builder.offer_received_handler()?; + let retrieve_offers = builder.retrieve_offers_handler()?; + + let inner = Arc::new(DiscoveryImpl { + offer_received, + retrieve_offers, + }); + Ok(Discovery { inner }) + } /// Broadcasts offer to other nodes in network. Connected nodes will /// get call to function bound in DiscoveryBuilder::bind_offer_received. - async fn broadcast_offer(&self, offer: Offer) -> Result<(), DiscoveryError>; - async fn retrieve_offers(&self) -> Result, DiscoveryError>; + pub async fn broadcast_offer(&self, offer: Offer) -> Result<(), DiscoveryError> { + log::info!("Broadcasting offer [{}] to the network.", offer.offer_id()?); + + let msg = OfferReceived { offer }; + let bcast_msg = SendBroadcastMessage::new(msg); + + let _ = bus::service(net::local::BUS_ID).send(bcast_msg).await?; + Ok(()) + } + + pub async fn retrieve_offers(&self) -> Result, DiscoveryError> { + unimplemented!() + } + + pub async fn bind_gsb( + &self, + public_prefix: &str, + private_prefix: &str, + ) -> Result<(), DiscoveryInitError> { + let myself = self.clone(); + + log::debug!("Creating broadcast topic {}.", OfferReceived::TOPIC); + + let offer_broadcast_address = format!("{}/{}", private_prefix, OfferReceived::TOPIC); + let subscribe_msg = OfferReceived::into_subscribe_msg(&offer_broadcast_address); + bus::service(net::local::BUS_ID) + .send(subscribe_msg) + .await??; + + log::debug!( + "Binding handler for broadcast topic {}.", + OfferReceived::TOPIC + ); + + let _ = bus::bind_with_caller( + &offer_broadcast_address, + move |caller, msg: SendBroadcastMessage| { + let myself = myself.clone(); + myself.on_offer_received(caller, msg.body().to_owned()) + }, + ); + + Ok(()) + } + + async fn on_offer_received(self, caller: String, msg: OfferReceived) -> Result<(), ()> { + let callback = self.inner.offer_received.clone(); + + let offer = msg.offer.clone(); + let offer_id = offer.offer_id().unwrap_or("{Empty id}").to_string(); + let provider_id = offer.provider_id().unwrap_or("{Empty id}").to_string(); + + log::info!( + "Received broadcasted Offer [{}] from provider [{}]. Sender: [{}].", + offer_id, + provider_id, + &caller, + ); + + match callback.call(caller, msg).await? { + PropagateOffer::True => { + log::info!("Propagating further Offer [{}].", offer_id,); + + // TODO: Should we retry in case of fail? + if let Err(error) = self.broadcast_offer(offer).await { + log::error!( + "Error propagating further Offer [{}] from provider [{}].", + offer_id, + provider_id + ); + } + } + PropagateOffer::False(reason) => { + log::info!( + "Not propagating Offer [{}] for reason: {}.", + offer_id, + reason + ); + } + } + Ok(()) + } +} + +// =========================================== // +// Discovery messages +// =========================================== // + +#[derive(Serialize, Deserialize, Display)] +pub enum StopPropagateReason { + #[display(fmt = "Offer already exists in database")] + AlreadyExists, + #[display(fmt = "Error adding offer: {}", "_0")] + Error(String), +} + +#[derive(Serialize, Deserialize)] +pub enum PropagateOffer { + True, + False(StopPropagateReason), +} + +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OfferReceived { + pub offer: Offer, } -/// Creates Discovery of specific type. -pub trait DiscoveryFactory { - fn new(builder: DiscoveryBuilder) -> Result, DiscoveryInitError>; +impl RpcMessage for OfferReceived { + const ID: &'static str = "OfferReceived"; + type Item = PropagateOffer; + type Error = (); +} + +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RetrieveOffers { + pub newer_than: chrono::DateTime, } +impl RpcMessage for RetrieveOffers { + const ID: &'static str = "RetrieveOffers"; + type Item = Vec; + type Error = DiscoveryRemoteError; +} + +// =========================================== // +// Internal Discovery messages used +// for communication between market instances +// of Discovery protocol +// =========================================== // + +impl BroadcastMessage for OfferReceived { + const TOPIC: &'static str = "market-protocol-mk1-offer"; +} + +// =========================================== // +// Discovery interface builder +// =========================================== // + /// Discovery API initialization. pub struct DiscoveryBuilder { offer_received: Option>, @@ -97,88 +258,35 @@ impl DiscoveryBuilder { Ok(handler) } - pub fn build( - self, - ) -> Result, DiscoveryInitError> { - Ok(Factory::new(self)?) + pub fn build(self) -> Result { + Ok(Discovery::new(self)?) } } // =========================================== // -// Discovery implementation +// Errors From impls // =========================================== // -/// Implementation of Discovery protocol using GSB. -pub struct DiscoveryGSB { - offer_received: HandlerSlot, - retrieve_offers: HandlerSlot, -} - -impl DiscoveryFactory for DiscoveryGSB { - fn new(mut builder: DiscoveryBuilder) -> Result, DiscoveryInitError> { - let offer_received = builder.offer_received_handler()?; - let retrieve_offers = builder.retrieve_offers_handler()?; - - Ok(Arc::new(DiscoveryGSB { - offer_received, - retrieve_offers, - })) +impl From for DiscoveryInitError { + fn from(err: net::local::SubscribeError) -> Self { + DiscoveryInitError::BroadcastSubscribeFailed(format!("{}", err)) } } -#[async_trait] -impl Discovery for DiscoveryGSB { - async fn broadcast_offer(&self, offer: Offer) -> Result<(), DiscoveryError> { - // TODO: Implement - Ok(()) - } - - async fn retrieve_offers(&self) -> Result, DiscoveryError> { - unimplemented!() +impl From for DiscoveryInitError { + fn from(err: ya_service_bus::error::Error) -> Self { + DiscoveryInitError::BindingGsbFailed(format!("{}", err)) } - - async fn bind_gsb(&self, prefix: String) -> Result<(), DiscoveryInitError> { - let retrive_handler = self.retrieve_offers.clone(); - let offer_received_handler = self.offer_received.clone(); - - let _ = bus::bind_with_caller(&prefix, move |caller, msg: RetrieveOffers| { - let handler = retrive_handler.clone(); - async move { handler.call(caller, msg).await } - }); - - let _ = bus::bind_with_caller(&prefix, move |caller, msg: OfferReceived| { - let handler = offer_received_handler.clone(); - async move { handler.call(caller, msg).await } - }); - - Ok(()) - } -} - -// =========================================== // -// Discovery messages -// =========================================== // - -#[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct OfferReceived { - pub offer: Offer, } -impl RpcMessage for OfferReceived { - const ID: &'static str = "OfferReceived"; - type Item = (); - type Error = DiscoveryRemoteError; -} - -#[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct RetrieveOffers { - pub newer_than: chrono::DateTime, +impl From for DiscoveryError { + fn from(err: ya_service_bus::error::Error) -> Self { + DiscoveryError::GsbError(format!("{}", err)) + } } -impl RpcMessage for RetrieveOffers { - const ID: &'static str = "RetrieveOffers"; - type Item = Vec; - type Error = DiscoveryRemoteError; +impl From for DiscoveryError { + fn from(e: ErrorMessage) -> Self { + DiscoveryError::InternalError(e.to_string()) + } } diff --git a/core/market/decentralized/src/protocol/mod.rs b/core/market/decentralized/src/protocol/mod.rs index 7c8a1caa9b..1af896e96b 100644 --- a/core/market/decentralized/src/protocol/mod.rs +++ b/core/market/decentralized/src/protocol/mod.rs @@ -2,7 +2,7 @@ pub mod callbacks; mod discovery; -pub use self::discovery::{Discovery, DiscoveryBuilder, DiscoveryFactory, DiscoveryGSB}; +pub use self::discovery::{Discovery, DiscoveryBuilder, PropagateOffer, StopPropagateReason}; pub use self::discovery::{DiscoveryError, DiscoveryInitError, DiscoveryRemoteError}; pub use self::discovery::{OfferReceived, RetrieveOffers}; diff --git a/core/market/decentralized/tests/test_market_initialization.rs b/core/market/decentralized/tests/test_market_initialization.rs index 899ed3fecc..f544b5637c 100644 --- a/core/market/decentralized/tests/test_market_initialization.rs +++ b/core/market/decentralized/tests/test_market_initialization.rs @@ -19,6 +19,7 @@ mod tests { #[actix_rt::test] async fn instantiate() -> Result<(), anyhow::Error> { let network = MarketsNetwork::new("instantiate") + .await .add_market_instance("Node-1") .await? .add_market_instance("Node-2") diff --git a/core/market/decentralized/tests/test_offer_broadcast.rs b/core/market/decentralized/tests/test_offer_broadcast.rs new file mode 100644 index 0000000000..87d920b245 --- /dev/null +++ b/core/market/decentralized/tests/test_offer_broadcast.rs @@ -0,0 +1,57 @@ +mod utils; + +#[cfg(test)] +mod tests { + use crate::utils::MarketsNetwork; + + use ya_client::model::market::Offer; + use ya_market_decentralized::MarketService; + + use serde_json::json; + use std::sync::Arc; + use std::time::Duration; + + /// Test adds offer. It should be broadcasted to other nodes in the network. + #[cfg_attr(not(feature = "market-test-suite"), ignore)] + #[actix_rt::test] + async fn test_broadcast_offer() -> Result<(), anyhow::Error> { + let network = MarketsNetwork::new("test_broadcast_offer") + .await + .add_market_instance("Node-1") + .await? + .add_market_instance("Node-2") + .await? + .add_market_instance("Node-3") + .await?; + + // Add Offer on Node-1. It should be propagated to remaining nodes. + let market1: Arc = network.get_market("Node-1"); + let identity1 = network.get_default_id("Node-1"); + + let mut offer = Offer::new(json!({}), "()".to_string()); + let subscription_id = market1.subscribe_offer(&offer, identity1.clone()).await?; + + // Fill expected values for further comparison. + offer.provider_id = Some(identity1.identity.to_string()); + offer.offer_id = Some(subscription_id.clone()); + + // Expect, that Offer will appear on other nodes. + let market2: Arc = network.get_market("Node-2"); + let market3: Arc = network.get_market("Node-3"); + + // Wait for Offer propagation. + // TODO: How to wait without assuming any number of seconds? + tokio::time::delay_for(Duration::from_secs(1)).await; + + assert_eq!( + offer, + market2.matcher.get_offer(&subscription_id).await?.unwrap() + ); + assert_eq!( + offer, + market3.matcher.get_offer(&subscription_id).await?.unwrap() + ); + + Ok(()) + } +} diff --git a/core/market/decentralized/tests/test_subscription.rs b/core/market/decentralized/tests/test_subscription.rs index bfae39c9ce..47b6b8ca0b 100644 --- a/core/market/decentralized/tests/test_subscription.rs +++ b/core/market/decentralized/tests/test_subscription.rs @@ -2,12 +2,11 @@ mod utils; #[cfg(test)] mod tests { + use crate::utils::mock_offer::{example_demand, example_offer}; use crate::utils::MarketsNetwork; - use ya_client::model::market::{Demand, Offer}; use ya_market_decentralized::MarketService; - use serde_json::json; use std::sync::Arc; /// Test subscribes offers, checks if offer is available @@ -16,37 +15,36 @@ mod tests { #[actix_rt::test] async fn test_subscribe_offer() -> Result<(), anyhow::Error> { let network = MarketsNetwork::new("test_subscribe_offer") + .await .add_market_instance("Node-1") .await?; let market1: Arc = network.get_market("Node-1"); let identity1 = network.get_default_id("Node-1"); - let offer = Offer::new(json!({}), "()".to_string()); - let subscription_id = market1.subscribe_offer(offer, identity1.clone()).await?; + let mut offer = example_offer(); + let subscription_id = market1.subscribe_offer(&offer, identity1.clone()).await?; + + // Fill expected values for further comparison. + offer.provider_id = Some(identity1.identity.to_string()); + offer.offer_id = Some(subscription_id.clone()); // Offer should be available in database after subscribe. - let offer = market1.matcher.get_offer(&subscription_id).await?.unwrap(); - assert_eq!(offer.offer_id, Some(subscription_id.clone())); + let got_offer = market1.matcher.get_offer(&subscription_id).await?.unwrap(); + assert_eq!(got_offer, offer); // Unsubscribe should fail on not existing subscription id. - assert_eq!( - market1 - .unsubscribe_offer("".to_string(), identity1.clone()) - .await - .is_err(), - true - ); + assert!(market1 + .unsubscribe_offer("".to_string(), identity1.clone()) + .await + .is_err()); market1 .unsubscribe_offer(subscription_id.to_string(), identity1.clone()) .await?; // Offer should be removed from database after unsubscribed. - assert_eq!( - market1.matcher.get_offer(&subscription_id).await?.is_none(), - true - ); + assert!(market1.matcher.get_offer(&subscription_id).await?.is_none()); Ok(()) } @@ -57,41 +55,40 @@ mod tests { #[actix_rt::test] async fn test_subscribe_demand() -> Result<(), anyhow::Error> { let network = MarketsNetwork::new("test_subscribe_demand") + .await .add_market_instance("Node-1") .await?; let market1: Arc = network.get_market("Node-1"); let identity1 = network.get_default_id("Node-1"); - let demand = Demand::new(json!({}), "()".to_string()); - let subscription_id = market1.subscribe_demand(demand, identity1.clone()).await?; + let mut demand = example_demand(); + let subscription_id = market1.subscribe_demand(&demand, identity1.clone()).await?; + + // Fill expected values for further comparison. + demand.requestor_id = Some(identity1.identity.to_string()); + demand.demand_id = Some(subscription_id.clone()); // Offer should be available in database after subscribe. - let demand = market1.matcher.get_demand(&subscription_id).await?.unwrap(); - assert_eq!(demand.demand_id, Some(subscription_id.clone())); + let got_demand = market1.matcher.get_demand(&subscription_id).await?.unwrap(); + assert_eq!(got_demand, demand); // Unsubscribe should fail on not existing subscription id. - assert_eq!( - market1 - .unsubscribe_demand("".to_string(), identity1.clone()) - .await - .is_err(), - true - ); + assert!(market1 + .unsubscribe_demand("".to_string(), identity1.clone()) + .await + .is_err()); market1 .unsubscribe_demand(subscription_id.to_string(), identity1.clone()) .await?; // Offer should be removed from database after unsubscribed. - assert_eq!( - market1 - .matcher - .get_demand(&subscription_id) - .await? - .is_none(), - true - ); + assert!(market1 + .matcher + .get_demand(&subscription_id) + .await? + .is_none()); Ok(()) } diff --git a/core/market/decentralized/tests/utils/mock_net.rs b/core/market/decentralized/tests/utils/mock_net.rs new file mode 100644 index 0000000000..8447a41a2f --- /dev/null +++ b/core/market/decentralized/tests/utils/mock_net.rs @@ -0,0 +1,53 @@ +use actix_rt::Arbiter; +use std::rc::Rc; + +use ya_core_model::net::{local as local_net, local::SendBroadcastMessage}; +use ya_net::bcast; +use ya_service_bus::{typed as bus, untyped as local_bus, RpcMessage}; + +pub struct MockNet; + +impl MockNet { + pub async fn gsb(bcast: bcast::BCastService) -> anyhow::Result<()> { + let bcast_service_id = as RpcMessage>::ID; + + { + let bcast = bcast.clone(); + let _ = bus::bind(local_net::BUS_ID, move |subscribe: local_net::Subscribe| { + let bcast = bcast.clone(); + async move { + let (_, id) = bcast.add(subscribe); + Ok(id) + } + }); + } + + { + let bcast = bcast.clone(); + let addr = format!("{}/{}", local_net::BUS_ID, bcast_service_id); + let resp: Rc<[u8]> = serde_json::to_vec(&Ok::<(), ()>(())).unwrap().into(); + let _ = local_bus::subscribe(&addr, move |caller: &str, _addr: &str, msg: &[u8]| { + let resp = resp.clone(); + let bcast = bcast.clone(); + + let msg_json: SendBroadcastMessage = + serde_json::from_slice(msg).unwrap(); + let caller = caller.to_string(); + + Arbiter::spawn(async move { + let msg = serde_json::to_vec(&msg_json).unwrap(); + let topic = msg_json.topic().to_owned(); + let endpoints = bcast.resolve(&topic); + + for endpoint in endpoints { + let addr = format!("{}/{}", endpoint, bcast_service_id); + let _ = local_bus::send(addr.as_ref(), &caller, msg.as_ref()).await; + } + }); + async move { Ok(Vec::from(resp.as_ref())) } + }); + } + + Ok(()) + } +} diff --git a/core/market/decentralized/tests/utils.rs b/core/market/decentralized/tests/utils/mock_node.rs similarity index 87% rename from core/market/decentralized/tests/utils.rs rename to core/market/decentralized/tests/utils/mock_node.rs index 516e95a84a..4f69a5f931 100644 --- a/core/market/decentralized/tests/utils.rs +++ b/core/market/decentralized/tests/utils/mock_node.rs @@ -6,11 +6,13 @@ use std::path::PathBuf; use std::sync::Arc; use ya_client::model::NodeId; -use ya_core_model::net; use ya_market_decentralized::MarketService; +use ya_net::bcast; use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::Identity; +use super::mock_net::MockNet; + /// Instantiates market test nodes inside one process. pub struct MarketsNetwork { markets: Vec, @@ -28,9 +30,12 @@ pub struct MarketNode { } impl MarketsNetwork { - pub fn new>(dir_name: Str) -> Self { + pub async fn new>(dir_name: Str) -> Self { let test_dir = prepare_test_dir(dir_name).unwrap(); + let bcast = bcast::BCastService::default(); + MockNet::gsb(bcast).await.unwrap(); + MarketsNetwork { markets: vec![], test_dir, @@ -44,8 +49,11 @@ impl MarketsNetwork { let db = self.init_database(name.as_ref())?; let market = Arc::new(MarketService::new(&db)?); - let gsb_prefix = format!("{}/{}/market", net::BUS_ID, name.as_ref()); - market.bind_gsb(gsb_prefix).await?; + let public_gsb_prefix = format!("/{}", name.as_ref()); + let local_gsb_prefix = format!("/{}", name.as_ref()); + market + .bind_gsb(&public_gsb_prefix, &local_gsb_prefix) + .await?; let market_node = MarketNode { name: name.as_ref().to_string(), diff --git a/core/market/decentralized/tests/utils/mock_offer.rs b/core/market/decentralized/tests/utils/mock_offer.rs new file mode 100644 index 0000000000..d3f092f135 --- /dev/null +++ b/core/market/decentralized/tests/utils/mock_offer.rs @@ -0,0 +1,24 @@ +use std::string::ToString; +use ya_client::model::market::{Demand, Offer}; + +#[allow(unused)] +pub fn example_offer() -> Offer { + let properties = serde_json::json!({ + "golem": { + "node.id.name": "itstest".to_string(), + "srv.comp.wasm.task_package": "test-package".to_string(), + }, + }); + Offer::new(properties, "(golem.node.debug.subnet=blaa)".to_string()) +} + +#[allow(unused)] +pub fn example_demand() -> Demand { + let properties = serde_json::json!({ + "golem": { + "node.id.name": "itstest".to_string(), + "srv.comp.wasm.task_package": "test-package".to_string(), + }, + }); + Demand::new(properties, "(golem.node.debug.subnet=blaa)".to_string()) +} diff --git a/core/market/decentralized/tests/utils/mod.rs b/core/market/decentralized/tests/utils/mod.rs new file mode 100644 index 0000000000..c12ff3435c --- /dev/null +++ b/core/market/decentralized/tests/utils/mod.rs @@ -0,0 +1,6 @@ +mod mock_net; +pub mod mock_node; +pub mod mock_offer; + +pub use mock_node::MarketsNetwork; +pub use mock_offer::{example_demand, example_offer}; diff --git a/core/model/src/market.rs b/core/model/src/market.rs index 832e9d361f..29dd0a4dd4 100644 --- a/core/model/src/market.rs +++ b/core/model/src/market.rs @@ -6,6 +6,11 @@ use ya_service_bus::RpcMessage; /// Public Market bus address. pub const BUS_ID: &str = "/public/market"; +/// Internal Market bus address. +pub mod private { + pub const BUS_ID: &str = "/private/market"; +} + /// Returns the Agreement. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/core/model/src/net.rs b/core/model/src/net.rs index 90c65bef97..b960779a13 100644 --- a/core/model/src/net.rs +++ b/core/model/src/net.rs @@ -61,12 +61,6 @@ pub mod local { } impl Subscribe { - pub fn with_endpoint(endpoint: impl Into) -> Self { - let topic = M::TOPIC.to_owned(); - let endpoint = endpoint.into(); - Self { topic, endpoint } - } - pub fn topic(&self) -> &str { self.topic.as_ref() } @@ -76,6 +70,18 @@ pub mod local { } } + pub trait ToEndpoint { + fn into_subscribe_msg(endpoint: impl Into) -> Subscribe; + } + + impl ToEndpoint for M { + fn into_subscribe_msg(endpoint: impl Into) -> Subscribe { + let topic = M::TOPIC.to_owned(); + let endpoint = endpoint.into(); + Subscribe { topic, endpoint } + } + } + impl RpcMessage for Subscribe { const ID: &'static str = "Subscribe"; type Item = u64; diff --git a/core/net/src/bcast.rs b/core/net/src/bcast.rs index d84810acd7..6e410cd773 100644 --- a/core/net/src/bcast.rs +++ b/core/net/src/bcast.rs @@ -25,7 +25,7 @@ impl BCastService { .entry(subscribe.topic().to_owned()) .or_insert_with(Default::default); - let is_new = !receivers.is_empty(); + let is_new = receivers.is_empty(); receivers.push((id, subscribe.endpoint().into())); me.last_id += 1; (is_new, id) diff --git a/core/net/src/lib.rs b/core/net/src/lib.rs index 9a5fe5b371..89245d15be 100644 --- a/core/net/src/lib.rs +++ b/core/net/src/lib.rs @@ -1,5 +1,5 @@ #[cfg(any(feature = "service", test))] -mod bcast; +pub mod bcast; #[cfg(any(feature = "service", test))] mod service; diff --git a/core/net/src/service.rs b/core/net/src/service.rs index b48ace6640..9c34716729 100644 --- a/core/net/src/service.rs +++ b/core/net/src/service.rs @@ -71,6 +71,7 @@ pub async fn bind_remote(default_node_id: NodeId, nodes: Vec) -> std::io let endpoints = bcast.resolve(&topic); let msg: Rc<[u8]> = msg.into(); Arbiter::spawn(async move { + log::debug!("Received broadcast to topic {} from [{}].", &topic, &caller); for endpoint in endpoints { let addr = format!("{}/{}", endpoint, bcast_service_id); let _ = local_bus::send(addr.as_ref(), &caller, msg.as_ref()).await; @@ -144,10 +145,12 @@ pub async fn bind_remote(default_node_id: NodeId, nodes: Vec) -> std::io let (is_new, id) = bcast.add(subscribe); let central_bus = central_bus.clone(); async move { + log::info!("Subscribe topic {} on central bus.", topic); if is_new { if let Err(e) = central_bus.subscribe(topic.clone()).await { log::error!("fail to subscribe to: {}, {}", topic, e); } + log::info!("Created new topic: {}", topic); } Ok(id) } @@ -161,6 +164,13 @@ pub async fn bind_remote(default_node_id: NodeId, nodes: Vec) -> std::io let _ = local_bus::subscribe(&addr, move |caller: &str, _addr: &str, msg: &[u8]| { // TODO: remove unwrap here. let ent: SendBroadcastMessage = serde_json::from_slice(msg).unwrap(); + + log::debug!( + "Broadcast msg related to topic {} from [{}].", + ent.topic(), + &caller + ); + let fut = central_bus.broadcast(caller.to_owned(), ent.topic().to_owned(), msg.into()); let resp = resp.clone(); async move { diff --git a/core/serv/Cargo.toml b/core/serv/Cargo.toml new file mode 100644 index 0000000000..7a6cca61b5 --- /dev/null +++ b/core/serv/Cargo.toml @@ -0,0 +1,57 @@ +[package] +name = "yagna" +version = "0.3.0" +description = "Yagna Service and CLI" +readme = "README.md" +authors = ["Golem Factory "] +homepage = "https://github.com/golemfactory/yagna/core/serv" +repository = "https://github.com/golemfactory/yagna" +license = "GPL-3.0" +edition = "2018" + +[features] +#default=['ya-market-forwarding'] +static-openssl=["openssl/vendored"] + +[dependencies] +ya-activity = "0.2" +ya-identity = "0.2" +ya-market-decentralized = { version = "0.1" } +#ya-market-forwarding = { version = "0.1", optional = true } +ya-net = { version = "0.1", features = ["service"] } +ya-payment = { version = "0.1", features = ["gnt-driver"] } +ya-persistence = "0.2" +ya-sb-proto = "0.1" +ya-sb-router = "0.1" +ya-service-api = "0.1" +ya-service-api-derive = "0.1" +ya-service-api-interfaces = "0.1" +ya-service-api-web = "0.1" +ya-service-bus = "0.2" +openssl="0.10" + +actix-rt = "1.0" +actix-service = "1.0" +actix-web = "2.0" +anyhow = "1.0" +directories = "2.0.2" +dotenv = "0.15.0" +env_logger = "0.7" +futures = "0.3" +lazy_static = "1.4" +log = "0.4" +structopt = "0.3" +url = "2.1.1" + +[package.metadata.deb] +assets = [ + ["target/release/yagna", "usr/bin/", "755"], + ["target/release/ya-requestor", "usr/bin/", "755"], + ["target/release/ya-provider", "usr/bin/", "755"], + ["target/release/exe-unit", "usr/lib/yagna/plugins/", "755"], + ["../../README.md", "usr/share/doc/yagna/", "644"], + ["README.md", "usr/share/doc/yagna/service.md", "644"], + ["../../agent/provider/readme.md", "usr/share/doc/yagna/ya-provider.md", "644"], +] +features=["static-openssl"] + diff --git a/diesel.toml b/diesel.toml new file mode 100644 index 0000000000..92267c829f --- /dev/null +++ b/diesel.toml @@ -0,0 +1,5 @@ +# For documentation on how to configure this file, +# see diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/schema.rs"