Skip to content

Commit

Permalink
Added start and stop logic for servers
Browse files Browse the repository at this point in the history
  • Loading branch information
HttpRafa committed Jun 8, 2024
1 parent 1eeab84 commit 85e4261
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 96 deletions.
40 changes: 21 additions & 19 deletions controller/application/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use anyhow::Error;
use log::{debug, warn};
use log::warn;
use server::Servers;
use tokio::sync::{Mutex, MutexGuard};
use tokio::time;

Expand All @@ -11,7 +12,6 @@ use crate::network::start_controller_server;
use crate::controller::driver::Drivers;
use crate::controller::group::Groups;
use crate::controller::node::Nodes;
use crate::controller::server::Servers;

pub mod node;
pub mod group;
Expand All @@ -32,10 +32,12 @@ pub struct Controller {
/* Runtime State */
running: AtomicBool,

/* Mutable | This can be changed by the user at runtime */
/* Accessed rarely */
nodes: Mutex<Nodes>,
groups: Mutex<Groups>,
servers: Mutex<Servers>,

/* Accessed frequently */
servers: Servers,
}

impl Controller {
Expand All @@ -44,14 +46,16 @@ impl Controller {
let nodes = Nodes::load_all(&drivers).await;
let groups = Groups::load_all(&nodes).await;
let servers = Servers::new();
Arc::new_cyclic(move |handle| Self {
handle: handle.clone(),
configuration,
drivers,
running: AtomicBool::new(true),
nodes: Mutex::new(nodes),
groups: Mutex::new(groups),
servers: Mutex::new(servers),
Arc::new_cyclic(move |handle| {
Self {
handle: handle.clone(),
configuration,
drivers,
running: AtomicBool::new(true),
nodes: Mutex::new(nodes),
groups: Mutex::new(groups),
servers,
}
})
}

Expand Down Expand Up @@ -86,21 +90,19 @@ impl Controller {
self.groups.lock().await
}

pub async fn request_servers(&self) -> MutexGuard<Servers> {
self.servers.lock().await
pub fn request_servers(&self) -> &Servers {
&self.servers
}

async fn tick(&self) {
// NOTE: We have to be careful to not lock something used in some tick method to avoid deadlocks

let mut servers = self.request_servers().await;
let servers = self.request_servers();
// Check if all groups have started there servers etc..
self.request_groups().await.tick(&mut servers).await;
self.request_groups().await.tick(servers).await;

// Check if all servers have sent their heartbeats and start requested server if we can
servers.tick().await;

debug!("Controller ticked");
servers.tick(&self.handle).await;
}
}

Expand Down
8 changes: 6 additions & 2 deletions controller/application/src/controller/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use log::info;
use tonic::async_trait;

use crate::controller::node::Node;
use crate::controller::server::ServerHandle;

#[cfg(feature = "wasm-drivers")]
use crate::controller::driver::wasm::WasmDriver;
Expand All @@ -26,13 +27,16 @@ pub trait GenericDriver: Send + Sync {
fn name(&self) -> &String;
async fn init(&self) -> Result<Information>;
async fn init_node(&self, node: &Node) -> Result<DriverNodeHandle>;
async fn stop_server(&self, server: &str) -> Result<()>;
async fn start_server(&self, server: &str) -> Result<()>;
}

#[async_trait]
pub trait GenericNode: Send + Sync {
/* Prepare */
async fn allocate_addresses(&self, amount: u32) -> Result<Vec<SocketAddr>>;
async fn deallocate_addresses(&self, addresses: Vec<SocketAddr>) -> Result<()>;

/* Servers */
async fn start_server(&self, server: &ServerHandle) -> Result<()>;
}

pub type DriverHandle = Arc<dyn GenericDriver>;
Expand Down
18 changes: 10 additions & 8 deletions controller/application/src/controller/driver/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fs;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::{Arc, Weak};

Expand Down Expand Up @@ -152,14 +153,6 @@ impl GenericDriver for WasmDriver {
Err(error) => Err(anyhow!(error)),
}
}

async fn stop_server(&self, _server: &str) -> Result<()> {
todo!()
}

async fn start_server(&self, _server: &str) -> Result<()> {
todo!()
}
}

impl WasmDriver {
Expand Down Expand Up @@ -328,4 +321,13 @@ impl From<&Capability> for bridge::Capability {
Capability::SubNode(node) => bridge::Capability::SubNode(node.to_owned()),
}
}
}

impl From<&SocketAddr> for bridge::Address {
fn from(val: &SocketAddr) -> Self {
bridge::Address {
ip: val.ip().to_string(),
port: val.port(),
}
}
}
40 changes: 27 additions & 13 deletions controller/application/src/controller/driver/wasm/node_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use anyhow::{anyhow, Result};
use tonic::async_trait;
use wasmtime::component::ResourceAny;

use crate::controller::driver::GenericNode;
use crate::controller::{driver::GenericNode, server::ServerHandle};

use super::WasmDriver;
use super::{exports::node::driver::bridge::Address, WasmDriver};

pub struct WasmNode {
pub handle: Weak<WasmDriver>,
Expand All @@ -19,21 +19,35 @@ impl GenericNode for WasmNode {
if let Some(driver) = self.handle.upgrade() {
let mut handle = driver.handle.lock().await;
let (_, store) = handle.get();
return match driver.bindings.node_driver_bridge().generic_node().call_allocate_addresses(store, self.resource, amount).await {
Ok(addresses) => match addresses {
Ok(addresses) => {
let mut result = Vec::with_capacity(addresses.len());
for address in addresses {
match driver.bindings.node_driver_bridge().generic_node().call_allocate_addresses(store, self.resource, amount).await {
Ok(Ok(addresses)) => {
addresses
.into_iter()
.map(|address| {
let ip = IpAddr::from_str(&address.ip)?;
result.push(SocketAddr::new(ip, address.port));
}
Ok(result)
},
Err(error) => Err(anyhow!(error)),
Ok(SocketAddr::new(ip, address.port))
})
.collect::<Result<Vec<SocketAddr>>>()
},
Ok(Err(error)) => Err(anyhow!(error)),
Err(error) => Err(error),
}
} else {
Err(anyhow!("Failed to get handle to wasm driver"))
}
Err(anyhow!("Failed to get handle to wasm driver"))
}

async fn deallocate_addresses(&self, addresses: Vec<SocketAddr>) -> Result<()> {
if let Some(driver) = self.handle.upgrade() {
let mut handle = driver.handle.lock().await;
let (_, store) = handle.get();
driver.bindings.node_driver_bridge().generic_node().call_deallocate_addresses(store, self.resource, &addresses.iter().map(|address| address.into()).collect::<Vec<Address>>()).await
} else {
Err(anyhow!("Failed to get handle to wasm driver"))
}
}

async fn start_server(&self, _server: &ServerHandle) -> Result<()> {
Err(anyhow!("Not implemented yet"))
}
}
30 changes: 19 additions & 11 deletions controller/application/src/controller/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::sync::Mutex;

use crate::config::{LoadFromTomlFile, SaveToTomlFile};

use super::{node::{Nodes, WeakNodeHandle}, server::{DeploySetting, Resources, Servers, StartRequest, WeakServerHandle}, CreationResult};
use super::{node::{Nodes, WeakNodeHandle}, server::{DeploySetting, Resources, ServerHandle, Servers, StartRequest}, CreationResult};

const GROUPS_DIRECTORY: &str = "groups";

Expand Down Expand Up @@ -82,7 +82,7 @@ impl Groups {
groups
}

pub async fn tick(&self, servers: &mut Servers) {
pub async fn tick(&self, servers: &Servers) {
for group in &self.groups {
let mut group = group.lock().await;
group.tick(servers).await;
Expand Down Expand Up @@ -123,8 +123,8 @@ impl Groups {

#[derive(Serialize, Deserialize, Clone, Copy, Default)]
pub struct ScalingPolicy {
pub min: u32,
pub max: u32,
pub minimum: u32,
pub maximum: u32,
pub priority: i32,
}

Expand All @@ -138,7 +138,7 @@ pub struct Group {
deployment: Vec<DeploySetting>,

/* Servers that the group has started */
servers: Vec<WeakServerHandle>,
servers: Vec<ServerHandle>,
}

impl Group {
Expand All @@ -149,7 +149,7 @@ impl Group {
name: name.to_string(),
nodes,
scaling: stored_group.scaling,
resources: stored_group.resources,
resources: stored_group.resources.clone(),
deployment: stored_group.deployment.clone(),
servers: Vec::new(),
})
Expand All @@ -165,11 +165,11 @@ impl Group {
Some(Self::from(name, stored_group, node_handles))
}

async fn tick(&mut self, servers: &mut Servers) {
async fn tick(&mut self, servers: &Servers) {
// Create how many servers we need to start to reach the min value
for requested in 0..(self.scaling.min as usize).saturating_sub(self.servers.len()) {
for requested in 0..(self.scaling.minimum as usize).saturating_sub(self.servers.len()) {
// Check if we have reached the max value
if (self.servers.len() + requested) >= self.scaling.max as usize {
if (self.servers.len() + requested) >= self.scaling.maximum as usize {
break;
}

Expand All @@ -178,12 +178,20 @@ impl Group {
name: format!("{}-{}", self.name, (self.servers.len() + requested)),
nodes: self.nodes.clone(),
group: self.handle.clone(),
resources: self.resources,
resources: self.resources.clone(),
deployment: self.deployment.clone(),
priority: self.scaling.priority,
});
}).await;
}
}

pub fn add_server(&mut self, server: ServerHandle) {
self.servers.push(server);
}

pub fn remove_server(&mut self, server: &ServerHandle) {
self.servers.retain(|handle| !Arc::ptr_eq(handle, server));
}
}

mod shared {
Expand Down
44 changes: 32 additions & 12 deletions controller/application/src/controller/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,29 @@ impl Nodes {
}

pub type AllocationHandle = Arc<Allocation>;
pub type WeakAllocationHandle = Weak<Allocation>;

pub struct Allocation {
pub addresses: Vec<SocketAddr>,
pub resources: Resources,
pub deployment: Vec<DeploySetting>,
}

impl Allocation {
pub fn primary_address(&self) -> &SocketAddr {
&self.addresses[0]
}
}

pub struct Node {
pub name: String,
pub capabilities: Vec<Capability>,

/* Driver handles */
pub driver: DriverHandle,
pub node: Option<DriverNodeHandle>,
inner: Option<DriverNodeHandle>,

/* Allocations made on this node */
pub allocations: Vec<WeakAllocationHandle>,
pub allocations: Vec<AllocationHandle>,
}

impl Node {
Expand All @@ -161,7 +168,7 @@ impl Node {
name: name.to_string(),
capabilities: stored_node.capabilities.clone(),
driver,
node: None,
inner: None,
allocations: Vec::new(),
}
}
Expand All @@ -181,18 +188,18 @@ impl Node {
pub async fn init(&mut self) -> Result<()> {
match self.driver.init_node(self).await {
Ok(value) => {
self.node = Some(value);
self.inner = Some(value);
Ok(())
},
Err(error) => Err(error),
}
}

pub async fn allocate(&mut self, resources: Resources, deployment: Vec<DeploySetting>) -> Result<AllocationHandle> {
pub async fn allocate(&mut self, resources: &Resources, deployment: &[DeploySetting]) -> Result<AllocationHandle> {
for capability in &self.capabilities {
match capability {
Capability::LimitedMemory(value) => {
let used_memory: u32 = self.allocations.iter().map(|ele| ele.upgrade().unwrap().resources.memory).sum();
let used_memory: u32 = self.allocations.iter().map(|allocation| allocation.resources.memory).sum();
if used_memory > *value {
return Err(anyhow!("Node has reached the memory limit"));
}
Expand All @@ -206,16 +213,29 @@ impl Node {
}
}

let addresses = self.node.as_ref().unwrap().allocate_addresses(resources.addresses).await?;
let addresses = self.inner.as_ref().unwrap().allocate_addresses(resources.addresses).await?;
if addresses.len() < resources.addresses as usize {
return Err(anyhow!("Node did not allocate the required amount of addresses"));
}

Ok(Arc::new(Allocation {
let allocation = Arc::new(Allocation {
addresses,
resources,
deployment,
}))
resources: resources.clone(),
deployment: deployment.to_owned(),
});
self.allocations.push(allocation.clone());
Ok(allocation)
}

pub async fn deallocate(&mut self, allocation: &AllocationHandle) {
self.inner.as_ref().unwrap().deallocate_addresses(allocation.addresses.clone()).await.unwrap_or_else(|error| {
error!("{} to deallocate addresses: {}", "Failed".red(), &error);
});
self.allocations.retain(|alloc| !Arc::ptr_eq(alloc, allocation));
}

pub fn get_inner(&self) -> &DriverNodeHandle {
self.inner.as_ref().unwrap()
}
}

Expand Down
Loading

0 comments on commit 85e4261

Please sign in to comment.