Skip to content

Commit

Permalink
Merge pull request #19 from helyOSFramework/log_collector
Browse files Browse the repository at this point in the history
Log collector new feature: log folding
  • Loading branch information
cviolbarbosa authored Jun 3, 2024
2 parents 44fbe42 + 2d378fa commit 2cb1497
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,58 @@ const { inMemDB } = require('../../services/in_mem_database/mem_database_service
const { MISSION_STATUS, ASSIGNMENT_STATUS } = require('../../modules/data_models.js');


/**
* Evaluates the updates for an assignment's status based on its current status and the parent mission's status.
*
* @param {Object} currentAssm - The current state of the assignment.
* @param {Object} assmUpdate - The proposed updates to the assignment.
* @returns {Promise} A promise which resolves when the assignment update has been handled appropriately.
*/
const evaluateAssignmentUpdate = (currentAssm, assmUpdate, uuid) => {
// If assignment status changed
if (currentAssm.status !== assmUpdate.status) {
// Filtering out irrelevant state update
if(currentAssm.status === ASSIGNMENT_STATUS.COMPLETED && assmUpdate.status === ASSIGNMENT_STATUS.SUCCEEDED) {
return Promise.resolve(false);
}
// Filtering out and reportig invalid state flow
if ([ASSIGNMENT_STATUS.SUCCEEDED, ASSIGNMENT_STATUS.COMPLETED, ASSIGNMENT_STATUS.FAILED].includes(currentAssm.status)){
logData.addLog('agent', { 'uuid': uuid }, 'warning', `agent tried to change the status of an assignment that is already ${currentAssm.status}`);
return Promise.resolve(false);
}
// Unconditonaly ending an assignment
if ([ASSIGNMENT_STATUS.CANCELED, ASSIGNMENT_STATUS.ABORTED, ASSIGNMENT_STATUS.FAILED].includes(assmUpdate.status)) {
logData.addLog('agent', { 'uuid': uuid }, 'info', `agent has marked the assignment ${currentAssm.id} as ${assmUpdate.status}`);
return databaseServices.assignments.update_byId(currentAssm.id, assmUpdate).then(()=>true);
}
// Conditional assignment status change
return databaseServices.assignments.updateByConditions({
'assignments.id': currentAssm.id,
'work_processes.id': currentAssm.work_process_id,
'work_processes.status__in': [ MISSION_STATUS.EXECUTING,
MISSION_STATUS.DISPATCHED,
MISSION_STATUS.CALCULATING,
MISSION_STATUS.CANCELING,
MISSION_STATUS.FAILED]
}, assmUpdate).then(()=>true);
}

// If assignment status did not change, but assignment results did.
if (assmUpdate.status === ASSIGNMENT_STATUS.ACTIVE || assmUpdate.status === ASSIGNMENT_STATUS.EXECUTING) {
// Conditional assignment status change
return databaseServices.assignments.updateByConditions({
'assignments.id': currentAssm.id,
'work_processes.id': currentAssm.work_process_id,
'work_processes.status__in': [ MISSION_STATUS.EXECUTING,
MISSION_STATUS.DISPATCHED,
MISSION_STATUS.CALCULATING,
MISSION_STATUS.CANCELING,
MISSION_STATUS.FAILED]
}, assmUpdate).then(()=>true);
}

}

/* Update the assignmnet if it is not already marked as 'completed' or 'succeeded' */
/* The data is updated in the assignment table and in the agent table (under work process clearance) */
async function updateAgentMission(assignment, uuid = null) {
Expand All @@ -16,65 +68,30 @@ async function updateAgentMission(assignment, uuid = null) {
const assignmentResult = assignment_status_obj.result;

const assmUpdate = { 'id': assignmentId, 'status': assignmentStatus, 'result': assignmentResult };
const currentAssm = await databaseServices.assignments.get_byId(assignmentId, ['status', 'work_process_id']);

if (currentAssm && currentAssm.status !== assmUpdate.status) {
const currentAssm = await databaseServices.assignments.get_byId(assignmentId, ['id', 'status', 'work_process_id']);
if (!currentAssm) return;

// Filtering out irrelevant state update
if(currentAssm.status === ASSIGNMENT_STATUS.COMPLETED && assmUpdate.status === ASSIGNMENT_STATUS.SUCCEEDED) {
return;
}

// Filtering out and reportig invalid state flow
if ([ASSIGNMENT_STATUS.SUCCEEDED, ASSIGNMENT_STATUS.COMPLETED, ASSIGNMENT_STATUS.FAILED].includes(currentAssm.status)){
logData.addLog('agent', { 'uuid': uuid }, 'warning', `agent tried to change the status of an assignment that is already ${currentAssm.status}`);
return;
}

// Unconditonaly ending an assignment
if ([ASSIGNMENT_STATUS.CANCELED, ASSIGNMENT_STATUS.ABORTED, ASSIGNMENT_STATUS.FAILED].includes(assmUpdate.status)) {
logData.addLog('agent', { 'uuid': uuid }, 'info', `agent has marked the assignment ${assignmentId} as ${assmUpdate.status}`);
return await databaseServices.assignments.update_byId(assignmentId, assmUpdate);
if (await evaluateAssignmentUpdate(currentAssm, assmUpdate, uuid)) {
if (uuid) {
const updtedAssignment = {...currentAssm, ...assmUpdate};
await databaseServices.agents.updateByConditions({uuid}, {assignment:updtedAssignment});
}
}

// Conditional assignment status change
await databaseServices.assignments.updateByConditions({
'assignments.id': assignmentId,
'work_processes.id': currentAssm.work_process_id,
'work_processes.status__in': [ MISSION_STATUS.EXECUTING,
MISSION_STATUS.DISPATCHED,
MISSION_STATUS.CALCULATING,
MISSION_STATUS.CANCELING,
MISSION_STATUS.FAILED]
}, assmUpdate);

if (uuid) {
const agents = await databaseServices.agents.get('uuid', uuid, ['id', 'wp_clearance']);
if (agents.length > 0) {
const agent = agents[0];
if (agent.wp_clearance) agent.wp_clearance['assignment_status'] = assmUpdate; //backward compatibility
return await databaseServices.agents.update_byId(agent.id, { 'wp_clearance': agent.wp_clearance, 'assignment': assignment });
} else {
logData.addLog('agent', { uuid }, 'error', "agent does not exist");
}
}
}



async function updateState(objMsg, uuid, bufferPeriod=0) {
try {
const toolUpdate = {uuid, "status": objMsg.body.status, 'last_message_time': new Date() };

// Get the agent id only once and save in in-memory table.
if (!inMemDB.agents[uuid] || !inMemDB.agents[uuid].id ){
const toolIds = await databaseServices.agents.getIds([uuid]);
inMemDB.update('agents', 'uuid', {uuid, id:toolIds[0]}, toolUpdate['last_message_time']);
}


const toolUpdate = {uuid, "status": objMsg.body.status, 'last_message_time': new Date() };

if (objMsg.body.resources){
toolUpdate['resources'] = objMsg.body.resources;
}
Expand All @@ -95,5 +112,4 @@ async function updateState(objMsg, uuid, bufferPeriod=0) {
}


module.exports.updateState = updateState;
module.exports.updateAgentMission = updateAgentMission;
module.exports.updateState = updateState;
59 changes: 37 additions & 22 deletions helyos_server/src/modules/systemlog.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,52 @@
// This module is used to collect logs and save then in the database.
// This module is used to collect logs and save them in the database.

const databaseServices = require('../services/database/database_services.js')
const LOG_OUTPUT = process.env.LOG_OUTPUT || 'database';


/**
* Class that gather 10 log messsage and insert than with a single INSERT query to the database.
/**
* LogData gathers messages and inserts them with a single INSERT query to the database after a time period.
* @class
* @param {string} origin - The origin of the log message. It can be 'microservice', 'helyos_core' or 'agent'.
* @param {object} metadata - The metadata of the log message. It can be the request object, the agent object or the assignment object.
* @param {string} logType - The type of the log message. It can be 'info', 'warning' or 'error'.
*
* @param {string} log_msg - The log message.
* @param {string} eventType - The event type of the log message. It can be 'request', 'response', 'error', 'info', 'warning' or 'success'.
*
*
*/

*/
class LogData {
constructor(number_of_logs=10) {
this.number_of_logs = number_of_logs;
/**
* Creates a new LogData instance.
* @param {number} [bufferTime=1000] - The time interval (in milliseconds) for periodically saving logs to the database.
*/
constructor(bufferTime=1000) {
this.logs = [];
this._periodicallySaveLogs();

this.lastLogMsg = '';
this.repeatedLog = 1;
this.bufferTime = bufferTime;
}


isLogRepeating(lastLog, newLog) {
return lastLog.event == newLog.event &&
lastLog.log_type == newLog.log_type &&
lastLog.origin == newLog.origin &&
this.lastLogMsg == newLog.msg;
}
/**
* Adds a log entry to the internal log buffer.
* @param {string} origin - The origin of the log message (e.g., 'microservice', 'helyos_core', or 'agent').
* @param {object} metadata - The metadata associated with the log message (e.g., request object, agent object, or assignment object).
* @param {string} logType - The type of the log message ('info', 'warning', or 'error').
* @param {string} log_msg - The log message.
* @param {string} [eventType=''] - The event type of the log message ('request', 'response', 'error', 'info', 'warning', or 'success').
*/
addLog(origin, metadata, logType, log_msg, eventType='') {
let new_log_instance = parseLogData(origin, metadata, logType, log_msg, eventType);
if (LOG_OUTPUT === 'database') {
this.logs.push(new_log_instance);
if (this.logs.length >= this.number_of_logs) {
this.saveLogs();
}
const lastLog = this.logs[this.logs.length - 1];
if (lastLog && this.isLogRepeating(lastLog, new_log_instance)) {
this.repeatedLog++;
lastLog.msg = `${this.repeatedLog}X: ${log_msg}`;
} else {
this.lastLogMsg = log_msg;
this.repeatedLog = 1;
this.logs.push(new_log_instance);
}
} else {
console.log(new_log_instance);
}
Expand All @@ -48,7 +63,7 @@ class LogData {
_periodicallySaveLogs() {
setInterval(() => {
this.saveLogs(true);
}, 1000);
}, this.bufferTime);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packaging/build.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
docker build -f ./Dockerfile -t helyosframework/helyos_core:test ..
docker build --no-cache -f ./Dockerfile -t helyosframework/helyos_core:test ..

0 comments on commit 2cb1497

Please sign in to comment.