diff --git a/src/adapters/monitors/cloudwatch.rs b/src/adapters/monitors/cloudwatch.rs index a8815a6..635dfec 100644 --- a/src/adapters/monitors/cloudwatch.rs +++ b/src/adapters/monitors/cloudwatch.rs @@ -137,72 +137,137 @@ impl CloudWatch { impl Monitor for CloudWatch { type Item = CategoricalObservation<5, ResponseStatusCode>; - async fn query(&mut self, group: Group) -> Vec { + async fn query(&mut self) -> Vec { // This function queries the metrics that we care most about (2xx, 4xx, and 5xx errors), // compiles them into a list, then generates the correct number of // CategoricalObservations for each response code let now: DateTime = Utc::now(); let five_mins_ago: DateTime = now - Duration::minutes(5); - let count_future = self.query_cloudwatch( + let control_count_future = self.query_cloudwatch( ApiMetric::Count, "Releases", "prod", - group, + Group::Control, now, five_mins_ago, ); - let error4xx_future = self.query_cloudwatch( + let control_error4xx_future = self.query_cloudwatch( ApiMetric::Error4XX, "Releases", "prod", - group, + Group::Control, now, five_mins_ago, ); - let error5xx_future = self.query_cloudwatch( + let control_error5xx_future = self.query_cloudwatch( ApiMetric::Error5XX, "Releases", "prod", - group, + Group::Control, now, five_mins_ago, ); - let (count, error4xx, error5xx) = - tokio::join!(count_future, error4xx_future, error5xx_future); + let canary_count_future = self.query_cloudwatch( + ApiMetric::Count, + "Releases", + "prod", + Group::Experimental, + now, + five_mins_ago, + ); + + let canary_error4xx_future = self.query_cloudwatch( + ApiMetric::Error4XX, + "Releases", + "prod", + Group::Experimental, + now, + five_mins_ago, + ); + + let canary_error5xx_future = self.query_cloudwatch( + ApiMetric::Error5XX, + "Releases", + "prod", + Group::Experimental, + now, + five_mins_ago, + ); - let mut observations = vec![]; + let ( + control_count, + control_error4xx, + control_error5xx, + canary_count, + canary_error4xx, + canary_error5xx, + ) = tokio::join!( + control_count_future, + control_error4xx_future, + control_error5xx_future, + canary_count_future, + canary_error4xx_future, + canary_error5xx_future + ); + + let mut observations = Vec::new(); - let resp2xx = (error4xx + error5xx) - count; + let control_resp2xx = (control_error4xx + control_error5xx) - control_count; + let canary_resp2xx = (canary_error4xx + canary_error5xx) - canary_count; // Since we need a CategoricalObservation for each instance of a response code // but AWS only returns us a total count, we need to make our own // list of observations, 1 per counted item observations.extend( std::iter::repeat(CategoricalObservation { - group: group.clone(), + group: Group::Control, + outcome: ResponseStatusCode::_2XX, + }) + .take(control_resp2xx as usize), + ); + + observations.extend( + std::iter::repeat(CategoricalObservation { + group: Group::Control, + outcome: ResponseStatusCode::_4XX, + }) + .take(control_error4xx as usize), + ); + + observations.extend( + std::iter::repeat(CategoricalObservation { + group: Group::Control, + outcome: ResponseStatusCode::_5XX, + }) + .take(control_error5xx as usize), + ); + + observations.extend( + std::iter::repeat(CategoricalObservation { + group: Group::Experimental, outcome: ResponseStatusCode::_2XX, }) - .take(resp2xx as usize), + .take(canary_resp2xx as usize), ); observations.extend( std::iter::repeat(CategoricalObservation { - group: group.clone(), + group: Group::Experimental, outcome: ResponseStatusCode::_4XX, }) - .take(error4xx as usize), + .take(canary_error4xx as usize), ); observations.extend( std::iter::repeat(CategoricalObservation { - group: group.clone(), + group: Group::Experimental, outcome: ResponseStatusCode::_5XX, }) - .take(error5xx as usize), + .take(canary_error5xx as usize), ); return observations; diff --git a/src/adapters/monitors/mod.rs b/src/adapters/monitors/mod.rs index 8bf4c47..04ac4d4 100644 --- a/src/adapters/monitors/mod.rs +++ b/src/adapters/monitors/mod.rs @@ -30,7 +30,7 @@ pub trait Monitor { /// is supposed to represent the set that occurred since the last time this /// function was called. // TODO: This should return a result which we should account for in error handling. - async fn query(&mut self, group: Group) -> Vec; + async fn query(&mut self) -> Vec; /// Load a response code monitor using the configuration provided by the user. async fn load_from_conf() -> Result @@ -56,7 +56,6 @@ pub trait Monitor { /// This function runs indefinitely. pub fn repeat_query( mut observer: Box>, - group: Group, duration: tokio::time::Duration, ) -> impl tokio_stream::Stream { // • Everything happens in this stream closure, which desugars @@ -71,7 +70,7 @@ pub fn repeat_query( // Each iteration of the loop represents one unit of tiem. while timer.next().await.is_some() { // • We perform the query then dump the results into the stream. - let items = observer.query(group).await; + let items = observer.query().await; for item in items { yield item; } diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 961d23b..6010b7a 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -52,20 +52,12 @@ impl Pipeline { // only one task at a time (the active task) // can ever reach a block point. // • We spawn the Monitor as a stream. - let control_event_stream = - repeat_query(self.monitor, Group::Control, DEFAULT_QUERY_FREQUENCY); - let control_observations = - batch_observations(control_event_stream, DEFAULT_QUERY_FREQUENCY); - - // TODO: Uncomment once we add Control vs Canary monitoring - // let canary_event_stream = - // repeat_query(self.monitor, Group::Experimental, DEFAULT_QUERY_FREQUENCY); - // let canary_observations = batch_observations(canary_event_stream, DEFAULT_QUERY_FREQUENCY); - + let event_stream = repeat_query(self.monitor, DEFAULT_QUERY_FREQUENCY); + let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY); // • We push observations from the Monitor into the // DecisionEngine. let _controller = - EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, control_observations); + EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations); // • Pipe actions to the Ingress. todo!();