From 0481f9f37708269d9c3c02e03bfe8f64aa60509f Mon Sep 17 00:00:00 2001 From: Mohit Goyal Date: Thu, 25 Apr 2024 13:56:56 +0530 Subject: [PATCH] adds base interpretation layer and links that to workflow actions --- .../playbook_metric_task_executor_facade.py | 111 +----------------- executor/workflows/action/action_executor.py | 3 +- .../action/notify_action_executor/notifier.py | 2 +- .../notify_action_executor/notify_facade.py | 10 +- .../notify_action_executor/slack_notifier.py | 28 ++++- executor/workflows/tasks.py | 58 ++++++++- intelligence_layer/__init__.py | 0 intelligence_layer/admin.py | 3 + intelligence_layer/apps.py | 6 + intelligence_layer/migrations/__init__.py | 0 intelligence_layer/models.py | 3 + .../task_result_interpreters/__init__.py | 0 .../__init__.py | 0 .../basic_metric_task_interpreter.py | 109 +++++++++++++++++ .../task_result_interpreter_facade.py | 33 ++++++ intelligence_layer/tests.py | 3 + intelligence_layer/utils.py | 8 ++ intelligence_layer/views.py | 3 + media/utils.py | 18 +++ playbooks/base_settings.py | 4 +- .../intelligence_layer/interpreter.proto | 29 +++++ .../intelligence_layer/interpreter_pb2.py | 32 +++++ .../intelligence_layer/interpreter_pb2.pyi | 84 +++++++++++++ requirements.txt | 1 + 24 files changed, 427 insertions(+), 121 deletions(-) create mode 100644 intelligence_layer/__init__.py create mode 100644 intelligence_layer/admin.py create mode 100644 intelligence_layer/apps.py create mode 100644 intelligence_layer/migrations/__init__.py create mode 100644 intelligence_layer/models.py create mode 100644 intelligence_layer/task_result_interpreters/__init__.py create mode 100644 intelligence_layer/task_result_interpreters/metric_task_result_interpreters/__init__.py create mode 100644 intelligence_layer/task_result_interpreters/metric_task_result_interpreters/basic_metric_task_interpreter.py create mode 100644 intelligence_layer/task_result_interpreters/task_result_interpreter_facade.py create mode 100644 intelligence_layer/tests.py create mode 100644 intelligence_layer/utils.py create mode 100644 intelligence_layer/views.py create mode 100644 protos/playbooks/intelligence_layer/interpreter.proto create mode 100644 protos/playbooks/intelligence_layer/interpreter_pb2.py create mode 100644 protos/playbooks/intelligence_layer/interpreter_pb2.pyi diff --git a/executor/metric_task_executor/playbook_metric_task_executor_facade.py b/executor/metric_task_executor/playbook_metric_task_executor_facade.py index 74db7e7ac..316b44327 100644 --- a/executor/metric_task_executor/playbook_metric_task_executor_facade.py +++ b/executor/metric_task_executor/playbook_metric_task_executor_facade.py @@ -1,10 +1,5 @@ -import random from typing import Dict -import pandas as pd -import plotly.graph_objects as go -import plotly.io as pio - from executor.metric_task_executor.cloudwatch_task_executor import CloudwatchMetricTaskExecutor from executor.metric_task_executor.datadog_task_executor import DatadogMetricTaskExecutor from executor.metric_task_executor.newrelic_task_executor import NewRelicMetricTaskExecutor @@ -12,111 +7,7 @@ from executor.metric_task_executor.grafana_executor import GrafanaMetricTaskExecutor from executor.metric_task_executor.grafana_vpc_executor import GrafanaVpcMetricTaskExecutor -from protos.playbooks.playbook_pb2 import PlaybookMetricTaskDefinition as PlaybookMetricTaskDefinitionProto, \ - PlaybookMetricTaskExecutionResult - -metric_source_displace_name_map = { - PlaybookMetricTaskDefinitionProto.Source.CLOUDWATCH: 'Cloudwatch', - PlaybookMetricTaskDefinitionProto.Source.GRAFANA: 'Grafana', - PlaybookMetricTaskDefinitionProto.Source.GRAFANA_VPC: 'Grafana', - PlaybookMetricTaskDefinitionProto.Source.NEW_RELIC: 'New Relic', - PlaybookMetricTaskDefinitionProto.Source.DATADOG: 'Datadog' -} - - -def generate_color_map(labels): - color_map = {} - for label in labels: - color_map[label] = f'rgb({random.randint(0, 255)},{random.randint(0, 255)},{random.randint(0, 255)})' - return color_map - - -def timeseries_to_df(proto_data: PlaybookMetricTaskExecutionResult.Result.Timeseries): - data = [] - for timeseries in proto_data.labeled_metric_timeseries: - legend_label = '' - for label in timeseries.metric_label_values: - legend_label += f'{label.name.value}={label.value.value}__' - legend_label = legend_label[:-2] - for datapoint in timeseries.datapoints: - data.append({ - 'Timestamp': pd.Timestamp(datapoint.timestamp, unit='ms').strftime('%H:%M'), - 'Value': datapoint.value.value, - 'Label': legend_label, - 'Unit': timeseries.unit.value - }) - - return pd.DataFrame(data) - - -def table_result_to_df(table_result): - # Extracting column names from the first TableRow - column_names = [col.name.value for col in table_result.rows[0].columns] - - # Initialize an empty DataFrame with column names - df = pd.DataFrame(columns=column_names) - - # Extracting data from rows - for row in table_result.rows: - row_data = [col.value.value for col in row.columns] - df = df.append(pd.Series(row_data, index=column_names), ignore_index=True) - - # Keep only the first 5 rows if there are more - if len(df) > 5: - df = df.head(5) - - return df - - -def publish_metric_task_execution_result(file_key, result): - result_type = result.type - if result_type == PlaybookMetricTaskExecutionResult.Result.Type.TIMESERIES: - timeseries = result.timeseries - df = timeseries_to_df(timeseries) - unique_labels = df['Label'].unique() - color_map = generate_color_map(unique_labels) - fig = go.Figure() - unit = df['Unit'].iloc[0] if 'Unit' in df and df['Unit'].iloc[0] else '' - for label, data in df.groupby('Label'): - fig.add_trace(go.Scatter(x=data['Timestamp'], y=data['Value'], mode='lines', name=label, - line=dict(color=color_map[label]))) - if unique_labels is None or (len(unique_labels) == 1 and '' in unique_labels): - fig.update_layout( - xaxis_title='Timestamp', - yaxis_title='Values' if not unit else unit, - title_x=0.5, - title_y=0.9 - ) - else: - fig.update_layout( - xaxis_title='Timestamp', - yaxis_title='Values' if not unit else unit, - legend_title_text='Labels', - title_x=0.5, - title_y=0.9 - ) - - pio.write_image(fig, file_key) - return True - elif result_type == PlaybookMetricTaskExecutionResult.Result.Type.TABLE_RESULT: - # table_result = result.table_result - # df = table_result_to_df(table_result) - # fig = go.Figure(data=[go.Table( - # header=dict(values=df.columns, - # fill_color='paleturquoise', - # align='left'), - # cells=dict(values=[df[col] for col in df.columns], - # fill_color='lavender', - # align='left')) - # ]) - # - # # Update layout for better visualization - # fig.update_layout(title='Table Result' if not expression else expression, title_x=0.5, title_y=0.9) - # - # # Exporting the plot to an image using plotly - # pio.write_image(fig, file_key) - return False - return False +from protos.playbooks.playbook_pb2 import PlaybookMetricTaskDefinition as PlaybookMetricTaskDefinitionProto class PlaybookMetricTaskExecutorFacade: diff --git a/executor/workflows/action/action_executor.py b/executor/workflows/action/action_executor.py index b16a866fc..f231e86d7 100644 --- a/executor/workflows/action/action_executor.py +++ b/executor/workflows/action/action_executor.py @@ -2,12 +2,13 @@ from accounts.models import Account from executor.workflows.action.notify_action_executor.notify_facade import notifier_facade +from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto from protos.playbooks.workflow_pb2 import WorkflowAction as WorkflowActionProto logger = logging.getLogger(__name__) -def action_executor(account: Account, action: WorkflowActionProto, execution_output): +def action_executor(account: Account, action: WorkflowActionProto, execution_output: [InterpretationProto]): if action.type == WorkflowActionProto.Type.NOTIFY: notifier_facade.notify(account, action.notification_config, execution_output) else: diff --git a/executor/workflows/action/notify_action_executor/notifier.py b/executor/workflows/action/notify_action_executor/notifier.py index 7678c579c..845c55ae2 100644 --- a/executor/workflows/action/notify_action_executor/notifier.py +++ b/executor/workflows/action/notify_action_executor/notifier.py @@ -3,7 +3,7 @@ class Notifier: - type: WorkflowActionNotificationConfigProto.type = WorkflowActionNotificationConfigProto.Type.UNKNOWN + type: WorkflowActionNotificationConfigProto.Type = WorkflowActionNotificationConfigProto.Type.UNKNOWN account: Account = None def notify(self, config: WorkflowActionNotificationConfigProto, execution_output) -> bool: diff --git a/executor/workflows/action/notify_action_executor/notify_facade.py b/executor/workflows/action/notify_action_executor/notify_facade.py index d4111b63f..64a6fb1e8 100644 --- a/executor/workflows/action/notify_action_executor/notify_facade.py +++ b/executor/workflows/action/notify_action_executor/notify_facade.py @@ -2,7 +2,8 @@ from accounts.models import Account from executor.workflows.action.notify_action_executor.notifier import Notifier -from executor.workflows.entry_point.alert_entry_point.slack_channel_alert_entry_point import SlackChannelAlertEntryPoint +from executor.workflows.action.notify_action_executor.slack_notifier import SlackNotifier +from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto from protos.playbooks.workflow_pb2 import WorkflowActionNotificationConfig as WorkflowActionNotificationConfigProto logger = logging.getLogger(__name__) @@ -16,14 +17,15 @@ def __init__(self): def register(self, notification_type: WorkflowActionNotificationConfigProto.Type, notifier: Notifier.__class__): self._map[notification_type] = notifier - def notify(self, account: Account, config: WorkflowActionNotificationConfigProto, execution_output) -> bool: + def notify(self, account: Account, config: WorkflowActionNotificationConfigProto, + execution_output: [InterpretationProto]) -> bool: if not account or not config or not execution_output: return False if config.type not in self._map: raise ValueError(f'Notification type {config.type} is not supported') - notifier = self._map.get(config.type)(account) + notifier = self._map[config.type](account) return notifier.notify(config, execution_output) notifier_facade = NotifierFacade() -notifier_facade.register(WorkflowActionNotificationConfigProto.Type.SLACK, SlackChannelAlertEntryPoint) +notifier_facade.register(WorkflowActionNotificationConfigProto.Type.SLACK, SlackNotifier) diff --git a/executor/workflows/action/notify_action_executor/slack_notifier.py b/executor/workflows/action/notify_action_executor/slack_notifier.py index b135f6215..25d7a8b56 100644 --- a/executor/workflows/action/notify_action_executor/slack_notifier.py +++ b/executor/workflows/action/notify_action_executor/slack_notifier.py @@ -5,6 +5,7 @@ from executor.workflows.action.notify_action_executor.notifier import Notifier from integrations_api_processors.slack_api_processor import SlackApiProcessor from protos.connectors.connector_pb2 import ConnectorType, ConnectorKey +from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto from protos.playbooks.workflow_pb2 import WorkflowActionNotificationConfig as WorkflowActionNotificationConfigProto, \ WorkflowActionSlackNotificationConfig as WorkflowActionSlackNotificationConfigProto @@ -29,13 +30,36 @@ def __init__(self, account: Account): slack_bot_auth_token = slack_bot_auth_token_keys.first() self.slack_api_processor = SlackApiProcessor(slack_bot_auth_token.value) - def notify(self, config: WorkflowActionNotificationConfigProto, execution_output): + def notify(self, config: WorkflowActionNotificationConfigProto, execution_output: [InterpretationProto]): slack_config: WorkflowActionSlackNotificationConfigProto = config.slack_config channel_id = slack_config.slack_channel_id.value if not channel_id: raise ValueError('Slack channel id is not configured in the notification config') logger.info(f"Sending slack message to channel {channel_id} for account {self.account.id}") - message_params = {'text_message': execution_output, 'channel_id': channel_id} + blocks = [] + for i, interpretation in enumerate(execution_output): + if interpretation.type == InterpretationProto.Type.SUMMARY: + blocks.append({ + "type": "section", + "text": { + "type": "mrkdwn", + "text": f'Step {i + 1}: {interpretation.title.value}' + } + }) + elif interpretation.type == InterpretationProto.Type.IMAGE: + blocks.append({ + "type": "section", + "text": { + "type": "mrkdwn", + "text": f'Step {i + 1}: {interpretation.title.value}' + } + }) + blocks.append({ + "type": "image", + "image_url": interpretation.image_url.value, + "alt_text": 'metric evaluation' + }) + message_params = {'blocks': blocks, 'channel_id': channel_id} if slack_config.message_type == WorkflowActionSlackNotificationConfigProto.MessageType.THREAD_REPLY: message_params['reply_to'] = slack_config.thread_ts.value try: diff --git a/executor/workflows/tasks.py b/executor/workflows/tasks.py index 26b818d61..c83257b32 100644 --- a/executor/workflows/tasks.py +++ b/executor/workflows/tasks.py @@ -4,17 +4,24 @@ from celery import shared_task from django.conf import settings -from executor.crud.playbook_execution_crud import create_playbook_execution +from accounts.models import Account +from executor.crud.playbook_execution_crud import create_playbook_execution, get_db_playbook_execution from executor.tasks import execute_playbook +from executor.workflows.action.action_executor import action_executor from executor.workflows.crud.workflow_execution_crud import update_db_account_workflow_execution_status, \ get_db_workflow_execution_logs, get_workflow_executions, create_workflow_execution_log, \ update_db_account_workflow_execution_count_increment +from executor.workflows.crud.workflows_crud import get_db_workflows +from intelligence_layer.task_result_interpreters.task_result_interpreter_facade import \ + playbook_execution_result_interpret from management.crud.task_crud import get_or_create_task from management.models import TaskRun, PeriodicTaskStatus from management.utils.celery_task_signal_utils import publish_pre_run_task, publish_task_failure, publish_post_run_task from playbooks.utils.utils import current_datetime from protos.base_pb2 import TimeRange -from protos.playbooks.workflow_pb2 import WorkflowExecutionStatusType +from protos.playbooks.intelligence_layer.interpreter_pb2 import InterpreterType, Interpretation as InterpretationProto +from protos.playbooks.playbook_pb2 import PlaybookExecution as PlaybookExecutionProto +from protos.playbooks.workflow_pb2 import WorkflowExecutionStatusType, Workflow as WorkflowProto from utils.proto_utils import dict_to_proto logger = logging.getLogger(__name__) @@ -99,16 +106,63 @@ def workflow_scheduler(): @shared_task(max_retries=3, default_retry_delay=10) def workflow_executor(account_id, workflow_id, workflow_execution_id, playbook_id, playbook_execution_id, time_range): + current_time = current_datetime().timestamp() logger.info(f"Running workflow execution:: account_id: {account_id}, workflow_execution_id: " f"{workflow_execution_id}, playbook_execution_id: {playbook_execution_id}") try: create_workflow_execution_log(account_id, workflow_id, workflow_execution_id, playbook_execution_id) execute_playbook(account_id, playbook_id, playbook_execution_id, time_range) + try: + saved_task = get_or_create_task(workflow_action_execution.__name__, account_id, workflow_id, + workflow_execution_id, playbook_execution_id) + if not saved_task: + logger.error(f"Failed to create workflow action execution task for account: {account_id}, workflow_id: " + f"{workflow_id}, workflow_execution_id: {workflow_execution_id}, playbook_id: " + f"{playbook_id}") + return + task = workflow_action_execution.delay(account_id, workflow_id, workflow_execution_id, + playbook_execution_id) + task_run = TaskRun.objects.create(task=saved_task, task_uuid=task.id, + status=PeriodicTaskStatus.SCHEDULED, + account_id=account_id, + scheduled_at=datetime.fromtimestamp(float(current_time))) + except Exception as e: + logger.error( + f"Failed to create workflow action execution:: workflow_id: {workflow_id}, workflow_execution_id: " + f"{workflow_execution_id} playbook_id: {playbook_id}, error: {e}") except Exception as exc: logger.error(f"Error occurred while running workflow execution: {exc}") raise exc +@shared_task(max_retries=3, default_retry_delay=10) +def workflow_action_execution(account_id, workflow_id, workflow_execution_id, playbook_execution_id): + logger.info(f"Running workflow action execution:: account_id: {account_id}, workflow_execution_id: " + f"{workflow_execution_id}, playbook_execution_id: {playbook_execution_id}") + account = Account.objects.get(id=account_id) + try: + playbook_executions = get_db_playbook_execution(account, playbook_execution_id=playbook_execution_id) + workflows = get_db_workflows(account, workflow_id=workflow_id) + if not playbook_executions: + logger.error(f"Aborting workflow action execution as playbook execution not found for " + f"account_id: {account_id}, playbook_execution_id: {playbook_execution_id}") + if not workflows: + logger.error(f"Aborting workflow action execution as workflow not found for " + f"account_id: {account_id}, workflow_id: {workflow_id}") + playbook_execution = playbook_executions.first() + pe_proto: PlaybookExecutionProto = playbook_execution.proto + pe_logs = pe_proto.logs + execution_output: [InterpretationProto] = playbook_execution_result_interpret(InterpreterType.BASIC_I, pe_logs) + workflow = workflows.first() + w_proto: WorkflowProto = workflow.proto + w_actions = w_proto.actions + for w_action in w_actions: + action_executor(account, w_action, execution_output) + except Exception as exc: + logger.error(f"Error occurred while running workflow action execution: {exc}") + raise exc + + workflow_executor_prerun_notifier = publish_pre_run_task(workflow_executor) workflow_executor_failure_notifier = publish_task_failure(workflow_executor) workflow_executor_postrun_notifier = publish_post_run_task(workflow_executor) diff --git a/intelligence_layer/__init__.py b/intelligence_layer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelligence_layer/admin.py b/intelligence_layer/admin.py new file mode 100644 index 000000000..8c38f3f3d --- /dev/null +++ b/intelligence_layer/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/intelligence_layer/apps.py b/intelligence_layer/apps.py new file mode 100644 index 000000000..3e9d2d682 --- /dev/null +++ b/intelligence_layer/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class IntelligenceLayerConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'intelligence_layer' diff --git a/intelligence_layer/migrations/__init__.py b/intelligence_layer/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelligence_layer/models.py b/intelligence_layer/models.py new file mode 100644 index 000000000..71a836239 --- /dev/null +++ b/intelligence_layer/models.py @@ -0,0 +1,3 @@ +from django.db import models + +# Create your models here. diff --git a/intelligence_layer/task_result_interpreters/__init__.py b/intelligence_layer/task_result_interpreters/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelligence_layer/task_result_interpreters/metric_task_result_interpreters/__init__.py b/intelligence_layer/task_result_interpreters/metric_task_result_interpreters/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelligence_layer/task_result_interpreters/metric_task_result_interpreters/basic_metric_task_interpreter.py b/intelligence_layer/task_result_interpreters/metric_task_result_interpreters/basic_metric_task_interpreter.py new file mode 100644 index 000000000..7ece47d9f --- /dev/null +++ b/intelligence_layer/task_result_interpreters/metric_task_result_interpreters/basic_metric_task_interpreter.py @@ -0,0 +1,109 @@ +import logging + +import pandas as pd +import plotly.graph_objects as go +import plotly.io as pio +from google.protobuf.wrappers_pb2 import StringValue + +from intelligence_layer.utils import generate_color_map +from media.utils import save_image_to_db, generate_local_image_path +from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto +from protos.playbooks.playbook_pb2 import PlaybookMetricTaskExecutionResult as PlaybookMetricTaskExecutionResultProto, \ + PlaybookMetricTaskDefinition as PlaybookMetricTaskDefinitionProto, \ + PlaybookTaskDefinition as PlaybookTaskDefinitionProto + +logger = logging.getLogger(__name__) + +metric_source_displace_name_map = { + PlaybookMetricTaskDefinitionProto.Source.CLOUDWATCH: 'Cloudwatch', + PlaybookMetricTaskDefinitionProto.Source.GRAFANA: 'Grafana', + PlaybookMetricTaskDefinitionProto.Source.GRAFANA_VPC: 'Grafana', + PlaybookMetricTaskDefinitionProto.Source.NEW_RELIC: 'New Relic', + PlaybookMetricTaskDefinitionProto.Source.DATADOG: 'Datadog' +} + + +def metric_timeseries_result_to_df(result: PlaybookMetricTaskExecutionResultProto.Result.Timeseries): + data = [] + for timeseries in result.labeled_metric_timeseries: + legend_label = '' + for label in timeseries.metric_label_values: + legend_label += f'{label.name.value}={label.value.value}__' + if legend_label and legend_label[-2:] == '__': + legend_label = legend_label[:-2] + for datapoint in timeseries.datapoints: + data.append({ + 'Timestamp': pd.Timestamp(datapoint.timestamp, unit='ms').strftime('%H:%M'), + 'Value': datapoint.value.value, + 'Label': legend_label, + 'Unit': timeseries.unit.value + }) + + return pd.DataFrame(data) + + +def metric_table_result_to_df(table_result): + column_names = [col.name.value for col in table_result.rows[0].columns] + + df = pd.DataFrame(columns=column_names) + + for row in table_result.rows: + row_data = [col.value.value for col in row.columns] + df = df.append(pd.Series(row_data, index=column_names), ignore_index=True) + + if len(df) > 5: + df = df.head(5) + + return df + + +def basic_metric_task_result_interpreter(task: PlaybookTaskDefinitionProto, + task_result: PlaybookMetricTaskExecutionResultProto) -> InterpretationProto: + file_key = generate_local_image_path() + metric_expression = task_result.metric_expression.value + metric_expression = metric_expression.replace('`', '') + metric_name = task_result.metric_name.value + metric_source = metric_source_displace_name_map.get(task_result.metric_source) + result = task_result.result + result_type = result.type + if result_type == PlaybookMetricTaskExecutionResultProto.Result.Type.TIMESERIES: + timeseries = result.timeseries + df = metric_timeseries_result_to_df(timeseries) + unique_labels = df['Label'].unique() + color_map = generate_color_map(unique_labels) + fig = go.Figure() + unit = df['Unit'].iloc[0] if 'Unit' in df and df['Unit'].iloc[0] else '' + for label, data in df.groupby('Label'): + fig.add_trace(go.Scatter(x=data['Timestamp'], y=data['Value'], mode='lines', name=label, + line=dict(color=color_map[label]))) + if unique_labels is None or (len(unique_labels) == 1 and '' in unique_labels): + fig.update_layout( + xaxis_title='Timestamp', + yaxis_title='Values' if not unit else unit, + title_x=0.5, + title_y=0.9 + ) + else: + fig.update_layout( + xaxis_title='Timestamp', + yaxis_title='Values' if not unit else unit, + legend_title_text='Labels', + title_x=0.5, + title_y=0.9 + ) + try: + pio.write_image(fig, file_key) + object_url = save_image_to_db(file_key, task.name.value, remove_file_from_os=True) + if metric_name: + metric_name = metric_name.replace('`', '') + title = f'Fetched `{metric_expression}` for `{metric_name}` from `{metric_source}`' + else: + title = f'Fetched `{metric_expression}` from `{metric_source}`' + return InterpretationProto( + type=InterpretationProto.Type.IMAGE, + title=StringValue(value=title), + image_url=StringValue(value=object_url), + ) + except Exception as e: + logger.error(f'Error writing image: {e}') + raise e diff --git a/intelligence_layer/task_result_interpreters/task_result_interpreter_facade.py b/intelligence_layer/task_result_interpreters/task_result_interpreter_facade.py new file mode 100644 index 000000000..78a14fc7d --- /dev/null +++ b/intelligence_layer/task_result_interpreters/task_result_interpreter_facade.py @@ -0,0 +1,33 @@ +import logging + +from intelligence_layer.task_result_interpreters.metric_task_result_interpreters.basic_metric_task_interpreter import \ + basic_metric_task_result_interpreter +from protos.playbooks.intelligence_layer.interpreter_pb2 import InterpreterType, Interpretation as InterpretationProto +from protos.playbooks.playbook_pb2 import PlaybookMetricTaskExecutionResult as PlaybookMetricTaskExecutionResultProto, \ + PlaybookTaskDefinition as PlaybookTaskDefinitionProto, \ + PlaybookTaskExecutionResult as PlaybookTaskExecutionResultProto, PlaybookExecutionLog + +logger = logging.getLogger(__name__) + + +def task_result_interpret(interpreter_type: InterpreterType, task: PlaybookTaskDefinitionProto, + task_result: PlaybookTaskExecutionResultProto) -> InterpretationProto: + which_one_of = task_result.WhichOneof('result') + if which_one_of == 'metric_task_execution_result': + metric_task_result: PlaybookMetricTaskExecutionResultProto = task_result.metric_task_execution_result + if interpreter_type == InterpreterType.BASIC_I: + return basic_metric_task_result_interpreter(task, metric_task_result) + + +def playbook_execution_result_interpret(interpreter_type: InterpreterType, + playbook_execution_logs: [PlaybookExecutionLog]) -> [InterpretationProto]: + interpretations: [InterpretationProto] = [] + for log in playbook_execution_logs: + try: + interpretation_result = task_result_interpret(interpreter_type, log.task, log.task_execution_result) + if interpretation_result: + interpretations.append(interpretation_result) + except Exception as e: + logger.error(f"Failed to interpret playbook execution log with error: {e}") + continue + return interpretations diff --git a/intelligence_layer/tests.py b/intelligence_layer/tests.py new file mode 100644 index 000000000..7ce503c2d --- /dev/null +++ b/intelligence_layer/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/intelligence_layer/utils.py b/intelligence_layer/utils.py new file mode 100644 index 000000000..34ba47034 --- /dev/null +++ b/intelligence_layer/utils.py @@ -0,0 +1,8 @@ +import random + + +def generate_color_map(labels): + color_map = {} + for label in labels: + color_map[label] = f'rgb({random.randint(0, 255)},{random.randint(0, 255)},{random.randint(0, 255)})' + return color_map diff --git a/intelligence_layer/views.py b/intelligence_layer/views.py new file mode 100644 index 000000000..91ea44a21 --- /dev/null +++ b/intelligence_layer/views.py @@ -0,0 +1,3 @@ +from django.shortcuts import render + +# Create your views here. diff --git a/media/utils.py b/media/utils.py index 075c10a08..dc8cdb55e 100644 --- a/media/utils.py +++ b/media/utils.py @@ -1,16 +1,34 @@ import logging import os +import uuid from django.conf import settings +from django.core.files.base import ContentFile +from django.core.files.storage import default_storage from media.models import Image from PIL import Image as PILImage +from playbooks.utils.utils import current_milli_time from utils.uri_utils import build_absolute_uri logger = logging.getLogger(__name__) +def generate_local_image_path(image_data=None, image_name: str = None): + randon_name = str(uuid.uuid4()) + if not image_name: + image_name = f'{current_milli_time()}_{randon_name}' + '.png' + if not image_name.endswith('.png'): + image_name += '.png' + file_path = os.path.join('images', image_name) + if image_data: + file_content = ContentFile(image_data) + return default_storage.save(os.path.join(settings.MEDIA_ASSETS_ROOT, file_path), file_content) + else: + return os.path.join(settings.MEDIA_ASSETS_ROOT, file_path) + + def save_image_to_db(image_file_path, image_title: str = 'Untitled', image_description: str = None, image_metadata: dict = None, remove_file_from_os=False) -> str: try: diff --git a/playbooks/base_settings.py b/playbooks/base_settings.py index fa3bf3cd5..bb08885d4 100644 --- a/playbooks/base_settings.py +++ b/playbooks/base_settings.py @@ -44,7 +44,8 @@ 'management.apps.ManagementConfig', 'executor.apps.ExecutorConfig', 'executor.workflows.apps.WorkflowsConfig', - 'media.apps.MediaConfig' + 'media.apps.MediaConfig', + 'intelligence_layer.apps.IntelligenceLayerConfig', ] THIRD_PARTY_APPS = [ @@ -326,3 +327,4 @@ def get_cache_backend(alias='default'): MEDIA_STORAGE_LOCATION = env.str("MEDIA_STORAGE_LOCATION", default='/media/images') MEDIA_STORAGE_SITE_HTTP_PROTOCOL = env.str("MEDIA_STORAGE_SITE_HTTP_PROTOCOL", default='https') MEDIA_STORAGE_USE_SITE = env.bool("MEDIA_STORAGE_USE_SITE", default=True) +MEDIA_ASSETS_ROOT = os.path.join(BASE_DIR, 'media/assets') diff --git a/protos/playbooks/intelligence_layer/interpreter.proto b/protos/playbooks/intelligence_layer/interpreter.proto new file mode 100644 index 000000000..53b0d2232 --- /dev/null +++ b/protos/playbooks/intelligence_layer/interpreter.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; +package protos.playbooks; + +import "google/protobuf/wrappers.proto"; + +import "protos/base.proto"; +import "protos/playbooks/playbook.proto"; + +enum InterpreterType { + UNKNOWN_I = 0; + BASIC_I = 1; + STATISTICAL_I = 2; + LLM_CHAT_GPT_I = 3; +} + + + +message Interpretation { + enum Type { + UNKNOWN = 0; + IMAGE = 1; + SUMMARY = 2; + } + Type type = 1; + google.protobuf.StringValue title = 2; + google.protobuf.StringValue description = 3; + google.protobuf.StringValue summary = 4; + google.protobuf.StringValue image_url = 5; +} \ No newline at end of file diff --git a/protos/playbooks/intelligence_layer/interpreter_pb2.py b/protos/playbooks/intelligence_layer/interpreter_pb2.py new file mode 100644 index 000000000..f894e3b6a --- /dev/null +++ b/protos/playbooks/intelligence_layer/interpreter_pb2.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: protos/playbooks/intelligence_layer/interpreter.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 +from protos import base_pb2 as protos_dot_base__pb2 +from protos.playbooks import playbook_pb2 as protos_dot_playbooks_dot_playbook__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n5protos/playbooks/intelligence_layer/interpreter.proto\x12\x10protos.playbooks\x1a\x1egoogle/protobuf/wrappers.proto\x1a\x11protos/base.proto\x1a\x1fprotos/playbooks/playbook.proto\"\xb2\x02\n\x0eInterpretation\x12\x33\n\x04type\x18\x01 \x01(\x0e\x32%.protos.playbooks.Interpretation.Type\x12+\n\x05title\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12\x31\n\x0b\x64\x65scription\x18\x03 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12-\n\x07summary\x18\x04 \x01(\x0b\x32\x1c.google.protobuf.StringValue\x12/\n\timage_url\x18\x05 \x01(\x0b\x32\x1c.google.protobuf.StringValue\"+\n\x04Type\x12\x0b\n\x07UNKNOWN\x10\x00\x12\t\n\x05IMAGE\x10\x01\x12\x0b\n\x07SUMMARY\x10\x02*T\n\x0fInterpreterType\x12\r\n\tUNKNOWN_I\x10\x00\x12\x0b\n\x07\x42\x41SIC_I\x10\x01\x12\x11\n\rSTATISTICAL_I\x10\x02\x12\x12\n\x0eLLM_CHAT_GPT_I\x10\x03\x62\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protos.playbooks.intelligence_layer.interpreter_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _INTERPRETERTYPE._serialized_start=468 + _INTERPRETERTYPE._serialized_end=552 + _INTERPRETATION._serialized_start=160 + _INTERPRETATION._serialized_end=466 + _INTERPRETATION_TYPE._serialized_start=423 + _INTERPRETATION_TYPE._serialized_end=466 +# @@protoc_insertion_point(module_scope) diff --git a/protos/playbooks/intelligence_layer/interpreter_pb2.pyi b/protos/playbooks/intelligence_layer/interpreter_pb2.pyi new file mode 100644 index 000000000..6bdff64e9 --- /dev/null +++ b/protos/playbooks/intelligence_layer/interpreter_pb2.pyi @@ -0,0 +1,84 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" +import builtins +import google.protobuf.descriptor +import google.protobuf.internal.enum_type_wrapper +import google.protobuf.message +import google.protobuf.wrappers_pb2 +import sys +import typing + +if sys.version_info >= (3, 10): + import typing as typing_extensions +else: + import typing_extensions + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +class _InterpreterType: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + +class _InterpreterTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_InterpreterType.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + UNKNOWN_I: _InterpreterType.ValueType # 0 + BASIC_I: _InterpreterType.ValueType # 1 + STATISTICAL_I: _InterpreterType.ValueType # 2 + LLM_CHAT_GPT_I: _InterpreterType.ValueType # 3 + +class InterpreterType(_InterpreterType, metaclass=_InterpreterTypeEnumTypeWrapper): ... + +UNKNOWN_I: InterpreterType.ValueType # 0 +BASIC_I: InterpreterType.ValueType # 1 +STATISTICAL_I: InterpreterType.ValueType # 2 +LLM_CHAT_GPT_I: InterpreterType.ValueType # 3 +global___InterpreterType = InterpreterType + +@typing_extensions.final +class Interpretation(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _Type: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _TypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Interpretation._Type.ValueType], builtins.type): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + UNKNOWN: Interpretation._Type.ValueType # 0 + IMAGE: Interpretation._Type.ValueType # 1 + SUMMARY: Interpretation._Type.ValueType # 2 + + class Type(_Type, metaclass=_TypeEnumTypeWrapper): ... + UNKNOWN: Interpretation.Type.ValueType # 0 + IMAGE: Interpretation.Type.ValueType # 1 + SUMMARY: Interpretation.Type.ValueType # 2 + + TYPE_FIELD_NUMBER: builtins.int + TITLE_FIELD_NUMBER: builtins.int + DESCRIPTION_FIELD_NUMBER: builtins.int + SUMMARY_FIELD_NUMBER: builtins.int + IMAGE_URL_FIELD_NUMBER: builtins.int + type: global___Interpretation.Type.ValueType + @property + def title(self) -> google.protobuf.wrappers_pb2.StringValue: ... + @property + def description(self) -> google.protobuf.wrappers_pb2.StringValue: ... + @property + def summary(self) -> google.protobuf.wrappers_pb2.StringValue: ... + @property + def image_url(self) -> google.protobuf.wrappers_pb2.StringValue: ... + def __init__( + self, + *, + type: global___Interpretation.Type.ValueType = ..., + title: google.protobuf.wrappers_pb2.StringValue | None = ..., + description: google.protobuf.wrappers_pb2.StringValue | None = ..., + summary: google.protobuf.wrappers_pb2.StringValue | None = ..., + image_url: google.protobuf.wrappers_pb2.StringValue | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["description", b"description", "image_url", b"image_url", "summary", b"summary", "title", b"title"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "image_url", b"image_url", "summary", b"summary", "title", b"title", "type", b"type"]) -> None: ... + +global___Interpretation = Interpretation diff --git a/requirements.txt b/requirements.txt index 22f4a040d..91f001cb3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -49,6 +49,7 @@ grpcio-tools==1.51.1 gunicorn==22.0.0 idna==3.7 jmespath==1.0.1 +kaleido==0.2.1 kombu==5.3.7 kubernetes==29.0.0 lz4==4.3.3