Skip to content

Commit

Permalink
Move group into query and add both control and canary queries in same…
Browse files Browse the repository at this point in the history
… function
  • Loading branch information
EricGhildyal committed Nov 20, 2024
1 parent 81f2957 commit fd595ff
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 31 deletions.
99 changes: 82 additions & 17 deletions src/adapters/monitors/cloudwatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,72 +137,137 @@ impl CloudWatch {
impl Monitor for CloudWatch {
type Item = CategoricalObservation<5, ResponseStatusCode>;

async fn query(&mut self, group: Group) -> Vec<Self::Item> {
async fn query(&mut self) -> Vec<Self::Item> {
// This function queries the metrics that we care most about (2xx, 4xx, and 5xx errors),
// compiles them into a list, then generates the correct number of
// CategoricalObservations for each response code
let now: DateTime<Utc> = Utc::now();
let five_mins_ago: DateTime<Utc> = now - Duration::minutes(5);

let count_future = self.query_cloudwatch(
let control_count_future = self.query_cloudwatch(
ApiMetric::Count,
"Releases",
"prod",
group,
Group::Control,
now,
five_mins_ago,
);

let error4xx_future = self.query_cloudwatch(
let control_error4xx_future = self.query_cloudwatch(
ApiMetric::Error4XX,
"Releases",
"prod",
group,
Group::Control,
now,
five_mins_ago,
);

let error5xx_future = self.query_cloudwatch(
let control_error5xx_future = self.query_cloudwatch(
ApiMetric::Error5XX,
"Releases",
"prod",
group,
Group::Control,
now,
five_mins_ago,
);

let (count, error4xx, error5xx) =
tokio::join!(count_future, error4xx_future, error5xx_future);
let canary_count_future = self.query_cloudwatch(
ApiMetric::Count,
"Releases",
"prod",
Group::Experimental,
now,
five_mins_ago,
);

let canary_error4xx_future = self.query_cloudwatch(
ApiMetric::Error4XX,
"Releases",
"prod",
Group::Experimental,
now,
five_mins_ago,
);

let canary_error5xx_future = self.query_cloudwatch(
ApiMetric::Error5XX,
"Releases",
"prod",
Group::Experimental,
now,
five_mins_ago,
);

let mut observations = vec![];
let (
control_count,
control_error4xx,
control_error5xx,
canary_count,
canary_error4xx,
canary_error5xx,
) = tokio::join!(
control_count_future,
control_error4xx_future,
control_error5xx_future,
canary_count_future,
canary_error4xx_future,
canary_error5xx_future
);

let mut observations = Vec::new();

let resp2xx = (error4xx + error5xx) - count;
let control_resp2xx = (control_error4xx + control_error5xx) - control_count;
let canary_resp2xx = (canary_error4xx + canary_error5xx) - canary_count;

// Since we need a CategoricalObservation for each instance of a response code
// but AWS only returns us a total count, we need to make our own
// list of observations, 1 per counted item
observations.extend(
std::iter::repeat(CategoricalObservation {
group: group.clone(),
group: Group::Control,
outcome: ResponseStatusCode::_2XX,
})
.take(control_resp2xx as usize),
);

observations.extend(
std::iter::repeat(CategoricalObservation {
group: Group::Control,
outcome: ResponseStatusCode::_4XX,
})
.take(control_error4xx as usize),
);

observations.extend(
std::iter::repeat(CategoricalObservation {
group: Group::Control,
outcome: ResponseStatusCode::_5XX,
})
.take(control_error5xx as usize),
);

observations.extend(
std::iter::repeat(CategoricalObservation {
group: Group::Experimental,
outcome: ResponseStatusCode::_2XX,
})
.take(resp2xx as usize),
.take(canary_resp2xx as usize),
);

observations.extend(
std::iter::repeat(CategoricalObservation {
group: group.clone(),
group: Group::Experimental,
outcome: ResponseStatusCode::_4XX,
})
.take(error4xx as usize),
.take(canary_error4xx as usize),
);

observations.extend(
std::iter::repeat(CategoricalObservation {
group: group.clone(),
group: Group::Experimental,
outcome: ResponseStatusCode::_5XX,
})
.take(error5xx as usize),
.take(canary_error5xx as usize),
);

return observations;
Expand Down
5 changes: 2 additions & 3 deletions src/adapters/monitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait Monitor {
/// is supposed to represent the set that occurred since the last time this
/// function was called.
// TODO: This should return a result which we should account for in error handling.
async fn query(&mut self, group: Group) -> Vec<Self::Item>;
async fn query(&mut self) -> Vec<Self::Item>;

/// Load a response code monitor using the configuration provided by the user.
async fn load_from_conf() -> Result<ResponseMonitor>
Expand All @@ -56,7 +56,6 @@ pub trait Monitor {
/// This function runs indefinitely.
pub fn repeat_query<T: Observation>(
mut observer: Box<dyn Monitor<Item = T>>,
group: Group,
duration: tokio::time::Duration,
) -> impl tokio_stream::Stream<Item = T> {
// • Everything happens in this stream closure, which desugars
Expand All @@ -71,7 +70,7 @@ pub fn repeat_query<T: Observation>(
// Each iteration of the loop represents one unit of tiem.
while timer.next().await.is_some() {
// • We perform the query then dump the results into the stream.
let items = observer.query(group).await;
let items = observer.query().await;
for item in items {
yield item;
}
Expand Down
14 changes: 3 additions & 11 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,12 @@ impl Pipeline {
// only one task at a time (the active task)
// can ever reach a block point.
// • We spawn the Monitor as a stream.
let control_event_stream =
repeat_query(self.monitor, Group::Control, DEFAULT_QUERY_FREQUENCY);
let control_observations =
batch_observations(control_event_stream, DEFAULT_QUERY_FREQUENCY);

// TODO: Uncomment once we add Control vs Canary monitoring
// let canary_event_stream =
// repeat_query(self.monitor, Group::Experimental, DEFAULT_QUERY_FREQUENCY);
// let canary_observations = batch_observations(canary_event_stream, DEFAULT_QUERY_FREQUENCY);

let event_stream = repeat_query(self.monitor, DEFAULT_QUERY_FREQUENCY);
let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY);
// • We push observations from the Monitor into the
// DecisionEngine.
let _controller =
EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, control_observations);
EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
// • Pipe actions to the Ingress.

todo!();
Expand Down

0 comments on commit fd595ff

Please sign in to comment.