Skip to content

Commit dd3cf26

Browse files
authored
asset partitions numFailed (#12570)
gql for counting failed partitions. Update: to make it so that numMaterialized and numFailed can't total to greater than numPartitions, we now filter out failed partitions from materialized.
1 parent dffc844 commit dd3cf26

File tree

9 files changed

+12116
-9894
lines changed

9 files changed

+12116
-9894
lines changed

python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from dagster._core.instance import DynamicPartitionsStore
4646
from dagster._core.storage.partition_status_cache import (
4747
CACHEABLE_PARTITION_TYPES,
48+
filter_incomplete_materialized_runs_to_failed,
4849
get_and_update_asset_status_cache_value,
4950
get_materialized_multipartitions,
5051
get_validated_partition_keys,
@@ -321,19 +322,18 @@ def get_unique_asset_id(
321322
)
322323

323324

324-
def get_materialized_partitions_subset(
325+
def get_materialized_and_failed_partition_subsets(
325326
instance: DagsterInstance,
326327
asset_key: AssetKey,
327328
dynamic_partitions_loader: DynamicPartitionsStore,
328329
partitions_def: Optional[PartitionsDefinition] = None,
329-
) -> Optional[PartitionsSubset]:
330+
) -> Tuple[Optional[PartitionsSubset], Optional[PartitionsSubset]]:
330331
"""
331-
Returns the materialization status for each partition key. The materialization status
332-
is a boolean indicating whether the partition has been materialized: True if materialized,
333-
False if not.
332+
Returns a tuple of two PartitionSubset objects: the first is the materialized partitions,
333+
the second is the failed partitions.
334334
"""
335335
if not partitions_def:
336-
return None
336+
return None, None
337337

338338
if instance.can_cache_asset_status_data() and isinstance(
339339
partitions_def, CACHEABLE_PARTITION_TYPES
@@ -348,8 +348,13 @@ def get_materialized_partitions_subset(
348348
if updated_cache_value
349349
else partitions_def.empty_subset()
350350
)
351+
failed_subset = (
352+
updated_cache_value.deserialize_failed_partition_subsets(partitions_def)
353+
if updated_cache_value
354+
else partitions_def.empty_subset()
355+
)
351356

352-
return materialized_subset
357+
return materialized_subset, failed_subset
353358

354359
else:
355360
# If the partition status can't be cached, fetch partition status from storage
@@ -370,12 +375,36 @@ def get_materialized_partitions_subset(
370375
dynamic_partitions_loader, partitions_def, set(materialized_keys)
371376
)
372377

373-
return (
378+
materialized_subset = (
374379
partitions_def.empty_subset().with_partition_keys(validated_keys)
375380
if validated_keys
376381
else partitions_def.empty_subset()
377382
)
378383

384+
# failed
385+
incomplete_materialization_runs = instance.event_log_storage.get_latest_asset_partition_materialization_attempts_without_materializations(
386+
asset_key
387+
)
388+
389+
if not incomplete_materialization_runs:
390+
failed_subset = partitions_def.empty_subset()
391+
else:
392+
validated_keys = get_validated_partition_keys(
393+
dynamic_partitions_loader,
394+
partitions_def,
395+
filter_incomplete_materialized_runs_to_failed(
396+
instance, incomplete_materialization_runs
397+
),
398+
)
399+
400+
failed_subset = (
401+
partitions_def.empty_subset().with_partition_keys(validated_keys)
402+
if validated_keys
403+
else partitions_def.empty_subset()
404+
)
405+
406+
return materialized_subset, failed_subset
407+
379408

380409
def build_materialized_partitions(
381410
dynamic_partitions_store: DynamicPartitionsStore,

python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from ..implementation.fetch_assets import (
4747
build_materialized_partitions,
4848
get_freshness_info,
49-
get_materialized_partitions_subset,
49+
get_materialized_and_failed_partition_subsets,
5050
)
5151
from ..implementation.loader import (
5252
BatchMaterializationLoader,
@@ -748,14 +748,14 @@ def resolve_materializedPartitions(
748748
if not self._dynamic_partitions_loader:
749749
check.failed("dynamic_partitions_loader must be provided to get partition keys")
750750

751-
materialized_partition_subset = get_materialized_partitions_subset(
751+
materialized_partition_subset = get_materialized_and_failed_partition_subsets(
752752
graphene_info.context.instance,
753753
asset_key,
754754
self._dynamic_partitions_loader,
755755
self._external_asset_node.partitions_def_data.get_partitions_definition()
756756
if self._external_asset_node.partitions_def_data
757757
else None,
758-
)
758+
)[0]
759759

760760
return build_materialized_partitions(
761761
self._dynamic_partitions_loader,
@@ -770,7 +770,10 @@ def resolve_partitionStats(self, graphene_info) -> Optional[GraphenePartitionSta
770770
if not self._dynamic_partitions_loader:
771771
check.failed("dynamic_partitions_loader must be provided to get partition keys")
772772

773-
materialized_partition_subset = get_materialized_partitions_subset(
773+
(
774+
materialized_partition_subset,
775+
failed_partition_subset,
776+
) = get_materialized_and_failed_partition_subsets(
774777
graphene_info.context.instance,
775778
asset_key,
776779
self._dynamic_partitions_loader,
@@ -779,14 +782,24 @@ def resolve_partitionStats(self, graphene_info) -> Optional[GraphenePartitionSta
779782
else None,
780783
)
781784

782-
if materialized_partition_subset is None:
785+
if materialized_partition_subset is None or failed_partition_subset is None:
783786
check.failed("Expected partitions subset for a partitioned asset")
784787

788+
num_materialized = len(materialized_partition_subset)
789+
num_materialized_and_not_failed = num_materialized - len(
790+
[
791+
k
792+
for k in failed_partition_subset.get_partition_keys()
793+
if k in materialized_partition_subset
794+
]
795+
)
796+
785797
return GraphenePartitionStats(
786-
numMaterialized=len(materialized_partition_subset),
798+
numMaterialized=num_materialized_and_not_failed,
787799
numPartitions=partitions_def_data.get_partitions_definition().get_num_partitions(
788800
dynamic_partitions_store=self._dynamic_partitions_loader
789801
),
802+
numFailed=len(failed_partition_subset),
790803
)
791804
else:
792805
return None

python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ class Meta:
148148
class GraphenePartitionStats(graphene.ObjectType):
149149
numMaterialized = graphene.NonNull(graphene.Int)
150150
numPartitions = graphene.NonNull(graphene.Int)
151+
numFailed = graphene.NonNull(graphene.Int)
151152

152153
class Meta:
153154
name = "PartitionStats"

python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,6 +1651,20 @@ def yield_partition_materialization():
16511651
)
16521652

16531653

1654+
@asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c", "d"]))
1655+
def fail_partition_materialization(context):
1656+
if context.run.tags.get("fail") == "true":
1657+
raise Exception("fail_partition_materialization")
1658+
yield Output(5)
1659+
1660+
1661+
fail_partition_materialization_job = build_assets_job(
1662+
"fail_partition_materialization_job",
1663+
assets=[fail_partition_materialization],
1664+
executor_def=in_process_executor,
1665+
)
1666+
1667+
16541668
@asset
16551669
def asset_yields_observation():
16561670
yield AssetObservation(asset_key=AssetKey("asset_yields_observation"), metadata={"text": "FOO"})
@@ -1926,6 +1940,7 @@ def define_pipelines():
19261940
dynamic_partitioned_assets_job,
19271941
time_partitioned_assets_job,
19281942
partition_materialization_job,
1943+
fail_partition_materialization_job,
19291944
observation_job,
19301945
failure_assets_job,
19311946
asset_group_job,

0 commit comments

Comments
 (0)