Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update microbatch end_time to the batch_size ceiling #10883

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def __init__(

def build_end_time(self):
"""Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided"""
return self.event_time_end or self.default_end_time
end_time = self.event_time_end or self.default_end_time
return MicrobatchBuilder.ceiling_timestamp(end_time, self.model.config.batch_size)

def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
Expand Down Expand Up @@ -161,7 +162,7 @@ def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) ->
return offset_timestamp

@staticmethod
def truncate_timestamp(timestamp: datetime, batch_size: BatchSize):
def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime:
"""Truncates the passed in timestamp based on the batch_size.

2024-09-17 16:06:00 + Batchsize.hour -> 2024-09-17 16:00:00
Expand Down Expand Up @@ -201,3 +202,23 @@ def format_batch_start(
return str(
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
)

@staticmethod
def ceiling_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime:
"""Takes the given timestamp and moves it to the ceiling for the given batch size

Note, if the timestamp is already the batch size ceiling, that is returned
2024-09-17 16:06:00 + BatchSize.hour -> 2024-09-17 17:00:00
2024-09-17 16:00:00 + BatchSize.hour -> 2024-09-17 16:00:00
2024-09-17 16:06:00 + BatchSize.day -> 2024-09-18 00:00:00
2024-09-17 00:00:00 + BatchSize.day -> 2024-09-17 00:00:00
2024-09-17 16:06:00 + BatchSize.month -> 2024-10-01 00:00:00
2024-09-01 00:00:00 + BatchSize.month -> 2024-09-01 00:00:00
2024-09-17 16:06:00 + BatchSize.year -> 2025-01-01 00:00:00
2024-01-01 00:00:00 + BatchSize.year -> 2024-01-01 00:00:00

"""
ceiling = truncated = MicrobatchBuilder.truncate_timestamp(timestamp, batch_size)
if truncated != timestamp:
ceiling = MicrobatchBuilder.offset_timestamp(truncated, batch_size, 1)
return ceiling
56 changes: 54 additions & 2 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def microbatch_model(self):
model.config.materialized = "incremental"
model.config.incremental_strategy = "microbatch"
model.config.begin = MODEL_CONFIG_BEGIN
model.config.batch_size = BatchSize.day

return model

Expand All @@ -30,12 +31,12 @@ def microbatch_model(self):
(
False,
None,
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
),
(
False,
Expand Down Expand Up @@ -616,3 +617,54 @@ def test_format_batch_start(self, batch_size, batch_start, expected_formatted_ba
MicrobatchBuilder.format_batch_start(batch_start, batch_size)
== expected_formatted_batch_start
)

@pytest.mark.parametrize(
"timestamp,batch_size,expected_datetime",
[
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.hour,
datetime(2024, 9, 17, 17, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 0, 0, 0, pytz.UTC),
BatchSize.hour,
datetime(2024, 9, 17, 16, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.day,
datetime(2024, 9, 18, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 0, 0, 0, 0, pytz.UTC),
BatchSize.day,
datetime(2024, 9, 17, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.month,
datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.month,
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.year,
datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.year,
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
],
)
def test_ceiling_timestamp(
self, timestamp: datetime, batch_size: BatchSize, expected_datetime: datetime
) -> None:
ceilinged = MicrobatchBuilder.ceiling_timestamp(timestamp, batch_size)
assert ceilinged == expected_datetime
Loading