Skip to content

Commit

Permalink
feat[transfer]: add logic for handling transfers and resolving them
Browse files Browse the repository at this point in the history
  • Loading branch information
HttpRafa committed Aug 25, 2024
1 parent 1738d96 commit 2125bd6
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 60 deletions.
2 changes: 1 addition & 1 deletion controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl Controller {
let nodes = Nodes::load_all(&drivers);
let groups = Groups::load_all(&nodes);
let servers = Servers::new(handle.clone());
let users = Users::new(/*handle.clone()*/);
let users = Users::new(handle.clone());
let event_bus = EventBus::new(/*handle.clone()*/);
Self {
handle: handle.clone(),
Expand Down
43 changes: 13 additions & 30 deletions controller/src/controller/event.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use channel::ChannelMessageSended;
use colored::Colorize;
use log::debug;
use uuid::Uuid;

use super::server::{ServerHandle, WeakServerHandle};
use crate::network::server::proto::ChannelMessage;

use std::{
any::{Any, TypeId},
Expand All @@ -14,10 +13,12 @@ use std::{
};

pub mod channel;
pub mod transfer;

#[derive(Eq, PartialEq)]
pub enum EventKey {
Channel(String),
Transfer(Uuid),
Custom(TypeId),
}

Expand Down Expand Up @@ -54,7 +55,7 @@ impl EventBus {
.push(registered_listener);
}

pub fn register_listener_with_server<E: Event>(
pub fn register_listener_under_server<E: Event>(
&self,
key: EventKey,
server: WeakServerHandle,
Expand Down Expand Up @@ -108,12 +109,12 @@ impl EventBus {
}
}

pub fn dispatch<E: Event>(&self, event: &E) -> u32 {
pub fn dispatch<E: Event>(&self, key: &EventKey, event: &E) -> u32 {
debug!("[{}] Dispatching event: {:?}", "EVENTS".blue(), event);

let mut count = 0;
let listeners = self.listeners.lock().unwrap();
if let Some(registered_listeners) = listeners.get(&EventKey::Custom(TypeId::of::<E>())) {
if let Some(registered_listeners) = listeners.get(key) {
for registered_listener in registered_listeners {
if let Some(listener) = registered_listener
.listener
Expand All @@ -127,30 +128,8 @@ impl EventBus {
count
}

pub fn dispatch_channel_message(&self, message: ChannelMessage) -> u32 {
debug!(
"[{}] Dispatching channel message: {:?}",
"EVENTS".blue(),
message
);

let mut count = 0;
let listeners = self.listeners.lock().unwrap();
if let Some(registered_listeners) =
listeners.get(&EventKey::Channel(message.channel.clone()))
{
let event = ChannelMessageSended { message };
for registered_listener in registered_listeners {
if let Some(listener) = registered_listener
.listener
.downcast_ref::<EventListener<ChannelMessageSended>>()
{
listener(&event);
count += 1;
}
}
}
count
pub fn dispatch_custom<E: Event>(&self, event: &E) -> u32 {
self.dispatch(&EventKey::Custom(TypeId::of::<E>()), event)
}
}

Expand All @@ -161,8 +140,12 @@ impl Hash for EventKey {
state.write_u8(0);
channel.hash(state);
}
EventKey::Custom(type_id) => {
EventKey::Transfer(server) => {
state.write_u8(1);
server.hash(state);
}
EventKey::Custom(type_id) => {
state.write_u8(2);
type_id.hash(state);
}
}
Expand Down
10 changes: 10 additions & 0 deletions controller/src/controller/event/transfer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use crate::controller::user::transfer::Transfer;

use super::Event;

#[derive(Debug)]
pub struct UserTransferRequested {
pub transfer: Transfer,
}

impl Event for UserTransferRequested {}
10 changes: 10 additions & 0 deletions controller/src/controller/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ impl Group {
true
});
}

pub fn get_free_server(&self) -> Option<ServerHandle> {
let servers = self.servers.lock().expect("Failed to lock servers");
for server in servers.iter() {
if let GroupedServer::Active(server) = server {
return Some(server.clone());
}
}
None
}
}

mod shared {
Expand Down
20 changes: 12 additions & 8 deletions controller/src/controller/user.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::HashMap,
ops::Deref,
sync::{Arc, Mutex, Weak},
};

Expand All @@ -8,7 +9,10 @@ use log::{debug, info};
use transfer::Transfer;
use uuid::Uuid;

use super::server::{ServerHandle, WeakServerHandle};
use super::{
server::{ServerHandle, WeakServerHandle},
WeakControllerHandle,
};

pub mod transfer;

Expand All @@ -18,16 +22,16 @@ pub type WeakUserHandle = Weak<User>;
type UsersMap = HashMap<Uuid, UserHandle>;

pub struct Users {
//controller: WeakControllerHandle,
controller: WeakControllerHandle,

/* Users that joined some started server */
users: Mutex<UsersMap>,
}

impl Users {
pub fn new(/*controller: WeakControllerHandle*/) -> Self {
pub fn new(controller: WeakControllerHandle) -> Self {
Self {
//controller,
controller,
users: Mutex::new(HashMap::new()),
}
}
Expand All @@ -53,7 +57,7 @@ impl Users {
pub fn handle_user_disconnected(&self, server: ServerHandle, uuid: Uuid) {
let mut users = self.users.lock().unwrap();
if let Some(user) = users.get(&uuid).cloned() {
if let CurrentServer::Connected(weak_server) = &user.server {
if let CurrentServer::Connected(weak_server) = user.server.lock().unwrap().deref() {
if let Some(strong_server) = weak_server.upgrade() {
// Verify if the user is connected to the server that is saying he is disconnecting
if Arc::ptr_eq(&strong_server, &server) {
Expand All @@ -74,7 +78,7 @@ impl Users {
pub fn cleanup_users(&self, dead_server: &ServerHandle) -> u32 {
let mut amount = 0;
self.users.lock().unwrap().retain(|_, user| {
if let CurrentServer::Connected(weak_server) = &user.server {
if let CurrentServer::Connected(weak_server) = user.server.lock().unwrap().deref() {
if let Some(server) = weak_server.upgrade() {
if Arc::ptr_eq(&server, dead_server) {
info!(
Expand Down Expand Up @@ -106,7 +110,7 @@ impl Users {
Arc::new(User {
name,
uuid,
server: CurrentServer::Connected(Arc::downgrade(server)),
server: Mutex::new(CurrentServer::Connected(Arc::downgrade(server))),
})
}
}
Expand All @@ -119,5 +123,5 @@ pub enum CurrentServer {
pub struct User {
pub name: String,
pub uuid: Uuid,
pub server: CurrentServer,
pub server: Mutex<CurrentServer>,
}
83 changes: 80 additions & 3 deletions controller/src/controller/user/transfer.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,81 @@
use std::{ops::Deref, sync::Arc};

use log::error;
use log::info;
use log::warn;

use crate::controller::{
event::{transfer::UserTransferRequested, EventKey},
group::WeakGroupHandle,
server::{ServerHandle, WeakServerHandle},
};

use super::{UserHandle, Users, WeakUserHandle};
use super::{CurrentServer, UserHandle, Users, WeakUserHandle};

impl Users {
pub fn transfer_all_users(&self, _server: &ServerHandle) -> u32 {
// TODO: Move all players from server to another server
0
}

pub fn transfer_user(&self, _user: &UserHandle, _target: &TransferTarget) -> bool {
// TODO: Move all players from server to another server
pub fn resolve_transfer(&self, user: &UserHandle, target: &TransferTarget) -> Option<Transfer> {
if let CurrentServer::Connected(from) = user.server.lock().unwrap().deref() {
match target {
TransferTarget::Server(to) => {
return Some(Transfer::new(
Arc::downgrade(user),
from.clone(),
to.clone(),
));
}
TransferTarget::Group(group) => {
if let Some(group) = group.upgrade() {
if let Some(to) = group.get_free_server() {
return Some(Transfer::new(
Arc::downgrade(user),
from.clone(),
Arc::downgrade(&to),
));
} else {
warn!("Failed to find free server in group {} while resolving transfer of user {}", group.name, user.name);
}
} else {
error!(
"Failed to upgrade group handle while resolving transfer of user {}",
user.name
);
}
}
}
}

None
}

pub fn transfer_user(&self, transfer: Transfer) -> bool {
if let Some((user, from, to)) = transfer.get_strong() {
info!(
"Transfering user {} from {} to server {}",
user.name, from.name, to.name
);

let controller = self
.controller
.upgrade()
.expect("Failed to upgrade controller");
controller.get_event_bus().dispatch(
&EventKey::Transfer(from.uuid),
&UserTransferRequested {
transfer: transfer.clone(),
},
);

*user.server.lock().unwrap() = CurrentServer::Transfering(transfer);
return true;
} else {
error!("Failed to transfer user because some required information is missing");
}

false
}
}
Expand All @@ -22,8 +85,22 @@ pub enum TransferTarget {
Group(WeakGroupHandle),
}

#[derive(Clone, Debug)]
pub struct Transfer {
pub user: WeakUserHandle,
pub from: WeakServerHandle,
pub to: WeakServerHandle,
}

impl Transfer {
pub fn new(user: WeakUserHandle, from: WeakServerHandle, to: WeakServerHandle) -> Self {
Self { user, from, to }
}

pub fn get_strong(&self) -> Option<(UserHandle, ServerHandle, ServerHandle)> {
let user = self.user.upgrade()?;
let from = self.from.upgrade()?;
let to = self.to.upgrade()?;
Some((user, from, to))
}
}
Loading

0 comments on commit 2125bd6

Please sign in to comment.