Skip to content

Commit

Permalink
Fix: Make sure that physical tables exist for promoted snapshots (#3798)
Browse files Browse the repository at this point in the history
  • Loading branch information
izeigerman committed Feb 7, 2025
1 parent 679c0fd commit 3b97096
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
22 changes: 17 additions & 5 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,23 @@ def _push(
plan: The plan to source snapshots from.
deployability_index: Indicates which snapshots are deployable in the context of this creation.
"""
snapshots_to_create = [
s
for s in snapshots.values()
if s.is_model and not s.is_symbolic and plan.is_selected_for_backfill(s.name)
]
promoted_snapshot_ids = (
set(plan.environment.promoted_snapshot_ids)
if plan.environment.promoted_snapshot_ids is not None
else None
)

def _should_create(s: Snapshot) -> bool:
if not s.is_model or s.is_symbolic:
return False
# Only create tables for snapshots that we're planning to promote or that were selected for backfill
return (
plan.is_selected_for_backfill(s.name)
or promoted_snapshot_ids is None
or s.snapshot_id in promoted_snapshot_ids
)

snapshots_to_create = [s for s in snapshots.values() if _should_create(s)]

completed = False
try:
Expand Down
29 changes: 29 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,35 @@ def test_prod_restatement_plan_missing_model_in_dev(
)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_plan_snapshot_table_exists_for_promoted_snapshot(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
context.apply(plan)

model = context.get_model("sushi.waiter_revenue_by_day")
context.upsert_model(add_projection_to_model(t.cast(SqlModel, model)))

context.plan("dev", auto_apply=True, no_prompts=True, skip_tests=True)

# Drop the views and make sure SQLMesh recreates them later
top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True)
context.engine_adapter.drop_view(top_waiters_snapshot.table_name())
context.engine_adapter.drop_view(top_waiters_snapshot.table_name(False))

# Make the environment unfinalized to force recreation of all views in the virtual layer
context.state_sync.state_sync.engine_adapter.execute(
"UPDATE sqlmesh._environments SET finalized_ts = NULL WHERE name = 'dev'"
)

model = context.get_model("sushi.customers")
context.upsert_model(add_projection_to_model(t.cast(SqlModel, model)))

context.plan(
"dev", select_models=["sushi.customers"], auto_apply=True, no_prompts=True, skip_tests=True
)
assert context.engine_adapter.table_exists(top_waiters_snapshot.table_name())


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_plan_against_expired_environment(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down

0 comments on commit 3b97096

Please sign in to comment.