Skip to content

Commit

Permalink
Make run_after optional as part of API
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Feb 21, 2025
1 parent 2343f38 commit 4d42d9a
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
3 changes: 2 additions & 1 deletion airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _trigger_dag(
dag_bag: DagBag,
*,
triggered_by: DagRunTriggeredByType,
run_after: datetime,
run_after: datetime | None = None,
run_id: str | None = None,
conf: dict | str | None = None,
logical_date: datetime | None = None,
Expand All @@ -67,6 +67,7 @@ def _trigger_dag(
if dag is None or dag_id not in dag_bag.dags:
raise DagNotFound(f"Dag id {dag_id} not found")

run_after = run_after or timezone.coerce_datetime(timezone.utcnow())
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError("The logical date should be localized")
Expand Down
8 changes: 4 additions & 4 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
data_interval_start: AwareDatetime | None = None
data_interval_end: AwareDatetime | None = None
logical_date: AwareDatetime | None
run_after: datetime = Field(default_factory=timezone.utcnow)
run_after: datetime | None = Field(default_factory=timezone.utcnow)

conf: dict = Field(default_factory=dict)
note: str | None = None
Expand All @@ -106,7 +106,7 @@ def check_data_intervals(cls, values):

def validate_context(self, dag: DAG) -> dict:
coerced_logical_date = timezone.coerce_datetime(self.logical_date)
run_after = self.run_after
run_after = self.run_after or timezone.utcnow()
data_interval = None
if coerced_logical_date:
if self.data_interval_start and self.data_interval_end:
Expand All @@ -116,14 +116,14 @@ def validate_context(self, dag: DAG) -> dict:
)
else:
data_interval = dag.timetable.infer_manual_data_interval(
run_after=coerced_logical_date or timezone.coerce_datetime(self.run_after)
run_after=coerced_logical_date or timezone.coerce_datetime(run_after)
)
run_after = data_interval.end

run_id = self.dag_run_id or DagRun.generate_run_id(
run_type=DagRunType.SCHEDULED,
logical_date=coerced_logical_date,
run_after=self.run_after,
run_after=run_after,
)
return {
"run_id": run_id,
Expand Down
6 changes: 4 additions & 2 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10942,8 +10942,10 @@ components:
- type: 'null'
title: Logical Date
run_after:
type: string
format: date-time
anyOf:
- type: string
format: date-time
- type: 'null'
title: Run After
conf:
type: object
Expand Down
11 changes: 9 additions & 2 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6054,8 +6054,15 @@ export const $TriggerDAGRunPostBody = {
title: "Logical Date",
},
run_after: {
type: "string",
format: "date-time",
anyOf: [
{
type: "string",
format: "date-time",
},
{
type: "null",
},
],
title: "Run After",
},
conf: {
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ export type TriggerDAGRunPostBody = {
data_interval_start?: string | null;
data_interval_end?: string | null;
logical_date: string | null;
run_after?: string;
run_after?: string | null;
conf?: {
[key: string]: unknown;
};
Expand Down

0 comments on commit 4d42d9a

Please sign in to comment.