Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds base interpretation layer and links that to workflow actions #42

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 1 addition & 110 deletions executor/metric_task_executor/playbook_metric_task_executor_facade.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,13 @@
import random
from typing import Dict

import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio

from executor.metric_task_executor.cloudwatch_task_executor import CloudwatchMetricTaskExecutor
from executor.metric_task_executor.datadog_task_executor import DatadogMetricTaskExecutor
from executor.metric_task_executor.newrelic_task_executor import NewRelicMetricTaskExecutor
from executor.metric_task_executor.playbook_metric_task_executor import PlaybookMetricTaskExecutor
from executor.metric_task_executor.grafana_executor import GrafanaMetricTaskExecutor
from executor.metric_task_executor.grafana_vpc_executor import GrafanaVpcMetricTaskExecutor

from protos.playbooks.playbook_pb2 import PlaybookMetricTaskDefinition as PlaybookMetricTaskDefinitionProto, \
PlaybookMetricTaskExecutionResult

metric_source_displace_name_map = {
PlaybookMetricTaskDefinitionProto.Source.CLOUDWATCH: 'Cloudwatch',
PlaybookMetricTaskDefinitionProto.Source.GRAFANA: 'Grafana',
PlaybookMetricTaskDefinitionProto.Source.GRAFANA_VPC: 'Grafana',
PlaybookMetricTaskDefinitionProto.Source.NEW_RELIC: 'New Relic',
PlaybookMetricTaskDefinitionProto.Source.DATADOG: 'Datadog'
}


def generate_color_map(labels):
color_map = {}
for label in labels:
color_map[label] = f'rgb({random.randint(0, 255)},{random.randint(0, 255)},{random.randint(0, 255)})'
return color_map


def timeseries_to_df(proto_data: PlaybookMetricTaskExecutionResult.Result.Timeseries):
data = []
for timeseries in proto_data.labeled_metric_timeseries:
legend_label = ''
for label in timeseries.metric_label_values:
legend_label += f'{label.name.value}={label.value.value}__'
legend_label = legend_label[:-2]
for datapoint in timeseries.datapoints:
data.append({
'Timestamp': pd.Timestamp(datapoint.timestamp, unit='ms').strftime('%H:%M'),
'Value': datapoint.value.value,
'Label': legend_label,
'Unit': timeseries.unit.value
})

return pd.DataFrame(data)


def table_result_to_df(table_result):
# Extracting column names from the first TableRow
column_names = [col.name.value for col in table_result.rows[0].columns]

# Initialize an empty DataFrame with column names
df = pd.DataFrame(columns=column_names)

# Extracting data from rows
for row in table_result.rows:
row_data = [col.value.value for col in row.columns]
df = df.append(pd.Series(row_data, index=column_names), ignore_index=True)

# Keep only the first 5 rows if there are more
if len(df) > 5:
df = df.head(5)

return df


def publish_metric_task_execution_result(file_key, result):
result_type = result.type
if result_type == PlaybookMetricTaskExecutionResult.Result.Type.TIMESERIES:
timeseries = result.timeseries
df = timeseries_to_df(timeseries)
unique_labels = df['Label'].unique()
color_map = generate_color_map(unique_labels)
fig = go.Figure()
unit = df['Unit'].iloc[0] if 'Unit' in df and df['Unit'].iloc[0] else ''
for label, data in df.groupby('Label'):
fig.add_trace(go.Scatter(x=data['Timestamp'], y=data['Value'], mode='lines', name=label,
line=dict(color=color_map[label])))
if unique_labels is None or (len(unique_labels) == 1 and '' in unique_labels):
fig.update_layout(
xaxis_title='Timestamp',
yaxis_title='Values' if not unit else unit,
title_x=0.5,
title_y=0.9
)
else:
fig.update_layout(
xaxis_title='Timestamp',
yaxis_title='Values' if not unit else unit,
legend_title_text='Labels',
title_x=0.5,
title_y=0.9
)

pio.write_image(fig, file_key)
return True
elif result_type == PlaybookMetricTaskExecutionResult.Result.Type.TABLE_RESULT:
# table_result = result.table_result
# df = table_result_to_df(table_result)
# fig = go.Figure(data=[go.Table(
# header=dict(values=df.columns,
# fill_color='paleturquoise',
# align='left'),
# cells=dict(values=[df[col] for col in df.columns],
# fill_color='lavender',
# align='left'))
# ])
#
# # Update layout for better visualization
# fig.update_layout(title='Table Result' if not expression else expression, title_x=0.5, title_y=0.9)
#
# # Exporting the plot to an image using plotly
# pio.write_image(fig, file_key)
return False
return False
from protos.playbooks.playbook_pb2 import PlaybookMetricTaskDefinition as PlaybookMetricTaskDefinitionProto


class PlaybookMetricTaskExecutorFacade:
Expand Down
3 changes: 2 additions & 1 deletion executor/workflows/action/action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from accounts.models import Account
from executor.workflows.action.notify_action_executor.notify_facade import notifier_facade
from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto
from protos.playbooks.workflow_pb2 import WorkflowAction as WorkflowActionProto

logger = logging.getLogger(__name__)


def action_executor(account: Account, action: WorkflowActionProto, execution_output):
def action_executor(account: Account, action: WorkflowActionProto, execution_output: [InterpretationProto]):
if action.type == WorkflowActionProto.Type.NOTIFY:
notifier_facade.notify(account, action.notification_config, execution_output)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class Notifier:
type: WorkflowActionNotificationConfigProto.type = WorkflowActionNotificationConfigProto.Type.UNKNOWN
type: WorkflowActionNotificationConfigProto.Type = WorkflowActionNotificationConfigProto.Type.UNKNOWN
account: Account = None

def notify(self, config: WorkflowActionNotificationConfigProto, execution_output) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from accounts.models import Account
from executor.workflows.action.notify_action_executor.notifier import Notifier
from executor.workflows.entry_point.alert_entry_point.slack_channel_alert_entry_point import SlackChannelAlertEntryPoint
from executor.workflows.action.notify_action_executor.slack_notifier import SlackNotifier
from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto
from protos.playbooks.workflow_pb2 import WorkflowActionNotificationConfig as WorkflowActionNotificationConfigProto

logger = logging.getLogger(__name__)
Expand All @@ -16,14 +17,15 @@ def __init__(self):
def register(self, notification_type: WorkflowActionNotificationConfigProto.Type, notifier: Notifier.__class__):
self._map[notification_type] = notifier

def notify(self, account: Account, config: WorkflowActionNotificationConfigProto, execution_output) -> bool:
def notify(self, account: Account, config: WorkflowActionNotificationConfigProto,
execution_output: [InterpretationProto]) -> bool:
if not account or not config or not execution_output:
return False
if config.type not in self._map:
raise ValueError(f'Notification type {config.type} is not supported')
notifier = self._map.get(config.type)(account)
notifier = self._map[config.type](account)
return notifier.notify(config, execution_output)


notifier_facade = NotifierFacade()
notifier_facade.register(WorkflowActionNotificationConfigProto.Type.SLACK, SlackChannelAlertEntryPoint)
notifier_facade.register(WorkflowActionNotificationConfigProto.Type.SLACK, SlackNotifier)
28 changes: 26 additions & 2 deletions executor/workflows/action/notify_action_executor/slack_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from executor.workflows.action.notify_action_executor.notifier import Notifier
from integrations_api_processors.slack_api_processor import SlackApiProcessor
from protos.connectors.connector_pb2 import ConnectorType, ConnectorKey
from protos.playbooks.intelligence_layer.interpreter_pb2 import Interpretation as InterpretationProto
from protos.playbooks.workflow_pb2 import WorkflowActionNotificationConfig as WorkflowActionNotificationConfigProto, \
WorkflowActionSlackNotificationConfig as WorkflowActionSlackNotificationConfigProto

Expand All @@ -29,13 +30,36 @@ def __init__(self, account: Account):
slack_bot_auth_token = slack_bot_auth_token_keys.first()
self.slack_api_processor = SlackApiProcessor(slack_bot_auth_token.value)

def notify(self, config: WorkflowActionNotificationConfigProto, execution_output):
def notify(self, config: WorkflowActionNotificationConfigProto, execution_output: [InterpretationProto]):
slack_config: WorkflowActionSlackNotificationConfigProto = config.slack_config
channel_id = slack_config.slack_channel_id.value
if not channel_id:
raise ValueError('Slack channel id is not configured in the notification config')
logger.info(f"Sending slack message to channel {channel_id} for account {self.account.id}")
message_params = {'text_message': execution_output, 'channel_id': channel_id}
blocks = []
for i, interpretation in enumerate(execution_output):
if interpretation.type == InterpretationProto.Type.SUMMARY:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f'Step {i + 1}: {interpretation.title.value}'
}
})
elif interpretation.type == InterpretationProto.Type.IMAGE:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f'Step {i + 1}: {interpretation.title.value}'
}
})
blocks.append({
"type": "image",
"image_url": interpretation.image_url.value,
"alt_text": 'metric evaluation'
})
message_params = {'blocks': blocks, 'channel_id': channel_id}
if slack_config.message_type == WorkflowActionSlackNotificationConfigProto.MessageType.THREAD_REPLY:
message_params['reply_to'] = slack_config.thread_ts.value
try:
Expand Down
58 changes: 56 additions & 2 deletions executor/workflows/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
from celery import shared_task
from django.conf import settings

from executor.crud.playbook_execution_crud import create_playbook_execution
from accounts.models import Account
from executor.crud.playbook_execution_crud import create_playbook_execution, get_db_playbook_execution
from executor.tasks import execute_playbook
from executor.workflows.action.action_executor import action_executor
from executor.workflows.crud.workflow_execution_crud import update_db_account_workflow_execution_status, \
get_db_workflow_execution_logs, get_workflow_executions, create_workflow_execution_log, \
update_db_account_workflow_execution_count_increment
from executor.workflows.crud.workflows_crud import get_db_workflows
from intelligence_layer.task_result_interpreters.task_result_interpreter_facade import \
playbook_execution_result_interpret
from management.crud.task_crud import get_or_create_task
from management.models import TaskRun, PeriodicTaskStatus
from management.utils.celery_task_signal_utils import publish_pre_run_task, publish_task_failure, publish_post_run_task
from playbooks.utils.utils import current_datetime
from protos.base_pb2 import TimeRange
from protos.playbooks.workflow_pb2 import WorkflowExecutionStatusType
from protos.playbooks.intelligence_layer.interpreter_pb2 import InterpreterType, Interpretation as InterpretationProto
from protos.playbooks.playbook_pb2 import PlaybookExecution as PlaybookExecutionProto
from protos.playbooks.workflow_pb2 import WorkflowExecutionStatusType, Workflow as WorkflowProto
from utils.proto_utils import dict_to_proto

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -99,16 +106,63 @@ def workflow_scheduler():

@shared_task(max_retries=3, default_retry_delay=10)
def workflow_executor(account_id, workflow_id, workflow_execution_id, playbook_id, playbook_execution_id, time_range):
current_time = current_datetime().timestamp()
logger.info(f"Running workflow execution:: account_id: {account_id}, workflow_execution_id: "
f"{workflow_execution_id}, playbook_execution_id: {playbook_execution_id}")
try:
create_workflow_execution_log(account_id, workflow_id, workflow_execution_id, playbook_execution_id)
execute_playbook(account_id, playbook_id, playbook_execution_id, time_range)
try:
saved_task = get_or_create_task(workflow_action_execution.__name__, account_id, workflow_id,
workflow_execution_id, playbook_execution_id)
if not saved_task:
logger.error(f"Failed to create workflow action execution task for account: {account_id}, workflow_id: "
f"{workflow_id}, workflow_execution_id: {workflow_execution_id}, playbook_id: "
f"{playbook_id}")
return
task = workflow_action_execution.delay(account_id, workflow_id, workflow_execution_id,
playbook_execution_id)
task_run = TaskRun.objects.create(task=saved_task, task_uuid=task.id,
status=PeriodicTaskStatus.SCHEDULED,
account_id=account_id,
scheduled_at=datetime.fromtimestamp(float(current_time)))
except Exception as e:
logger.error(
f"Failed to create workflow action execution:: workflow_id: {workflow_id}, workflow_execution_id: "
f"{workflow_execution_id} playbook_id: {playbook_id}, error: {e}")
except Exception as exc:
logger.error(f"Error occurred while running workflow execution: {exc}")
raise exc


@shared_task(max_retries=3, default_retry_delay=10)
def workflow_action_execution(account_id, workflow_id, workflow_execution_id, playbook_execution_id):
logger.info(f"Running workflow action execution:: account_id: {account_id}, workflow_execution_id: "
f"{workflow_execution_id}, playbook_execution_id: {playbook_execution_id}")
account = Account.objects.get(id=account_id)
try:
playbook_executions = get_db_playbook_execution(account, playbook_execution_id=playbook_execution_id)
workflows = get_db_workflows(account, workflow_id=workflow_id)
if not playbook_executions:
logger.error(f"Aborting workflow action execution as playbook execution not found for "
f"account_id: {account_id}, playbook_execution_id: {playbook_execution_id}")
if not workflows:
logger.error(f"Aborting workflow action execution as workflow not found for "
f"account_id: {account_id}, workflow_id: {workflow_id}")
playbook_execution = playbook_executions.first()
pe_proto: PlaybookExecutionProto = playbook_execution.proto
pe_logs = pe_proto.logs
execution_output: [InterpretationProto] = playbook_execution_result_interpret(InterpreterType.BASIC_I, pe_logs)
workflow = workflows.first()
w_proto: WorkflowProto = workflow.proto
w_actions = w_proto.actions
for w_action in w_actions:
action_executor(account, w_action, execution_output)
except Exception as exc:
logger.error(f"Error occurred while running workflow action execution: {exc}")
raise exc


workflow_executor_prerun_notifier = publish_pre_run_task(workflow_executor)
workflow_executor_failure_notifier = publish_task_failure(workflow_executor)
workflow_executor_postrun_notifier = publish_post_run_task(workflow_executor)
Empty file added intelligence_layer/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions intelligence_layer/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.contrib import admin

# Register your models here.
6 changes: 6 additions & 0 deletions intelligence_layer/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class IntelligenceLayerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'intelligence_layer'
Empty file.
3 changes: 3 additions & 0 deletions intelligence_layer/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.db import models

# Create your models here.
Empty file.
Loading
Loading