Skip to content

Commit

Permalink
Task refactor WIP -- compiles, but does not link
Browse files Browse the repository at this point in the history
  • Loading branch information
LeStarch committed Apr 4, 2024
1 parent 3166546 commit 3a38f0c
Show file tree
Hide file tree
Showing 10 changed files with 739 additions and 271 deletions.
14 changes: 7 additions & 7 deletions Drv/Ip/SocketReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ SocketReadTask::SocketReadTask() : m_reconnect(false), m_stop(false) {}

SocketReadTask::~SocketReadTask() {}

void SocketReadTask::startSocketTask(const Fw::StringBase &name,
void SocketReadTask::start(const Fw::StringBase &name,
const bool reconnect,
const Os::Task::ParamType priority,
const Os::Task::ParamType stack,
const Os::Task::ParamType cpuAffinity) {
FW_ASSERT(not m_task.isStarted()); // It is a coding error to start this task multiple times
FW_ASSERT(m_task.getState() == Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times
FW_ASSERT(not this->m_stop); // It is a coding error to stop the thread before it is started
m_reconnect = reconnect;
// Note: the first step is for the IP socket to open the port
Os::Task::TaskStatus stat = m_task.start(name, SocketReadTask::readTask, this, priority, stack, cpuAffinity);
FW_ASSERT(Os::Task::TASK_OK == stat, static_cast<NATIVE_INT_TYPE>(stat));
Os::Task::Status stat = m_task.start(name, SocketReadTask::readTask, this, priority, stack, cpuAffinity);
FW_ASSERT(Os::Task::OP_OK == stat, static_cast<NATIVE_INT_TYPE>(stat));
}

SocketIpStatus SocketReadTask::startup() {
Expand All @@ -57,11 +57,11 @@ void SocketReadTask::close() {
this->getSocketHandler().close();
}

Os::Task::TaskStatus SocketReadTask::joinSocketTask(void** value_ptr) {
return m_task.join(value_ptr);
Os::Task::Status SocketReadTask::join() {
return m_task.join();
}

void SocketReadTask::stopSocketTask() {
void SocketReadTask::stop() {
this->m_stop = true;
this->getSocketHandler().shutdown(); // Break out of any receives and fully shutdown
}
Expand Down
8 changes: 4 additions & 4 deletions Drv/Ip/SocketReadTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SocketReadTask {
* \param stack: stack size provided to the task. See: Os::Task::start. Default: TASK_DEFAULT, posix threads default
* \param cpuAffinity: cpu affinity provided to task. See: Os::Task::start. Default: TASK_DEFAULT, don't care
*/
void startSocketTask(const Fw::StringBase &name,
void start(const Fw::StringBase &name,
const bool reconnect = true,
const Os::Task::ParamType priority = Os::Task::TASK_DEFAULT,
const Os::Task::ParamType stack = Os::Task::TASK_DEFAULT,
Expand Down Expand Up @@ -106,17 +106,17 @@ class SocketReadTask {
* Called to stop the socket read task. It is an error to call this before the thread has been started using the
* startSocketTask call. This will stop the read task and close the client socket.
*/
void stopSocketTask();
void stop();

/**
* \brief joins to the stopping read task to wait for it to close
*
* Called to join with the read socket task. This will block and return after the task has been stopped with a call
* to the stopSocketTask method.
* \param value_ptr: a pointer to fill with data. Passed to the Os::Task::join call. NULL to ignore.
* \return: Os::Task::TaskStatus passed back from the Os::Task::join call.
* \return: Os::Task::Status passed back from the Os::Task::join call.
*/
Os::Task::TaskStatus joinSocketTask(void** value_ptr);
Os::Task::Status join();


PROTECTED:
Expand Down
106 changes: 48 additions & 58 deletions Fw/Comp/ActiveComponentBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,11 @@ namespace Fw {
(void)snprintf(taskNameChar,sizeof(taskNameChar),"ActComp_%d",Os::Task::getNumTasks());
taskName = taskNameChar;
#endif
// If running with the baremetal scheduler, use a variant of the task-loop that
// does not loop internal, but waits for an external iteration call.
#if FW_BAREMETAL_SCHEDULER == 1
Os::Task::taskRoutine routine = this->s_baseBareTask;
#else
Os::Task::taskRoutine routine = this->s_baseTask;
#endif
Os::Task::TaskStatus status = this->m_task.start(taskName, routine, this, priority, stackSize, cpuAffinity, identifier);
FW_ASSERT(status == Os::Task::TASK_OK,static_cast<NATIVE_INT_TYPE>(status));
// Cooperative threads tasks externalize the task loop, and as such use the state machine as their task function
// Standard multithreading tasks use the task loop to respectively call the state machine
Os::Task::taskRoutine routine = (m_task.isCooperative()) ? this->s_taskStateMachine : this->s_taskLoop;
Os::Task::Status status = this->m_task.start(taskName, routine, this, priority, stackSize, cpuAffinity, identifier);
FW_ASSERT(status == Os::Task::Status::OP_OK,static_cast<NATIVE_INT_TYPE>(status));
}

void ActiveComponentBase::exit() {
Expand All @@ -82,66 +78,60 @@ namespace Fw {
DEBUG_PRINT("exit %s\n", this->getObjName());
}

Os::Task::TaskStatus ActiveComponentBase::join(void **value_ptr) {
Os::Task::Status ActiveComponentBase::join() {
DEBUG_PRINT("join %s\n", this->getObjName());
return this->m_task.join(value_ptr);
return this->m_task.join();
}

void ActiveComponentBase::s_baseBareTask(void* ptr) {
FW_ASSERT(ptr != nullptr);
ActiveComponentBase* comp = reinterpret_cast<ActiveComponentBase*>(ptr);
//Start if not started
if (!comp->m_task.isStarted()) {
comp->m_task.setStarted(true);
comp->preamble();
}
//Bare components cannot block, so return to the scheduler
if (comp->m_queue.getNumMsgs() == 0) {
return;
}
ActiveComponentBase::MsgDispatchStatus loopStatus = comp->doDispatch();
switch (loopStatus) {
case ActiveComponentBase::MSG_DISPATCH_OK: // if normal message processing, continue
void ActiveComponentBase::s_taskStateMachine(void* component_pointer) {
FW_ASSERT(component_pointer != nullptr);
// cast void* back to active component
ActiveComponentBase* component = static_cast<ActiveComponentBase*>(component_pointer);

// Each invocation of this function runs a single stage of the thread lifecycle.
switch (component->m_stage) {
// The first stage the active component triggers the "preamble" call before moving into the dispatching
// stage of the component thread.
case Lifecycle::CREATED:
component->preamble();
component->m_stage = Lifecycle::DISPATCHING;
break;
// The second stage of the active component triggers the dispatching loop dispatching messages until an
// exit message is received.
case Lifecycle::DISPATCHING:
if (component->dispatch() == MsgDispatchStatus::MSG_DISPATCH_EXIT) {
component->m_stage = Lifecycle::FINALIZING;
}
break;
// The second-to-last stage is where the finalizer is called. This will transition to the final stage
// automatically after the finalizer is called
case Lifecycle::FINALIZING:
component->finalizer();
component->m_stage = Lifecycle::DONE;
break;
case ActiveComponentBase::MSG_DISPATCH_EXIT:
comp->finalizer();
comp->m_task.setStarted(false);
// The last stage does nothing, cooperative tasks live here forever, threaded tasks exit on this condition
case Lifecycle::DONE:
break;
default:
FW_ASSERT(0,static_cast<NATIVE_INT_TYPE>(loopStatus));
FW_ASSERT(0);
break;
}
}
void ActiveComponentBase::s_baseTask(void* ptr) {
// cast void* back to active component
ActiveComponentBase* comp = static_cast<ActiveComponentBase*> (ptr);
// indicated that task is started
comp->m_task.setStarted(true);
// print out message when task is started
// printf("Active Component %s task started.\n",comp->getObjName());
// call preamble
comp->preamble();
// call main task loop until exit or error
comp->loop();
// if main loop exits, call finalizer
comp->finalizer();
}

void ActiveComponentBase::loop() {

bool quitLoop = false;
while (!quitLoop) {
MsgDispatchStatus loopStatus = this->doDispatch();
switch (loopStatus) {
case MSG_DISPATCH_OK: // if normal message processing, continue
break;
case MSG_DISPATCH_EXIT:
quitLoop = true;
break;
default:
FW_ASSERT(0,static_cast<NATIVE_INT_TYPE>(loopStatus));
}
void ActiveComponentBase::s_taskLoop(void* component_pointer) {
FW_ASSERT(component_pointer != nullptr);
ActiveComponentBase* component = static_cast<ActiveComponentBase*>(component_pointer);
while (component->m_stage != ActiveComponentBase::Lifecycle::DONE) {
ActiveComponentBase::s_taskStateMachine(component);
}
}

ActiveComponentBase::MsgDispatchStatus ActiveComponentBase::dispatch() {
// Cooperative tasks should return rather than block when no messages are available
if (this->m_task.isCooperative() and m_queue.getNumMsgs() == 0) {
return MsgDispatchStatus::MSG_DISPATCH_EMPTY;
}
return this->doDispatch();
}

void ActiveComponentBase::preamble() {
Expand Down
21 changes: 16 additions & 5 deletions Fw/Comp/ActiveComponentBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,39 @@ class ActiveComponentBase : public QueuedComponentBase {
Os::Task::ParamType stackSize = Os::Task::TASK_DEFAULT,
Os::Task::ParamType cpuAffinity = Os::Task::TASK_DEFAULT,
Os::Task::ParamType identifier =
Os::Task::TASK_DEFAULT); //!< called by instantiator when task is to be started
Os::Task::TASK_DEFAULT); //!< called by instantiator when task is to be started
void exit(); //!< exit task in active component
Os::Task::TaskStatus join(void** value_ptr); //!< provide return value of thread if value_ptr is not NULL
Os::Task::Status join(); //!< Join the thread
DEPRECATED(Os::Task::Status join(void** value_ptr), "Switch to .join()"); //!< Join to thread with discarded value_ptr

enum {
ACTIVE_COMPONENT_EXIT //!< message to exit active component task
};

PROTECTED:
//! Tracks the lifecycle of the component
enum Lifecycle {
CREATED, //!< Initial stage, call preamble
DISPATCHING, //!< Component is dispatching messages
FINALIZING, //!< Penultimate stage, call finalizer
DONE, //!< Done, doing nothing
};

explicit ActiveComponentBase(const char* name); //!< Constructor
virtual ~ActiveComponentBase(); //!< Destructor
void init(NATIVE_INT_TYPE instance); //!< initialization code
virtual void preamble(); //!< A function that will be called before the event loop is entered
virtual void loop(); //!< The function that will loop dispatching messages
MsgDispatchStatus dispatch(); //!< The function that will dispatching messages
virtual void finalizer(); //!< A function that will be called after exiting the loop
Os::Task m_task; //!< task object for active component

#if FW_OBJECT_TO_STRING == 1
virtual void toString(char* str, NATIVE_INT_TYPE size); //!< create string description of component
#endif
PRIVATE:
static void s_baseTask(void*); //!< function provided to task class for new thread.
static void s_baseBareTask(void*); //!< function provided to task class for new thread.
Lifecycle m_stage; //!< Lifecycle stage of the component
static void s_taskStateMachine(void*); //!< Task lifecycle state machine
static void s_taskLoop(void*); //!< Standard multi-threading task loop
};

} // namespace Fw
Expand Down
5 changes: 3 additions & 2 deletions Os/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set(MOD_DEPS
set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/IntervalTimerCommon.cpp"
"${CMAKE_CURRENT_LIST_DIR}/TaskString.cpp"
"${CMAKE_CURRENT_LIST_DIR}/TaskCommon.cpp"
# "${CMAKE_CURRENT_LIST_DIR}/TaskCommon.cpp"
"${CMAKE_CURRENT_LIST_DIR}/QueueCommon.cpp"
"${CMAKE_CURRENT_LIST_DIR}/QueueString.cpp"
"${CMAKE_CURRENT_LIST_DIR}/IPCQueueCommon.cpp"
Expand All @@ -38,6 +38,7 @@ set(SOURCE_FILES

# Refactored common files
"${CMAKE_CURRENT_LIST_DIR}/File.cpp"
"${CMAKE_CURRENT_LIST_DIR}/Task.cpp"
)
# Check for default logger
if (NOT FPRIME_DISABLE_DEFAULT_LOGGER)
Expand All @@ -53,7 +54,7 @@ if (FPRIME_USE_POSIX)
"${CMAKE_CURRENT_LIST_DIR}/Pthreads/BufferQueueCommon.cpp"
"${CMAKE_CURRENT_LIST_DIR}/Pthreads/PriorityBufferQueue.cpp"
"${CMAKE_CURRENT_LIST_DIR}/Pthreads/MaxHeap/MaxHeap.cpp"
"${CMAKE_CURRENT_LIST_DIR}/Posix/Task.cpp"
# "${CMAKE_CURRENT_LIST_DIR}/Posix/Task.cpp"
"${CMAKE_CURRENT_LIST_DIR}/Linux/InterruptLock.cpp"
"${CMAKE_CURRENT_LIST_DIR}/Linux/WatchdogTimer.cpp"
"${CMAKE_CURRENT_LIST_DIR}/Posix/IntervalTimer.cpp"
Expand Down
Loading

0 comments on commit 3a38f0c

Please sign in to comment.