Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflows Improvements #43

Merged
merged 2 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docker/playbooks.docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,28 @@ services:
networks:
- mynetwork

workflow_action_execution_celery_worker:
build:
context: ..
dockerfile: Dockerfile
image: playbooks
command: ./start-celery-worker.sh
environment:
- "DJANGO_DEBUG=True"
- "POSTGRES_HOST=db"
- "CELERY_BROKER_URL=redis://redis:6379/0"
- "CELERY_RESULT_BACKEND=redis://redis:6379/0"
- "REDIS_URL=redis://redis:6379/0"
- "POSTGRES_USER=user"
- "POSTGRES_PASSWORD=pass"
- "CELERY_QUEUE=workflow_action_execution"
depends_on:
- db
- redis
- setup_db
networks:
- mynetwork

celery_beat:
build:
context: ..
Expand Down
4 changes: 3 additions & 1 deletion executor/crud/playbooks_update_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ def update_playbook(elem: PlayBook, update_op: UpdatePlaybookOp.UpdatePlaybook)
for task in tasks:
task.is_active = False
task.save(update_fields=['is_active'])
elem.is_active = False
elem.save(update_fields=['is_active'])
updated_playbook = update_op.playbook
updated_elem, err = create_db_playbook(elem.account, elem.created_by, updated_playbook)
if err:
raise Exception(err)
return updated_elem
except Exception as ex:
logger.exception(f"Error occurred updating playbook for {elem.name}")
raise Exception(f"Error occurred updating playbook status for {elem.name}")
return elem

@staticmethod
def update_playbook_alert_ops_trigger_status(elem: PlayBook,
Expand Down
8 changes: 5 additions & 3 deletions executor/workflows/crud/workflows_update_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ def update_workflow(elem: Workflow, update_op: UpdateWorkflowOp.UpdateWorkflow)
for action_mapping in all_workflow_actions_mapping:
action_mapping.is_active = False
action_mapping.save(update_fields=['is_active'])
elem.is_active = False
elem.save(update_fields=['is_active'])
updated_workflow = update_op.workflow
updated_elem, err = create_db_workflow(elem.account, elem.created_by, updated_workflow)
if err:
raise Exception(err)
return updated_elem
except Exception as ex:
logger.exception(f"Error occurred updating workflow for {elem.name}")
raise Exception(f"Error occurred updating workflow status for {elem.name}")
return elem
logger.exception(f"Error occurred updating workflow for {elem.name}, {str(ex)}")
raise Exception(f"Error occurred updating workflow status for {elem.name}, {str(ex)}")

@staticmethod
def update_workflow_entry_point_status(elem: Workflow,
Expand Down
11 changes: 8 additions & 3 deletions executor/workflows/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ def workflow_executor(account_id, workflow_id, workflow_execution_id, playbook_i
raise exc


workflow_executor_prerun_notifier = publish_pre_run_task(workflow_executor)
workflow_executor_failure_notifier = publish_task_failure(workflow_executor)
workflow_executor_postrun_notifier = publish_post_run_task(workflow_executor)


@shared_task(max_retries=3, default_retry_delay=10)
def workflow_action_execution(account_id, workflow_id, workflow_execution_id, playbook_execution_id):
logger.info(f"Running workflow action execution:: account_id: {account_id}, workflow_execution_id: "
Expand Down Expand Up @@ -163,6 +168,6 @@ def workflow_action_execution(account_id, workflow_id, workflow_execution_id, pl
raise exc


workflow_executor_prerun_notifier = publish_pre_run_task(workflow_executor)
workflow_executor_failure_notifier = publish_task_failure(workflow_executor)
workflow_executor_postrun_notifier = publish_post_run_task(workflow_executor)
workflow_action_execution_prerun_notifier = publish_pre_run_task(workflow_action_execution)
workflow_action_execution_failure_notifier = publish_task_failure(workflow_action_execution)
workflow_action_execution_postrun_notifier = publish_post_run_task(workflow_action_execution)
1 change: 1 addition & 0 deletions playbooks/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
app.conf.update(task_routes={
'executor.workflows.tasks.workflow_scheduler': {'queue': 'workflow_scheduler'},
'executor.workflows.tasks.workflow_executor': {'queue': 'workflow_executor'},
'executor.workflows.tasks.workflow_action_execution': {'queue': 'workflow_action_execution'},
})

app.conf.beat_schedule = {
Expand Down
Loading