Skip to content

Commit

Permalink
adds cron rule parser and scheduling logic for workflow executions
Browse files Browse the repository at this point in the history
  • Loading branch information
droid-mohit committed Apr 24, 2024
1 parent 1b2f6a7 commit 71f1cf5
Show file tree
Hide file tree
Showing 18 changed files with 307 additions and 122 deletions.
15 changes: 10 additions & 5 deletions executor/workflows/crud/workflow_execution_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ def get_db_workflow_executions(account: Account, workflow_execution_id=None, wor
if status:
filters['status'] = status
try:
return account.workflowexecution_set.filter(**filters)
db_we = account.workflowexecution_set.all()
db_we = db_we.order_by('workflow_run_id', 'scheduled_at')
if filters:
db_we = db_we.filter(**filters)
return db_we
except Exception as e:
logger.error(f"Failed to get workflow execution for account_id: {account.id}, with error: {str(e)}")
return None
Expand Down Expand Up @@ -52,6 +56,7 @@ def get_workflow_executions(account_id=None, workflow_execution_id=None, workflo
all_we = all_we.select_related('account')
if filters:
all_we = all_we.filter(**filters)
all_we = all_we.order_by('scheduled_at')
return all_we
except Exception as e:
logger.error(f"Failed to get workflow execution with error: {str(e)}")
Expand Down Expand Up @@ -89,10 +94,10 @@ def create_workflow_execution(account: Account, time_range: TimeRange, workflow_
raise e


def update_db_account_workflow_execution_status(account: Account, workflow_run_id: int,
def update_db_account_workflow_execution_status(account: Account, workflow_execution_id: int, scheduled_at,
status: WorkflowExecutionStatusType):
try:
workflow_execution = account.workflowexecution_set.get(id=workflow_run_id)
workflow_execution = account.workflowexecution_set.get(id=workflow_execution_id, scheduled_at=scheduled_at)
workflow_execution.status = status
update_fields = ['status']
if status == WorkflowExecutionStatusType.WORKFLOW_RUNNING:
Expand All @@ -106,10 +111,10 @@ def update_db_account_workflow_execution_status(account: Account, workflow_run_i
return True
except WorkflowExecution.DoesNotExist:
logger.error(f"Failed to get workflow execution for account_id: {account.id}, "
f"workflow_run_id: {workflow_run_id}")
f"workflow_run_id: {workflow_execution_id}")
except Exception as e:
logger.error(f"Failed to update workflow execution status for account_id: {account.id}, "
f"workflow_run_id: {workflow_run_id}, error: {e}")
f"workflow_run_id: {workflow_execution_id}, error: {e}")
return False


Expand Down
16 changes: 8 additions & 8 deletions executor/workflows/crud/workflows_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ def create_db_workflow(account: Account, created_by, workflow_proto: WorkflowPro
for ap in wf_action_protos:
ap_type = ap.type
action = proto_to_dict(ap)
saved_ep, _ = WorkflowAction.objects.get_or_create(account=account,
type=ap_type,
action=action,
created_by=created_by,
defaults={
'is_active': True,
})
WorkflowActionMapping.objects.create(account=account, workflow=db_workflow, action=saved_ep,
saved_a, _ = WorkflowAction.objects.get_or_create(account=account,
type=ap_type,
action=action,
created_by=created_by,
defaults={
'is_active': True,
})
WorkflowActionMapping.objects.create(account=account, workflow=db_workflow, action=saved_a,
is_active=True)

for pb in db_playbooks:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.1.4 on 2024-04-24 07:59

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('accounts', '0002_userinvitation'),
('workflows', '0003_workflowexecution_total_executions'),
]

operations = [
migrations.AlterUniqueTogether(
name='workflowexecution',
unique_together={('account', 'workflow_run_id', 'scheduled_at')},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.1.4 on 2024-04-24 08:41

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('workflows', '0004_alter_workflowexecution_unique_together'),
]

operations = [
migrations.AlterField(
model_name='workflowexecution',
name='interval',
field=models.IntegerField(blank=True, db_index=True, null=True),
),
]
10 changes: 5 additions & 5 deletions executor/workflows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class Meta:
unique_together = [['account', 'type', 'action_md5', 'created_by']]

def save(self, **kwargs):
if self.action_md5:
self.action_md5_md5 = md5(str(self.action).encode('utf-8')).hexdigest()
if self.action:
self.action_md5 = md5(str(self.action).encode('utf-8')).hexdigest()
super().save(**kwargs)

@property
Expand Down Expand Up @@ -197,7 +197,7 @@ class WorkflowExecution(models.Model):

scheduled_at = models.DateTimeField(db_index=True)
expiry_at = models.DateTimeField(blank=True, null=True, db_index=True)
interval = models.IntegerField(db_index=True, default=60)
interval = models.IntegerField(null=True, blank=True, db_index=True)
total_executions = models.IntegerField(default=0)

created_at = models.DateTimeField(auto_now_add=True, db_index=True)
Expand All @@ -208,7 +208,7 @@ class WorkflowExecution(models.Model):
created_by = models.TextField(null=True, blank=True)

class Meta:
unique_together = [['account', 'workflow_run_id']]
unique_together = [['account', 'workflow_run_id', 'scheduled_at']]

@property
def proto(self) -> WorkflowExecutionProto:
Expand Down Expand Up @@ -239,7 +239,7 @@ def proto_partial(self) -> WorkflowExecutionProto:
status=self.status,
scheduled_at=int(self.scheduled_at.replace(tzinfo=timezone.utc).timestamp()),
expiry_at=int(self.expiry_at.replace(tzinfo=timezone.utc).timestamp()) if self.expiry_at else 0,
interval=UInt64Value(value=self.interval),
interval=UInt64Value(value=self.interval) if self.interval else None,
total_executions=UInt64Value(value=self.total_executions),
created_at=int(self.created_at.replace(tzinfo=timezone.utc).timestamp()),
started_at=int(self.started_at.replace(tzinfo=timezone.utc).timestamp()) if self.started_at else 0,
Expand Down
25 changes: 16 additions & 9 deletions executor/workflows/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import timedelta, datetime

from celery import shared_task
from django.conf import settings

from executor.crud.playbook_execution_crud import create_playbook_execution
from executor.tasks import execute_playbook
Expand Down Expand Up @@ -31,31 +32,31 @@ def workflow_scheduler():
if wf_execution.status == WorkflowExecutionStatusType.WORKFLOW_CANCELLED:
logger.info(
f"Workflow execution cancelled for workflow_execution_id: {wf_execution.id} at {current_time}")
return True
continue

scheduled_at = wf_execution.scheduled_at
if current_time_utc < scheduled_at:
logger.info(f"Workflow execution not scheduled yet for workflow_execution_id: {wf_execution.id}")
return True
continue

expiry_at = wf_execution.expiry_at
if current_time_utc >= expiry_at:
interval = wf_execution.interval
if current_time_utc > expiry_at + timedelta(seconds=int(settings.WORKFLOW_SCHEDULER_INTERVAL)):
logger.info(f"Workflow execution expired for workflow_execution_id: {wf_execution.id}")
update_db_account_workflow_execution_status(account, wf_execution.id,
update_db_account_workflow_execution_status(account, wf_execution.id, scheduled_at,
WorkflowExecutionStatusType.WORKFLOW_FINISHED)
return True
continue
wf_execution_logs = get_db_workflow_execution_logs(account, wf_execution.id)
if wf_execution_logs:
interval = wf_execution.interval
if interval and wf_execution_logs:
latest_wf_execution_log = wf_execution_logs.first()
next_schedule = latest_wf_execution_log.created_at + timedelta(seconds=interval)
if current_time_utc < next_schedule:
logger.info(f"Workflow execution already scheduled for workflow_execution_id: {wf_execution.id}")
return True
continue

update_db_account_workflow_execution_count_increment(account, wf_execution.id)
if wf_execution.status == WorkflowExecutionStatusType.WORKFLOW_SCHEDULED:
update_db_account_workflow_execution_status(account, wf_execution.id,
update_db_account_workflow_execution_status(account, wf_execution.id, scheduled_at,
WorkflowExecutionStatusType.WORKFLOW_RUNNING)
time_range = wf_execution.time_range
all_pbs = wf_execution.workflow.playbooks.filter(is_active=True)
Expand Down Expand Up @@ -83,6 +84,12 @@ def workflow_scheduler():
f"Failed to create workflow execution:: workflow_id: {workflow_id}, workflow_execution_id: "
f"{wf_execution.id} playbook_id: {pb_id}, error: {e}")
continue
if not interval:
logger.info(
f"Workflow execution interval not set for workflow_execution_id, marking complete: {wf_execution.id}")
update_db_account_workflow_execution_status(account, wf_execution.id, scheduled_at,
WorkflowExecutionStatusType.WORKFLOW_FINISHED)
continue


workflow_scheduler_prerun_notifier = publish_pre_run_task(workflow_scheduler)
Expand Down
56 changes: 43 additions & 13 deletions executor/workflows/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import timedelta
from typing import Union

from django.conf import settings
from django.db.models import QuerySet
from django.http import HttpResponse

Expand All @@ -14,8 +15,8 @@
from playbooks.utils.decorators import web_api
from playbooks.utils.meta import get_meta
from playbooks.utils.queryset import filter_page
from playbooks.utils.utils import current_datetime
from protos.base_pb2 import Meta, Message, Page, TimeRange
from playbooks.utils.utils import current_datetime, calculate_cron_times
from protos.base_pb2 import Meta, Message, Page, TimeRange, TaskCronRule
from protos.playbooks.api_pb2 import GetWorkflowsRequest, GetWorkflowsResponse, CreateWorkflowRequest, \
CreateWorkflowResponse, UpdateWorkflowRequest, UpdateWorkflowResponse, ExecuteWorkflowRequest, \
ExecuteWorkflowResponse, ExecutionWorkflowGetRequest, ExecutionWorkflowGetResponse, ExecutionsWorkflowsListResponse, \
Expand Down Expand Up @@ -113,18 +114,47 @@ def workflows_execute(request_message: ExecuteWorkflowRequest) -> Union[ExecuteW
try:
schedule_type = account_workflow.schedule_type
schedule: WorkflowScheduleProto = dict_to_proto(account_workflow.schedule, WorkflowScheduleProto)
workflow_run_uuid = f'{account.id}_{workflow_id}_{str(int(current_time_utc.timestamp()))}_wf_run'
if schedule_type == WorkflowScheduleProto.Type.PERIODIC:
periodic_schedule: WorkflowPeriodicScheduleProto = schedule.periodic
interval = periodic_schedule.interval.value
duration_in_seconds = periodic_schedule.duration_in_seconds.value
scheduled_at = current_time_utc
expiry_at = scheduled_at + timedelta(seconds=duration_in_seconds)
time_range = TimeRange(time_geq=int(scheduled_at.timestamp()) - 3600, time_lt=int(scheduled_at.timestamp()))
workflow_run_uuid = f'{account.id}_{workflow_id}_{str(int(current_time_utc.timestamp()))}_wf_run'
create_workflow_execution(account, time_range, workflow_id, workflow_run_uuid, scheduled_at, expiry_at,
interval, user.email)
return ExecuteWorkflowResponse(success=BoolValue(value=True),
workflow_run_id=StringValue(value=workflow_run_uuid))
expiry_at = current_time_utc + timedelta(seconds=duration_in_seconds)
if periodic_schedule.type == WorkflowPeriodicScheduleProto.Type.INTERVAL:
scheduled_at = current_time_utc
interval = periodic_schedule.task_interval.interval_in_seconds.value
if interval < 60:
return ExecuteWorkflowResponse(success=BoolValue(value=False),
message=Message(title="Error", description="Invalid Interval"))
time_range = TimeRange(time_geq=int(scheduled_at.timestamp()) - 3600,
time_lt=int(scheduled_at.timestamp()))
create_workflow_execution(account, time_range, workflow_id, workflow_run_uuid, scheduled_at, expiry_at,
interval, user.email)
elif periodic_schedule.type == WorkflowPeriodicScheduleProto.Type.CRON:
cron_rule: TaskCronRule = periodic_schedule.cron_rule.rule.value
cron_schedules = calculate_cron_times(cron_rule, current_time_utc, expiry_at)
if len(cron_schedules) == 0:
return ExecuteWorkflowResponse(success=BoolValue(value=False),
message=Message(title="Error",
description=f"No Schedules Found with Cron Rule: {cron_rule}"))
for scheduled_at in cron_schedules:
if scheduled_at > expiry_at:
break
time_range = TimeRange(time_geq=int(scheduled_at.timestamp()) - 3600,
time_lt=int(scheduled_at.timestamp()))
create_workflow_execution(account, time_range, workflow_id, workflow_run_uuid, scheduled_at,
scheduled_at, None, user.email)
elif schedule_type == WorkflowScheduleProto.Type.ONE_OFF:
scheduled_at = current_time_utc + timedelta(seconds=int(settings.WORKFLOW_SCHEDULER_INTERVAL))
time_range = TimeRange(time_geq=int(scheduled_at.timestamp()) - 3600,
time_lt=int(scheduled_at.timestamp()))
create_workflow_execution(account, time_range, workflow_id, workflow_run_uuid, scheduled_at, scheduled_at,
None, user.email)
else:
return ExecuteWorkflowResponse(success=BoolValue(value=False),
message=Message(title="Error", description="Invalid Schedule Type"))

return ExecuteWorkflowResponse(success=BoolValue(value=True),
workflow_run_id=StringValue(value=workflow_run_uuid))
except Exception as e:
logger.error(f"Error updating playbook: {e}")
return ExecuteWorkflowResponse(success=BoolValue(value=False),
Expand All @@ -148,8 +178,8 @@ def workflows_execution_get(request_message: ExecutionWorkflowGetRequest) -> \
return ExecutionWorkflowGetResponse(success=BoolValue(value=False),
message=Message(title="Error", description=str(e)))

workflow_execution = workflow_execution.first()
return ExecutionWorkflowGetResponse(success=BoolValue(value=True), workflow_execution=workflow_execution.proto)
workflow_execution_protos = [we.proto for we in workflow_execution]
return ExecutionWorkflowGetResponse(success=BoolValue(value=True), workflow_executions=workflow_execution_protos)


@web_api(ExecutionsWorkflowsListRequest)
Expand Down
2 changes: 2 additions & 0 deletions playbooks/base_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,5 @@ def get_cache_backend(alias='default'):
'cache_key': env.str("ACCOUNT_PASSWORD_CONTEXT_CACHE_KEY", default='default'),
'enabled': env.bool("GLOBAL_ACCOUNT_PASSWORD_CONTEXT_CACHE", default=True),
}

WORKFLOW_SCHEDULER_INTERVAL = env.int("WORKFLOW_SCHEDULER_INTERVAL", default=10)
55 changes: 54 additions & 1 deletion playbooks/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
import pytz
from datetime import datetime
from datetime import datetime, timedelta


def current_milli_time():
Expand All @@ -13,3 +13,56 @@ def current_epoch_timestamp():

def current_datetime(timezone=pytz.utc):
return datetime.now(timezone)


def parse_cron_rule(rule):
parts = rule.split()
if len(parts) != 5:
raise ValueError("Invalid cron rule format")

minute = parse_part(parts[0], 0, 59)
hour = parse_part(parts[1], 0, 23)
day = parse_part(parts[2], 1, 31)
month = parse_part(parts[3], 1, 12)
weekday = parse_part(parts[4], 0, 6)

return minute, hour, day, month, weekday


def parse_part(part, min_val, max_val):
if part == '*':
return list(range(min_val, max_val + 1))
if ',' in part:
values = []
for item in part.split(','):
values.extend(parse_part(item, min_val, max_val))
return sorted(list(set(values)))
if '-' in part:
start, end = map(int, part.split('-'))
return list(range(start, end + 1))
if '/' in part:
start, step = map(int, part.split('/'))
return list(range(start, max_val + 1, step))
return [int(part)]


def calculate_cron_times(rule, start_time=None, end_time=None):
minute, hour, day, month, weekday = parse_cron_rule(rule)

if start_time is None:
start_time = datetime.now()
if end_time is None:
end_time = start_time + timedelta(days=1)

cron_times = []
current_time = start_time
while current_time < end_time:
if (current_time.minute in minute and
current_time.hour in hour and
current_time.day in day and
current_time.month in month and
current_time.weekday() in weekday):
cron_times.append(current_time)
current_time += timedelta(minutes=1)

return cron_times
4 changes: 4 additions & 0 deletions protos/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ message TaskCronSchedule {
google.protobuf.StringValue timezone = 6;
}

message TaskInterval {
google.protobuf.UInt64Value interval_in_seconds = 1;
}

message TaskCronRule {
google.protobuf.StringValue rule = 1;
google.protobuf.StringValue timezone = 2;
Expand Down
Loading

0 comments on commit 71f1cf5

Please sign in to comment.