Skip to content

Commit

Permalink
Market decentralized - broadcast offers (#290)
Browse files Browse the repository at this point in the history
* Add database most important tables

* Insert offer to database

* [Test] Check market state after subscribing offer

* Unsubscribe offer

* [Test] Add check on not existing subsctiption id

* Subscribe demand and unsubscribe [+Test]

* Add http endpoints

* Decentralized market running instruction

* SubscriptionId consists of random part and offer/demand hash

* rebased on service-ctx-sep-dbs

* Add timestamps to Offer/Demand

* Cargo fmt

* Try to bind to broadcast

* Binding to broadcast works in test

* Broadcasting offers. Receiving Offers fast implementation just to check - still doesn't work

* Get rid of dyn Discovery. Trait is not needed anymore, since we can mock on net level

* Stop Offer propagation if it already existed in db or error occured

* [Docs] Describe how to use logger with test suite

* Fix: BCastService returns correct value if topic is new

* Broadcasting Offers works with yagna service

* Fix unsubscribe Offer/Demand http response

* No need for additional Discovery functions with Arc<DiscoveryImpl>

* Review fixes: bind_gsb takes &str, Subscribe endpoint returns Ok string

* Offer/Demand borrowed in subscribe

* Fix: Offers/Demands serde_json to string conversion; Extend tests to compare full Offer/Demand

* Cosmetic change: use or_else instead of cimplicated matching

Co-authored-by: Piotr Chromiec <tworec@golem.network>
  • Loading branch information
nieznanysprawiciel and tworec authored Jun 9, 2020
1 parent 900863f commit ad92480
Show file tree
Hide file tree
Showing 28 changed files with 578 additions and 268 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions core/market/decentralized/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ market-test-suite = []

[dependencies]
ya-client = "0.3"
ya-core-model = { version = "0.1", features = ["market"] }
ya-core-model = { version = "0.1", features = ["market", "net"] }
ya-persistence = "0.2"
ya-service-api = "0.1"
ya-service-bus = "0.2"
Expand All @@ -18,7 +18,6 @@ ya-service-api-web = "0.1"

actix-web = "2.0"
anyhow = "1.0"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
derive_more = "0.99.5"
diesel = { version = "1.4", features = ["chrono", "sqlite", "r2d2"] }
Expand All @@ -41,6 +40,6 @@ actix-rt = "1.0.0"
env_logger = "0.7"
rand = "0.7.2"
serde_json = "1.0"
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "0.2", features = ["macros", "rt-core"] }

ya-core-model = { version = "0.1", features = ["net"] }
ya-net = { version = "0.1", features = ["service"] }
97 changes: 0 additions & 97 deletions core/market/decentralized/examples/discovery_usage.rs

This file was deleted.

9 changes: 9 additions & 0 deletions core/market/decentralized/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ or for market crate only
cargo test -p ya-market-decentralized --features ya-market-decentralized/market-test-suite
```

### Running with logs enabled

It is very useful to see logs, if we want to debug test. We can do this as
always by adding RUST_LOG environment variable, but in test case we need to
add `env_logger::init();` on the beginning.

```
RUST_LOG=debug cargo test -p ya-market-decentralized --features ya-market-decentralized/market-test-suite
```
4 changes: 2 additions & 2 deletions core/market/decentralized/src/api/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn subscribe(
body: Json<Offer>,
id: Identity,
) -> HttpResponse {
match market.subscribe_offer(body.into_inner(), id).await {
match market.subscribe_offer(&body.into_inner(), id).await {
Ok(subscription_id) => response::created(subscription_id),
// TODO: Translate MarketError to better HTTP response.
Err(error) => response::server_error(&format!("{}", error)),
Expand All @@ -56,7 +56,7 @@ async fn unsubscribe(
) -> HttpResponse {
let subscription_id = path.into_inner().subscription_id;
match market.unsubscribe_offer(subscription_id.clone(), id).await {
Ok(()) => response::ok(subscription_id),
Ok(()) => response::ok("Ok"),
// TODO: Translate MatcherError to better HTTP response.
Err(error) => response::server_error(&format!("{}", error)),
}
Expand Down
4 changes: 2 additions & 2 deletions core/market/decentralized/src/api/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn subscribe(
body: Json<Demand>,
id: Identity,
) -> HttpResponse {
match market.subscribe_demand(body.into_inner(), id).await {
match market.subscribe_demand(&body.into_inner(), id).await {
Ok(subscription_id) => response::created(subscription_id),
// TODO: Translate MarketError to better HTTP response.
Err(error) => response::server_error(&format!("{}", error)),
Expand All @@ -58,7 +58,7 @@ async fn unsubscribe(
) -> HttpResponse {
let subscription_id = path.into_inner().subscription_id;
match market.unsubscribe_demand(subscription_id.clone(), id).await {
Ok(()) => response::ok(subscription_id),
Ok(()) => response::ok("Ok"),
// TODO: Translate MatcherError to better HTTP response.
Err(error) => response::server_error(&format!("{}", error)),
}
Expand Down
2 changes: 1 addition & 1 deletion core/market/decentralized/src/db/models/demand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Demand {
demand_id: Some(self.id.to_string()),
requestor_id: Some(self.node_id.clone()),
constraints: self.constraints.clone(),
properties: serde_json::to_value(&self.properties).map_err(|error| {
properties: serde_json::from_str(&self.properties).map_err(|error| {
format!(
"Can't serialize Demand properties from database!!! Error: {}",
error
Expand Down
2 changes: 1 addition & 1 deletion core/market/decentralized/src/db/models/offer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Offer {
offer_id: Some(self.id.to_string()),
provider_id: Some(self.node_id.clone()),
constraints: self.constraints.clone(),
properties: serde_json::to_value(&self.properties).map_err(|error| {
properties: serde_json::from_str(&self.properties).map_err(|error| {
format!(
"Can't serialize Offer properties from database!!! Error: {}",
error
Expand Down
35 changes: 22 additions & 13 deletions core/market/decentralized/src/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use crate::matcher::{Matcher, MatcherError, MatcherInitError};
use crate::migrations;
use crate::negotiation::{NegotiationError, NegotiationInitError};
use crate::negotiation::{ProviderNegotiationEngine, RequestorNegotiationEngine};
use crate::protocol::{DiscoveryBuilder, DiscoveryGSB};
use crate::protocol::DiscoveryBuilder;

use ya_client::error::Error::ModelError;
use ya_client::model::market::{Demand, Offer};
use ya_client::model::ErrorMessage;
use ya_core_model::market::BUS_ID;
use ya_core_model::market::{private, BUS_ID};
use ya_persistence::executor::DbExecutor;
use ya_service_api_interfaces::{Provider, Service};
use ya_service_api_web::middleware::Identity;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl MarketService {
// TODO: Set Matcher independent parameters here or remove this todo.
let builder = DiscoveryBuilder::new();

let (matcher, listeners) = Matcher::new::<DiscoveryGSB>(builder, db)?;
let (matcher, listeners) = Matcher::new(builder, db)?;
let provider_engine = ProviderNegotiationEngine::new(db.clone())?;
let requestor_engine =
RequestorNegotiationEngine::new(db.clone(), listeners.proposal_receiver)?;
Expand All @@ -68,18 +68,24 @@ impl MarketService {
})
}

pub async fn bind_gsb(&self, prefix: String) -> Result<(), MarketInitError> {
self.matcher.bind_gsb(prefix.clone()).await?;
pub async fn bind_gsb(
&self,
public_prefix: &str,
private_prefix: &str,
) -> Result<(), MarketInitError> {
self.matcher.bind_gsb(public_prefix, private_prefix).await?;
self.provider_negotiation_engine
.bind_gsb(prefix.clone())
.bind_gsb(public_prefix, private_prefix)
.await?;
self.requestor_negotiation_engine
.bind_gsb(public_prefix, private_prefix)
.await?;
self.requestor_negotiation_engine.bind_gsb(prefix).await?;
Ok(())
}

pub async fn gsb<Context: Provider<Self, DbExecutor>>(ctx: &Context) -> anyhow::Result<()> {
let market = MARKET.get_or_init_market(&ctx.component())?;
Ok(market.bind_gsb(BUS_ID.to_string()).await?)
Ok(market.bind_gsb(BUS_ID, private::BUS_ID).await?)
}

pub fn rest<Context: Provider<Self, DbExecutor>>(ctx: &Context) -> actix_web::Scope {
Expand All @@ -97,15 +103,18 @@ impl MarketService {
.extend(requestor::register_endpoints)
}

pub async fn subscribe_offer(&self, offer: Offer, id: Identity) -> Result<String, MarketError> {
let offer = ModelOffer::from_new(&offer, &id);
pub async fn subscribe_offer(
&self,
offer: &Offer,
id: Identity,
) -> Result<String, MarketError> {
let offer = ModelOffer::from_new(offer, &id);
let subscription_id = offer.id.to_string();

self.matcher.subscribe_offer(&offer).await?;
self.provider_negotiation_engine
.subscribe_offer(&offer)
.await?;

Ok(subscription_id)
}

Expand All @@ -124,10 +133,10 @@ impl MarketService {

pub async fn subscribe_demand(
&self,
demand: Demand,
demand: &Demand,
id: Identity,
) -> Result<String, MarketError> {
let demand = ModelDemand::from_new(&demand, &id);
let demand = ModelDemand::from_new(demand, &id);
let subscription_id = demand.id.to_string();

self.matcher.subscribe_demand(&demand).await?;
Expand Down
Loading

0 comments on commit ad92480

Please sign in to comment.