diff --git a/Cargo.lock b/Cargo.lock index 825a5f4..f9c33d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,13 +1890,10 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" name = "metrics" version = "0.1.0" dependencies = [ - "actix-web", "env", "errors", "opentelemetry", "opentelemetry-otlp", - "opentelemetry-prometheus", - "prometheus", "tokio", "tonic", "tracing", @@ -2109,17 +2106,6 @@ dependencies = [ "tonic", ] -[[package]] -name = "opentelemetry-prometheus" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06c3d833835a53cf91331d2cfb27e9121f5a95261f31f08a1f79ab31688b8da8" -dependencies = [ - "opentelemetry", - "prometheus", - "protobuf", -] - [[package]] name = "opentelemetry-proto" version = "0.1.0" @@ -2440,21 +2426,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prometheus" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" -dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", - "parking_lot", - "protobuf", - "thiserror", -] - [[package]] name = "prost" version = "0.11.3" @@ -2510,12 +2481,6 @@ dependencies = [ "prost", ] -[[package]] -name = "protobuf" -version = "2.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" - [[package]] name = "quote" version = "1.0.23" @@ -2818,18 +2783,18 @@ checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" [[package]] name = "serde" -version = "1.0.151" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0" +checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.151" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", diff --git a/amqp/Cargo.toml b/amqp/Cargo.toml index 4f8d68c..9bdf4c8 100644 --- a/amqp/Cargo.toml +++ b/amqp/Cargo.toml @@ -14,10 +14,10 @@ traces = { path = "../traces" } lapin = { version = "2.1.1" } opentelemetry = { version = "0.18.0" } uuid = { version = "1.2.2", features = ["v4"] } -async-trait = { version = "0.1.59" } +async-trait = { version = "0.1.60" } tracing = { version = "0.1.37" } -serde_json = { version = "1.0.89" } -serde = { version = "1.0.150", features = ["derive"] } +serde_json = { version = "1.0.91" } +serde = { version = "1.0.152", features = ["derive"] } tokio = { version = "1.23.0", features = ["default"] } futures-util = { version = "0.3.25"} diff --git a/amqp/src/consumer.rs b/amqp/src/consumer.rs index 557c864..bffb13e 100644 --- a/amqp/src/consumer.rs +++ b/amqp/src/consumer.rs @@ -4,99 +4,18 @@ use crate::{ types::{new_span, Metadata}, }; use errors::amqp::AmqpError; -use futures_util::{future::join_all, StreamExt}; use lapin::{ message::Delivery, options::{BasicAckOptions, BasicNackOptions, BasicPublishOptions}, }; use opentelemetry::{ - global::{self, BoxedTracer}, + global::BoxedTracer, trace::{FutureExt, Span, Status}, }; -use std::{borrow::Cow, sync::Arc, vec}; -use tokio::task::JoinError; +use std::{borrow::Cow, sync::Arc}; use tracing::{debug, error, warn}; -pub struct Dispatches { - amqp: Arc, - pub queues: Vec, - pub msgs_types: Vec, - pub handlers: Vec>, -} - -impl Dispatches { - pub fn new(amqp: Arc) -> Dispatches { - Dispatches { - amqp, - queues: vec![], - msgs_types: vec![], - handlers: vec![], - } - } - - pub fn declare( - &mut self, - queue: QueueDefinition, - msg_type: String, - handler: Arc, - ) -> Result<(), AmqpError> { - if msg_type.is_empty() { - return Err(AmqpError::ConsumerDeclarationError {}); - } - - self.queues.push(queue); - self.msgs_types.push(msg_type); - self.handlers.push(handler); - - Ok(()) - } - - pub async fn consume_blocking(&self) -> Vec> { - let mut spawns = vec![]; - - for i in 0..self.queues.len() { - spawns.push(tokio::spawn({ - let m_amqp = self.amqp.clone(); - let msgs_allowed = self.msgs_types.clone(); - - let queue = self.queues[i].clone(); - let msg_type = self.msgs_types[i].clone(); - let handler = self.handlers[i].clone(); - - let mut consumer = m_amqp - .consumer(&queue.name, &msg_type) - .await - .expect("unexpected error while creating the consumer"); - - async move { - while let Some(result) = consumer.next().await { - match result { - Ok(delivery) => match consume( - &global::tracer("amqp consumer"), - &queue, - &msg_type, - &msgs_allowed, - &delivery, - m_amqp.clone(), - handler.clone(), - ) - .await - { - Err(e) => error!(error = e.to_string(), "errors consume msg"), - _ => {} - }, - Err(e) => error!(error = e.to_string(), "error receiving delivery msg"), - }; - } - } - })); - } - - join_all(spawns).await - } -} - -async fn consume<'c>( +pub(crate) async fn consume<'c>( tracer: &'c BoxedTracer, queue: &'c QueueDefinition, msg_type: &'c str, @@ -117,6 +36,7 @@ async fn consume<'c>( queue.name, ); + //check if the received msg contains type and if the type match with the expectation if metadata.msg_type.is_empty() || metadata.msg_type != msg_type.to_string() { let msg = "unexpected or empty type - removing message"; span.record_error(&AmqpError::ConsumerError(msg.to_string())); @@ -127,15 +47,16 @@ async fn consume<'c>( msg ); match delivery.ack(BasicAckOptions { multiple: false }).await { - Ok(_) => {} Err(e) => { error!("error whiling nack msg"); span.record_error(&e); } - }; + _ => {} + } return Ok(()); - } + }; + //check if the message received is expected for other consumers if !msgs_allowed.contains(&metadata.msg_type) { let msg = "remove message - reason: unsupported msg type"; span.record_error(&AmqpError::ConsumerError(msg.to_string())); @@ -146,25 +67,22 @@ async fn consume<'c>( msg ); match delivery.ack(BasicAckOptions { multiple: false }).await { - Ok(_) => {} Err(e) => { error!("error whiling nack msg"); span.record_error(&e); } + _ => {} }; return Ok(()); } - match handler + //ack msg and remove from queue if the handler execute correctly + if let Ok(_) = handler .exec(&ctx, delivery.data.as_slice()) .with_context(ctx.clone()) .await { - Ok(_) => match delivery.ack(BasicAckOptions { multiple: false }).await { - Ok(_) => { - span.set_status(Status::Ok); - return Ok(()); - } + match delivery.ack(BasicAckOptions { multiple: false }).await { Err(e) => { error!( trace.id = traces::trace_id(&ctx), @@ -177,106 +95,153 @@ async fn consume<'c>( }); return Err(AmqpError::AckMessageError {}); } - }, - _ if queue.with_retry => { - if metadata.count < queue.retries.unwrap() { - warn!( + _ => { + span.set_status(Status::Ok); + return Ok(()); + } + } + }; + + //ack msg and remove from queue if handler failure and there are no fallback configured + if !queue.with_retry && !queue.with_dlq { + match delivery.ack(BasicAckOptions { multiple: false }).await { + Ok(_) => return Ok(()), + Err(e) => { + error!( trace.id = traces::trace_id(&ctx), span.id = traces::span_id(&ctx), - "error whiling handling msg, requeuing for latter" + "error whiling nack msg" ); - match delivery - .nack(BasicNackOptions { - multiple: false, - requeue: false, - }) - .await - { - Ok(_) => return Ok(()), - Err(e) => { - error!( - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), - "error whiling requeuing" - ); - span.record_error(&e); - span.set_status(Status::Error { - description: Cow::from("error to requeuing msg"), - }); - return Err(AmqpError::RequeuingMessageError {}); - } - } - } else { + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("error to nack msg"), + }); + return Err(AmqpError::NackMessageError {}); + } + } + } + + //send msg to dlq if handler failure and there is no retry configured + if !queue.with_retry && queue.with_dlq { + match amqp + .channel() + .basic_publish( + "", + &queue.dlq_name, + BasicPublishOptions::default(), + &delivery.data, + delivery.properties.clone(), + ) + .await + { + Err(e) => { error!( trace.id = traces::trace_id(&ctx), span.id = traces::span_id(&ctx), - "too many attempts, sending to dlq" + "error whiling sending to dlq" ); - match amqp - .channel() - .basic_publish( - "", - &queue.dlq_name, - BasicPublishOptions::default(), - &delivery.data, - delivery.properties.clone(), - ) - .await - { - Ok(_) => { - match delivery.ack(BasicAckOptions { multiple: false }).await { - Ok(_) => return Ok(()), - Err(e) => { - error!( - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), - "error whiling ack msg to default queue" - ); - span.record_error(&e); - span.set_status(Status::Error { - description: Cow::from("msg was sent to dlq"), - }); - return Err(AmqpError::AckMessageError {}); - } - }; - } + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("msg was sent to dlq"), + }); + return Err(AmqpError::PublishingToDQLError {}); + } + _ => { + match delivery.ack(BasicAckOptions { multiple: false }).await { Err(e) => { error!( trace.id = traces::trace_id(&ctx), span.id = traces::span_id(&ctx), - "error whiling sending to dlq" + "error whiling ack msg to default queue" ); span.record_error(&e); span.set_status(Status::Error { description: Cow::from("msg was sent to dlq"), }); - return Err(AmqpError::PublishingToDQLError {}); + return Err(AmqpError::AckMessageError {}); } + _ => return Ok(()), }; } + }; + } + + //send msg to retry when handler failure and the retry count Dont active the max of the retries configured + if metadata.count < queue.retries.unwrap() { + warn!( + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), + "error whiling handling msg, requeuing for latter" + ); + match delivery + .nack(BasicNackOptions { + multiple: false, + requeue: false, + }) + .await + { + Ok(_) => return Ok(()), + Err(e) => { + error!( + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), + "error whiling requeuing" + ); + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("error to requeuing msg"), + }); + return Err(AmqpError::RequeuingMessageError {}); + } + } + } + + //send msg to dlq when count active the max retries + error!( + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), + "too many attempts, sending to dlq" + ); + + match amqp + .channel() + .basic_publish( + "", + &queue.dlq_name, + BasicPublishOptions::default(), + &delivery.data, + delivery.properties.clone(), + ) + .await + { + Err(e) => { + error!( + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), + "error whiling sending to dlq" + ); + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("msg was sent to dlq"), + }); + return Err(AmqpError::PublishingToDQLError {}); } _ => { - match delivery - .nack(BasicNackOptions { - multiple: false, - requeue: false, - }) - .await - { - Ok(_) => return Ok(()), + match delivery.ack(BasicAckOptions { multiple: false }).await { Err(e) => { error!( trace.id = traces::trace_id(&ctx), span.id = traces::span_id(&ctx), - "error whiling nack msg" + "error whiling ack msg to default queue" ); span.record_error(&e); span.set_status(Status::Error { - description: Cow::from("error to nack msg"), + description: Cow::from("msg was sent to dlq"), }); - return Err(AmqpError::NackMessageError {}); + return Err(AmqpError::AckMessageError {}); } - } + _ => return Ok(()), + }; } } } @@ -287,41 +252,7 @@ mod tests { use crate::mocks::MockAmqpImpl; use async_trait::async_trait; use lapin::{acker::Acker, protocol::basic::AMQPProperties, types::ShortString}; - use opentelemetry::Context; - - #[test] - fn test_dispatch_declare_successfully() { - let mut dispatcher = Dispatches::new(Arc::new(MockAmqpImpl::new())); - let handler = MockedHandler { mock_error: None }; - - let res = dispatcher.declare( - QueueDefinition::name("queue"), - "msg_type".to_owned(), - Arc::new(handler), - ); - - assert!(res.is_ok()); - assert_eq!(dispatcher.handlers.len(), 1); - assert_eq!(dispatcher.msgs_types.len(), 1); - assert_eq!(dispatcher.queues.len(), 1); - } - - #[test] - fn test_dispatch_declare_error() { - let mut dispatcher = Dispatches::new(Arc::new(MockAmqpImpl::new())); - let handler = MockedHandler { mock_error: None }; - - let res = dispatcher.declare( - QueueDefinition::name("queue"), - "".to_owned(), - Arc::new(handler), - ); - - assert!(res.is_err()); - assert_eq!(dispatcher.handlers.len(), 0); - assert_eq!(dispatcher.msgs_types.len(), 0); - assert_eq!(dispatcher.queues.len(), 0); - } + use opentelemetry::{global, Context}; #[tokio::test] async fn test_consume_msg_correctly() { @@ -435,9 +366,6 @@ mod tests { assert!(res.is_ok()) } - pub struct MockedHandler { - pub mock_error: Option, - } #[tokio::test] async fn test_consume_msg_with_handler_error_without_retry() { @@ -470,6 +398,10 @@ mod tests { assert!(res.is_ok()) } + pub struct MockedHandler { + pub mock_error: Option, + } + #[async_trait] impl ConsumerHandler for MockedHandler { async fn exec(&self, _ctx: &Context, _data: &[u8]) -> Result<(), AmqpError> { diff --git a/amqp/src/dispatcher.rs b/amqp/src/dispatcher.rs new file mode 100644 index 0000000..9c96020 --- /dev/null +++ b/amqp/src/dispatcher.rs @@ -0,0 +1,148 @@ +use crate::{ + client::Amqp, + consumer::consume, + topology::{ConsumerHandler, QueueDefinition}, +}; +use errors::amqp::AmqpError; +use futures_util::{future::join_all, StreamExt}; +use opentelemetry::global; +use std::sync::Arc; +use tokio::task::JoinError; +use tracing::error; + +pub struct Dispatcher { + amqp: Arc, + pub(crate) queues: Vec, + pub(crate) msgs_types: Vec, + pub(crate) handlers: Vec>, +} + +impl Dispatcher { + pub fn new(amqp: Arc) -> Dispatcher { + Dispatcher { + amqp, + queues: vec![], + msgs_types: vec![], + handlers: vec![], + } + } + + pub fn declare( + &mut self, + queue: QueueDefinition, + msg_type: String, + handler: Arc, + ) -> Result<(), AmqpError> { + if msg_type.is_empty() { + return Err(AmqpError::ConsumerDeclarationError {}); + } + + self.queues.push(queue); + self.msgs_types.push(msg_type); + self.handlers.push(handler); + + Ok(()) + } + + pub async fn consume_blocking(&self) -> Vec> { + let mut spawns = vec![]; + + for i in 0..self.queues.len() { + spawns.push(tokio::spawn({ + let m_amqp = self.amqp.clone(); + let msgs_allowed = self.msgs_types.clone(); + + let queue = self.queues[i].clone(); + let msg_type = self.msgs_types[i].clone(); + let handler = self.handlers[i].clone(); + + let mut consumer = m_amqp + .consumer(&queue.name, &msg_type) + .await + .expect("unexpected error while creating the consumer"); + + async move { + while let Some(result) = consumer.next().await { + match result { + Ok(delivery) => match consume( + &global::tracer("amqp consumer"), + &queue, + &msg_type, + &msgs_allowed, + &delivery, + m_amqp.clone(), + handler.clone(), + ) + .await + { + Err(e) => error!(error = e.to_string(), "errors consume msg"), + _ => {} + }, + Err(e) => error!(error = e.to_string(), "error receiving delivery msg"), + }; + } + } + })); + } + + join_all(spawns).await + } +} + +#[cfg(test)] +mod tests { + use async_trait::async_trait; + use opentelemetry::Context; + + use super::*; + use crate::mocks::MockAmqpImpl; + + #[test] + fn test_dispatch_declare_successfully() { + let mut dispatcher = Dispatcher::new(Arc::new(MockAmqpImpl::new())); + let handler = MockedHandler { mock_error: None }; + + let res = dispatcher.declare( + QueueDefinition::name("queue"), + "msg_type".to_owned(), + Arc::new(handler), + ); + + assert!(res.is_ok()); + assert_eq!(dispatcher.handlers.len(), 1); + assert_eq!(dispatcher.msgs_types.len(), 1); + assert_eq!(dispatcher.queues.len(), 1); + } + + #[test] + fn test_dispatch_declare_error() { + let mut dispatcher = Dispatcher::new(Arc::new(MockAmqpImpl::new())); + let handler = MockedHandler { mock_error: None }; + + let res = dispatcher.declare( + QueueDefinition::name("queue"), + "".to_owned(), + Arc::new(handler), + ); + + assert!(res.is_err()); + assert_eq!(dispatcher.handlers.len(), 0); + assert_eq!(dispatcher.msgs_types.len(), 0); + assert_eq!(dispatcher.queues.len(), 0); + } + + struct MockedHandler { + pub mock_error: Option, + } + + #[async_trait] + impl ConsumerHandler for MockedHandler { + async fn exec(&self, _ctx: &Context, _data: &[u8]) -> Result<(), AmqpError> { + if self.mock_error.is_none() { + return Ok(()); + } + + Err(AmqpError::InternalError {}) + } + } +} diff --git a/amqp/src/lib.rs b/amqp/src/lib.rs index b01840c..af737d3 100644 --- a/amqp/src/lib.rs +++ b/amqp/src/lib.rs @@ -1,6 +1,8 @@ +mod consumer; + pub mod client; -pub mod consumer; pub mod defs; +pub mod dispatcher; #[cfg(test)] pub mod mocks; #[cfg(feature = "mocks")] diff --git a/errors/Cargo.toml b/errors/Cargo.toml index 2a7a78a..715e7fd 100644 --- a/errors/Cargo.toml +++ b/errors/Cargo.toml @@ -4,4 +4,4 @@ version = "0.1.0" edition = "2021" [dependencies] -thiserror = { version = "1.0.37" } \ No newline at end of file +thiserror = { version = "1.0.38" } \ No newline at end of file diff --git a/health_readiness/Cargo.toml b/health_readiness/Cargo.toml index 52714a8..dd13492 100644 --- a/health_readiness/Cargo.toml +++ b/health_readiness/Cargo.toml @@ -8,7 +8,7 @@ errors = { path = "../errors" } httpw = { path = "../httpw" } env = { path = "../env" } -async-trait = { version = "0.1.59" } +async-trait = { version = "0.1.60" } tracing = { version = "0.1.37" } deadpool-postgres = { version = "0.10.3" } lapin = { version = "2.1.1" } diff --git a/httpw/Cargo.toml b/httpw/Cargo.toml index 3999ff5..604951a 100644 --- a/httpw/Cargo.toml +++ b/httpw/Cargo.toml @@ -11,5 +11,5 @@ env = { path = '../env' } thiserror = { version = "1.0.37" } actix-web = { version = "4.2.1" } actix-cors = { version = "0.6.3" } -serde = { version = "1.0.150", features = ["derive"] } +serde = { version = "1.0.152", features = ["derive"] } tracing = { version = "0.1.37" } diff --git a/httpw/README.md b/httpw/README.md index 274d8c3..fc5c3bf 100644 --- a/httpw/README.md +++ b/httpw/README.md @@ -1 +1 @@ -# Http Crate \ No newline at end of file +# HTTP Crate \ No newline at end of file diff --git a/httpw/src/server/server.rs b/httpw/src/server/server.rs index 68482ff..c57ff65 100644 --- a/httpw/src/server/server.rs +++ b/httpw/src/server/server.rs @@ -26,7 +26,7 @@ impl HttpwServerImpl { self } - pub async fn server(&self) -> Result<(), HttpServerError> { + pub async fn start(&self) -> Result<(), HttpServerError> { HttpServer::new({ let services = self.services.to_vec(); move || { @@ -49,38 +49,7 @@ impl HttpwServerImpl { error = e.to_string(), "error to binding the http server addr" ); - HttpServerError::HttpPortBindError {} - })? - .run() - .await - .map_err(|e| { - error!(error = e.to_string(), "error to start http server"); HttpServerError::ServerError {} - })?; - - Ok(()) - } - - pub async fn simple_server(&self) -> Result<(), HttpServerError> { - HttpServer::new({ - let services = self.services.to_vec(); - move || { - let mut app = App::new(); - - for svc in services.clone() { - app = app.configure(svc); - } - - app - } - }) - .bind(&self.addr) - .map_err(|e| { - error!( - error = e.to_string(), - "error to binding the http server addr" - ); - HttpServerError::HttpPortBindError {} })? .run() .await diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 55f2ef4..7fc84ce 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -6,14 +6,8 @@ edition = "2021" [dependencies] env = { path = "../env" } errors = { path = "../errors" } - -tokio = { version = "1.23.0", features = ["default"] } opentelemetry = { version = "0.18.0", features = ["rt-tokio", "metrics"] } +opentelemetry-otlp = { version = "0.11.0", features = ["tonic", "metrics", "grpc-tonic" , "tls", "tls-roots"] } tracing = { version = "0.1.37" } - tonic = { version = "0.8.3", features = ["tls"] } -opentelemetry-otlp = { version = "0.11.0", features = ["tonic", "metrics", "grpc-tonic" , "tls", "tls-roots"] } - -opentelemetry-prometheus = { version = "0.11.0" } -prometheus = { version = "0.13.3" } -actix-web = { version = "4.2.1" } \ No newline at end of file +tokio = { version = "1.23.0", features = ["default"] } \ No newline at end of file diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 6cb6e15..2fb1ccc 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -1,2 +1,3 @@ pub mod otlp; +// pub mod process; pub mod prometheus; diff --git a/metrics/src/prometheus.rs b/metrics/src/prometheus.rs index 349ca89..caba069 100644 --- a/metrics/src/prometheus.rs +++ b/metrics/src/prometheus.rs @@ -1,47 +1,5 @@ -use actix_web::{web::Data, HttpResponse, Responder}; use env::Config; -use opentelemetry::sdk::{ - export::metrics::aggregation, - metrics::{controllers, processors, selectors}, -}; -use opentelemetry_prometheus::PrometheusExporter; -use prometheus::{Encoder, TextEncoder}; -use std::error::Error; -use std::sync::Arc; -use tracing::debug; -pub fn setup(cfg: &Config) -> Result, Box> { - if !cfg.otlp.enable_metrics { - debug!("metrics::setup skipping metrics export setup"); - return Ok(None); - } - - debug!("metrics::setup configure prometheus..."); - - let controller = controllers::basic( - processors::factory( - selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), - aggregation::cumulative_temporality_selector(), - ) - .with_memory(true), - ) - .build(); - - let exporter = opentelemetry_prometheus::exporter(controller).init(); - - debug!("metrics::setup installed"); - - Ok(Some(exporter)) -} - -pub async fn metrics_handler(exporter: Data>) -> impl Responder { - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let metric_families = exporter.registry().gather(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - - HttpResponse::Ok() - .content_type(encoder.format_type()) - .body(buffer) +pub fn setup(_cfg: &Config) -> Result<(), ()> { + Ok(()) } diff --git a/migrator/Cargo.toml b/migrator/Cargo.toml index c37453d..955f5e0 100644 --- a/migrator/Cargo.toml +++ b/migrator/Cargo.toml @@ -12,7 +12,7 @@ env = { path = "../env" } logging = { path = "../logging" } errors = { path = "../errors" } -async-trait = { version = "0.1.59" } +async-trait = { version = "0.1.60" } tracing = { version = "0.1.37" } deadpool-postgres = { version = "0.10.2", optional = true } diff --git a/mqtt/Cargo.toml b/mqtt/Cargo.toml index 8710fad..78e81b4 100644 --- a/mqtt/Cargo.toml +++ b/mqtt/Cargo.toml @@ -12,11 +12,11 @@ env = { path = "../env" } traces = { path = "../traces" } opentelemetry = { version = "0.18.0" } tracing = { version = "0.1.37" } -async-trait = { version = "0.1.59" } +async-trait = { version = "0.1.60" } bytes = { version = "1.2.1", features = ["serde"] } paho-mqtt = { version = "0.11" } -serde = { version = "1.0.150", features = ["derive"] } -serde_json = { version = "1.0.89" } +serde = { version = "1.0.152", features = ["derive"] } +serde_json = { version = "1.0.91" } futures-util = { version = "0.3.25" } # Used only with feature mock diff --git a/mqtt/src/dispatcher.rs b/mqtt/src/dispatcher.rs index c187a45..074dccf 100644 --- a/mqtt/src/dispatcher.rs +++ b/mqtt/src/dispatcher.rs @@ -6,11 +6,11 @@ use errors::mqtt::MqttError; use futures_util::StreamExt; use opentelemetry::{ global::{self, BoxedTracer}, - trace::{SpanKind, TraceContextExt}, + trace::{SpanKind, Status, TraceContextExt}, Context, }; -use paho_mqtt::{Message, TopicFilter}; -use std::sync::Arc; +use paho_mqtt::Message; +use std::{borrow::Cow, sync::Arc}; use tracing::{debug, error, warn}; pub struct MqttDispatcher { @@ -44,35 +44,7 @@ impl MqttDispatcher { } async fn consume(&self, ctx: &Context, msg: &Message) -> Result<(), MqttError> { - let mut p = -1; - for (i, tp) in self.topics.clone().into_iter().enumerate() { - let filter = TopicFilter::new(tp).map_err(|e| { - error!( - error = e.to_string(), - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), - "error to create mqtt topic filter", - ); - MqttError::InternalError {} - })?; - - if filter.is_match(msg.topic()) { - p = i as i8; - break; - } - } - - if p == -1 { - warn!( - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), - "cant find dispatch for this topic" - ); - - return Err(MqttError::UnregisteredDispatchForThisTopicError( - msg.topic().to_owned(), - )); - } + let dispatch_index = self.get_dispatch_index(ctx, msg.topic())?; let metadata = TopicMessage::new(msg.topic())?; @@ -86,7 +58,7 @@ impl MqttDispatcher { msg.topic() ); - let dispatch = self.dispatches.get(p as usize).unwrap(); + let dispatch = self.dispatches.get(dispatch_index).unwrap(); return match dispatch.exec(&ctx, msg.payload(), &metadata).await { Ok(_) => { @@ -105,6 +77,9 @@ impl MqttDispatcher { e ); span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("failed to handle the event"), + }); Err(e) } }; @@ -131,6 +106,51 @@ impl MqttDispatcher { } } +impl MqttDispatcher { + fn get_dispatch_index(&self, ctx: &Context, received_topic: &str) -> Result { + let mut p: i16 = -1; + for handler_topic_index in 0..self.topics.len() { + let handler_topic = self.topics[handler_topic_index].clone(); + + if received_topic == handler_topic { + p = handler_topic_index as i16; + break; + } + + if received_topic.len() > received_topic.len() { + break; + } + + let handler_fields: Vec<_> = handler_topic.split('/').collect(); + let received_fields: Vec<_> = received_topic.split('/').collect(); + + for i in 0..handler_fields.len() { + if handler_fields[i] == "#" { + p = handler_topic_index as i16; + break; + } + + if handler_fields[i] != "+" && handler_fields[i] != received_fields[i] { + break; + } + } + } + + if p == -1 { + warn!( + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), + "cant find dispatch for this topic" + ); + return Err(MqttError::UnregisteredDispatchForThisTopicError( + received_topic.to_owned(), + )); + } + + Ok(p as usize) + } +} + #[cfg(test)] mod tests { use std::vec; diff --git a/secrets_manager/Cargo.toml b/secrets_manager/Cargo.toml index 2d85cd5..b3d7815 100644 --- a/secrets_manager/Cargo.toml +++ b/secrets_manager/Cargo.toml @@ -9,11 +9,11 @@ mocks = ["dep:mockall"] [dependencies] errors = { path = "../errors" } -async-trait = { version = "0.1.59" } +async-trait = { version = "0.1.60" } tracing = { version = "0.1.37" } aws-config = { version = "0.51.0" } aws-sdk-secretsmanager = { version = "0.21.0" } -serde_json = { version = "1.0.89" } +serde_json = { version = "1.0.91" } # Used only with feature mock mockall = { version = "0.11.3", optional = true } diff --git a/sql_pool/Cargo.toml b/sql_pool/Cargo.toml index 4a0071e..ce43b03 100644 --- a/sql_pool/Cargo.toml +++ b/sql_pool/Cargo.toml @@ -20,5 +20,5 @@ rusqlite = { version = "0.28.0", optional = true } tracing = { version = "0.1.37", optional = true } # chrono = { version = "0.4.22" } -# async-trait = { version = "0.1.59" } +# async-trait = { version = "0.1.60" } # tokio = { version = "1.23.0", features = ["default"] } \ No newline at end of file diff --git a/traces/Cargo.toml b/traces/Cargo.toml index 2500085..c17a779 100644 --- a/traces/Cargo.toml +++ b/traces/Cargo.toml @@ -8,6 +8,6 @@ env = { path = "../env" } opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.11.0", features = ["tonic", "tls", "tls-roots"] } tracing = { version = "0.1.37" } -serde = { version = "1.0.150", features = ["derive"] } +serde = { version = "1.0.152", features = ["derive"] } tonic = { version = "0.8.1", features = ["tls"] } tokio = { version = "1.23.0", features = ["default"] } \ No newline at end of file