diff --git a/crates/kernel/src/builtins/bf_server.rs b/crates/kernel/src/builtins/bf_server.rs index 28a9b0f2..4c9ca386 100644 --- a/crates/kernel/src/builtins/bf_server.rs +++ b/crates/kernel/src/builtins/bf_server.rs @@ -71,7 +71,6 @@ fn bf_notify(bf_args: &mut BfCallState<'_>) -> Result { .map_err(world_state_bf_err)?; let event = NarrativeEvent::notify_text(bf_args.exec_state.caller(), msg.to_string()); - bf_args .scheduler_sender .send(( @@ -81,7 +80,7 @@ fn bf_notify(bf_args: &mut BfCallState<'_>) -> Result { event, }, )) - .expect("scheduler is not listening"); + .ok(); // MOO docs say this should return none, but in reality it returns 1? Ok(Ret(v_int(1))) @@ -444,7 +443,7 @@ fn bf_queued_tasks(bf_args: &mut BfCallState<'_>) -> Result { .scheduler_sender .send(( bf_args.exec_state.task_id, - SchedulerControlMsg::DescribeOtherTasks(send), + SchedulerControlMsg::RequestQueuedTasks(send), )) .expect("scheduler is not listening"); let tasks = receive.recv().expect("scheduler is not listening"); diff --git a/crates/kernel/src/tasks/mod.rs b/crates/kernel/src/tasks/mod.rs index b9309ea6..6fb2dfd3 100644 --- a/crates/kernel/src/tasks/mod.rs +++ b/crates/kernel/src/tasks/mod.rs @@ -14,9 +14,6 @@ use crate::tasks::scheduler::TaskResult; use moor_values::var::{List, Objid}; -use std::cell::Cell; -use std::marker::PhantomData; -use std::sync::MutexGuard; use std::time::SystemTime; pub mod command_parse; @@ -42,9 +39,6 @@ impl TaskHandle { } } -pub(crate) type PhantomUnsync = PhantomData>; -pub(crate) type PhantomUnsend = PhantomData>; - /// The minimum set of information needed to make a *resolution* call for a verb. #[derive(Debug, Clone, Eq, PartialEq)] pub struct VerbCall { diff --git a/crates/kernel/src/tasks/scheduler.rs b/crates/kernel/src/tasks/scheduler.rs index bba18e04..d1800778 100644 --- a/crates/kernel/src/tasks/scheduler.rs +++ b/crates/kernel/src/tasks/scheduler.rs @@ -15,17 +15,16 @@ use std::collections::HashMap; use std::fs::File; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; -use std::time::{Duration, SystemTime}; +use std::sync::Arc; +use std::time::{Duration, Instant, SystemTime}; use bincode::{Decode, Encode}; use crossbeam_channel::Sender; use thiserror::Error; -use tracing::{error, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use uuid::Uuid; -use crossbeam_channel::internal::SelectHandle; use crossbeam_channel::Receiver; use std::sync::Mutex; use std::thread::yield_now; @@ -33,16 +32,16 @@ use std::thread::yield_now; use moor_compiler::compile; use moor_compiler::CompileError; use moor_db::Database; +use moor_values::model::VerbProgramError; use moor_values::model::{BinaryType, CommandError, HasUuid, VerbAttrs}; use moor_values::model::{CommitResult, Perms}; -use moor_values::model::{VerbProgramError, WorldStateSource}; use moor_values::var::Error::{E_INVARG, E_PERM}; -use moor_values::var::{v_err, v_int, v_none, v_string, List, Var}; +use moor_values::var::{v_err, v_int, v_string, List, Var}; use moor_values::var::{Objid, Variant}; use moor_values::{AsByteBuffer, SYSTEM_OBJECT}; use SchedulerError::{ - CommandExecutionError, CompilationError, CouldNotStartTask, InputRequestNotFound, - TaskAbortedCancelled, TaskAbortedError, TaskAbortedException, TaskAbortedLimit, + CommandExecutionError, CompilationError, InputRequestNotFound, TaskAbortedCancelled, + TaskAbortedError, TaskAbortedException, TaskAbortedLimit, }; use crate::config::Config; @@ -52,7 +51,7 @@ use crate::tasks::command_parse::ParseMatcher; use crate::tasks::scheduler::SchedulerError::{TaskNotFound, VerbProgramFailed}; use crate::tasks::sessions::Session; use crate::tasks::task::Task; -use crate::tasks::task_messages::{SchedulerControlMsg, TaskControlMsg, TaskStart}; +use crate::tasks::task_messages::{SchedulerControlMsg, TaskStart}; use crate::tasks::{TaskDescription, TaskHandle, TaskId}; use crate::textdump::{make_textdump, TextdumpWriter}; use crate::vm::Fork; @@ -73,21 +72,63 @@ pub struct Scheduler { running: Arc, database: Arc, next_task_id: AtomicUsize, - inner: RwLock, + + /// The internal task queue which holds our suspended tasks, and control records for actively + /// running tasks. + task_q: Arc>, +} + +/// Scheduler-side per-task record. Lives in the scheduler thread and owned by the scheduler and +/// not shared elsewhere. +/// The actual `Task` is owned by the task thread until it is suspended or completed. +/// (When suspended it is moved into a `SuspendedTask` in the `.suspended` list) +struct RunningTaskControl { + /// For which player this task is running on behalf of. + player: Objid, + /// A kill switch to signal the task to stop. True means the VM execution thread should stop + /// as soon as it can. + kill_switch: Arc, + /// The connection-session for this task. + session: Arc, + /// A mailbox to deliver the result of the task to a waiting party with a subscription, if any. + result_sender: Option>, +} + +/// State a suspended task sits in inside the `suspended` side of the task queue. +/// When tasks are not running they are moved into these. +struct SuspendedTask { + wake_condition: WakeCondition, + task: Task, + session: Arc, + result_sender: Option>, +} + +/// Possible conditions in which a suspended task can wake from suspension. +enum WakeCondition { + /// This task will never wake up on its own, and must be manually woken with `bf_resume` + Never, + /// This task will wake up when the given time is reached. + Time(Instant), + /// This task will wake up when the given input request is fulfilled. + Input(Uuid), } -struct Inner { - input_requests: HashMap, - tasks: HashMap, +/// The internal state of the task queue. +struct TaskQ { + tasks: HashMap, + suspended: HashMap, } +/// Reasons a task might be aborted for a 'limit' #[derive(Clone, Copy, Debug, Eq, PartialEq, Decode, Encode)] pub enum AbortLimitReason { + /// This task hit its allotted tick limit. Ticks(usize), + /// This task hit its allotted time limit. Time(Duration), } -/// Results returned to waiters on tasks during subscription. +/// Possible results returned to waiters on tasks to which they've subscribed. #[derive(Clone, Debug)] pub enum TaskResult { Success(Var), @@ -119,66 +160,19 @@ pub enum SchedulerError { VerbProgramFailed(VerbProgramError), } -/// Scheduler-side per-task record. Lives in the scheduler thread and owned by the scheduler and -/// not shared elsewhere. -struct TaskControl { - task_id: TaskId, - player: Objid, - /// Outbound mailbox for messages from the scheduler to the task. - task_control_sender: Sender, - state_source: Arc, - session: Arc, - suspended: bool, - waiting_input: Option, - resume_time: Option, - // TODO: find a way for this not to be in a mutex. - result_sender: Mutex>>, -} - -/// The set of actions that the scheduler needs to take in response to a task control message. -enum TaskHandleResult { - /// The final result of a task, to be sent back to the task's subscriber if there is one. - /// (Otherwise the result is thrown away). - Result(TaskId, TaskResult), - /// A request to fork a new task. - Fork { - fork_request: Fork, - reply: oneshot::Sender, - session: Arc, - }, - /// A request for a description of another task. - Describe(TaskId, oneshot::Sender>), - Kill { - requesting_task_id: TaskId, - victim_task_id: TaskId, - sender_permissions: Perms, - result_sender: oneshot::Sender, - }, - Resume { - requesting_task_id: TaskId, - queued_task_id: TaskId, - sender_permissions: Perms, - return_value: Var, - result_sender: oneshot::Sender, - }, - Disconnect(TaskId, Objid), - Retry(TaskId), -} - -/// Public facing interface for the scheduler. impl Scheduler { pub fn new(database: Arc, config: Config) -> Self { let config = Arc::new(config); let (control_sender, control_receiver) = crossbeam_channel::unbounded(); - let inner = Inner { - input_requests: Default::default(), + let inner = TaskQ { tasks: Default::default(), + suspended: Default::default(), }; Self { running: Arc::new(AtomicBool::new(false)), database, next_task_id: Default::default(), - inner: RwLock::new(inner), + task_q: Arc::new(Mutex::new(inner)), config, control_sender, control_receiver, @@ -186,9 +180,50 @@ impl Scheduler { } /// Execute the scheduler loop, run from the server process. + #[instrument(skip(self))] pub fn run(self: Arc) { self.running.store(true, Ordering::SeqCst); - self.clone().do_process(); + info!("Starting scheduler loop"); + loop { + let is_running = self.running.load(Ordering::SeqCst); + if !is_running { + warn!("Scheduler stopping"); + break; + } + { + // Look for tasks that need to be woken (have hit their wakeup-time), and wake them. + let mut inner = self.task_q.lock().unwrap(); + let now = Instant::now(); + + // We need to take the tasks that need waking out of the suspended list, and then + // rehydrate them. + let to_wake = inner + .suspended + .iter() + .filter_map(|(task_id, sr)| match &sr.wake_condition { + WakeCondition::Time(t) => (*t <= now).then_some(*task_id), + _ => None, + }) + .collect::>(); + + for task_id in to_wake { + let sr = inner.suspended.remove(&task_id).unwrap(); + if let Err(e) = inner.resume_task_thread( + sr.task, + v_int(0), + sr.session, + sr.result_sender, + &self.control_sender, + self.database.clone(), + ) { + error!(?task_id, ?e, "Error resuming task"); + } + } + } + if let Ok((task_id, msg)) = self.control_receiver.recv_timeout(SCHEDULER_TICK_TIME) { + self.handle_task_control_msg(task_id, msg); + } + } info!("Scheduler done."); } @@ -202,20 +237,12 @@ impl Scheduler { ) -> Result { trace!(?player, ?command, "Command submitting"); - let task_start = TaskStart::StartCommandVerb { + let task_start = Arc::new(TaskStart::StartCommandVerb { player, command: command.to_string(), - }; + }); - self.new_task( - task_start, - player, - session, - None, - self.control_sender.clone(), - player, - false, - ) + self.new_task(task_start, player, session, None, player, false) } /// Receive input that the (suspended) task previously requested, using the given @@ -232,18 +259,26 @@ impl Scheduler { // the given input, clearing the input request out. trace!(?input_request_id, ?input, "Received input for task"); - let mut inner = self.inner.write().unwrap(); - let Some(task_id) = inner.input_requests.get(&input_request_id) else { + let mut inner = self.task_q.lock().unwrap(); + + // Find the task that requested this input, if any + let Some((task_id, perms)) = inner.suspended.iter().find_map(|(task_id, sr)| { + if let WakeCondition::Input(request_id) = &sr.wake_condition { + if *request_id == input_request_id { + Some((*task_id, sr.task.perms)) + } else { + None + } + } else { + None + } + }) else { + warn!(?input_request_id, "Input request not found"); return Err(InputRequestNotFound(input_request_id.as_u128())); }; - let task_id = *task_id; - let Some(task) = inner.tasks.get_mut(&task_id) else { - warn!(?task_id, ?input_request_id, "Input received for dead task"); - return Err(TaskNotFound(task_id)); - }; // If the player doesn't match, we'll pretend we didn't even see it. - if task.player != player { + if perms != player { warn!( ?task_id, ?input_request_id, @@ -253,17 +288,17 @@ impl Scheduler { return Err(TaskNotFound(task_id)); } - // Now we can resume the task with the given input - let tcs = task.task_control_sender.clone(); - task.waiting_input = None; - tcs.send(TaskControlMsg::ResumeReceiveInput( - task.state_source.clone(), - input, - )) - .map_err(|_| CouldNotStartTask)?; - inner.input_requests.remove(&input_request_id); + let sr = inner.suspended.remove(&task_id).expect("Corrupt task list"); - Ok(()) + // Wake and bake. + inner.resume_task_thread( + sr.task, + v_string(input), + sr.session, + sr.result_sender, + &self.control_sender, + self.database.clone(), + ) } /// Submit a verb task to the scheduler for execution. @@ -282,23 +317,15 @@ impl Scheduler { perms: Objid, session: Arc, ) -> Result { - let task_start = TaskStart::StartVerb { + let task_start = Arc::new(TaskStart::StartVerb { player, vloc, verb, args: List::from_slice(&args), argstr, - }; + }); - self.new_task( - task_start, - player, - session, - None, - self.control_sender.clone(), - perms, - false, - ) + self.new_task(task_start, player, session, None, perms, false) } #[instrument(skip(self, session))] @@ -310,23 +337,15 @@ impl Scheduler { session: Arc, ) -> Result { let args = command.into_iter().map(v_string).collect::>(); - let task_start = TaskStart::StartVerb { + let task_start = Arc::new(TaskStart::StartVerb { player, vloc: SYSTEM_OBJECT, verb: "do_out_of_band_command".to_string(), args: List::from_slice(&args), argstr, - }; + }); - self.new_task( - task_start, - player, - session, - None, - self.control_sender.clone(), - player, - false, - ) + self.new_task(task_start, player, session, None, player, false) } /// Submit an eval task to the scheduler for execution. @@ -344,24 +363,31 @@ impl Scheduler { Err(e) => return Err(CompilationError(e)), }; - let task_start = TaskStart::StartEval { + let task_start = Arc::new(TaskStart::StartEval { player, program: binary, - }; + }); - self.new_task( - task_start, - player, - sessions, - None, - self.control_sender.clone(), - perms, - false, - ) + self.new_task(task_start, player, sessions, None, perms, false) + } + + #[instrument(skip(self))] + pub fn submit_shutdown( + &self, + task: TaskId, + reason: Option, + ) -> Result<(), SchedulerError> { + // If we can't deliver a shutdown message, that's really a cause for panic! + self.control_sender + .send((task, SchedulerControlMsg::Shutdown(reason))) + .expect("could not send clean shutdown message"); + Ok(()) } /// Start a transaction, match the object name and verb name, and if it exists and the /// permissions are correct, program the verb with the given code. + // TODO: this probably doesn't belong on scheduler + #[instrument(skip(self))] pub fn program_verb( &self, player: Objid, @@ -427,61 +453,31 @@ impl Scheduler { Err(VerbProgramFailed(VerbProgramError::DatabaseError)) } - pub fn submit_shutdown( - &self, - task: TaskId, - reason: Option, - ) -> Result<(), SchedulerError> { - // If we can't deliver a shutdown message, that's really a cause for panic! - self.control_sender - .send((task, SchedulerControlMsg::Shutdown(reason))) - .expect("could not send clean shutdown message"); - Ok(()) - } - - pub fn abort_player_tasks(&self, player: Objid) -> Result<(), SchedulerError> { - let mut to_abort = Vec::new(); - let mut inner = self.inner.write().unwrap(); - for (task_id, task_ref) in inner.tasks.iter() { - if task_ref.player == player { - to_abort.push(*task_id); - } - } - for task_id in to_abort { - let task = inner.tasks.get_mut(&task_id).expect("Corrupt task list"); - let tcs = task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Abort) { - warn!(task_id, error = ?e, "Could not send abort for task. Dead?"); - continue; - } - } - - Ok(()) - } - - /// Request information on all tasks known to the scheduler. - pub fn tasks(&self) -> Result, SchedulerError> { - let inner = self.inner.read().unwrap(); + /// Request information on all (suspended) tasks known to the scheduler. + pub fn tasks(&self) -> Vec { + let inner = self.task_q.lock().unwrap(); let mut tasks = Vec::new(); - for (task_id, task) in inner.tasks.iter() { - trace!(task_id, "Requesting task description"); - let (t_send, t_reply) = oneshot::channel(); - let tcs = task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Describe(t_send)) { - warn!(task_id, error = ?e, "Could not request task description for task. Dead?"); - continue; - } - let Ok(task_desc) = t_reply.recv() else { - warn!( - task_id, - "Could not request task description for task. Dead?" - ); - continue; + + // Suspended tasks. + for (_, sr) in inner.suspended.iter() { + let start_time = match sr.wake_condition { + WakeCondition::Time(t) => { + let distance_from_now = t.duration_since(Instant::now()); + Some(SystemTime::now() + distance_from_now) + } + _ => None, }; - trace!(task_id, "Got task description"); - tasks.push(task_desc); + tasks.push(TaskDescription { + task_id: sr.task.task_id, + start_time, + permissions: sr.task.perms, + verb_name: sr.task.vm_host.verb_name().clone(), + verb_definer: sr.task.vm_host.verb_definer(), + line_number: sr.task.vm_host.line_number(), + this: sr.task.vm_host.this(), + }); } - Ok(tasks) + tasks } /// Stop the scheduler run loop. @@ -489,13 +485,9 @@ impl Scheduler { warn!("Issuing clean shutdown..."); { // Send shut down to all the tasks. - let inner = self.inner.read().unwrap(); - for task in inner.tasks.values() { - let tcs = task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Abort) { - warn!(task_id = task.task_id, error = ?e, "Could not send abort for task. Already dead?"); - continue; - } + let mut inner = self.task_q.lock().unwrap(); + for (_, task) in inner.tasks.drain() { + task.kill_switch.store(true, Ordering::SeqCst); } } warn!("Waiting for tasks to finish..."); @@ -503,7 +495,7 @@ impl Scheduler { // Then spin until they're all done. loop { { - let inner = self.inner.read().unwrap(); + let inner = self.task_q.lock().unwrap(); if inner.tasks.is_empty() { break; } @@ -516,136 +508,60 @@ impl Scheduler { Ok(()) } - - pub fn abort_task(&self, id: TaskId) -> Result<(), SchedulerError> { - let mut inner = self.inner.write().unwrap(); - let task = inner.tasks.get_mut(&id).ok_or(TaskNotFound(id))?; - let tcs = task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Abort) { - error!(error = ?e, "Could not send abort message to task on its channel. Already dead?"); - } - Ok(()) - } } impl Scheduler { - fn do_process(&self) { - // TODO: Improve scheduler "tick" and "prune" logic. It's a bit of a mess. - // we might be able to use a vector of delay-futures for this instead, and just poll - // those using some futures_util magic. - info!("Starting scheduler loop"); - loop { - let is_running = self.running.load(Ordering::SeqCst); - if !is_running { - warn!("Scheduler stopping"); - break; - } - - // Look for tasks that need to be woken (have hit their wakeup-time), and wake them. - // Or tasks that need pruning. - let mut to_wake = Vec::new(); - let mut to_prune = Vec::new(); - { - let inner = self.inner.read().unwrap(); - for (task_id, task) in inner.tasks.iter() { - if !task.task_control_sender.is_ready() { - warn!( - task_id, - "Task is present but its channel is invalid. Pruning." - ); - to_prune.push(*task_id); - continue; - } - - if !task.suspended { - continue; - } - let Some(delay) = task.resume_time else { - continue; - }; - if delay <= SystemTime::now() { - to_wake.push(*task_id); - } - } - } - if !to_wake.is_empty() { - self.process_wake_ups(&to_wake); - } - if !to_prune.is_empty() { - self.process_task_removals(&to_prune); - } - if let Ok(msg) = self.control_receiver.recv_timeout(SCHEDULER_TICK_TIME) { - let (task_id, msg) = msg; - if let Some(action) = self.handle_task_control_msg(task_id, msg) { - self.process_task_action(action); - } - } - } - info!("Done."); - } - /// Handle scheduler control messages inbound from tasks. /// Note: this function should never be allowed to panic, as it is called from the scheduler main loop. - fn handle_task_control_msg( - &self, - task_id: TaskId, - msg: SchedulerControlMsg, - ) -> Option { + #[instrument(skip(self))] + fn handle_task_control_msg(&self, task_id: TaskId, msg: SchedulerControlMsg) { match msg { SchedulerControlMsg::TaskSuccess(value) => { // Commit the session. - let mut inner = self.inner.write().unwrap(); + let mut inner = self.task_q.lock().unwrap(); let Some(task) = inner.tasks.get_mut(&task_id) else { warn!(task_id, "Task not found for success"); - return None; + return; }; let Ok(()) = task.session.commit() else { warn!("Could not commit session; aborting task"); - return Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )); + return inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); }; trace!(?task_id, result = ?value, "Task succeeded"); - Some(TaskHandleResult::Result( - task_id, - TaskResult::Success(value), - )) + return inner.send_task_result(task_id, TaskResult::Success(value)); } - SchedulerControlMsg::TaskConflictRetry => { + SchedulerControlMsg::TaskConflictRetry(task) => { trace!(?task_id, "Task retrying due to conflict"); // Ask the task to restart itself, using its stashed original start info, but with // a brand new transaction. - Some(TaskHandleResult::Retry(task_id)) + let mut inner = self.task_q.lock().unwrap(); + inner.retry_task(task, &self.control_sender, self.database.clone()); } SchedulerControlMsg::TaskVerbNotFound(this, verb) => { // I'd make this 'warn' but `do_command` gets invoked for every command and // many cores don't have it at all. So it would just be way too spammy. trace!(this = ?this, verb, ?task_id, "Verb not found, task cancelled"); - - Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )) + let mut inner = self.task_q.lock().unwrap(); + inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); } SchedulerControlMsg::TaskCommandError(parse_command_error) => { // This is a common occurrence, so we don't want to log it at warn level. trace!(?task_id, error = ?parse_command_error, "command parse error"); - - Some(TaskHandleResult::Result( + let mut inner = self.task_q.lock().unwrap(); + inner.send_task_result( task_id, TaskResult::Error(CommandExecutionError(parse_command_error)), - )) + ); } SchedulerControlMsg::TaskAbortCancelled => { warn!(?task_id, "Task cancelled"); // Rollback the session. - let mut inner = self.inner.write().unwrap(); + let mut inner = self.task_q.lock().unwrap(); let Some(task) = inner.tasks.get_mut(&task_id) else { warn!(task_id, "Task not found for abort"); - return None; + return; }; if let Err(send_error) = task .session @@ -656,15 +572,9 @@ impl Scheduler { let Ok(()) = task.session.rollback() else { warn!("Could not rollback session; aborting task"); - return Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )); + return inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); }; - Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedCancelled), - )) + inner.send_task_result(task_id, TaskResult::Error(TaskAbortedCancelled)); } SchedulerControlMsg::TaskAbortLimitsReached(limit_reason) => { let abort_reason_text = match limit_reason { @@ -679,10 +589,10 @@ impl Scheduler { }; // Commit the session - let mut inner = self.inner.write().unwrap(); + let mut inner = self.task_q.lock().unwrap(); let Some(task) = inner.tasks.get_mut(&task_id) else { warn!(task_id, "Task not found for abort"); - return None; + return; }; task.session @@ -691,18 +601,15 @@ impl Scheduler { let _ = task.session.commit(); - Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedLimit(limit_reason)), - )) + inner.send_task_result(task_id, TaskResult::Error(TaskAbortedLimit(limit_reason))); } SchedulerControlMsg::TaskException(exception) => { warn!(?task_id, finally_reason = ?exception, "Task threw exception"); - let mut inner = self.inner.write().unwrap(); + let mut inner = self.task_q.lock().unwrap(); let Some(task) = inner.tasks.get_mut(&task_id) else { warn!(task_id, "Task not found for abort"); - return None; + return; }; // Compose a string out of the backtrace @@ -722,10 +629,7 @@ impl Scheduler { let _ = task.session.commit(); - Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedException(exception)), - )) + inner.send_task_result(task_id, TaskResult::Error(TaskAbortedException(exception))); } SchedulerControlMsg::TaskRequestFork(fork_request, reply) => { trace!(?task_id, delay=?fork_request.delay, "Task requesting fork"); @@ -733,80 +637,94 @@ impl Scheduler { // Task has requested a fork. Dispatch it and reply with the new task id. // Gotta dump this out til we exit the loop tho, since self.tasks is already // borrowed here. - let mut inner = self.inner.write().unwrap(); - let Some(task) = inner.tasks.get_mut(&task_id) else { - warn!(task_id, "Task not found for fork request"); - return None; + let new_session = { + let mut inner = self.task_q.lock().unwrap(); + + let Some(task) = inner.tasks.get_mut(&task_id) else { + warn!(task_id, "Task not found for fork request"); + return; + }; + task.session.clone() }; - Some(TaskHandleResult::Fork { - fork_request, - reply, - session: task.session.clone(), - }) + self.process_fork_request(fork_request, reply, new_session); } - SchedulerControlMsg::TaskSuspend(resume_time) => { - trace!(task_id, "Handling task suspension until {:?}", resume_time); + SchedulerControlMsg::TaskSuspend(resume_time, task) => { + debug!(task_id, "Handling task suspension until {:?}", resume_time); // Task is suspended. The resume time (if any) is the system time at which // the scheduler should try to wake us up. - let mut inner = self.inner.write().unwrap(); - let Some(task) = inner.tasks.get_mut(&task_id) else { + let mut inner = self.task_q.lock().unwrap(); + + // Remove from the local task control... + let Some(tc) = inner.tasks.remove(&task_id) else { warn!(task_id, "Task not found for suspend request"); - return None; + return; }; // Commit the session. - let Ok(()) = task.session.commit() else { + let Ok(()) = tc.session.commit() else { warn!("Could not commit session; aborting task"); - return Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )); + return inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); }; - task.suspended = true; - task.resume_time = resume_time; - trace!(task_id, resume_time = ?task.resume_time, "Task suspended"); - None + // And insert into the suspended list. + let wake_condition = match resume_time { + Some(t) => WakeCondition::Time(t), + None => WakeCondition::Never, + }; + + inner.suspended.insert( + task_id, + SuspendedTask { + wake_condition, + task, + session: tc.session, + result_sender: tc.result_sender, + }, + ); + + debug!(task_id, "Task suspended"); } - SchedulerControlMsg::TaskRequestInput => { + SchedulerControlMsg::TaskRequestInput(task) => { // Task has gone into suspension waiting for input from the client. // Create a unique ID for this request, and we'll wake the task when the // session receives input. let input_request_id = Uuid::new_v4(); - { - let mut inner = self.inner.write().unwrap(); - let Some(task) = inner.tasks.get_mut(&task_id) else { - warn!(task_id, "Task not found for input request"); - return None; - }; - // Commit the session (not DB transaction) to make sure current output is - // flushed up to the prompt point. - let Ok(()) = task.session.commit() else { - warn!("Could not commit session; aborting task"); - return Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )); - }; + let mut inner = self.task_q.lock().unwrap(); + let Some(tc) = inner.tasks.remove(&task_id) else { + warn!(task_id, "Task not found for input request"); + return; + }; + // Commit the session (not DB transaction) to make sure current output is + // flushed up to the prompt point. + let Ok(()) = tc.session.commit() else { + warn!("Could not commit session; aborting task"); + return inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); + }; + + let Ok(()) = tc.session.request_input(tc.player, input_request_id) else { + warn!("Could not request input from session; aborting task"); + return inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); + }; + inner.suspended.insert( + task_id, + SuspendedTask { + wake_condition: WakeCondition::Input(input_request_id), + task, + session: tc.session, + result_sender: tc.result_sender, + }, + ); - let Ok(()) = task.session.request_input(task.player, input_request_id) else { - warn!("Could not request input from session; aborting task"); - return Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )); - }; - task.waiting_input = Some(input_request_id); - inner.input_requests.insert(input_request_id, task_id); - } trace!(?task_id, "Task suspended waiting for input"); - None } - SchedulerControlMsg::DescribeOtherTasks(reply) => { + SchedulerControlMsg::RequestQueuedTasks(reply) => { // Task is asking for a description of all other tasks. - Some(TaskHandleResult::Describe(task_id, reply)) + if let Err(e) = reply.send(self.tasks()) { + error!(?e, "Could not send task description to requester"); + // TODO: murder this errant task + } } SchedulerControlMsg::KillTask { victim_task_id, @@ -814,47 +732,45 @@ impl Scheduler { result_sender, } => { // Task is asking to kill another task. - Some(TaskHandleResult::Kill { - requesting_task_id: task_id, - victim_task_id, - sender_permissions, - result_sender, - }) + let mut inner = self.task_q.lock().unwrap(); + inner.kill_task(victim_task_id, sender_permissions, result_sender); } SchedulerControlMsg::ResumeTask { queued_task_id, sender_permissions, return_value, result_sender, - } => Some(TaskHandleResult::Resume { - requesting_task_id: task_id, - queued_task_id, - sender_permissions, - return_value, - result_sender, - }), + } => { + let mut inner = self.task_q.lock().unwrap(); + inner.resume_task( + task_id, + queued_task_id, + sender_permissions, + return_value, + result_sender, + &self.control_sender, + self.database.clone(), + ); + } SchedulerControlMsg::BootPlayer { player, sender_permissions: _, } => { // Task is asking to boot a player. - Some(TaskHandleResult::Disconnect(task_id, player)) + let mut inner = self.task_q.lock().unwrap(); + inner.disconnect_task(task_id, player); } SchedulerControlMsg::Notify { player, event } => { - // Task is asking to notify a player. - let mut inner = self.inner.write().unwrap(); + // Task is asking to notify a player of an event. + let mut inner = self.task_q.lock().unwrap(); let Some(task) = inner.tasks.get_mut(&task_id) else { warn!(task_id, "Task not found for notify request"); - return None; + return; }; let Ok(()) = task.session.send_event(player, event) else { warn!("Could not notify player; aborting task"); - return Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )); + return inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); }; - None } SchedulerControlMsg::Shutdown(msg) => { info!("Shutting down scheduler. Reason: {msg:?}"); @@ -862,29 +778,23 @@ impl Scheduler { Ok(_) => v_string("Scheduler stopping.".to_string()), Err(e) => v_string(format!("Shutdown failed: {e}")), }; - let mut inner = self.inner.write().unwrap(); + let mut inner = self.task_q.lock().unwrap(); let Some(task) = inner.tasks.get_mut(&task_id) else { warn!(task_id, "Task not found for notify request"); - return None; + return; }; match task.session.shutdown(msg) { - Ok(_) => Some(TaskHandleResult::Result( - task_id, - TaskResult::Success(result_mst), - )), + Ok(_) => inner.send_task_result(task_id, TaskResult::Success(result_mst)), Err(e) => { warn!(?e, "Could not notify player; aborting task"); - Some(TaskHandleResult::Result( - task_id, - TaskResult::Error(TaskAbortedError), - )) + inner.send_task_result(task_id, TaskResult::Error(TaskAbortedError)); } } } SchedulerControlMsg::Checkpoint => { let Some(textdump_path) = self.config.textdump_output.clone() else { error!("Cannot textdump as textdump_file not configured"); - return None; + return; }; let db = self.database.clone(); @@ -925,113 +835,250 @@ impl Scheduler { if let Err(e) = tr { error!(?e, "Could not start textdump thread"); } - None } } } - fn submit_fork_task( + #[instrument(skip(self, session))] + fn process_fork_request( &self, - fork: Fork, + fork_request: Fork, + reply: oneshot::Sender, session: Arc, - ) -> Result { - let suspended = fork.delay.is_some(); - - let player = fork.player; - let delay = fork.delay; - let progr = fork.progr; - let task_handle = self.new_task( - TaskStart::StartFork { - fork_request: fork, + ) { + let mut to_remove = vec![]; + + // Fork the session. + let forked_session = session.clone(); + + let suspended = fork_request.delay.is_some(); + let player = fork_request.player; + let delay = fork_request.delay; + let progr = fork_request.progr; + + let task_handle = match self.new_task( + Arc::new(TaskStart::StartFork { + fork_request, suspended, - }, + }), player, - session, + forked_session, delay, - self.control_sender.clone(), progr, - false, - )?; + true, + ) { + Ok(th) => th, + Err(e) => { + error!(?e, "Could not fork task"); + return; + } + }; let task_id = task_handle.task_id(); - let mut inner = self.inner.write().unwrap(); - let Some(task_ref) = inner.tasks.get_mut(&task_id) else { - return Err(TaskNotFound(task_id)); - }; - // If there's a delay on the fork, we will mark it in suspended state and put in the - // delay time. - if let Some(delay) = delay { - task_ref.suspended = true; - task_ref.resume_time = Some(SystemTime::now() + delay); + let reply = reply; + if let Err(e) = reply.send(task_id) { + error!(task = task_id, error = ?e, "Could not send fork reply. Parent task gone? Remove."); + to_remove.push(task_id); } + } - Ok(task_id) + #[instrument(skip(self, session))] + #[allow(clippy::too_many_arguments)] + fn new_task( + &self, + task_start: Arc, + player: Objid, + session: Arc, + delay_start: Option, + perms: Objid, + is_background: bool, + ) -> Result { + // TODO: support a queue-size on concurrent executing tasks and allow them to sit in an + // initially suspended state without spawning a worker thread, until the queue has space. + let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst); + let mut inner = self.task_q.lock().unwrap(); + inner.start_task_thread( + task_id, + task_start, + player, + session, + delay_start, + perms, + is_background, + &self.control_sender, + self.database.clone(), + ) } +} - fn process_task_action(&self, task_action: TaskHandleResult) { - let mut to_remove = vec![]; - match task_action { - TaskHandleResult::Result(task_id, result) => self.process_notification(task_id, result), - TaskHandleResult::Fork { - fork_request, - reply, - session, - } => { - self.process_fork_request(fork_request, reply, session); - } - TaskHandleResult::Describe(task_id, reply) => { - to_remove.extend(self.process_describe_request(task_id, reply)) - } - TaskHandleResult::Kill { - requesting_task_id, - victim_task_id, - sender_permissions, - result_sender, - } => { - to_remove.extend(self.process_kill_request( - requesting_task_id, - victim_task_id, - sender_permissions, - result_sender, - )); - } - TaskHandleResult::Resume { - requesting_task_id, - queued_task_id, - sender_permissions, - return_value, - result_sender, - } => { - to_remove.extend(self.process_resume_request( - requesting_task_id, - queued_task_id, - sender_permissions, - return_value, - result_sender, - )); - } - TaskHandleResult::Disconnect(task_id, player) => { - self.process_disconnect(task_id, player); - } - TaskHandleResult::Retry(task_id) => { - to_remove.extend(self.process_retry_request(task_id)); +impl TaskQ { + #[allow(clippy::too_many_arguments)] + #[instrument(skip(self, control_sender, database, session))] + fn start_task_thread( + &mut self, + task_id: TaskId, + task_start: Arc, + player: Objid, + session: Arc, + delay_start: Option, + perms: Objid, + is_background: bool, + control_sender: &Sender<(TaskId, SchedulerControlMsg)>, + database: Arc, + ) -> Result { + let state_source = database + .world_state_source() + .expect("Unable to instantiate database"); + + let (sender, receiver) = oneshot::channel(); + + let kill_switch = Arc::new(AtomicBool::new(false)); + let mut task = Task::new( + task_id, + player, + task_start, + perms, + is_background, + session.clone(), + control_sender, + kill_switch.clone(), + ); + + // If this task is delayed, stick it into suspension state immediately. + if let Some(delay) = delay_start { + // However we'll need the task to be in a resumable state, which means executing + // setup_task_start in a transaction. + let mut world_state = match state_source.new_world_state() { + Ok(ws) => ws, + Err(e) => { + error!(error = ?e, "Could not start transaction for delayed task"); + return Err(SchedulerError::CouldNotStartTask); + } + }; + + if !task.setup_task_start(control_sender, world_state.as_mut()) { + error!(task_id, "Could not setup task start"); + return Err(SchedulerError::CouldNotStartTask); } + + let wake_condition = WakeCondition::Time(Instant::now() + delay); + self.suspended.insert( + task_id, + SuspendedTask { + // (suspend_until, task, session) + wake_condition, + task, + session, + result_sender: Some(sender), + }, + ); + return Ok(TaskHandle(task_id, receiver)); } - self.process_task_removals(&to_remove); + + // Otherwise, we create a task control record and fire up a thread. + let task_control = RunningTaskControl { + player, + kill_switch, + session, + result_sender: Some(sender), + }; + + // Footgun warning: ALWAYS `self.tasks.insert` before spawning the task thread! + self.tasks.insert(task_id, task_control); + + let thread_name = format!("moor-task-{}-player-{}", task_id, player); + let control_sender = control_sender.clone(); + std::thread::Builder::new() + .name(thread_name) + .spawn(move || { + trace!(?task_id, "Starting up task"); + // Start the db transaction, which will initially be used to resolve the verb before the task + // starts executing. + let mut world_state = match state_source.new_world_state() { + Ok(ws) => ws, + Err(e) => { + error!(error = ?e, "Could not start transaction for task"); + return; + } + }; + + if !task.setup_task_start(&control_sender, world_state.as_mut()) { + error!(task_id, "Could not setup task start"); + return; + } + + Task::run_task_loop(task, control_sender, world_state); + trace!(?task_id, "Completed task"); + }) + .expect("Could not spawn task thread"); + + Ok(TaskHandle(task_id, receiver)) } - fn process_notification(&self, task_id: TaskId, result: TaskResult) { - let mut inner = self.inner.write().unwrap(); - let Some(task_control) = inner.tasks.remove(&task_id) else { - // Missing task, must have ended already. This is odd though? So we'll warn. + #[allow(clippy::too_many_arguments)] + #[instrument(skip(self, result_sender, control_sender, database, session))] + fn resume_task_thread( + &mut self, + mut task: Task, + resume_val: Var, + session: Arc, + result_sender: Option>, + control_sender: &Sender<(TaskId, SchedulerControlMsg)>, + database: Arc, + ) -> Result<(), SchedulerError> { + // Take a task out of a suspended state and start running it again. + // Means: + // Start a new transaction + // Create a new control record + // Push resume-value into the task + + let state_source = database + .world_state_source() + .expect("Unable to instantiate database"); + + let task_id = task.task_id; + let player = task.perms; + let kill_switch = task.kill_switch.clone(); + let task_control = RunningTaskControl { + player, + kill_switch, + session, + result_sender, + }; + + self.tasks.insert(task_id, task_control); + task.vm_host.resume_execution(resume_val); + let thread_name = format!("moor-task-{}-player-{}", task_id, player); + let control_sender = control_sender.clone(); + std::thread::Builder::new() + .name(thread_name) + .spawn(move || { + // Start its new transaction... + let world_state = match state_source.new_world_state() { + Ok(ws) => ws, + Err(e) => { + error!(error = ?e, "Could not start transaction for task resumption"); + return; + } + }; + + Task::run_task_loop(task, control_sender, world_state); + trace!(?task_id, "Completed task"); + }) + .expect("Could not spawn task thread"); + + Ok(()) + } + + fn send_task_result(&mut self, task_id: TaskId, result: TaskResult) { + let Some(mut task_control) = self.tasks.remove(&task_id) else { + // Missing task, must have ended already or gone into suspension? + // This is odd though? So we'll warn. warn!(task_id, "Task not found for notification, ignoring"); return; }; - let result_sender = { - let mut result_sender_lock = task_control.result_sender.lock().unwrap(); - result_sender_lock.take() - }; + let result_sender = task_control.result_sender.take(); let Some(result_sender) = result_sender else { return; }; @@ -1045,142 +1092,63 @@ impl Scheduler { } } - fn process_wake_ups(&self, to_wake: &[TaskId]) -> Vec { - let mut to_remove = vec![]; - - trace!(?to_wake, "Waking up tasks..."); - - let mut inner = self.inner.write().unwrap(); - for task_id in to_wake { - let task = inner.tasks.get_mut(task_id).unwrap(); - task.suspended = false; - - let world_state_source = self - .database - .clone() - .world_state_source() - .expect("Could not get world state source"); - - let tcs = task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Resume(world_state_source, v_int(0))) { - error!(?task_id, error = ?e, "Could not send message resume task. Task being removed."); - to_remove.push(task.task_id); - } - } - to_remove - } - - fn process_fork_request( - &self, - fork_request: Fork, - reply: oneshot::Sender, - session: Arc, - ) -> Vec { - let mut to_remove = vec![]; - // Fork the session. - let forked_session = session.clone(); - let task_id = self - .submit_fork_task(fork_request, forked_session) - .unwrap_or_else(|e| panic!("Could not fork task: {:?}", e)); - - let reply = reply; - if let Err(e) = reply.send(task_id) { - error!(task = task_id, error = ?e, "Could not send fork reply. Parent task gone? Remove."); - to_remove.push(task_id); - } - to_remove - } - - fn process_describe_request( - &self, - requesting_task_id: TaskId, - reply: oneshot::Sender>, - ) -> Vec { - let mut to_remove = vec![]; - - let reply = reply; - - // Note these could be done in parallel and joined instead of single file, to avoid blocking - // the loop on one uncooperative thread, and could be done in a separate thread as well? - // The challenge being the borrow semantics of the 'tasks' list. - // And we should have a timeout here to boot. - // For now, just iterate blocking. - let mut tasks = Vec::new(); - trace!( - task = requesting_task_id, - "Task requesting task descriptions" - ); - let inner = self.inner.read().unwrap(); - for (task_id, task) in inner.tasks.iter() { - // Tasks not in suspended state shouldn't be added. - if !task.suspended { - continue; - } - if *task_id != requesting_task_id { - trace!( - requesting_task_id = requesting_task_id, - other_task = task_id, - "Requesting task description" - ); - let (t_send, t_reply) = oneshot::channel(); - let tcs = task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Describe(t_send)) { - error!(?task_id, error = ?e, - "Could not send describe request to task. Task being removed."); - to_remove.push(task.task_id); - continue; - } - let Ok(task_desc) = t_reply.recv() else { - error!(?task_id, "Could not get task description"); - to_remove.push(task.task_id); - continue; - }; - trace!( - requesting_task_id = requesting_task_id, - other_task = task_id, - "Got task description" - ); - tasks.push(task_desc); - } + #[instrument(skip(self, control_sender, database))] + fn retry_task( + &mut self, + task: Task, + control_sender: &Sender<(TaskId, SchedulerControlMsg)>, + database: Arc, + ) { + // Make sure the old thread is dead. + task.kill_switch.store(false, Ordering::SeqCst); + + // Remove this from the running tasks. + // By definition we can't respond to a retry for a suspended task, so if it's not in the + // running tasks there's something very wrong. + let old_tc = self + .tasks + .remove(&task.task_id) + .expect("Task not found for retry"); + + // Grab the "task start" record from the (now dead) task, and submit this again with the same + // task_id. + let task_start = task.task_start.clone(); + if let Err(e) = self.start_task_thread( + task.task_id, + task_start, + old_tc.player, + old_tc.session, + None, + task.perms, + false, + control_sender, + database, + ) { + error!(?e, "Could not restart task"); } - trace!( - task = requesting_task_id, - "Sending task descriptions back..." - ); - reply.send(tasks).expect("Could not send task description"); - trace!(task = requesting_task_id, "Sent task descriptions back"); - to_remove } - fn process_kill_request( - &self, - requesting_task_id: TaskId, + #[instrument(skip(self, result_sender))] + fn kill_task( + &mut self, victim_task_id: TaskId, sender_permissions: Perms, result_sender: oneshot::Sender, - ) -> Vec { - let mut to_remove = vec![]; - - // If the task somehow is requesting a kill on itself, that would lead to deadlock, - // because we could never send the result back. So we reject that outright. bf_kill_task - // should be handling this upfront. - if requesting_task_id == victim_task_id { - error!( - task = requesting_task_id, - "Task requested to kill itself. Ignoring" - ); - return vec![]; - } - - let inner = self.inner.read().unwrap(); - let victim_task = match inner.tasks.get(&victim_task_id) { - Some(victim_task) => victim_task, - None => { - result_sender - .send(v_err(E_INVARG)) - .expect("Could not send kill result"); - return vec![]; - } + ) { + // We need to do perms check first, which means checking both running and suspended tasks, + // and getting their permissions. And may as well remember whether it was in suspended or + // active at the same time. + let (perms, is_suspended) = match self.suspended.get(&victim_task_id) { + Some(sr) => (sr.task.perms, true), + None => match self.tasks.get(&victim_task_id) { + Some(task) => (task.player, false), + None => { + result_sender + .send(v_err(E_INVARG)) + .expect("Could not send kill result"); + return; + } + }, }; // We reject this outright if the sender permissions are not sufficient: @@ -1193,34 +1161,51 @@ impl Scheduler { if !sender_permissions .check_is_wizard() .expect("Could not check wizard status for kill request") - && sender_permissions.who != victim_task.player + && sender_permissions.who != perms { result_sender .send(v_err(E_PERM)) .expect("Could not send kill result"); - return vec![]; + return; } - let tcs = victim_task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Abort) { - error!(task = victim_task_id, error = ?e, "Could not send kill request to task. Task being removed."); - to_remove.push(victim_task_id); + // If suspended we can just remove completely and move on. + if is_suspended { + if self.suspended.remove(&victim_task_id).is_none() { + error!( + task = victim_task_id, + "Task not found in suspended list for kill request" + ); + } + return; } - if let Err(e) = result_sender.send(v_none()) { - error!(task = requesting_task_id, error = ?e, "Could not send kill result to requesting task. Requesting task being removed."); - } - to_remove + // Otherwise we have to check if the task is running, remove its control record, and flip + // its kill switch. + let victim_task = match self.tasks.remove(&victim_task_id) { + Some(victim_task) => victim_task, + None => { + result_sender + .send(v_err(E_INVARG)) + .expect("Could not send kill result"); + return; + } + }; + victim_task.kill_switch.store(true, Ordering::SeqCst); } - fn process_resume_request( - &self, + #[allow(clippy::too_many_arguments)] + #[instrument(skip(self, result_sender, control_sender, database))] + fn resume_task( + &mut self, requesting_task_id: TaskId, queued_task_id: TaskId, sender_permissions: Perms, return_value: Var, result_sender: oneshot::Sender, - ) -> Option { + control_sender: &Sender<(TaskId, SchedulerControlMsg)>, + database: Arc, + ) { // Task can't resume itself, it couldn't be queued. Builtin should not have sent this // request. if requesting_task_id == queued_task_id { @@ -1228,93 +1213,59 @@ impl Scheduler { task = requesting_task_id, "Task requested to resume itself. Ignoring" ); - return None; + result_sender.send(v_err(E_INVARG)).ok(); + return; } - // Task does not exist. - let mut inner = self.inner.write().unwrap(); - let queued_task = match inner.tasks.get_mut(&queued_task_id) { - Some(queued_task) => queued_task, - None => { - result_sender - .send(v_err(E_INVARG)) - .expect("Could not send resume result"); - return None; + let perms = match self.suspended.get(&queued_task_id) { + Some(SuspendedTask { + wake_condition: WakeCondition::Never, + task, + .. + }) => task.perms, + Some(SuspendedTask { + wake_condition: WakeCondition::Time(_), + task, + .. + }) => task.perms, + _ => { + result_sender.send(v_err(E_INVARG)).ok(); + return; } }; - // No permissions. if !sender_permissions .check_is_wizard() .expect("Could not check wizard status for resume request") - && sender_permissions.who != queued_task.player + && sender_permissions.who != perms { result_sender .send(v_err(E_PERM)) .expect("Could not send resume result"); - return None; - } - // Task is not suspended. - if !queued_task.suspended { - result_sender - .send(v_err(E_INVARG)) - .expect("Could not send resume result"); - return None; - } - - // Follow the usual task resume logic. - let state_source = self - .database - .clone() - .world_state_source() - .expect("Unable to create world state source from database"); - - queued_task.suspended = false; - - let tcs = queued_task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Resume(state_source, return_value)) { - error!(task = queued_task_id, error = ?e, - "Could not send resume request to task. Task being removed."); - return Some(queued_task_id); - } - - if let Err(e) = result_sender.send(v_none()) { - error!(task = requesting_task_id, error = ?e, - "Could not send resume result to requesting task. Requesting task being removed."); - return Some(requesting_task_id); + return; } - None - } - - fn process_retry_request(&self, task_id: TaskId) -> Option { - let mut inner = self.inner.write().unwrap(); - let Some(task) = inner.tasks.get_mut(&task_id) else { - warn!(task = task_id, "Retrying task not found"); - return None; - }; - - // Create a new transaction. - let state_source = self - .database - .clone() - .world_state_source() - .expect("Unable to get world source from database"); - - task.suspended = false; + let sr = self.suspended.remove(&queued_task_id).unwrap(); - let tcs = task.task_control_sender.clone(); - if let Err(e) = tcs.send(TaskControlMsg::Restart(state_source)) { - error!(task = task_id, error = ?e, - "Could not send resume request to task. Task being removed."); - return Some(task_id); + if self + .resume_task_thread( + sr.task, + return_value, + sr.session, + sr.result_sender, + control_sender, + database, + ) + .is_err() + { + error!(task = queued_task_id, "Could not resume task"); + result_sender.send(v_err(E_INVARG)).ok(); } - None } - fn process_disconnect(&self, disconnect_task_id: TaskId, player: Objid) { - let mut inner = self.inner.write().unwrap(); - let Some(task) = inner.tasks.get_mut(&disconnect_task_id) else { + #[instrument(skip(self))] + fn disconnect_task(&mut self, disconnect_task_id: TaskId, player: Objid) { + let Some(task) = self.tasks.get_mut(&disconnect_task_id) else { warn!(task = disconnect_task_id, "Disconnecting task not found"); return; }; @@ -1327,98 +1278,32 @@ impl Scheduler { // Then abort all of their still-living forked tasks (that weren't the disconnect // task, we need to let that run to completion for sanity's sake.) - for (task_id, task) in inner.tasks.iter() { + for (task_id, tc) in self.tasks.iter() { if *task_id == disconnect_task_id { continue; } - if task.player != player { + if tc.player != player { continue; } warn!( ?player, task_id, "Aborting task from disconnected player..." ); - // This is fire and forget, we cannot assume that the task is still alive. - let tcs = task.task_control_sender.clone(); - let Ok(_) = tcs.send(TaskControlMsg::Abort) else { - trace!(?player, task_id, "Task already dead"); - continue; - }; + tc.kill_switch.store(true, Ordering::SeqCst); } - } - - fn process_task_removals(&self, to_remove: &[TaskId]) { - let mut inner = self.inner.write().unwrap(); + // Likewise, suspended tasks. + let to_remove = self + .suspended + .iter() + .filter_map(|(task_id, sr)| { + if sr.task.player != player { + return None; + } + Some(*task_id) + }) + .collect::>(); for task_id in to_remove { - trace!(task = task_id, "Task removed"); - inner.tasks.remove(task_id); + self.suspended.remove(&task_id); } } - - // Yes yes I know it's a lot of arguments, but wrapper object here is redundant. - #[allow(clippy::too_many_arguments)] - fn new_task( - &self, - task_start: TaskStart, - player: Objid, - session: Arc, - delay_start: Option, - control_sender: Sender<(TaskId, SchedulerControlMsg)>, - perms: Objid, - is_background: bool, - ) -> Result { - let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst); - let (task_control_sender, task_control_receiver) = crossbeam_channel::unbounded(); - - let state_source = self - .database - .clone() - .world_state_source() - .expect("Unable to instantiate database"); - - // TODO: support a queue-size on concurrent executing tasks and allow them to sit in an - // initially suspended state without spawning a worker thread, until the queue has space. - // Spawn the task's thread. - let task_state_source = state_source.clone(); - let task_session = session.clone(); - - let (sender, receiver) = oneshot::channel(); - let name = format!("moor-task-{}-player-{}", task_id, player); - let task_control = TaskControl { - task_id, - player, - task_control_sender, - state_source, - session, - suspended: false, - waiting_input: None, - resume_time: None, - result_sender: Mutex::new(Some(sender)), - }; - let mut inner = self.inner.write().unwrap(); - inner.tasks.insert(task_id, task_control); - - // Footgun warning: ALWAYS `self.tasks.insert` before spawning the task thread! - - std::thread::Builder::new() - .name(name) - .spawn(move || { - trace!(?task_id, ?task_start, "Starting up task"); - Task::run( - task_id, - task_start, - perms, - delay_start, - task_state_source, - is_background, - task_session, - task_control_receiver, - control_sender, - ); - trace!(?task_id, "Completed task"); - }) - .expect("Could not spawn task thread"); - - Ok(TaskHandle(task_id, receiver)) - } } diff --git a/crates/kernel/src/tasks/task.rs b/crates/kernel/src/tasks/task.rs index 7a2cb9f8..d188ec27 100644 --- a/crates/kernel/src/tasks/task.rs +++ b/crates/kernel/src/tasks/task.rs @@ -12,19 +12,31 @@ // this program. If not, see . // -use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, TryRecvError}; - +//! A task is a concurrent, transactionally isolated, thread of execution. It starts with the +//! execution of a 'verb' (or 'command verb' or 'eval' etc) and runs through to completion or +//! suspension or abort. +//! Within the task many verbs may be executed as subroutine calls from the root verb/command +//! Each task has its own VM host which is responsible for executing the program. +//! Each task has its own isolated transactional world state. +//! Each task is given a semi-isolated "session" object through which I/O is performed. +//! When a task fails, both the world state and I/O should be rolled back. +//! A task is generally tied 1:1 with a player connection, and usually come from one command, but +//! they can also be 'forked' from other tasks. +//! +use crossbeam_channel::Sender; + +use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use tracing::{debug, error, trace, warn}; use moor_values::model::CommandError::PermissionDenied; use moor_values::model::VerbInfo; +use moor_values::model::WorldState; use moor_values::model::{CommandError, CommitResult, WorldStateError}; -use moor_values::model::{WorldState, WorldStateSource}; use moor_values::util::parse_into_words; -use moor_values::var::{v_int, v_string}; +use moor_values::var::v_int; use moor_values::var::{List, Objid}; use moor_values::NOTHING; @@ -33,45 +45,29 @@ use crate::matching::ws_match_env::WsMatchEnv; use crate::tasks::command_parse::{parse_command, ParseCommandError, ParsedCommand}; use crate::tasks::sessions::Session; -use crate::tasks::task_messages::{SchedulerControlMsg, TaskControlMsg, TaskStart}; +use crate::tasks::task_messages::{SchedulerControlMsg, TaskStart}; use crate::tasks::vm_host::{VMHostResponse, VmHost}; -use crate::tasks::{PhantomUnsend, PhantomUnsync, TaskDescription, TaskId, VerbCall}; - -/// A task is a concurrent, transactionally isolated, thread of execution. It starts with the -/// execution of a 'verb' (or 'command verb' or 'eval' etc) and runs through to completion or -/// suspension or abort. -/// Within the task many verbs may be executed as subroutine calls from the root verb/command -/// Each task has its own VM host which is responsible for executing the program. -/// Each task has its own isolated transactional world state. -/// Each task is given a semi-isolated "session" object through which I/O is performed. -/// When a task fails, both the world state and I/O should be rolled back. -/// A task is generally tied 1:1 with a player connection, and usually come from one command, but -/// they can also be 'forked' from other tasks. -pub(crate) struct Task { +use crate::tasks::{TaskId, VerbCall}; + +#[derive(Debug)] +pub struct Task { /// My unique task id. pub(crate) task_id: TaskId, /// What I was asked to do. - pub(crate) task_start: TaskStart, + pub(crate) task_start: Arc, /// When this task will begin execution. /// For currently execution tasks this is when the task actually began running. /// For tasks in suspension, this is when they will wake up. /// If the task is in indefinite suspension, this is None. pub(crate) scheduled_start_time: Option, - /// The channel to send control messages to the scheduler. - /// This sender is unique for our task, but is passed around all over the place down into the - /// VM host and into the VM itself. - pub(crate) scheduler_control_sender: Sender<(TaskId, SchedulerControlMsg)>, - /// The transactionaly isolated world state for this task. - pub(crate) world_state: Box, + /// The player on behalf of whom this task is running. Who owns this task. + pub(crate) player: Objid, /// The permissions of the task -- the object on behalf of which all permissions are evaluated. pub(crate) perms: Objid, /// The actual VM host which is managing the execution of this task. pub(crate) vm_host: VmHost, - /// Should I die? - pub(crate) done: bool, - - unsend: PhantomUnsend, - unsync: PhantomUnsync, + /// True if the task should die. + pub(crate) kill_switch: Arc, } // TODO Propagate default ticks, seconds values from global config / args properly. @@ -82,7 +78,7 @@ const DEFAULT_FG_SECONDS: u64 = 5; const DEFAULT_BG_SECONDS: u64 = 3; const DEFAULT_MAX_STACK_DEPTH: usize = 50; -fn max_vm_values(_ws: &mut dyn WorldState, is_background: bool) -> (usize, u64, usize) { +fn max_vm_values(is_background: bool) -> (usize, u64, usize) { let (max_ticks, max_seconds, max_stack_depth) = if is_background { ( DEFAULT_BG_TICKS, @@ -138,32 +134,19 @@ fn max_vm_values(_ws: &mut dyn WorldState, is_background: bool) -> (usize, u64, impl Task { // Yes yes I know it's a lot of arguments, but wrapper object here is redundant. #[allow(clippy::too_many_arguments)] - pub fn run( + pub fn new( task_id: TaskId, - task_start: TaskStart, + player: Objid, + task_start: Arc, perms: Objid, - delay_start: Option, - state_source: Arc, is_background: bool, session: Arc, - task_control_receiver: Receiver, - control_sender: Sender<(TaskId, SchedulerControlMsg)>, - ) { - // TODO: Defer task delay to the scheduler, and let it handle the delay? - // Instead of performing it in the task startup. - if let Some(delay) = delay_start { - std::thread::sleep(delay); - } - - // Start the transaction. - let mut world_state = state_source - .new_world_state() - .expect("Could not start transaction for new task"); - + control_sender: &Sender<(TaskId, SchedulerControlMsg)>, + kill_switch: Arc, + ) -> Self { // Find out max ticks, etc. for this task. These are either pulled from server constants in // the DB or from default constants. - let (max_ticks, max_seconds, max_stack_depth) = - max_vm_values(world_state.as_mut(), is_background); + let (max_ticks, max_seconds, max_stack_depth) = max_vm_values(is_background); let scheduler_control_sender = control_sender.clone(); let vm_host = VmHost::new( @@ -174,90 +157,56 @@ impl Task { session.clone(), scheduler_control_sender.clone(), ); - let mut task = Task { + + Task { task_id, + player, task_start, scheduled_start_time: None, - scheduler_control_sender: scheduler_control_sender.clone(), vm_host, - world_state, perms, - done: false, - unsend: Default::default(), - unsync: Default::default(), - }; - - let start = task.task_start.clone(); - if !task.setup_task_start(start) { - task.done = true; - return; + kill_switch, } + } - trace!(task_id = ?task.task_id, "Task started"); - while !task.done { - if task.vm_host.is_running() { - let vm_continuation = task.vm_dispatch(); - if let Some(scheduler_msg) = vm_continuation { - scheduler_control_sender - .send((task.task_id, scheduler_msg)) - .expect("Could not send scheduler_msg"); - } + pub fn run_task_loop( + mut task: Task, + control_sender: Sender<(TaskId, SchedulerControlMsg)>, + mut world_state: Box, + ) { + let task_id = task.task_id; + debug!(task_id, "Task started"); + while task.vm_host.is_running() { + // Check kill switch. + if task.kill_switch.load(std::sync::atomic::Ordering::Relaxed) { + trace!(task_id = ?task.task_id, "Task killed"); + control_sender + .send((task.task_id, SchedulerControlMsg::TaskAbortCancelled)) + .expect("Could not send kill message"); + break; } - - // Receive control messages from the scheduler, if there are any. - - // If the VM is not running (e.g. we're suspended), we'll do a blocking receive to - // avoid spinning. - let control_msg = if task.vm_host.is_running() { - match task_control_receiver.try_recv() { - Ok(msg) => msg, - Err(TryRecvError::Disconnected) => { - warn!(task_id = ?task.task_id, "Channel closed"); - task.done = true; - - break; - } - Err(TryRecvError::Empty) => { - // No message, just keep going. - continue; - } - } + if let Some(continuation_task) = task.vm_dispatch(&control_sender, world_state.as_mut()) + { + task = continuation_task; } else { - match task_control_receiver.recv_timeout(Duration::from_millis(50)) { - Ok(msg) => msg, - Err(RecvTimeoutError::Timeout) => { - // No message, just keep going. - if task.vm_host.is_running() { - warn!(task_id = ?task.task_id, "Task not running, but in blocking receive. Why?"); - task.done = true; - } - continue; - } - Err(RecvTimeoutError::Disconnected) => { - trace!(task_id = ?task.task_id, "Channel disconnected"); - task.done = true; - - break; - } - } - }; - - if let Some(response) = task.handle_control_message(control_msg) { - scheduler_control_sender - .send((task.task_id, response)) - .expect("Could not send response"); + break; } } + debug!(task_id, "Task finished"); } /// Set the task up to start executing, based on the task start configuration. - fn setup_task_start(&mut self, task_start: TaskStart) -> bool { - match task_start { + pub(crate) fn setup_task_start( + &mut self, + control_sender: &Sender<(TaskId, SchedulerControlMsg)>, + world_state: &mut dyn WorldState, + ) -> bool { + match self.task_start.clone().as_ref() { // We've been asked to start a command. // We need to set up the VM and then execute it. TaskStart::StartCommandVerb { player, command } => { - if let Some(msg) = self.start_command(player, command.as_str()) { - self.scheduler_control_sender + if let Some(msg) = self.start_command(*player, command.as_str(), world_state) { + control_sender .send((self.task_id, msg)) .expect("Could not send start response"); }; @@ -273,16 +222,16 @@ impl Task { trace!(?verb, ?player, ?vloc, ?args, "Starting verb"); let verb_call = VerbCall { - verb_name: verb, - location: vloc, - this: vloc, - player, - args, - argstr, + verb_name: verb.clone(), + location: *vloc, + this: *vloc, + player: *player, + args: args.clone(), + argstr: argstr.clone(), caller: NOTHING, }; // Find the callable verb ... - match self.world_state.find_method_verb_on( + match world_state.find_method_verb_on( self.perms, verb_call.this, verb_call.verb_name.as_str(), @@ -290,7 +239,7 @@ impl Task { Err(WorldStateError::VerbNotFound(_, _)) => { debug!(task_id = ?self.task_id, this = ?verb_call.this, verb = verb_call.verb_name, "Verb not found"); - self.scheduler_control_sender + control_sender .send(( self.task_id, SchedulerControlMsg::TaskVerbNotFound( @@ -299,7 +248,6 @@ impl Task { ), )) .expect("Could not send start response"); - self.done = true; return false; } Err(e) => { @@ -324,12 +272,12 @@ impl Task { } => { trace!(task_id = ?self.task_id, suspended, "Setting up fork"); self.vm_host - .start_fork(self.task_id, fork_request, suspended); + .start_fork(self.task_id, fork_request, *suspended); } TaskStart::StartEval { player, program } => { self.scheduled_start_time = None; self.vm_host - .start_eval(self.task_id, player, program, self.world_state.as_ref()); + .start_eval(self.task_id, *player, program.clone(), world_state); } }; true @@ -337,14 +285,19 @@ impl Task { /// Call out to the vm_host and ask it to execute the next instructions, and it will return /// back telling us next steps. - /// Returns a tuple of (VmContinue, Option), where VmContinue indicates - /// whether the VM should continue running, and the SchedulerControlMsg is a message to send - /// back to the scheduler, if any. - fn vm_dispatch(&mut self) -> Option { + /// Results of VM execution are looked at, and if they involve a scheduler action, we will + /// send a message back to the scheduler to handle it. + /// If the scheduler action is some kind of suspension, we move ourselves into the message + /// itself. + /// If we are to be consumed (because ownership transferred back to the scheduler), we will + /// return None, otherwise we will return ourselves. + fn vm_dispatch( + mut self, + control_sender: &Sender<(TaskId, SchedulerControlMsg)>, + world_state: &mut dyn WorldState, + ) -> Option { // Call the VM - let vm_exec_result = self - .vm_host - .exec_interpreter(self.task_id, self.world_state.as_mut()); + let vm_exec_result = self.vm_host.exec_interpreter(self.task_id, world_state); // Having done that, what should we now do? match vm_exec_result { @@ -356,7 +309,7 @@ impl Task { // We will then take the new task id and send it back to the caller. let (send, reply) = oneshot::channel(); let task_id_var = fork_request.task_id; - self.scheduler_control_sender + control_sender .send(( self.task_id, SchedulerControlMsg::TaskRequestFork(fork_request, send), @@ -367,19 +320,21 @@ impl Task { self.vm_host .set_variable(&task_id_var, v_int(task_id as i64)); } - None + Some(self) } VMHostResponse::Suspend(delay) => { trace!(task_id = self.task_id, delay = ?delay, "Task suspend"); // VMHost is now suspended for execution, and we'll be waiting for a Resume - let commit_result = self - .world_state + let commit_result = world_state .commit() .expect("Could not commit world state before suspend"); if let CommitResult::ConflictRetry = commit_result { warn!("Conflict during commit before suspend"); - return Some(SchedulerControlMsg::TaskConflictRetry); + control_sender + .send((self.task_id, SchedulerControlMsg::TaskConflictRetry(self))) + .expect("Could not send conflict retry"); + return None; } trace!(task_id = self.task_id, "Task suspended"); @@ -391,8 +346,15 @@ impl Task { // In both cases we'll rely on the scheduler to wake us up in its processing loop // rather than sleep here, which would make this thread unresponsive to other // messages. - let resume_time = delay.map(|delay| SystemTime::now() + delay); - Some(SchedulerControlMsg::TaskSuspend(resume_time)) + let resume_time = delay.map(|delay| Instant::now() + delay); + + control_sender + .send(( + self.task_id, + SchedulerControlMsg::TaskSuspend(resume_time, self), + )) + .expect("Could not send suspend message"); + None } VMHostResponse::SuspendNeedInput => { trace!(task_id = self.task_id, "Task suspend need input"); @@ -400,50 +362,59 @@ impl Task { // VMHost is now suspended for input, and we'll be waiting for a ResumeReceiveInput // Attempt commit... See comments/notes on Suspend above. - let commit_result = self - .world_state + let commit_result = world_state .commit() .expect("Could not commit world state before suspend"); if let CommitResult::ConflictRetry = commit_result { warn!("Conflict during commit before suspend"); - return Some(SchedulerControlMsg::TaskConflictRetry); + control_sender + .send((self.task_id, SchedulerControlMsg::TaskConflictRetry(self))) + .expect("Could not send conflict retry"); + return None; } trace!(task_id = self.task_id, "Task suspended for input"); self.vm_host.stop(); - Some(SchedulerControlMsg::TaskRequestInput) - } - VMHostResponse::ContinueOk => { - self.done = false; + // Consume us, passing back to the scheduler that we're waiting for input. + control_sender + .send((self.task_id, SchedulerControlMsg::TaskRequestInput(self))) + .expect("Could not send suspend message"); None } + VMHostResponse::ContinueOk => Some(self), VMHostResponse::CompleteSuccess(result) => { trace!(task_id = self.task_id, result = ?result, "Task complete, success"); - let CommitResult::Success = - self.world_state.commit().expect("Could not attempt commit") + let CommitResult::Success = world_state.commit().expect("Could not attempt commit") else { warn!("Conflict during commit before complete, asking scheduler to retry task"); - return Some(SchedulerControlMsg::TaskConflictRetry); + control_sender + .send((self.task_id, SchedulerControlMsg::TaskConflictRetry(self))) + .expect("Could not send conflict retry"); + return None; }; - self.done = true; self.vm_host.stop(); - Some(SchedulerControlMsg::TaskSuccess(result)) + control_sender + .send((self.task_id, SchedulerControlMsg::TaskSuccess(result))) + .expect("Could not send success message"); + Some(self) } VMHostResponse::CompleteAbort => { error!(task_id = self.task_id, "Task aborted"); - self.world_state + world_state .rollback() .expect("Could not rollback world state transaction"); self.vm_host.stop(); - self.done = true; - Some(SchedulerControlMsg::TaskAbortCancelled) + control_sender + .send((self.task_id, SchedulerControlMsg::TaskAbortCancelled)) + .expect("Could not send abort message"); + Some(self) } VMHostResponse::CompleteException(exception) => { // Commands that end in exceptions are still expected to be committed, to @@ -456,116 +427,52 @@ impl Task { // evaluate this behaviour generally. warn!(task_id = self.task_id, "Task exception"); - self.done = true; self.vm_host.stop(); - Some(SchedulerControlMsg::TaskException(exception)) + control_sender + .send(( + self.task_id, + SchedulerControlMsg::TaskException(exception.clone()), + )) + .expect("Could not send exception message"); + Some(self) } VMHostResponse::AbortLimit(reason) => { warn!(task_id = self.task_id, "Task abort limit reached"); self.vm_host.stop(); - self.done = true; - - self.world_state + world_state .rollback() .expect("Could not rollback world state"); - - Some(SchedulerControlMsg::TaskAbortLimitsReached(reason)) + control_sender + .send(( + self.task_id, + SchedulerControlMsg::TaskAbortLimitsReached(reason), + )) + .expect("Could not send abort limit message"); + Some(self) } VMHostResponse::RollbackRetry => { warn!(task_id = self.task_id, "Task rollback requested, retrying"); self.vm_host.stop(); - self.done = true; - - self.world_state + world_state .rollback() .expect("Could not rollback world state"); - - Some(SchedulerControlMsg::TaskConflictRetry) - } - } - } - - /// Handle an inbound control message from the scheduler, and return a response message to send - /// back (if any) as well as a flag to indicate if the task loop should continue running. - fn handle_control_message(&mut self, msg: TaskControlMsg) -> Option { - match msg { - TaskControlMsg::Resume(state_source, value) => { - // We're back. - debug!( - task_id = self.task_id, - "Resuming task, with new transaction" - ); - self.world_state = state_source - .new_world_state() - .expect("Unable to start new transaction"); - self.scheduled_start_time = None; - self.vm_host.resume_execution(value); - None - } - TaskControlMsg::Restart(state_source) => { - // Try. Again. - debug!( - task_id = self.task_id, - "Restarting task, with new transaction" - ); - self.world_state = state_source - .new_world_state() - .expect("Unable to start new transaction"); - self.scheduled_start_time = None; - self.setup_task_start(self.task_start.clone()); - None - } - TaskControlMsg::ResumeReceiveInput(state_source, input) => { - // We're back. - trace!(task_id = self.task_id, ?input, "Resuming task, with input"); - assert!(!self.vm_host.is_running()); - self.world_state = state_source - .new_world_state() - .expect("Unable to start new transaction"); - self.scheduled_start_time = None; - self.vm_host.resume_execution(v_string(input)); - None - } - TaskControlMsg::Abort => { - // We've been asked to die. Go tell the VM host to abort, and roll back the - // transaction. - - trace!(task_id = self.task_id, "Aborting task"); - self.vm_host.stop(); - self.done = true; - - // Failure to rollback is a panic, something is fundamentally wrong, and we're best - // to just restart. - self.world_state - .rollback() - .expect("Could not rollback transaction. Panic."); - - // And now tell the scheduler we're done, as we exit. - - Some(SchedulerControlMsg::TaskAbortCancelled) - } - TaskControlMsg::Describe(reply_sender) => { - let description = TaskDescription { - task_id: self.task_id, - start_time: self.scheduled_start_time, - permissions: self.vm_host.permissions(), - verb_name: self.vm_host.verb_name(), - verb_definer: self.vm_host.verb_definer(), - line_number: self.vm_host.line_number(), - this: self.vm_host.this(), - }; - reply_sender - .send(description) - .expect("Could not send task description"); + control_sender + .send((self.task_id, SchedulerControlMsg::TaskConflictRetry(self))) + .expect("Could not send rollback retry message"); None } } } - fn start_command(&mut self, player: Objid, command: &str) -> Option { + fn start_command( + &mut self, + player: Objid, + command: &str, + world_state: &mut dyn WorldState, + ) -> Option { // Command execution is a multi-phase process: // 1. Lookup $do_command. If we have the verb, execute it. // 2. If it returns a boolean `true`, we're done, let scheduler know, otherwise: @@ -587,17 +494,14 @@ impl Task { // Next, try parsing the command. // We need the player's location, and we'll just die if we can't get it. - let player_location = match self.world_state.location_of(player, player) { + let player_location = match world_state.location_of(player, player) { Ok(loc) => loc, Err(WorldStateError::VerbPermissionDenied) | Err(WorldStateError::ObjectPermissionDenied) | Err(WorldStateError::PropertyPermissionDenied) => { - self.done = true; - return Some(SchedulerControlMsg::TaskCommandError(PermissionDenied)); } Err(wse) => { - self.done = true; return Some(SchedulerControlMsg::TaskCommandError( CommandError::DatabaseError(wse), )); @@ -606,20 +510,16 @@ impl Task { // Parse the command in the current environment. let me = WsMatchEnv { - ws: self.world_state.as_mut(), + ws: world_state, perms: player, }; let matcher = MatchEnvironmentParseMatcher { env: me, player }; let parsed_command = match parse_command(command, matcher) { Ok(pc) => pc, Err(ParseCommandError::PermissionDenied) => { - self.done = true; - return Some(SchedulerControlMsg::TaskCommandError(PermissionDenied)); } Err(_) => { - self.done = true; - return Some(SchedulerControlMsg::TaskCommandError( CommandError::CouldNotParseCommand, )); @@ -627,19 +527,13 @@ impl Task { }; // Look for the verb... - let parse_results = match find_verb_for_command( - player, - player_location, - &parsed_command, - self.world_state.as_mut(), - ) { - Ok(results) => results, - Err(e) => { - self.done = true; - - return Some(SchedulerControlMsg::TaskCommandError(e)); - } - }; + let parse_results = + match find_verb_for_command(player, player_location, &parsed_command, world_state) { + Ok(results) => results, + Err(e) => { + return Some(SchedulerControlMsg::TaskCommandError(e)); + } + }; let (verb_info, target) = match parse_results { // If we have a successful match, that's what we'll call into Some((verb_info, target)) => { @@ -655,8 +549,6 @@ impl Task { // Otherwise, we want to try to call :huh, if it exists. None => { if player_location == NOTHING { - self.done = true; - return Some(SchedulerControlMsg::TaskCommandError( CommandError::NoCommandMatch, )); @@ -665,11 +557,8 @@ impl Task { // Try to find :huh. If it exists, we'll dispatch to that, instead. // If we don't find it, that's the end of the line. let Ok(verb_info) = - self.world_state - .find_method_verb_on(self.perms, player_location, "huh") + world_state.find_method_verb_on(self.perms, player_location, "huh") else { - self.done = true; - return Some(SchedulerControlMsg::TaskCommandError( CommandError::NoCommandMatch, )); @@ -740,4 +629,5 @@ fn find_verb_for_command( Ok(None) } -// TODO: Unit tests for scheduler and tasks. +// TODO: a battery of unit tests here. Which will likely involve setting up a standalone VM running +// a simple program. diff --git a/crates/kernel/src/tasks/task_messages.rs b/crates/kernel/src/tasks/task_messages.rs index 34cc1d89..d859145f 100644 --- a/crates/kernel/src/tasks/task_messages.rs +++ b/crates/kernel/src/tasks/task_messages.rs @@ -16,15 +16,15 @@ use crate::tasks::scheduler::AbortLimitReason; use crate::tasks::{TaskDescription, TaskId}; use crate::vm::vm_unwind::UncaughtException; use crate::vm::Fork; -use std::sync::Arc; use moor_compiler::Program; +use crate::tasks::task::Task; +use moor_values::model::Perms; use moor_values::model::{CommandError, NarrativeEvent}; -use moor_values::model::{Perms, WorldStateSource}; use moor_values::var::Var; use moor_values::var::{List, Objid}; -use std::time::SystemTime; +use std::time::Instant; #[derive(Debug, Clone)] pub enum TaskStart { @@ -52,26 +52,6 @@ pub enum TaskStart { StartEval { player: Objid, program: Program }, } -/// Messages sent to tasks from the scheduler to tell the task to do things. -pub enum TaskControlMsg { - /// The scheduler is telling the task to restart itself in a new transaction. - Restart(Arc), - /// The scheduler is telling the task to resume execution. Use the given world state - /// (transaction) and permissions when doing so. - Resume(Arc, Var), - /// The scheduler is giving the task the input it requested from the client, and is asking it - /// to resume execution, using the given world state (transaction) to do so. - ResumeReceiveInput(Arc, String), - /// The scheduler is asking the task to describe itself. - /// TODO: Rethink task 'description' mechanism. - /// Causes deadlock if the task _requesting_ the description is the task being - /// described, so I need to rethink this. Right now this is prevented by the - /// runtime, but it's not a good design. - Describe(oneshot::Sender), - /// The scheduler is telling the task to abort itself. - Abort, -} - /// The ad-hoc messages that can be sent from tasks (or VM) up to the scheduler. #[derive(Debug)] pub enum SchedulerControlMsg { @@ -79,7 +59,7 @@ pub enum SchedulerControlMsg { TaskSuccess(Var), /// The task hit an unresolvable transaction serialization conflict, and needs to be restarted /// in a new transaction. - TaskConflictRetry, + TaskConflictRetry(Task), /// A 'StartCommandVerb' type task failed to parse or match the command. TaskCommandError(CommandError), /// The verb to be executed was not found. @@ -93,11 +73,11 @@ pub enum SchedulerControlMsg { /// The task is letting us know that it has reached its abort limits. TaskAbortLimitsReached(AbortLimitReason), /// Tell the scheduler that the task in a suspended state, with a time to resume (if any) - TaskSuspend(Option), + TaskSuspend(Option, Task), /// Tell the scheduler we're suspending until we get input from the client. - TaskRequestInput, + TaskRequestInput(Task), /// Task is requesting a list of all other tasks known to the scheduler. - DescribeOtherTasks(oneshot::Sender>), + RequestQueuedTasks(oneshot::Sender>), /// Task is requesting that the scheduler abort another task. KillTask { victim_task_id: TaskId, diff --git a/crates/kernel/src/tasks/vm_host.rs b/crates/kernel/src/tasks/vm_host.rs index 337c55ae..1e913ad0 100644 --- a/crates/kernel/src/tasks/vm_host.rs +++ b/crates/kernel/src/tasks/vm_host.rs @@ -17,12 +17,13 @@ use crate::tasks::scheduler::AbortLimitReason; use crate::tasks::sessions::Session; use crate::tasks::task_messages::SchedulerControlMsg; use crate::tasks::vm_host::VMHostResponse::{AbortLimit, ContinueOk, DispatchFork, Suspend}; -use crate::tasks::{PhantomUnsend, PhantomUnsync, TaskId, VerbCall}; +use crate::tasks::{TaskId, VerbCall}; use crate::vm::{ExecutionResult, Fork, VerbExecutionRequest, VM}; use crate::vm::{FinallyReason, VMExecState}; use crate::vm::{UncaughtException, VmExecParams}; use bytes::Bytes; use crossbeam_channel::Sender; +use daumtils::PhantomUnsync; use moor_compiler::Program; use moor_compiler::{compile, Name}; use moor_values::model::VerbInfo; @@ -31,6 +32,7 @@ use moor_values::model::{BinaryType, ObjFlag}; use moor_values::var::Var; use moor_values::var::{List, Objid}; use moor_values::AsByteBuffer; +use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tracing::{debug, error, trace, warn}; @@ -73,10 +75,20 @@ pub struct VmHost { scheduler_control_sender: Sender<(TaskId, SchedulerControlMsg)>, running: bool, - unsend: PhantomUnsend, unsync: PhantomUnsync, } +impl Debug for VmHost { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VmHost") + .field("task_id", &self.vm_exec_state.task_id) + .field("running", &self.running) + .field("max_stack_depth", &self.max_stack_depth) + .field("max_ticks", &self.max_ticks) + .field("max_time", &self.max_time) + .finish() + } +} impl VmHost { pub fn new( task_id: TaskId, @@ -99,7 +111,6 @@ impl VmHost { sessions, scheduler_control_sender, running: false, - unsend: Default::default(), unsync: Default::default(), } } @@ -149,13 +160,13 @@ impl VmHost { } /// Start execution of a fork request in the hosted VM. - pub fn start_fork(&mut self, task_id: TaskId, fork_request: Fork, suspended: bool) { + pub fn start_fork(&mut self, task_id: TaskId, fork_request: &Fork, suspended: bool) { self.vm_exec_state.start_time = Some(SystemTime::now()); self.vm_exec_state.maximum_time = Some(self.max_time); self.vm_exec_state.tick_count = 0; self.vm_exec_state.task_id = task_id; self.vm - .exec_fork_vector(&mut self.vm_exec_state, fork_request); + .exec_fork_vector(&mut self.vm_exec_state, fork_request.clone()); self.running = !suspended; } @@ -357,12 +368,17 @@ impl VmHost { /// Resume what you were doing after suspension. pub fn resume_execution(&mut self, value: Var) { - // coming back from any suspend, we need a return value to feed back to `bf_suspend` or - // `bf_read()` - self.vm_exec_state.top_mut().frame.push(value); self.vm_exec_state.start_time = Some(SystemTime::now()); self.vm_exec_state.tick_count = 0; self.running = true; + + // If there's no activations at all, that means we're a Fork, not returning to something. + if !self.vm_exec_state.stack.is_empty() { + // coming back from any suspend, we need a return value to feed back to `bf_suspend` or + // `bf_read()` + self.vm_exec_state.top_mut().frame.push(value); + } + debug!(task_id = self.vm_exec_state.task_id, "Resuming VMHost"); } diff --git a/crates/kernel/src/vm/exec_state.rs b/crates/kernel/src/vm/exec_state.rs index 144a80fb..dba1477a 100644 --- a/crates/kernel/src/vm/exec_state.rs +++ b/crates/kernel/src/vm/exec_state.rs @@ -12,8 +12,9 @@ // this program. If not, see . // -use crate::tasks::{PhantomUnsend, PhantomUnsync, TaskId}; +use crate::tasks::TaskId; use crate::vm::activation::{Activation, Caller}; +use daumtils::PhantomUnsync; use moor_values::var::Objid; use moor_values::var::Var; use moor_values::NOTHING; @@ -41,7 +42,6 @@ pub struct VMExecState { /// The amount of time the task is allowed to run. pub(crate) maximum_time: Option, - unsend: PhantomUnsend, unsync: PhantomUnsync, } @@ -55,7 +55,6 @@ impl VMExecState { max_ticks, tick_slice: 0, maximum_time: None, - unsend: Default::default(), unsync: Default::default(), } } diff --git a/crates/kernel/src/vm/vm_call.rs b/crates/kernel/src/vm/vm_call.rs index f28f6cbf..d085977c 100644 --- a/crates/kernel/src/vm/vm_call.rs +++ b/crates/kernel/src/vm/vm_call.rs @@ -13,7 +13,7 @@ // use std::sync::Arc; -use tracing::{debug, trace}; +use tracing::trace; use moor_values::model::WorldState; use moor_values::model::WorldStateError; @@ -247,7 +247,7 @@ impl VM { } let bf = self.builtins[bf_func_num].clone(); - debug!( + trace!( "Calling builtin: {}({}) caller_perms: {}", BUILTIN_DESCRIPTORS[bf_func_num].name, args_literal(&args),