Skip to content

Commit dffc844

Browse files
authored
Modify get_first_partition_window to account for offset (#12504)
Causes a graphQL error reported by a user here: https://dagster.slack.com/archives/C01U954MEER/p1677064207587029 Previously, `get_first_partition_window` did not account for the end offset. This led to errors such as `get_first_partition_window` returning `None` when the start time window is the current time window and offset > 0.
1 parent 70004f5 commit dffc844

File tree

2 files changed

+158
-7
lines changed

2 files changed

+158
-7
lines changed

python_modules/dagster/dagster/_core/definitions/time_window_partitions.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -370,17 +370,42 @@ def get_prev_partition_window(self, start_dt: datetime) -> Optional[TimeWindow]:
370370
def get_first_partition_window(
371371
self, current_time: Optional[datetime] = None
372372
) -> Optional[TimeWindow]:
373-
current_timestamp = (
373+
current_time = cast(
374+
datetime,
374375
pendulum.instance(current_time, tz=self.timezone)
375376
if current_time
376-
else pendulum.now(self.timezone)
377-
).timestamp()
377+
else pendulum.now(self.timezone),
378+
)
379+
current_timestamp = current_time.timestamp()
378380

379381
time_window = next(iter(self._iterate_time_windows(self.start)))
380-
if time_window.end.timestamp() <= current_timestamp:
381-
return time_window
382+
383+
if self.end_offset == 0:
384+
return time_window if time_window.end.timestamp() <= current_timestamp else None
385+
elif self.end_offset > 0:
386+
iterator = iter(self._iterate_time_windows(current_time))
387+
# first returned time window is time window of current time
388+
curr_window_plus_offset = next(iterator)
389+
for _ in range(self.end_offset):
390+
curr_window_plus_offset = next(iterator)
391+
return (
392+
time_window
393+
if time_window.end.timestamp() <= curr_window_plus_offset.start.timestamp()
394+
else None
395+
)
382396
else:
383-
return None
397+
# end offset < 0
398+
end_window = None
399+
iterator = iter(self._reverse_iterate_time_windows(current_time))
400+
for _ in range(abs(self.end_offset)):
401+
end_window = next(iterator)
402+
403+
if end_window is None:
404+
check.failed("end_window should not be None")
405+
406+
return (
407+
time_window if time_window.end.timestamp() <= end_window.start.timestamp() else None
408+
)
384409

385410
def get_last_partition_window(
386411
self, current_time: Optional[datetime] = None
@@ -1234,7 +1259,7 @@ def _get_partition_time_windows_not_in_subset(
12341259
Returns a list of partition time windows that are not in the subset.
12351260
Each time window is a single partition.
12361261
"""
1237-
first_tw = self._partitions_def.get_first_partition_window()
1262+
first_tw = self._partitions_def.get_first_partition_window(current_time=current_time)
12381263
last_tw = self._partitions_def.get_last_partition_window(current_time=current_time)
12391264

12401265
if not first_tw or not last_tw:

python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,3 +652,129 @@ def my_partitioned_config_2(_start, _end):
652652
partitions_def.get_partition_keys_between_indexes(50, 53, current_time=current_time)
653653
== partitions_def.get_partition_keys(current_time=current_time)[50:53]
654654
)
655+
656+
657+
def test_get_first_partition_window():
658+
assert DailyPartitionsDefinition(
659+
start_date="2023-01-01"
660+
).get_first_partition_window() == time_window("2023-01-01", "2023-01-02")
661+
662+
assert DailyPartitionsDefinition(
663+
start_date="2023-01-01", end_offset=1
664+
).get_first_partition_window(
665+
current_time=datetime.strptime("2023-01-01", "%Y-%m-%d")
666+
) == time_window(
667+
"2023-01-01", "2023-01-02"
668+
)
669+
670+
assert (
671+
DailyPartitionsDefinition(start_date="2023-02-15", end_offset=1).get_first_partition_window(
672+
current_time=datetime.strptime("2023-02-14", "%Y-%m-%d")
673+
)
674+
is None
675+
)
676+
677+
assert DailyPartitionsDefinition(
678+
start_date="2023-01-01", end_offset=2
679+
).get_first_partition_window(
680+
current_time=datetime.strptime("2023-01-02", "%Y-%m-%d")
681+
) == time_window(
682+
"2023-01-01", "2023-01-02"
683+
)
684+
685+
assert MonthlyPartitionsDefinition(
686+
start_date="2023-01-01", end_offset=1
687+
).get_first_partition_window(
688+
current_time=datetime.strptime("2023-01-15", "%Y-%m-%d")
689+
) == time_window(
690+
"2023-01-01", "2023-02-01"
691+
)
692+
693+
assert (
694+
DailyPartitionsDefinition(
695+
start_date="2023-01-15", end_offset=-1
696+
).get_first_partition_window(current_time=datetime.strptime("2023-01-16", "%Y-%m-%d"))
697+
is None
698+
)
699+
700+
assert DailyPartitionsDefinition(
701+
start_date="2023-01-15", end_offset=-1
702+
).get_first_partition_window(
703+
current_time=datetime.strptime("2023-01-17", "%Y-%m-%d")
704+
) == time_window(
705+
"2023-01-15", "2023-01-16"
706+
)
707+
708+
assert (
709+
DailyPartitionsDefinition(
710+
start_date="2023-01-15", end_offset=-2
711+
).get_first_partition_window(current_time=datetime.strptime("2023-01-17", "%Y-%m-%d"))
712+
is None
713+
)
714+
715+
assert DailyPartitionsDefinition(
716+
start_date="2023-01-15", end_offset=-2
717+
).get_first_partition_window(
718+
current_time=datetime.strptime("2023-01-18", "%Y-%m-%d")
719+
) == time_window(
720+
"2023-01-15", "2023-01-16"
721+
)
722+
723+
assert (
724+
MonthlyPartitionsDefinition(
725+
start_date="2023-01-01", end_offset=-1
726+
).get_first_partition_window(current_time=datetime.strptime("2023-01-15", "%Y-%m-%d"))
727+
is None
728+
)
729+
730+
assert (
731+
DailyPartitionsDefinition(start_date="2023-01-15", end_offset=1).get_first_partition_window(
732+
current_time=datetime.strptime("2023-01-14", "%Y-%m-%d")
733+
)
734+
is None
735+
)
736+
737+
assert DailyPartitionsDefinition(
738+
start_date="2023-01-15", end_offset=1
739+
).get_first_partition_window(
740+
current_time=datetime(year=2023, month=1, day=15, hour=12, minute=0, second=0)
741+
) == time_window(
742+
"2023-01-15", "2023-01-16"
743+
)
744+
745+
assert DailyPartitionsDefinition(
746+
start_date="2023-01-15", end_offset=1
747+
).get_first_partition_window(
748+
current_time=datetime(year=2023, month=1, day=14, hour=12, minute=0, second=0)
749+
) == time_window(
750+
"2023-01-15", "2023-01-16"
751+
)
752+
753+
assert (
754+
DailyPartitionsDefinition(start_date="2023-01-15", end_offset=1).get_first_partition_window(
755+
current_time=datetime(year=2023, month=1, day=13, hour=12, minute=0, second=0)
756+
)
757+
is None
758+
)
759+
760+
assert (
761+
MonthlyPartitionsDefinition(
762+
start_date="2023-01-01", end_offset=-1
763+
).get_first_partition_window(current_time=datetime.strptime("2023-01-15", "%Y-%m-%d"))
764+
is None
765+
)
766+
767+
assert (
768+
MonthlyPartitionsDefinition(
769+
start_date="2023-01-01", end_offset=-1
770+
).get_first_partition_window(current_time=datetime.strptime("2023-02-01", "%Y-%m-%d"))
771+
is None
772+
)
773+
774+
assert MonthlyPartitionsDefinition(
775+
start_date="2023-01-01", end_offset=-1
776+
).get_first_partition_window(
777+
current_time=datetime.strptime("2023-03-01", "%Y-%m-%d")
778+
) == time_window(
779+
"2023-01-01", "2023-02-01"
780+
)

0 commit comments

Comments
 (0)