Skip to content

Commit

Permalink
Merge pull request #18 from SiriDB/worker_split
Browse files Browse the repository at this point in the history
Worker split
  • Loading branch information
timoj authored Jul 2, 2021
2 parents cdeef25 + f12eeab commit 33be7e6
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 203 deletions.
50 changes: 19 additions & 31 deletions lib/analyser/model.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
from lib.util.util import load_disk_data
import os

from enodo import EnodoModel
from lib.config import Config
from lib.util import safe_json_dumps
from lib.util import load_disk_data, save_disk_data
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD, SUBSCRIPTION_CHANGE_TYPE_DELETE


Expand All @@ -16,52 +17,41 @@ async def async_setup(cls, update_cb):
cls._update_cb = update_cb

@classmethod
async def get_model(cls, model_name):
async def get_model(cls, name):
for m in cls.models:
if m.model_name == model_name:
if m.name == name:
return m
return None

@classmethod
async def add_model(cls, model_name, model_arguments):
if await cls.get_model(model_name) is None:
model = EnodoModel(model_name, model_arguments)
async def add_model(cls, name, model_arguments):
if await cls.get_model(name) is None:
model = EnodoModel(name, model_arguments)
cls.models.append(model)
await cls._update_cb(SUBSCRIPTION_CHANGE_TYPE_ADD, EnodoModel.to_dict(model))

@classmethod
async def add_model_from_dict(cls, dict_data):
try:
model = EnodoModel.from_dict(dict_data)
except Exception as e:
logging.error(f"Something went wrong while adding EnodoModel")
logging.debug(f"Corresponding error: {e}")
return False
else:
if await cls.get_model(model.model_name) is None:
cls.models.append(model)
await cls._update_cb(SUBSCRIPTION_CHANGE_TYPE_ADD, EnodoModel.to_dict(model))
return True
return False
async def add_enodo_model(cls, model):
if await cls.get_model(model.name) is None:
cls.models.append(model)
await cls._update_cb(SUBSCRIPTION_CHANGE_TYPE_ADD, EnodoModel.to_dict(model))
return True
return False

@classmethod
async def remove_model(cls, model_name):
model = await cls.get_model(model_name)
async def remove_model(cls, name):
model = await cls.get_model(name)
cls.models.remove(model)
await cls._update_cb(SUBSCRIPTION_CHANGE_TYPE_DELETE, model_name)
await cls._update_cb(SUBSCRIPTION_CHANGE_TYPE_DELETE, name)

@classmethod
async def load_from_disk(cls):
try:
if not os.path.exists(Config.model_save_path):
raise Exception()
f = open(Config.model_save_path, "r")
data = f.read()
f.close()
data = load_disk_data(Config.model_save_path)
except Exception as e:
data = "{}"

data = json.loads(data)
data = {}

if isinstance(data, list):
for model_data in data:
Expand All @@ -77,9 +67,7 @@ async def save_to_disk(cls):
model_list.append(EnodoModel.to_dict(model))

try:
f = open(Config.model_save_path, "w")
f.write(json.dumps(model_list, default=safe_json_dumps))
f.close()
save_disk_data(Config.model_save_path, model_list)
except Exception as e:
logging.error(f"Something went wrong when writing enodo models to disk")
logging.debug(f"Corresponding error: {e}")
22 changes: 7 additions & 15 deletions lib/enodojobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .socket import ClientManager
from .socket.package import create_header, WORKER_JOB, WORKER_JOB_CANCEL
from .socketio import SUBSCRIPTION_CHANGE_TYPE_UPDATE
from .util import safe_json_dumps
from lib.util import load_disk_data, save_disk_data
from .socketio import SUBSCRIPTION_CHANGE_TYPE_DELETE, SUBSCRIPTION_CHANGE_TYPE_ADD


Expand Down Expand Up @@ -345,7 +345,8 @@ async def receive_job_result(cls, writer, packet_type, packet_id, data, client_i
elif job_type == JOB_TYPE_BASE_SERIES_ANALYSIS:
try:
series = await SeriesManager.get_series(data.get('name'))
series.series_characteristics = data.get('characteristics')
series.series_characteristics = data.get('data').get('characteristics')
series.health = data.get('data').get('health')
await series.set_job_status(JOB_TYPE_BASE_SERIES_ANALYSIS, JOB_STATUS_DONE)
await SeriesManager.series_changed(SUBSCRIPTION_CHANGE_TYPE_UPDATE, data.get('name'))
except Exception as e:
Expand Down Expand Up @@ -427,35 +428,26 @@ async def save_to_disk(cls):
# 'open_jobs': [EnodoJob.to_dict(job) for job in cls._open_jobs],
'failed_jobs': [EnodoJob.to_dict(job) for job in cls._failed_jobs],
}
f = open(Config.jobs_save_path, "w")
f.write(json.dumps(job_data, default=safe_json_dumps))
f.close()
save_disk_data(Config.jobs_save_path, job_data)
except Exception as e:
logging.error(f"Something went wrong when saving jobmanager data to disk")
logging.debug(f"Corresponding error: {e}")
cls._unlock()

@classmethod
async def load_from_disk(cls):
loaded_open_jobs = 0
loaded_failed_jobs = 0
await cls._lock()
try:
if not os.path.exists(Config.jobs_save_path):
raise Exception()
f = open(Config.jobs_save_path, "r")
data = f.read()
f.close()
data = load_disk_data(Config.jobs_save_path)
except Exception as e:
data = "{}"

data = json.loads(data)
data = {}

if isinstance(data, dict):
if 'next_job_id' in data:
cls._next_job_id = int(data.get('next_job_id'))
# if 'open_jobs' in data:
# loaded_open_jobs += len(data.get('open_jobs'))
# cls._open_jobs = [EnodoJob.from_dict(job_data) for job_data in data.get('open_jobs')]
if 'failed_jobs' in data:
loaded_failed_jobs += len(data.get('failed_jobs'))
cls._failed_jobs = [EnodoJob.from_dict(job_data) for job_data in data.get('failed_jobs')]
Expand Down
17 changes: 7 additions & 10 deletions lib/events/enodoeventmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import aiohttp
import json
from jinja2 import Environment, PackageLoader
from jinja2 import Environment

from lib.config import Config
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD, SUBSCRIPTION_CHANGE_TYPE_UPDATE, SUBSCRIPTION_CHANGE_TYPE_DELETE
from lib.serverstate import ServerState
from lib.util import save_disk_data, load_disk_data

ENODO_EVENT_ANOMALY_DETECTED = "event_anomaly_detected"
ENODO_EVENT_JOB_QUEUE_TOO_LONG = "job_queue_too_long"
Expand Down Expand Up @@ -251,16 +252,14 @@ async def load_from_disk(cls):
try:
if not os.path.exists(Config.event_outputs_save_path):
raise Exception()
f = open(Config.event_outputs_save_path, "r")
data = f.read()
f.close()
data = load_disk_data(Config.event_outputs_save_path)
except Exception as _:
data = "{}"
data = {}

if data == "" or data is None:
data = "{}"
data = {}

output_data = json.loads(data)
output_data = data
if 'next_output_id' in output_data:
cls._next_output_id = output_data.get('next_output_id')
if 'outputs' in output_data:
Expand All @@ -279,9 +278,7 @@ async def save_to_disk(cls):
'next_output_id': cls._next_output_id,
'outputs': serialized_outputs
}
f = open(Config.event_outputs_save_path, "w")
f.write(json.dumps(output_data))
f.close()
save_disk_data(Config.event_outputs_save_path, output_data)
except Exception as e:
logging.error(f"Something went wrong when writing eventmanager data to disk")
logging.debug(f"Corresponding error: {e}")
Expand Down
2 changes: 1 addition & 1 deletion lib/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, name, config, datapoint_count, job_statuses=None, series_char

self._datapoint_count = datapoint_count
self._datapoint_count_lock = False
self.health = 100
self.health = None

async def set_datapoints_counter_lock(self, is_locked):
"""
Expand Down
49 changes: 30 additions & 19 deletions lib/series/seriesmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@

from lib.config import Config
from lib.events import EnodoEvent
from lib.events.enodoeventmanager import ENODO_EVENT_ANOMALY_DETECTED, EnodoEventManager
from lib.events.enodoeventmanager import ENODO_EVENT_ANOMALY_DETECTED,\
EnodoEventManager
from lib.serverstate import ServerState
from lib.siridb.siridb import query_series_datapoint_count, drop_series, \
insert_points, query_series_data, does_series_exist
from lib.siridb.siridb import query_series_datapoint_count,\
drop_series, insert_points, query_series_data, does_series_exist,\
query_group_expression_by_name
from lib.socket import ClientManager
from lib.socket.package import create_header, UPDATE_SERIES
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD, SUBSCRIPTION_CHANGE_TYPE_DELETE
from lib.util import safe_json_dumps
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD,\
SUBSCRIPTION_CHANGE_TYPE_DELETE
from lib.util import load_disk_data, save_disk_data

from enodo.jobs import JOB_STATUS_DONE, JOB_TYPE_BASE_SERIES_ANALYSIS, JOB_TYPE_DETECT_ANOMALIES_FOR_SERIES, JOB_TYPE_FORECAST_SERIES, JOB_TYPE_DETECT_ANOMALIES_FOR_SERIES_REALTIME
from enodo.jobs import JOB_STATUS_DONE, JOB_TYPE_BASE_SERIES_ANALYSIS,\
JOB_TYPE_DETECT_ANOMALIES_FOR_SERIES, JOB_TYPE_FORECAST_SERIES,\
JOB_TYPE_DETECT_ANOMALIES_FOR_SERIES_REALTIME


class SeriesManager:
Expand Down Expand Up @@ -68,7 +73,13 @@ def get_all_series(cls):

@classmethod
def get_listener_series_info(cls):
return [{"name": series_name, "realtime": series.series_config.realtime} for series_name, series in cls._series.items()]
series = [{"name": series_name, "realtime": series.series_config.realtime} for series_name, series in cls._series.items()]
labels = [{"name": label.get('selector'), "realtime": label.get('series_config').get('realtime'), "isGroup": label.get('type') == "group"} for label in cls._labels.values()]
return series + labels

@classmethod
def _update_listeners(cls):
ClientManager.update_listeners(cls.get_listener_series_info())

@classmethod
def get_labels_data(cls):
Expand All @@ -78,13 +89,18 @@ def get_labels_data(cls):
}

@classmethod
def add_label(cls, name, grouptag, series_config):
cls._labels[grouptag] = {"name": name, "grouptag": grouptag, "series_config": series_config}
async def add_label(cls, description, name, series_config):
if name not in cls._labels:
# TODO: Change auto type == "group" to a input value when tags are added
group_expression = await query_group_expression_by_name(ServerState.get_siridb_data_conn(), name)
cls._labels[name] = {"description": description, "name": name, "series_config": series_config, "type": "group", "selector": group_expression}
cls._update_listeners()

@classmethod
def remove_label(cls, grouptag):
if grouptag in cls._labels:
del cls._labels[grouptag]
def remove_label(cls, name):
if name in cls._labels:
del cls._labels[name]
cls._update_listeners()
return True
return False

Expand Down Expand Up @@ -169,10 +185,7 @@ async def read_from_disk(cls):
if not os.path.exists(Config.series_save_path):
pass
else:
f = open(Config.series_save_path, "r")
data = f.read()
f.close()
data = json.loads(data)
data = load_disk_data(Config.series_save_path)
series_data = data.get('series')
if series_data is not None:
for s in series_data:
Expand All @@ -194,9 +207,7 @@ async def save_to_disk(cls):
"series": serialized_series,
"labels": serialized_labels
}
f = open(Config.series_save_path, "w")
f.write(json.dumps(serialized_data, default=safe_json_dumps))
f.close()
save_disk_data(Config.series_save_path, serialized_data)
except Exception as e:
logging.error(f"Something went wrong when writing seriesmanager data to disk")
logging.debug(f"Corresponding error: {e}")
Expand Down
13 changes: 13 additions & 0 deletions lib/siridb/siridb.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,16 @@ async def insert_points(siridb_client, series_name, points):
print(e)
pass
return result

async def query_group_expression_by_name(siridb_client, group_name):
result = None
try:
result = await siridb_client.query(f'list groups where name == "{group_name}"')
except (QueryError, InsertError, ServerError, PoolError, AuthenticationError, UserAuthError) as e:
print("Connection problem with SiriDB server")
pass
groups = result.get('groups')
if groups is None or len(groups) < 1:
return None

return groups[0][0]
Loading

0 comments on commit 33be7e6

Please sign in to comment.