Skip to content

Commit

Permalink
Merge pull request #42 from DrDroidLab/feature/add_pb_task_interpreta…
Browse files Browse the repository at this point in the history
…tion_layer

Adds base interpretation layer and links that to workflow actions
  • Loading branch information
droid-mohit authored Apr 25, 2024
2 parents 532fe07 + 0481f9f commit 58851c4
Show file tree
Hide file tree
Showing 24 changed files with 427 additions and 121 deletions.
111 changes: 1 addition & 110 deletions executor/metric_task_executor/playbook_metric_task_executor_facade.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,13 @@
import random
from typing import Dict

import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio

from executor.metric_task_executor.cloudwatch_task_executor import CloudwatchMetricTaskExecutor
from executor.metric_task_executor.datadog_task_executor import DatadogMetricTaskExecutor
from executor.metric_task_executor.newrelic_task_executor import NewRelicMetricTaskExecutor
from executor.metric_task_executor.playbook_metric_task_executor import PlaybookMetricTaskExecutor
from executor.metric_task_executor.grafana_executor import GrafanaMetricTaskExecutor
from executor.metric_task_executor.grafana_vpc_executor import GrafanaVpcMetricTaskExecutor

from protos.playbooks.playbook_pb2 import PlaybookMetricTaskDefinition as PlaybookMetricTaskDefinitionProto, \
PlaybookMetricTaskExecutionResult

metric_source_displace_name_map = {
PlaybookMetricTaskDefinitionProto.Source.CLOUDWATCH: 'Cloudwatch',
PlaybookMetricTaskDefinitionProto.Source.GRAFANA: 'Grafana',
PlaybookMetricTaskDefinitionProto.Source.GRAFANA_VPC: 'Grafana',
PlaybookMetricTaskDefinitionProto.Source.NEW_RELIC: 'New Relic',
PlaybookMetricTaskDefinitionProto.Source.DATADOG: 'Datadog'
}


def generate_color_map(labels):
color_map = {}
for label in labels:
color_map[label] = f'rgb({random.randint(0, 255)},{random.randint(0, 255)},{random.randint(0, 255)})'
return color_map


def timeseries_to_df(proto_data: PlaybookMetricTaskExecutionResult.Result.Timeseries):
data = []
for timeseries in proto_data.labeled_metric_timeseries:
legend_label = ''
for label in timeseries.metric_label_values:
legend_label += f'{label.name.value}={label.value.value}__'
legend_label = legend_label[:-2]
for datapoint in timeseries.datapoints:
data.append({
'Timestamp': pd.Timestamp(datapoint.timestamp, unit='ms').strftime('%H:%M'),
'Value': datapoint.value.value,
'Label': legend_label,
'Unit': timeseries.unit.value
})

return pd.DataFrame(data)


def table_result_to_df(table_result):
# Extracting column names from the first TableRow
column_names = [col.name.value for col in table_result.rows[0].columns]

# Initialize an empty DataFrame with column names
df = pd.DataFrame(columns=column_names)

# Extracting data from rows
for row in table_result.rows:
row_data = [col.value.value for col in row.columns]
df = df.append(pd.Series(row_data, index=column_names), ignore_index=True)

# Keep only the first 5 rows if there are more
if len(df) > 5:
df = df.head(5)

return df


def publish_metric_task_execution_result(file_key, result):
result_type = result.type
if result_type == PlaybookMetricTaskExecutionResult.Result.Type.TIMESERIES:
timeseries = result.timeseries
df = timeseries_to_df(timeseries)
unique_labels = df['Label'].unique()
color_map = generate_color_map(unique_labels)
fig = go.Figure()
unit = df['Unit'].iloc[0] if 'Unit' in df and df['Unit'].iloc[0] else ''
for label, data in df.groupby('Label'):
fig.add_trace(go.Scatter(x=data['Timestamp'], y=data['Value'], mode='lines', name=label,
line=dict(color=color_map[label])))
if unique_labels is None or (len(unique_labels) == 1 and '' in unique_labels):
fig.update_layout(
xaxis_title='Timestamp',
yaxis_title='Values' if not unit else unit,
title_x=0.5,
title_y=0.9
)
else:
fig.update_layout(
xaxis_title='Timestamp',
yaxis_title='Values' if not unit else unit,
legend_title_text='Labels',
title_x=0.5,
title_y=0.9
)

pio.write_image(fig, file_key)
return True
elif result_type == PlaybookMetricTaskExecutionResult.Result.Type.TABLE_RESULT:
# table_result = result.table_result
# df = table_result_to_df(table_result)
# fig = go.Figure(data=[go.Table(
# header=dict(values=df.columns,
# fill_color='paleturquoise',
# align='left'),
# cells=dict(values=[df[col] for col in df.columns],
# fill_color='lavender',
# align='left'))
# ])
#
# # Update layout for better visualization
# fig.update_layout(title='Table Result' if not expression else expression, title_x=0.5, title_y=0.9)
#
# # Exporting the plot to an image using plotly
# pio.write_image(fig, file_key)
return False
return False
from protos.playbooks.playbook_pb2 import PlaybookMetricTaskDefinition as PlaybookMetricTaskDefinitionProto


class PlaybookMetricTaskExecutorFacade:
Expand Down
3 changes: 2 additions & 1 deletion executor/workflows/action/action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from accounts.models import Account
from executor.workflows.action.notify_action_executor.notify_facade import notifier_facade
from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto
from protos.playbooks.workflow_pb2 import WorkflowAction as WorkflowActionProto

logger = logging.getLogger(__name__)


def action_executor(account: Account, action: WorkflowActionProto, execution_output):
def action_executor(account: Account, action: WorkflowActionProto, execution_output: [InterpretationProto]):
if action.type == WorkflowActionProto.Type.NOTIFY:
notifier_facade.notify(account, action.notification_config, execution_output)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class Notifier:
type: WorkflowActionNotificationConfigProto.type = WorkflowActionNotificationConfigProto.Type.UNKNOWN
type: WorkflowActionNotificationConfigProto.Type = WorkflowActionNotificationConfigProto.Type.UNKNOWN
account: Account = None

def notify(self, config: WorkflowActionNotificationConfigProto, execution_output) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from accounts.models import Account
from executor.workflows.action.notify_action_executor.notifier import Notifier
from executor.workflows.entry_point.alert_entry_point.slack_channel_alert_entry_point import SlackChannelAlertEntryPoint
from executor.workflows.action.notify_action_executor.slack_notifier import SlackNotifier
from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto
from protos.playbooks.workflow_pb2 import WorkflowActionNotificationConfig as WorkflowActionNotificationConfigProto

logger = logging.getLogger(__name__)
Expand All @@ -16,14 +17,15 @@ def __init__(self):
def register(self, notification_type: WorkflowActionNotificationConfigProto.Type, notifier: Notifier.__class__):
self._map[notification_type] = notifier

def notify(self, account: Account, config: WorkflowActionNotificationConfigProto, execution_output) -> bool:
def notify(self, account: Account, config: WorkflowActionNotificationConfigProto,
execution_output: [InterpretationProto]) -> bool:
if not account or not config or not execution_output:
return False
if config.type not in self._map:
raise ValueError(f'Notification type {config.type} is not supported')
notifier = self._map.get(config.type)(account)
notifier = self._map[config.type](account)
return notifier.notify(config, execution_output)


notifier_facade = NotifierFacade()
notifier_facade.register(WorkflowActionNotificationConfigProto.Type.SLACK, SlackChannelAlertEntryPoint)
notifier_facade.register(WorkflowActionNotificationConfigProto.Type.SLACK, SlackNotifier)
28 changes: 26 additions & 2 deletions executor/workflows/action/notify_action_executor/slack_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from executor.workflows.action.notify_action_executor.notifier import Notifier
from integrations_api_processors.slack_api_processor import SlackApiProcessor
from protos.connectors.connector_pb2 import ConnectorType, ConnectorKey
from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto
from protos.playbooks.workflow_pb2 import WorkflowActionNotificationConfig as WorkflowActionNotificationConfigProto, \
WorkflowActionSlackNotificationConfig as WorkflowActionSlackNotificationConfigProto

Expand All @@ -29,13 +30,36 @@ def __init__(self, account: Account):
slack_bot_auth_token = slack_bot_auth_token_keys.first()
self.slack_api_processor = SlackApiProcessor(slack_bot_auth_token.value)

def notify(self, config: WorkflowActionNotificationConfigProto, execution_output):
def notify(self, config: WorkflowActionNotificationConfigProto, execution_output: [InterpretationProto]):
slack_config: WorkflowActionSlackNotificationConfigProto = config.slack_config
channel_id = slack_config.slack_channel_id.value
if not channel_id:
raise ValueError('Slack channel id is not configured in the notification config')
logger.info(f"Sending slack message to channel {channel_id} for account {self.account.id}")
message_params = {'text_message': execution_output, 'channel_id': channel_id}
blocks = []
for i, interpretation in enumerate(execution_output):
if interpretation.type == InterpretationProto.Type.SUMMARY:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f'Step {i + 1}: {interpretation.title.value}'
}
})
elif interpretation.type == InterpretationProto.Type.IMAGE:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f'Step {i + 1}: {interpretation.title.value}'
}
})
blocks.append({
"type": "image",
"image_url": interpretation.image_url.value,
"alt_text": 'metric evaluation'
})
message_params = {'blocks': blocks, 'channel_id': channel_id}
if slack_config.message_type == WorkflowActionSlackNotificationConfigProto.MessageType.THREAD_REPLY:
message_params['reply_to'] = slack_config.thread_ts.value
try:
Expand Down
58 changes: 56 additions & 2 deletions executor/workflows/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
from celery import shared_task
from django.conf import settings

from executor.crud.playbook_execution_crud import create_playbook_execution
from accounts.models import Account
from executor.crud.playbook_execution_crud import create_playbook_execution, get_db_playbook_execution
from executor.tasks import execute_playbook
from executor.workflows.action.action_executor import action_executor
from executor.workflows.crud.workflow_execution_crud import update_db_account_workflow_execution_status, \
get_db_workflow_execution_logs, get_workflow_executions, create_workflow_execution_log, \
update_db_account_workflow_execution_count_increment
from executor.workflows.crud.workflows_crud import get_db_workflows
from intelligence_layer.task_result_interpreters.task_result_interpreter_facade import \
playbook_execution_result_interpret
from management.crud.task_crud import get_or_create_task
from management.models import TaskRun, PeriodicTaskStatus
from management.utils.celery_task_signal_utils import publish_pre_run_task, publish_task_failure, publish_post_run_task
from playbooks.utils.utils import current_datetime
from protos.base_pb2 import TimeRange
from protos.playbooks.workflow_pb2 import WorkflowExecutionStatusType
from protos.playbooks.intelligence_layer.interpreter_pb2 import InterpreterType, Interpretation as InterpretationProto
from protos.playbooks.playbook_pb2 import PlaybookExecution as PlaybookExecutionProto
from protos.playbooks.workflow_pb2 import WorkflowExecutionStatusType, Workflow as WorkflowProto
from utils.proto_utils import dict_to_proto

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -99,16 +106,63 @@ def workflow_scheduler():

@shared_task(max_retries=3, default_retry_delay=10)
def workflow_executor(account_id, workflow_id, workflow_execution_id, playbook_id, playbook_execution_id, time_range):
current_time = current_datetime().timestamp()
logger.info(f"Running workflow execution:: account_id: {account_id}, workflow_execution_id: "
f"{workflow_execution_id}, playbook_execution_id: {playbook_execution_id}")
try:
create_workflow_execution_log(account_id, workflow_id, workflow_execution_id, playbook_execution_id)
execute_playbook(account_id, playbook_id, playbook_execution_id, time_range)
try:
saved_task = get_or_create_task(workflow_action_execution.__name__, account_id, workflow_id,
workflow_execution_id, playbook_execution_id)
if not saved_task:
logger.error(f"Failed to create workflow action execution task for account: {account_id}, workflow_id: "
f"{workflow_id}, workflow_execution_id: {workflow_execution_id}, playbook_id: "
f"{playbook_id}")
return
task = workflow_action_execution.delay(account_id, workflow_id, workflow_execution_id,
playbook_execution_id)
task_run = TaskRun.objects.create(task=saved_task, task_uuid=task.id,
status=PeriodicTaskStatus.SCHEDULED,
account_id=account_id,
scheduled_at=datetime.fromtimestamp(float(current_time)))
except Exception as e:
logger.error(
f"Failed to create workflow action execution:: workflow_id: {workflow_id}, workflow_execution_id: "
f"{workflow_execution_id} playbook_id: {playbook_id}, error: {e}")
except Exception as exc:
logger.error(f"Error occurred while running workflow execution: {exc}")
raise exc


@shared_task(max_retries=3, default_retry_delay=10)
def workflow_action_execution(account_id, workflow_id, workflow_execution_id, playbook_execution_id):
logger.info(f"Running workflow action execution:: account_id: {account_id}, workflow_execution_id: "
f"{workflow_execution_id}, playbook_execution_id: {playbook_execution_id}")
account = Account.objects.get(id=account_id)
try:
playbook_executions = get_db_playbook_execution(account, playbook_execution_id=playbook_execution_id)
workflows = get_db_workflows(account, workflow_id=workflow_id)
if not playbook_executions:
logger.error(f"Aborting workflow action execution as playbook execution not found for "
f"account_id: {account_id}, playbook_execution_id: {playbook_execution_id}")
if not workflows:
logger.error(f"Aborting workflow action execution as workflow not found for "
f"account_id: {account_id}, workflow_id: {workflow_id}")
playbook_execution = playbook_executions.first()
pe_proto: PlaybookExecutionProto = playbook_execution.proto
pe_logs = pe_proto.logs
execution_output: [InterpretationProto] = playbook_execution_result_interpret(InterpreterType.BASIC_I, pe_logs)
workflow = workflows.first()
w_proto: WorkflowProto = workflow.proto
w_actions = w_proto.actions
for w_action in w_actions:
action_executor(account, w_action, execution_output)
except Exception as exc:
logger.error(f"Error occurred while running workflow action execution: {exc}")
raise exc


workflow_executor_prerun_notifier = publish_pre_run_task(workflow_executor)
workflow_executor_failure_notifier = publish_task_failure(workflow_executor)
workflow_executor_postrun_notifier = publish_post_run_task(workflow_executor)
Empty file added intelligence_layer/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions intelligence_layer/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.contrib import admin

# Register your models here.
6 changes: 6 additions & 0 deletions intelligence_layer/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class IntelligenceLayerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'intelligence_layer'
Empty file.
3 changes: 3 additions & 0 deletions intelligence_layer/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.db import models

# Create your models here.
Empty file.
Empty file.
Loading

0 comments on commit 58851c4

Please sign in to comment.