From 09b3806d563819c1b180f30a0fca4efff9aec47b Mon Sep 17 00:00:00 2001 From: ralvescosta Date: Sun, 23 Jun 2024 16:17:36 -0300 Subject: [PATCH] feat: update external crates, apply clippy recommendations --- Cargo.lock | 55 +++++++++--------- configs/src/mqtt.rs | 4 +- configs/src/sqlite.rs | 12 +++- configs_builder/src/configs_builder.rs | 11 ++-- health_http_server/Cargo.toml | 2 +- health_readiness/src/service.rs | 4 +- http_components/Cargo.toml | 4 +- http_components/README.md | 1 - .../src/middlewares/deserializer.rs | 2 +- .../src/middlewares/otel/http_metrics.rs | 6 -- http_server/Cargo.toml | 2 +- kafka/Cargo.toml | 1 - kafka/README.md | 1 - kafka/src/errors.rs | 7 --- kafka/src/lib.rs | 1 - kafka/src/otel.rs | 38 +++++------- kafka/src/publisher.rs | 6 +- messaging/README.md | 1 - messaging/src/publisher.rs | 22 +++---- mqtt/src/client.rs | 6 +- mqtt/src/dispatcher.rs | 22 +++---- otel/README.md | 1 - rabbitmq/README.md | 1 - rabbitmq/src/consumer.rs | 58 +++++++++---------- rabbitmq/src/dispatcher.rs | 20 ++++--- rabbitmq/src/exchange.rs | 2 +- secrets_manager/Cargo.toml | 8 +-- secrets_manager/README.md | 2 +- secrets_manager/src/aws_client.rs | 2 +- traces/src/injectors/grpc.rs | 2 +- traces/src/lib.rs | 2 +- 31 files changed, 147 insertions(+), 159 deletions(-) delete mode 100644 http_components/README.md delete mode 100644 kafka/README.md delete mode 100644 kafka/src/errors.rs delete mode 100644 messaging/README.md delete mode 100644 otel/README.md delete mode 100644 rabbitmq/README.md diff --git a/Cargo.lock b/Cargo.lock index ad60f5a..a43bef3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,9 +36,9 @@ dependencies = [ [[package]] name = "actix-http" -version = "3.7.0" +version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eb9843d84c775696c37d9a418bbb01b932629d01870722c0f13eb3f95e2536d" +checksum = "3ae682f693a9cd7b058f2b0b5d9a6d7728a8555779bedbbc35dd88528611d020" dependencies = [ "actix-codec", "actix-rt", @@ -148,9 +148,9 @@ dependencies = [ [[package]] name = "actix-web" -version = "4.7.0" +version = "4.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d6316df3fa569627c98b12557a8b6ff0674e5be4bb9b5e4ae2550ddb4964ed6" +checksum = "1988c02af8d2b718c05bc4aeb6a66395b7cdf32858c2c71131e5637a8c05a9ff" dependencies = [ "actix-codec", "actix-http", @@ -590,9 +590,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "aws-config" -version = "1.5.1" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ac9889352d632214df943e26740c46a0f3da6e329fbd28164fe7ae1b061da7b" +checksum = "2368fb843e9eec932f7789d64d0e05850f4a79067188c657e572f1f5a7589df0" dependencies = [ "aws-credential-types", "aws-runtime", @@ -660,9 +660,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.2.2" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75588e7ee5e8496eed939adac2035a6dbab9f7eb2acdd9ab2d31856dab6f3955" +checksum = "9a4a5e448145999d7de17bf44a886900ecb834953408dae8aaf90465ce91c1dd" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -683,9 +683,9 @@ dependencies = [ [[package]] name = "aws-sdk-secretsmanager" -version = "1.32.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cab2c5545704ba24ae6013393b8d93de1097a47adf16165aee65e880566f1420" +checksum = "93a081cd200c6bfe2245c75f0d6707c8773a0fac81fc028f6f9ab2f2d749142f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -706,9 +706,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.29.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da75cf91cbb46686a27436d639a720a3a198b148efa76dc2467b7e5374a67fc0" +checksum = "b8aee358b755b2738b3ffb8a5b54ee991b28c8a07483a0ff7d49a58305cc2609" dependencies = [ "aws-credential-types", "aws-runtime", @@ -728,9 +728,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.30.0" +version = "1.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf2ec8a6687299685ed0a4a3137c129cdb132b5235bc3aa3443f6cffe468b9ff" +checksum = "1d5ce026f0ae73e06b20be5932150dd0e9b063417fd7c3acf5ca97018b9cbd64" dependencies = [ "aws-credential-types", "aws-runtime", @@ -750,9 +750,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.29.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458f1031e094b1411b59b49b19e4118f069e1fe13a9c5b8888e933daaf7ffdd6" +checksum = "c820248cb02e4ea83630ad2e43d0721cdbccedba5ac902cd0b6fb84d7271f205" dependencies = [ "aws-credential-types", "aws-runtime", @@ -773,9 +773,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b56f1cbe6fd4d0c2573df72868f20ab1c125ca9c9dbce17927a463433a2e57" +checksum = "31eed8d45759b2c5fe7fd304dd70739060e9e0de509209036eabea14d0720cce" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -846,9 +846,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.5.5" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0d3965f6417a92a6d1009c5958a67042f57e46342afb37ca58f9ad26744ec73" +checksum = "db83b08939838d18e33b5dbaf1a0f048f28c10bd28071ab7ce6f245451855414" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -860,6 +860,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "http-body 1.0.0", + "httparse", "hyper 0.14.29", "hyper-rustls 0.24.2", "once_cell", @@ -872,9 +873,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.6.2" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4179bd8a1c943e1aceb46c5b9fc014a561bd6c35a2153e816ba29076ee49d245" +checksum = "1b570ea39eb95bd32543f6e4032bce172cb6209b9bc8c83c770d08169e875afc" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -889,9 +890,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.1.10" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b6764ba7e1c5ede1c9f9e4046645534f06c2581402461c559b481a420330a83" +checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" dependencies = [ "base64-simd", "bytes", @@ -924,15 +925,14 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f734808d43702a67e57d478a12e227d4d038d0b90c9005a78c87890d3805922" +checksum = "2009a9733865d0ebf428a314440bbe357cc10d0c16d86a8e15d32e9b47c1e80e" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "http 0.2.12", "rustc_version", "tracing", ] @@ -2429,7 +2429,6 @@ dependencies = [ "messaging", "opentelemetry", "rdkafka", - "thiserror", "tokio", "tracing", ] diff --git a/configs/src/mqtt.rs b/configs/src/mqtt.rs index 95ca459..a18730d 100644 --- a/configs/src/mqtt.rs +++ b/configs/src/mqtt.rs @@ -10,7 +10,7 @@ pub enum MQTTBrokerKind { impl From<&str> for MQTTBrokerKind { fn from(value: &str) -> Self { match value.to_uppercase().as_str() { - "AWSIOTCORE" => MQTTBrokerKind::AWSIoTCore, + "AWSIoTCore" => MQTTBrokerKind::AWSIoTCore, _ => MQTTBrokerKind::Default, } } @@ -19,7 +19,7 @@ impl From<&str> for MQTTBrokerKind { impl From<&String> for MQTTBrokerKind { fn from(value: &String) -> Self { match value.to_uppercase().as_str() { - "AWSIOTCORE" => MQTTBrokerKind::AWSIoTCore, + "AWSIoTCore" => MQTTBrokerKind::AWSIoTCore, _ => MQTTBrokerKind::Default, } } diff --git a/configs/src/sqlite.rs b/configs/src/sqlite.rs index 13cef08..a2667b6 100644 --- a/configs/src/sqlite.rs +++ b/configs/src/sqlite.rs @@ -1,4 +1,4 @@ -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct SqliteConfigs { ///Default: local.db pub file: String, @@ -7,3 +7,13 @@ pub struct SqliteConfigs { /// Default: postgres pub password: String, } + +impl Default for SqliteConfigs { + fn default() -> Self { + Self { + file: Default::default(), + user: Default::default(), + password: Default::default(), + } + } +} diff --git a/configs_builder/src/configs_builder.rs b/configs_builder/src/configs_builder.rs index f6af595..068e77c 100644 --- a/configs_builder/src/configs_builder.rs +++ b/configs_builder/src/configs_builder.rs @@ -189,15 +189,14 @@ impl ConfigBuilder { app_cfg: &AppConfigs, ) -> Result>, ConfigsError> { match app_cfg.secret_manager { - SecretsManagerKind::None => Ok(Some(Arc::new(FakeSecretClient::new()))), + SecretsManagerKind::None => { + return Ok(Some(Arc::new(FakeSecretClient::new()))); + } SecretsManagerKind::AWSSecretManager => { let secret_key = env::var(SECRET_KEY_ENV_KEY).unwrap_or_default(); - match AWSSecretClientBuilder::new(secret_key) - .build() - .await - { + match AWSSecretClientBuilder::new(secret_key).build().await { Ok(c) => Ok(Some(Arc::new(c))), Err(err) => { error!(error = err.to_string(), "error to create aws secret client"); @@ -647,7 +646,7 @@ impl ConfigBuilder { v.parse().unwrap_or_else(|_| { error!(key = key, value = v, "parse went wrong"); - default + return default; }) } diff --git a/health_http_server/Cargo.toml b/health_http_server/Cargo.toml index d2b11ec..74f20ed 100644 --- a/health_http_server/Cargo.toml +++ b/health_http_server/Cargo.toml @@ -14,7 +14,7 @@ health-readiness = { path = '../health_readiness' } thiserror = { workspace = true } tracing = { workspace = true } opentelemetry = { workspace = true } -actix-web = { version = "4.7.0" } +actix-web = { version = "4.8.0" } prometheus = { version = "0.13.4", optional = true } diff --git a/health_readiness/src/service.rs b/health_readiness/src/service.rs index 0bce261..07f83e0 100644 --- a/health_readiness/src/service.rs +++ b/health_readiness/src/service.rs @@ -34,11 +34,11 @@ pub struct HealthReadinessServiceImpl { impl HealthReadinessServiceImpl { pub fn empty() -> Arc { - Arc::new(HealthReadinessServiceImpl { checkers: vec![] }) + return Arc::new(HealthReadinessServiceImpl { checkers: vec![] }); } pub fn new(checkers: Vec>) -> Arc { - Arc::new(HealthReadinessServiceImpl { checkers }) + return Arc::new(HealthReadinessServiceImpl { checkers }); } #[cfg(feature = "mqtt")] diff --git a/http_components/Cargo.toml b/http_components/Cargo.toml index 6607685..46c05af 100644 --- a/http_components/Cargo.toml +++ b/http_components/Cargo.toml @@ -12,8 +12,8 @@ validator = ["dep:validator"] health = ["dep:health-readiness"] [dependencies] -actix-web = { version = "4.6.0" } -actix-http = { version = "3.7.0" } +actix-web = { version = "4.8.0" } +actix-http = { version = "3.8.0" } actix-cors = { version = "0.7.0" } futures-util = { version = "0.3.30" } serde = { workspace = true, features = ["derive"] } diff --git a/http_components/README.md b/http_components/README.md deleted file mode 100644 index e075e41..0000000 --- a/http_components/README.md +++ /dev/null @@ -1 +0,0 @@ -# HTTP Components Crate \ No newline at end of file diff --git a/http_components/src/middlewares/deserializer.rs b/http_components/src/middlewares/deserializer.rs index a41deb2..1389cf5 100644 --- a/http_components/src/middlewares/deserializer.rs +++ b/http_components/src/middlewares/deserializer.rs @@ -11,7 +11,7 @@ pub fn handler() -> JsonConfig { format!("JSON error: {:?}", err), HttpResponse::BadRequest().json(HTTPError::bad_request( "unformatted body", - format!("{}", err), + &format!("{}", err), )), ) .into() diff --git a/http_components/src/middlewares/otel/http_metrics.rs b/http_components/src/middlewares/otel/http_metrics.rs index 0df884a..f6f35c5 100644 --- a/http_components/src/middlewares/otel/http_metrics.rs +++ b/http_components/src/middlewares/otel/http_metrics.rs @@ -60,12 +60,6 @@ impl HTTPOtelMetrics { } } -impl Default for HTTPOtelMetrics { - fn default() -> Self { - Self::new() - } -} - impl dev::Transform for HTTPOtelMetrics where S: dev::Service< diff --git a/http_server/Cargo.toml b/http_server/Cargo.toml index aeee6a6..c0b9f4c 100644 --- a/http_server/Cargo.toml +++ b/http_server/Cargo.toml @@ -12,7 +12,7 @@ http-components = { path = '../http_components', features = ["tracing", "metrics health-readiness = { path = '../health_readiness' } auth = { path = "../auth" } -actix-web = { version = "4.6.0" } +actix-web = { version = "4.8.0" } thiserror = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/kafka/Cargo.toml b/kafka/Cargo.toml index 3f732c7..6630656 100644 --- a/kafka/Cargo.toml +++ b/kafka/Cargo.toml @@ -12,4 +12,3 @@ async-trait = { workspace = true } opentelemetry = { workspace = true } tracing = { workspace = true } tokio = { workspace = true, features = ["default"] } -thiserror = { workspace = true } diff --git a/kafka/README.md b/kafka/README.md deleted file mode 100644 index 547bf45..0000000 --- a/kafka/README.md +++ /dev/null @@ -1 +0,0 @@ -# Kafka Crate \ No newline at end of file diff --git a/kafka/src/errors.rs b/kafka/src/errors.rs deleted file mode 100644 index ecb240b..0000000 --- a/kafka/src/errors.rs +++ /dev/null @@ -1,7 +0,0 @@ -use thiserror::Error; - -#[derive(Error, Debug, PartialEq, Eq)] -pub enum KafkaError { - #[error("internal error")] - InternalError, -} diff --git a/kafka/src/lib.rs b/kafka/src/lib.rs index 306cdfb..0bc4e49 100644 --- a/kafka/src/lib.rs +++ b/kafka/src/lib.rs @@ -1,5 +1,4 @@ pub mod connection; pub mod dispatcher; -pub mod errors; pub mod otel; pub mod publisher; diff --git a/kafka/src/otel.rs b/kafka/src/otel.rs index 5d404c5..1cc2b3e 100644 --- a/kafka/src/otel.rs +++ b/kafka/src/otel.rs @@ -1,4 +1,5 @@ -use super::errors::KafkaError; +use std::str::FromStr; + use opentelemetry::{ global::BoxedTracer, trace::{ @@ -8,7 +9,6 @@ use opentelemetry::{ Context, }; use rdkafka::message::{BorrowedHeaders, Header, Headers, OwnedHeaders}; -use std::str::FromStr; use tracing::error; const SUPPORTED_VERSION: u8 = 0; @@ -63,53 +63,45 @@ pub fn inject_context( }) } -pub fn extract_context(kafka_headers: &BorrowedHeaders) -> Result { +pub fn extract_context(kafka_headers: &BorrowedHeaders) -> Result { let Some((header_value, stats)) = extract_trace_from_header(kafka_headers) else { - return Err(KafkaError::InternalError); + return Err(()); }; let parts = header_value.split_terminator('-').collect::>(); // Ensure parts are not out of range. if parts.len() < 4 { - return Err(KafkaError::InternalError); + return Err(()); } // Ensure version is within range, for version 0 there must be 4 parts. - let Ok(version) = u8::from_str_radix(parts[0], 16) else { - return Err(KafkaError::InternalError); - }; + let version = u8::from_str_radix(parts[0], 16).map_err(|_| ())?; if version > MAX_VERSION || version == 0 && parts.len() != 4 { - return Err(KafkaError::InternalError); + return Err(()); } // Ensure trace id is lowercase if parts[1].chars().any(|c| c.is_ascii_uppercase()) { - return Err(KafkaError::InternalError); + return Err(()); } // Parse trace id section - let Ok(trace_id) = TraceId::from_hex(parts[1]) else { - return Err(KafkaError::InternalError); - }; + let trace_id = TraceId::from_hex(parts[1]).map_err(|_| ())?; // Ensure span id is lowercase if parts[2].chars().any(|c| c.is_ascii_uppercase()) { - return Err(KafkaError::InternalError); + return Err(()); } // Parse span id section - let Ok(span_id) = SpanId::from_hex(parts[2]) else { - return Err(KafkaError::InternalError); - }; + let span_id = SpanId::from_hex(parts[2]).map_err(|_| ())?; // Parse trace flags section - let Ok(opts) = u8::from_str_radix(parts[3], 16) else { - return Err(KafkaError::InternalError); - }; + let opts = u8::from_str_radix(parts[3], 16).map_err(|_| ())?; // Ensure opts are valid for version 0 if version == 0 && opts > 2 { - return Err(KafkaError::InternalError); + return Err(()); } // Build trace flags clearing all flags other than the trace-context @@ -117,7 +109,7 @@ pub fn extract_context(kafka_headers: &BorrowedHeaders) -> Result Result Option<(&str, &str)> { +fn extract_trace_from_header<'e>(kafka_headers: &'e BorrowedHeaders) -> Option<(&'e str, &'e str)> { let mut trace_parent = ""; let mut trace_state = ""; let mut founded = 0; diff --git a/kafka/src/publisher.rs b/kafka/src/publisher.rs index 33c8ad8..4ae7c7c 100644 --- a/kafka/src/publisher.rs +++ b/kafka/src/publisher.rs @@ -122,7 +122,7 @@ impl KafkaPublisher { let partition = match headers.get(PARTITION_HEADER_KEY) { Some(v) => { if let HeaderValues::LongInt(p) = v { - Some(p.to_owned()) + Some(p.clone()) } else { None } @@ -133,7 +133,7 @@ impl KafkaPublisher { let timestamp = match headers.get(TIMESTAMP_HEADER_KEY) { Some(v) => { if let HeaderValues::LongLongInt(t) = v { - t.to_owned() + t.clone() } else { now() } @@ -144,7 +144,7 @@ impl KafkaPublisher { let queue_timeout = match headers.get(QUEUE_TIMEOUT_KEY) { Some(v) => { if let HeaderValues::LongLongUint(t) = v { - Duration::from_millis(t.to_owned()) + Duration::from_millis(t.clone()) } else { Duration::from_secs(0) } diff --git a/messaging/README.md b/messaging/README.md deleted file mode 100644 index d10bb01..0000000 --- a/messaging/README.md +++ /dev/null @@ -1 +0,0 @@ -# Messaging Crate \ No newline at end of file diff --git a/messaging/src/publisher.rs b/messaging/src/publisher.rs index d1dd8da..294e4e5 100644 --- a/messaging/src/publisher.rs +++ b/messaging/src/publisher.rs @@ -18,17 +18,17 @@ pub enum HeaderValues { LongLongUint(u64), } -impl From for String { - fn from(val: HeaderValues) -> Self { - match val { - HeaderValues::ShortString(v) => v, - HeaderValues::LongString(v) => v, - HeaderValues::Int(v) => v.to_string(), - HeaderValues::LongInt(v) => v.to_string(), - HeaderValues::LongLongInt(v) => v.to_string(), - HeaderValues::Uint(v) => v.to_string(), - HeaderValues::LongUint(v) => v.to_string(), - HeaderValues::LongLongUint(v) => v.to_string(), +impl Into for HeaderValues { + fn into(self) -> String { + match self { + Self::ShortString(v) => v, + Self::LongString(v) => v, + Self::Int(v) => v.to_string(), + Self::LongInt(v) => v.to_string(), + Self::LongLongInt(v) => v.to_string(), + Self::Uint(v) => v.to_string(), + Self::LongUint(v) => v.to_string(), + Self::LongLongUint(v) => v.to_string(), } } } diff --git a/mqtt/src/client.rs b/mqtt/src/client.rs index 62cc9f0..3657681 100644 --- a/mqtt/src/client.rs +++ b/mqtt/src/client.rs @@ -32,10 +32,10 @@ impl MQTTClient { MQTTBrokerKind::Default => password_connection_opts(&cfgs.mqtt), }; - MQTTClient { + return MQTTClient { crate_opts, connection_opts, - } + }; } pub async fn connect( @@ -66,7 +66,7 @@ where T: DynamicConfigs, { CreateOptionsBuilder::new() - .server_uri(format!( + .server_uri(&format!( "{}://{}:{}", cfgs.mqtt.transport, cfgs.mqtt.host, cfgs.mqtt.port )) diff --git a/mqtt/src/dispatcher.rs b/mqtt/src/dispatcher.rs index 766d371..2697453 100644 --- a/mqtt/src/dispatcher.rs +++ b/mqtt/src/dispatcher.rs @@ -60,10 +60,12 @@ impl Dispatcher for MQTTDispatcher { let mut cloned_stream = self.stream.clone(); while let Some(delivery) = cloned_stream.next().await { - if let Some(msg) = delivery { - if let Err(err) = self.consume(&Context::new(), &msg).await { - error!(error = err.to_string(), "failure to consume msg"); - } + match delivery { + Some(msg) => match self.consume(&Context::new(), &msg).await { + Err(e) => error!(error = e.to_string(), "failure to consume msg"), + _ => {} + }, + _ => {} } } @@ -89,7 +91,7 @@ impl MQTTDispatcher { let msg = ConsumerMessage::new(msg.topic(), "", msg.payload(), None); - match handler.exec(&ctx, &msg).await { + return match handler.exec(&ctx, &msg).await { Ok(_) => { debug!( trace.id = traces::trace_id(&ctx), @@ -111,7 +113,7 @@ impl MQTTDispatcher { }); Err(e) } - } + }; } fn get_handler_index( @@ -134,8 +136,8 @@ impl MQTTDispatcher { Err(err) => { error!( error = err.to_string(), - trace.id = traces::trace_id(ctx), - span.id = traces::span_id(ctx), + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), topic = received_topic, "bad topic" ); @@ -146,8 +148,8 @@ impl MQTTDispatcher { if p == usize::MAX { warn!( - trace.id = traces::trace_id(ctx), - span.id = traces::span_id(ctx), + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), topic = received_topic, "cant find dispatch for this topic" ); diff --git a/otel/README.md b/otel/README.md deleted file mode 100644 index baeae38..0000000 --- a/otel/README.md +++ /dev/null @@ -1 +0,0 @@ -# Otel Crate \ No newline at end of file diff --git a/rabbitmq/README.md b/rabbitmq/README.md deleted file mode 100644 index 41c60a2..0000000 --- a/rabbitmq/README.md +++ /dev/null @@ -1 +0,0 @@ -# RabbitMQ Crate \ No newline at end of file diff --git a/rabbitmq/src/consumer.rs b/rabbitmq/src/consumer.rs index 09e980b..54cb11d 100644 --- a/rabbitmq/src/consumer.rs +++ b/rabbitmq/src/consumer.rs @@ -25,7 +25,7 @@ pub(crate) async fn consume<'c>( ) -> Result<(), AmqpError> { let (msg_type, count) = extract_header_properties(&delivery.properties); - let (ctx, mut span) = otel::new_span(&delivery.properties, tracer, &msg_type); + let (ctx, mut span) = otel::new_span(&delivery.properties, &tracer, &msg_type); debug!( trace.id = traces::trace_id(&ctx), @@ -41,22 +41,22 @@ pub(crate) async fn consume<'c>( span.set_status(Status::Error { description: Cow::from(msg), }); - debug!( trace.id = traces::trace_id(&ctx), span.id = traces::span_id(&ctx), "{}", msg ); - - if let Err(e) = delivery.ack(BasicAckOptions { multiple: false }).await { - error!("error whiling ack msg"); - span.record_error(&e); - span.set_status(Status::Error { - description: Cow::from("error to ack msg"), - }); + match delivery.ack(BasicAckOptions { multiple: false }).await { + Err(e) => { + error!("error whiling ack msg"); + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("error to ack msg"), + }); + } + _ => {} }; - return Err(AmqpError::InternalError {}); }; @@ -172,25 +172,25 @@ pub(crate) async fn consume<'c>( span.set_status(Status::Error { description: Cow::from("msg was sent to dlq"), }); - - Err(AmqpError::PublishingToDQLError {}) + return Err(AmqpError::PublishingToDQLError {}); + } + _ => { + match delivery.ack(BasicAckOptions { multiple: false }).await { + Err(e) => { + error!( + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), + "error whiling ack msg to default queue" + ); + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("msg was sent to dlq"), + }); + return Err(AmqpError::AckMessageError {}); + } + _ => return Ok(()), + }; } - _ => match delivery.ack(BasicAckOptions { multiple: false }).await { - Err(e) => { - error!( - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), - "error whiling ack msg to default queue" - ); - span.record_error(&e); - span.set_status(Status::Error { - description: Cow::from("msg was sent to dlq"), - }); - - Err(AmqpError::AckMessageError {}) - } - _ => Ok(()), - }, } } @@ -202,7 +202,7 @@ fn extract_header_properties(props: &AMQPProperties) -> (String, i64) { let count = match headers.inner().get(AMQP_HEADERS_X_DEATH) { Some(value) => match value.as_array() { - Some(arr) => match arr.as_slice().first() { + Some(arr) => match arr.as_slice().get(0) { Some(value) => match value.as_field_table() { Some(table) => match table.inner().get(AMQP_HEADERS_COUNT) { Some(value) => match value.as_long_long_int() { diff --git a/rabbitmq/src/dispatcher.rs b/rabbitmq/src/dispatcher.rs index 734c809..560e569 100644 --- a/rabbitmq/src/dispatcher.rs +++ b/rabbitmq/src/dispatcher.rs @@ -91,7 +91,7 @@ impl RabbitMQDispatcher { while let Some(result) = consumer.next().await { match result { Ok(delivery) => { - if let Err(err) = consume( + match consume( &global::tracer("amqp consumer"), &delivery, &defs, @@ -99,7 +99,10 @@ impl RabbitMQDispatcher { ) .await { - error!(error = err.to_string(), "error consume msg") + Err(err) => { + error!(error = err.to_string(), "error consume msg") + } + _ => {} } } @@ -114,18 +117,18 @@ impl RabbitMQDispatcher { return Err(MessagingError::ConsumerError("some error occur".to_owned())); } - Ok(()) + return Ok(()); } pub async fn consume_blocking_multi(&self) -> Result<(), MessagingError> { let mut spawns = vec![]; - for (consumer_tag, def) in &self.dispatchers_def { + for (msg_type, def) in &self.dispatchers_def { let mut consumer = match self .channel .basic_consume( &def.queue_def.name, - consumer_tag, + &msg_type, BasicConsumeOptions { no_local: false, no_ack: false, @@ -151,7 +154,7 @@ impl RabbitMQDispatcher { while let Some(result) = consumer.next().await { match result { Ok(delivery) => { - if let Err(err) = consume( + match consume( &global::tracer("amqp consumer"), &delivery, &defs, @@ -159,7 +162,10 @@ impl RabbitMQDispatcher { ) .await { - error!(error = err.to_string(), "error consume msg"); + Err(err) => { + error!(error = err.to_string(), "error consume msg") + } + _ => {} } } diff --git a/rabbitmq/src/exchange.rs b/rabbitmq/src/exchange.rs index 81379d2..84a75fc 100644 --- a/rabbitmq/src/exchange.rs +++ b/rabbitmq/src/exchange.rs @@ -110,7 +110,7 @@ impl<'ex> ExchangeDefinition<'ex> { } pub fn passive(mut self) -> Self { - self.passive = true; + self.passive = self.passive; self } diff --git a/secrets_manager/Cargo.toml b/secrets_manager/Cargo.toml index f843c7f..0439a13 100644 --- a/secrets_manager/Cargo.toml +++ b/secrets_manager/Cargo.toml @@ -9,14 +9,14 @@ mocks = ["dep:mockall"] [dependencies] async-trait = { workspace = true } tracing = { workspace = true } -aws-config = { version = "1.5.1" } -aws-sdk-secretsmanager = { version = "1.32.0" } +aws-config = { version = "1.5.3" } +aws-sdk-secretsmanager = { version = "1.37.0" } serde_json = { workspace = true } thiserror = { workspace = true } # Used only with feature mock -mockall = { version = "0.12.1", optional = true } +mockall = { version = "0.12", optional = true } [dev-dependencies] -mockall = { version = "0.12.1" } +mockall = { version = "0.12" } tokio = { workspace = true, features = ["macros"] } diff --git a/secrets_manager/README.md b/secrets_manager/README.md index 7b941a7..d46643f 100644 --- a/secrets_manager/README.md +++ b/secrets_manager/README.md @@ -1 +1 @@ -# Secrets Manager Crate \ No newline at end of file +# Secrets Manager \ No newline at end of file diff --git a/secrets_manager/src/aws_client.rs b/secrets_manager/src/aws_client.rs index 3928284..e25bb90 100644 --- a/secrets_manager/src/aws_client.rs +++ b/secrets_manager/src/aws_client.rs @@ -15,7 +15,7 @@ pub struct AWSSecretClient { #[cfg_attr(mock, automock)] impl SecretClient for AWSSecretClient { fn get_by_key(&self, key: &str) -> Result { - let key = key.strip_prefix('!').unwrap_or_default(); + let key = key.strip_prefix("!").unwrap_or_default(); let value = self.secrets[key].clone(); let Value::String(secret) = value else { diff --git a/traces/src/injectors/grpc.rs b/traces/src/injectors/grpc.rs index 9578f99..466440a 100644 --- a/traces/src/injectors/grpc.rs +++ b/traces/src/injectors/grpc.rs @@ -25,6 +25,6 @@ impl<'a> Injector for GRPCInjector<'a> { pub fn inject(ctx: &Context, meta: &mut tonic::metadata::MetadataMap) { global::get_text_map_propagator(|propagator| { - propagator.inject_context(ctx, &mut GRPCInjector(meta)) + propagator.inject_context(&ctx, &mut GRPCInjector(meta)) }); } diff --git a/traces/src/lib.rs b/traces/src/lib.rs index 578d77d..70c5f68 100644 --- a/traces/src/lib.rs +++ b/traces/src/lib.rs @@ -23,7 +23,7 @@ where } let sampler = Sampler::TraceIdRatioBased(cfg.trace.export_rate_base); - Sampler::ParentBased(Box::new(sampler)) + return Sampler::ParentBased(Box::new(sampler)); } pub fn span_ctx(tracer: &BoxedTracer, kind: SpanKind, name: &str) -> Context {