Skip to content

Commit

Permalink
Restructure app to use actor architecture (#10)
Browse files Browse the repository at this point in the history
Restructure app to use actor architecture

This commit defines the core "actors" in Canary as the "monitor",
"the ingress", and the "decision engine", and structures the app
around the adapters facilitating those three operations.

--------

Add Methods on DecisionEngine and Create Metrics Module

I extracted the "observation type" type definitions into a module
called "metrics" to more closely reflect that the observations
could be other types of metrics, like CPU and RAM usage, in addition
to response codes.

I also added a little more detail to the trait definition of the
decision engine.

-------

Add an EngineController type that converts the Engine into a stream.

This commit introduces an EngineController, which is a wrapper
around the DecisionEngine interface that controls when the Engine
should be called. Its intended to be where configuration around
number of samples and timeouts are implemented.
  • Loading branch information
RobbieMcKinstry authored Oct 28, 2024
1 parent bc79d20 commit 367c263
Show file tree
Hide file tree
Showing 13 changed files with 361 additions and 1,134 deletions.
1,040 changes: 17 additions & 1,023 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ path = "src/bin/main.rs"
[dependencies]
async-stream = "0.3.6"
async-trait = "0.1.83"
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-cloudwatchlogs = "1.52.0"
# aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
# aws-sdk-cloudwatchlogs = "1.52.0"
chrono = "0.4.38"
clap = { version = "4.3", features = ["derive"] }
futures-core = "0.3.31"
Expand All @@ -42,4 +42,12 @@ static_assertions = "1.1"
[profile.dist]
inherits = "release"
lto = "thin"
# This enables Clippy lints for the entire crate.
# Lints that are configurable have their config set in .clippy.toml (currently none)
# Note that Rustdoc lints can only be configured in source code (see lib.rs)
# [lints.clippy]
# pedantic = "deny"

[lints.rust]
dead_code = "allow"
unsafe_code = "forbid"
17 changes: 17 additions & 0 deletions src/adapters/engines/action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/// An [Action] describes an effectful operation affecting the deployments.
/// Actions describe decisions made by the [DecisionEngine].
pub enum Action {
/// Ramp the canary to 100% traffic and decommission the control deployment.
Promote,
/// Ramp the control to 100% traffic and decommission the canary deployment.
Yank,
/// RampUp indicates the amount of traffic provided to the canary should increase
/// by one unit.
RampUp,
/// RampDown indicates the amount of traffic provided to the canary should decrease.
RampDown,
// NB: We don't have a no-op action type, which might be something the DecisionEngine
// provides, except that I'm picturing this Action type as part of the interface
// into the Ingress, so the Ingress just won't hear back anything from the engine
// if that's the case.
}
88 changes: 88 additions & 0 deletions src/adapters/engines/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use async_stream::stream;
use futures_core::Stream;
use tokio::{pin, select, time::interval};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;

use super::{Action, DecisionEngine, HashableCategory};

/// An [EngineController] is a wrapper around a DecisionEngine that
/// controls how and when its called. It essentially converts the
/// [DecisionEngine] into an async stream that only emits [Action]s
/// when there's an action to take.
pub struct EngineController {
// TODO: Implement these fields.
// Only run engine if this many samples
// has been received.
// minimum_samples: u64,
// Always run the engine if this many samples has been received.
// maximum_samples: u64,
// If this amount of time has elapsed, and the minimum number of samples
// has been received, then run the engine.
// minimum_duration: tokio::time::Duration,
// If this amount of time has elapsed, run the engine even if the
// minimum number of samples has not yet been reached.
maximum_duration: tokio::time::Duration,
// receive a shutdown signal.
// shutdown: Receiver<()>,
}

impl EngineController {
/// Convert this controller into a stream that emits [Action]s from the Engine.
pub fn run<T: HashableCategory>(
self,
mut engine: impl DecisionEngine<T>,
observations: impl Stream<Item = Vec<T>>,
) -> impl Stream<Item = Action> {
stream! {
// TODO: Implement the stream controls.
let timer = IntervalStream::new(interval(self.maximum_duration));
// Pin our streams to the stack for iteration.
pin!(timer);
pin!(observations);

/*
TODO: it looks like yield cannot be used from within a closure. Consider
// verifying and filing a bug if that's the case.
// A helper with yield syntax. This is how we run the engine, dumping
// an item to the stream only if its actionable.
let compute_next = || {
if let Some(action) = engine.compute() {
yield action;
}
};
*/

// • Check to see if we can read a new observation.
loop {
select! {
_ = timer.next() => {
// • Timer has ticked! Run the engine and check for the results.
// compute_next();
if let Some(action) = engine.compute() {
yield action;
}
}
observation = observations.next() => {
match observation {
Some(obs) => {
for observ in obs {
engine.add_observation(observ);
}
},
// Nothing left for us to compute.
// Run the engine one last time and exit.
None => {
// compute_next();
if let Some(action) = engine.compute() {
yield action;
}
break;
},
}
}
}
}
}
}
}
54 changes: 54 additions & 0 deletions src/adapters/engines/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::stats::EnumerableCategory;
use std::hash::Hash;

pub use action::Action;

/// Helper trait, since these requirements are often used by
/// our implementation of `ContingencyTables`.
trait HashableCategory: EnumerableCategory + Hash + Eq {}
impl<T: EnumerableCategory + Hash + Eq> HashableCategory for T {}

/// The decision engine receives observations from the monitor
/// and determines whether the canary should be promoted, yanked,
/// or scaled up or down.
pub trait DecisionEngine<T: HashableCategory> {
/// [add_observation] provides a new observation that the engine
/// should take under advisement before making a decision.
fn add_observation(&mut self, observation: T);

/// [compute] will ask the engine to run over all known observations.
/// The engine isn't required to output an [Action]. It might determine
/// there isn't enough data to make an affirmative decision.
fn compute(&mut self) -> Option<Action>;
}

pub type BoxEngine<T> = Box<dyn DecisionEngine<T>>;

mod action;
mod controller;

/// The AlwaysPromote decision engine will always return the Promote
/// action when prompted. It discards all observations.
#[cfg(test)]
pub struct AlwaysPromote;

#[cfg(test)]
impl<T: HashableCategory> DecisionEngine<T> for AlwaysPromote {
fn add_observation(&mut self, _: T) {}

fn compute(&mut self) -> Option<Action> {
// true to its name, it will always promote the canary.
Some(Action::Promote)
}
}

#[cfg(test)]
mod tests {
use super::DecisionEngine;
use crate::metrics::ResponseStatusCode;
use static_assertions::assert_obj_safe;

// We expect the DesignEngine to be boxed, and we expect
// it to use response codes as input.
assert_obj_safe!(DecisionEngine<ResponseStatusCode>);
}
14 changes: 14 additions & 0 deletions src/adapters/ingresses/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pub trait Ingress {}

pub struct MockIngress;
impl Ingress for MockIngress {}

impl From<MockIngress> for BoxIngress {
fn from(value: MockIngress) -> Self {
Box::new(value)
}
}

/// Convenience alias since this type is often dynamically
/// dispatched.
pub type BoxIngress = Box<dyn Ingress>;
29 changes: 21 additions & 8 deletions src/adapter/mod.rs → src/adapters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use aws_sdk_cloudwatchlogs as cloudwatchlogs;
use futures_core::stream::Stream;
use tokio::sync::mpsc::Sender;

use crate::stats::Observation;

pub struct CloudwatchLogsAdapter {
pub use engines::*;
pub use ingresses::*;
pub use monitors::*;

pub struct CloudwatchLogs {
/// The AWS client for querying Cloudwatch Logs.
client: Box<dyn ObservationEmitter>,
outbox: Sender<Observation>,
Expand All @@ -18,7 +21,7 @@ pub trait ObservationEmitter: Send + Sync {
fn emit_next(&mut self) -> Vec<Observation>;
}

impl CloudwatchLogsAdapter {
impl CloudwatchLogs {
/// Create a new [CloudwatchLogsAdapter] using a provided AWS client.
pub fn new(client: impl ObservationEmitter + 'static) -> impl Stream<Item = Observation> {
let (outbox, mut inbox) = tokio::sync::mpsc::channel(1024);
Expand All @@ -43,12 +46,22 @@ impl CloudwatchLogsAdapter {
}
}

/// Contains the trait definition and decision engine implementations.
/// DecisionEngines are responsible for determining
/// how much traffic is sent to deployments and when deployments should be yanked or promoted.
mod engines;
/// Contains the trait definition and ingress implementations. Ingresses are responsible
/// for actuating changes to traffic.
mod ingresses;
mod monitors;

#[cfg(test)]
mod tests {
use crate::adapter::Observation;
use crate::stats::{Group, StatusCategory};
use crate::adapters::Observation;
use crate::metrics::ResponseStatusCode;
use crate::stats::Group;

use super::{CloudwatchLogsAdapter, ObservationEmitter};
use super::{CloudwatchLogs, ObservationEmitter};

use futures_util::pin_mut;
use futures_util::StreamExt;
Expand All @@ -58,14 +71,14 @@ mod tests {
fn emit_next(&mut self) -> Vec<super::Observation> {
vec![Observation {
group: Group::Control,
outcome: StatusCategory::_2XX,
outcome: ResponseStatusCode::_2XX,
}]
}
}

#[tokio::test]
async fn smoke_adapter_works() {
let event_stream = CloudwatchLogsAdapter::new(FakeObservationEmitter);
let event_stream = CloudwatchLogs::new(FakeObservationEmitter);
pin_mut!(event_stream);
let mut count = 0;
while let Some(_) = event_stream.next().await {
Expand Down
72 changes: 72 additions & 0 deletions src/adapters/monitors/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use async_trait::async_trait;
use tokio::{pin, time::interval};
use tokio_stream::{wrappers::IntervalStream, StreamExt};

/// The maximum number of observations that can be recevied before we
/// recompute statistical significance.
/// If this number is too low, we'll be performing compute-intensive
/// statical tests very often. If this number is too high, we could
/// be waiting too long before computing, which could permit us to promote more eagerly.
const DEFAULT_BATCH_SIZE: usize = 512;

/// An [Observer] watches a particular external system (like AWS CloudWatch Logs)
/// and converts them into observations before emitting them as a stream.
#[async_trait]
pub trait Monitor {
/// The kind of object emitted by the Observer.
type Item;

/// The [query] method will query the observable external system on demand
/// and produce a collection of observations. This collection of observations
/// is supposed to represent the set that occurred since the last time this
/// function was called.
// TODO: This should return a result which we should account for in error handling.
async fn query(&mut self) -> Vec<Self::Item>;
}

// TODO: Add a call to chunk_timeout to ensure that items are arriving after a particular
// amount of time.
/// [repeat_query] runs the query on an interval and returns a stream of items.
/// This function runs indefinitely.
pub fn repeat_query<T: Monitor>(
mut observer: T,
duration: tokio::time::Duration,
) -> impl tokio_stream::Stream<Item = T::Item> {
// • Everything happens in this stream closure, which desugars
// into a background thread and a channel write at yield points.
async_stream::stream! {
// • Initialize a timer that fires every interval.
let timer = IntervalStream::new(interval(duration));
// • The timer must be pinned to use in an iterator
// because we must promise that its address must not
// be moved between iterations.
pin!(timer);
// Each iteration of the loop represents one unit of tiem.
while timer.next().await.is_some() {
// • We perform the query then dump the results into the stream.
let items = observer.query().await;
for item in items {
yield item;
}
}
}
}

// TODO: Honestly, this function can be inlined where used.
/// Batch observations together into maximally sized chunks, and dump
/// them to a stream every so often.
pub fn batch_observations<T: Monitor>(
obs: impl tokio_stream::Stream<Item = T::Item>,
duration: tokio::time::Duration,
) -> impl tokio_stream::Stream<Item = Vec<T::Item>> {
obs.chunks_timeout(DEFAULT_BATCH_SIZE, duration)
}

#[cfg(test)]
mod tests {
use static_assertions::assert_obj_safe;

use super::Monitor;

assert_obj_safe!(Monitor<Item = ()>);
}
2 changes: 1 addition & 1 deletion src/cmd/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct Version;

impl Version {
pub fn new() -> Self {
Self::default()
Self
}

/// Print the version and exit.
Expand Down
11 changes: 5 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
#![allow(dead_code)]

pub use config::Flags;

/// An adapter connects to some observable resource (like CloudWatch) and
/// An adapter connects to some observable resource (like `CloudWatch`) and
/// emits events, like failed and succeeded requests.
mod adapter;
mod adapters;
/// Contains the dispatch logic for running individual CLI subcommands.
/// The CLI's main function calls into these entrypoints for each subcommand.
mod cmd;
/// configuration of the CLI, either from the environment of flags.
mod config;
/// This is the data pipeline responsible for the control flow
/// of data from observers into number crunchers.
/// Contains the definitions of metrics that are valuable to detecting
/// canary health. Currently, ResponseStatusCode is the only metric of note.
pub mod metrics;
mod pipeline;
/// Our statistics library.
pub mod stats;
24 changes: 24 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::stats::EnumerableCategory;

/// [ResponseStatusCode] groups HTTP response status codes according
/// to five general categories. This type is used as the dependent
/// variable in statical observations.
#[derive(Hash, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub enum ResponseStatusCode {
// Information responses
_1XX,
// Successful responses
_2XX,
// Redirection messages
_3XX,
// Client error responses
_4XX,
// Server error responses
_5XX,
}

impl EnumerableCategory for ResponseStatusCode {
fn groups() -> Box<dyn Iterator<Item = Self>> {
Box::new([Self::_1XX, Self::_2XX, Self::_3XX, Self::_4XX, Self::_5XX].into_iter())
}
}
Loading

0 comments on commit 367c263

Please sign in to comment.