Skip to content

Commit

Permalink
Refactor app to use const generics.
Browse files Browse the repository at this point in the history
This commit replaces the old, dynamic initialization approach with
const generics, ensuring at compile-time that the number of categories
is correct.

I disabled some tests, but I'll re-enable them in a follow-on PR.
Ultimately, this PR represents a transitory period for the computation
of test statistics, so the code is pretty rough, but I'll be cleaning
it up immediately.
  • Loading branch information
RobbieMcKinstry committed Nov 8, 2024
1 parent b1005ff commit fa0e46d
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 372 deletions.
27 changes: 13 additions & 14 deletions src/adapters/engines/chi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,36 @@ use super::DecisionEngine;
/// significance test to determine whether the canary should be promoted or not.
#[derive(Default)]
pub struct ChiSquareEngine {
control_data: ExpectationTable<ResponseStatusCode>,
experimental_data: EmpiricalTable<ResponseStatusCode>,
control_data: ExpectationTable<5, ResponseStatusCode>,
experimental_data: EmpiricalTable<5, ResponseStatusCode>,
}

impl DecisionEngine<ResponseStatusCode> for ChiSquareEngine {
impl ChiSquareEngine {
pub fn new() -> Self {
Self::default()
}
}

impl DecisionEngine<CategoricalObservation<5, ResponseStatusCode>> for ChiSquareEngine {
// TODO: From writing this method, it's apparent there should be a Vec implementation
// that adds Vec::len() to the total and concats the vectors together, because
// otherwise we're wasting a ton of cycles just incrementing counters.
fn add_observation(&mut self, observation: CategoricalObservation<ResponseStatusCode>) {
fn add_observation(&mut self, observation: CategoricalObservation<5, ResponseStatusCode>) {
match observation.group {
Group::Control => {
// • Increment the number of observations for this category.
self.control_data.increment(observation.outcome);
self.control_data.increment(&observation.outcome);
}
Group::Experimental => {
// • Increment the number of observations in the canary contingency table.
self.experimental_data.increment(observation.outcome);
// • Then, let the control contingency table know that there was
// another experimental observation.
self.control_data.increment_experimental_total();
}
}
}

fn compute(&mut self) -> Option<super::Action> {
// TODO: Remember to call self.control_data.set_experimental_total
// for calculating the expected values.
todo!()
}
}

impl ChiSquareEngine {
pub fn new() -> Self {
Self::default()
}
}
8 changes: 4 additions & 4 deletions src/adapters/engines/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use tokio::{pin, select, time::interval};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;

use crate::stats::CategoricalObservation;
use crate::stats::Observation;

use super::{Action, DecisionEngine, HashableCategory};
use super::{Action, DecisionEngine};

/// An [EngineController] is a wrapper around a DecisionEngine that
/// controls how and when its called. It essentially converts the
Expand All @@ -31,10 +31,10 @@ pub struct EngineController {

impl EngineController {
/// Convert this controller into a stream that emits [Action]s from the Engine.
pub fn run<T: HashableCategory>(
pub fn run<T: Observation>(
self,
mut engine: impl DecisionEngine<T>,
observations: impl Stream<Item = Vec<CategoricalObservation<T>>>,
observations: impl Stream<Item = Vec<T>>,
) -> impl Stream<Item = Action> {
stream! {
// TODO: Set the MissedTickBehavior on the internal.
Expand Down
23 changes: 9 additions & 14 deletions src/adapters/engines/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
use crate::stats::{CategoricalObservation, EnumerableCategory};
use std::hash::Hash;
use crate::stats::Observation;

pub use action::Action;
pub use chi::ChiSquareEngine;

/// Helper trait, since these requirements are often used by
/// our implementation of `ContingencyTables`.
trait HashableCategory: EnumerableCategory + Hash + Eq {}
impl<T: EnumerableCategory + Hash + Eq> HashableCategory for T {}

/// The decision engine receives observations from the monitor
/// and determines whether the canary should be promoted, yanked,
/// or scaled up or down.
pub trait DecisionEngine<T: HashableCategory> {
pub trait DecisionEngine<T: Observation> {
/// [add_observation] provides a new observation that the engine
/// should take under advisement before making a decision.
fn add_observation(&mut self, observation: CategoricalObservation<T>);
fn add_observation(&mut self, observation: T);

/// [compute] will ask the engine to run over all known observations.
/// The engine isn't required to output an [Action]. It might determine
Expand All @@ -33,8 +26,8 @@ mod controller;
pub struct AlwaysPromote;

#[cfg(test)]
impl<T: HashableCategory> DecisionEngine<T> for AlwaysPromote {
fn add_observation(&mut self, _: CategoricalObservation<T>) {}
impl<T: Observation> DecisionEngine<T> for AlwaysPromote {
fn add_observation(&mut self, _: T) {}

fn compute(&mut self) -> Option<Action> {
// true to its name, it will always promote the canary.
Expand All @@ -45,9 +38,11 @@ impl<T: HashableCategory> DecisionEngine<T> for AlwaysPromote {
#[cfg(test)]
mod tests {
use super::{AlwaysPromote, DecisionEngine};
use crate::{adapters::Action, metrics::ResponseStatusCode};
use crate::{adapters::Action, metrics::ResponseStatusCode, stats::CategoricalObservation};
use static_assertions::assert_obj_safe;

type StatusCode = CategoricalObservation<5, ResponseStatusCode>;

// We expect the DesignEngine to be boxed, and we expect
// it to use response codes as input.
assert_obj_safe!(DecisionEngine<ResponseStatusCode>);
Expand All @@ -57,7 +52,7 @@ mod tests {
/// it with the rest of the system.
#[test]
fn mock_decision_engine() {
let mut engine: Box<dyn DecisionEngine<ResponseStatusCode>> = Box::new(AlwaysPromote);
let mut engine: Box<dyn DecisionEngine<StatusCode>> = Box::new(AlwaysPromote);
assert_eq!(Some(Action::Promote), engine.compute());
}
}
82 changes: 0 additions & 82 deletions src/adapters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,7 @@
use futures_core::stream::Stream;
use tokio::sync::mpsc::Sender;

use crate::{metrics::ResponseStatusCode, stats::CategoricalObservation};

pub use engines::*;
pub use ingresses::*;
pub use monitors::*;

pub struct CloudwatchLogs {
/// The AWS client for querying Cloudwatch Logs.
client: Box<dyn ObservationEmitter>,
outbox: Sender<CategoricalObservation<ResponseStatusCode>>,
}

// TODO: This must be a Boxed Async function since it needs
// to perform nonblocking network IO.
/// An ObservationEmitter returns the next set of observations when queried.
/// The list of Observations may be empty if no observations occurred in the window.
pub trait ObservationEmitter: Send + Sync {
fn emit_next(&mut self) -> Vec<CategoricalObservation<ResponseStatusCode>>;
}

impl CloudwatchLogs {
/// Create a new [CloudwatchLogsAdapter] using a provided AWS client.
pub fn new(
client: impl ObservationEmitter + 'static,
) -> impl Stream<Item = CategoricalObservation<ResponseStatusCode>> {
let (outbox, mut inbox) = tokio::sync::mpsc::channel(1024);
let adapter = Self {
client: Box::new(client),
outbox,
};
tokio::spawn(async move {
adapter.run().await;
});
async_stream::stream! {
while let Some(item) = inbox.recv().await {
yield item;
}
}
}

async fn run(mut self) {
for item in self.client.emit_next() {
self.outbox.send(item).await.unwrap();
}
}
}

/// Contains the trait definition and decision engine implementations.
/// DecisionEngines are responsible for determining
/// how much traffic is sent to deployments and when deployments should be yanked or promoted.
Expand All @@ -56,39 +10,3 @@ mod engines;
/// for actuating changes to traffic.
mod ingresses;
mod monitors;

#[cfg(test)]
mod tests {
use crate::adapters::CategoricalObservation;
use crate::metrics::ResponseStatusCode;
use crate::stats::Group;

use super::{CloudwatchLogs, ObservationEmitter};

use futures_util::pin_mut;
use futures_util::StreamExt;

struct FakeObservationEmitter;
impl ObservationEmitter for FakeObservationEmitter {
fn emit_next(&mut self) -> Vec<CategoricalObservation<ResponseStatusCode>> {
vec![CategoricalObservation {
group: Group::Control,
outcome: ResponseStatusCode::_2XX,
}]
}
}

#[tokio::test]
async fn smoke_adapter_works() {
let event_stream = CloudwatchLogs::new(FakeObservationEmitter);
pin_mut!(event_stream);
let mut count = 0;
while let Some(_) = event_stream.next().await {
println!("Yay!");
count += 1;
if count == 5 {
break;
}
}
}
}
8 changes: 1 addition & 7 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stats::{Categorical, EnumerableCategory};
use crate::stats::Categorical;
use std::fmt;

/// [ResponseStatusCode] groups HTTP response status codes according
Expand Down Expand Up @@ -42,12 +42,6 @@ impl fmt::Display for ResponseStatusCode {
}
}

impl EnumerableCategory for ResponseStatusCode {
fn groups() -> Box<dyn Iterator<Item = Self>> {
Box::new([Self::_1XX, Self::_2XX, Self::_3XX, Self::_4XX, Self::_5XX].into_iter())
}
}

#[cfg(test)]
mod tests {
use super::ResponseStatusCode;
Expand Down
15 changes: 9 additions & 6 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@ use crate::{
use bon::bon;
use miette::Result;

/// A local-only shorthand to make the type declaration more readable.
type StatusCode = CategoricalObservation<5, ResponseStatusCode>;

/// Pipeline captures the core logic of canary.
pub struct Pipeline {
engine: Box<dyn DecisionEngine<ResponseStatusCode>>,
engine: Box<dyn DecisionEngine<StatusCode>>,
ingress: Box<dyn Ingress>,
monitor: Box<dyn Monitor<Item = CategoricalObservation<ResponseStatusCode>>>,
monitor: Box<dyn Monitor<Item = StatusCode>>,
}

#[bon]
impl Pipeline {
#[builder]
fn new(
monitor: impl Monitor<Item = CategoricalObservation<ResponseStatusCode>> + 'static,
monitor: impl Monitor<Item = CategoricalObservation<5, ResponseStatusCode>> + 'static,
ingress: impl Ingress + 'static,
engine: impl DecisionEngine<ResponseStatusCode> + 'static,
engine: impl DecisionEngine<CategoricalObservation<5, ResponseStatusCode>> + 'static,
) -> Self {
Self {
engine: Box::new(engine),
Expand All @@ -44,10 +47,10 @@ pub async fn setup_pipeline() {
// • First, we create a monitor based on the configuration we've been given.
// It must use dynamic dispatch because we're not sure what kind of
// monitor it is.
let _monitor: Option<Box<dyn Monitor<Item = ResponseStatusCode>>> = None;
let _monitor: Option<Box<dyn Monitor<Item = StatusCode>>> = None;
// • Repeat for the Ingress and the Engine.
let _ingress: Box<dyn Ingress> = Box::new(MockIngress);
let _engine: Box<dyn DecisionEngine<ResponseStatusCode>> = Box::new(AlwaysPromote);
let _engine: Box<dyn DecisionEngine<StatusCode>> = Box::new(AlwaysPromote);

// TODO:
// Define the APIs that each of these things use.
Expand Down
Loading

0 comments on commit fa0e46d

Please sign in to comment.