Skip to content

Commit df37c8c

Browse files
authored
Emit asset.referenced events on materialization only (PrefectHQ#18252)
1 parent bd30871 commit df37c8c

File tree

4 files changed

+499
-196
lines changed

4 files changed

+499
-196
lines changed

src/prefect/assets/core.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ class Asset(PrefectBaseModel):
4747
def __repr__(self) -> str:
4848
return f"Asset(key={self.key!r})"
4949

50+
def __hash__(self) -> int:
51+
return hash(self.key)
52+
5053
def add_metadata(self, metadata: dict[str, Any]) -> None:
5154
from prefect.context import AssetContext
5255

@@ -59,7 +62,7 @@ def add_metadata(self, metadata: dict[str, Any]) -> None:
5962
asset_ctx.add_asset_metadata(self.key, metadata)
6063

6164

62-
def add_asset_metadata(asset_key: str, metadata: dict[str, Any]) -> None:
65+
def add_asset_metadata(asset: str | Asset, metadata: dict[str, Any]) -> None:
6366
from prefect.context import AssetContext
6467

6568
asset_ctx = AssetContext.get()
@@ -68,4 +71,5 @@ def add_asset_metadata(asset_key: str, metadata: dict[str, Any]) -> None:
6871
"Unable to call `add_asset_metadata` when not inside of an AssetContext"
6972
)
7073

74+
asset_key = asset if isinstance(asset, str) else asset.key
7175
asset_ctx.add_asset_metadata(asset_key, metadata)

src/prefect/context.py

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ class EngineContext(RunContext):
400400

401401
# Tracking information needed to track asset linage between
402402
# tasks and materialization
403-
task_run_assets: dict[UUID, list[Asset]] = Field(default_factory=dict)
403+
task_run_assets: dict[UUID, set[Asset]] = Field(default_factory=dict)
404404

405405
# Events worker to emit events
406406
events: Optional[EventsWorker] = None
@@ -486,9 +486,9 @@ class AssetContext(ContextModel):
486486
materialization_metadata: Metadata for materialized assets
487487
"""
488488

489-
direct_asset_dependencies: list[Asset] = Field(default_factory=list)
490-
downstream_assets: list[Asset] = Field(default_factory=list)
491-
upstream_assets: list[Asset] = Field(default_factory=list)
489+
direct_asset_dependencies: set[Asset] = Field(default_factory=set)
490+
downstream_assets: set[Asset] = Field(default_factory=set)
491+
upstream_assets: set[Asset] = Field(default_factory=set)
492492
materialized_by: Optional[str] = None
493493
task_run_id: Optional[UUID] = None
494494
materialization_metadata: dict[str, dict[str, Any]] = Field(default_factory=dict)
@@ -516,7 +516,7 @@ def from_task_and_inputs(
516516
from prefect.client.schemas import TaskRunResult
517517
from prefect.tasks import MaterializingTask
518518

519-
upstream_assets: list[Asset] = []
519+
upstream_assets: set[Asset] = set()
520520

521521
# Get upstream assets from engine context instead of TaskRunResult.assets
522522
flow_ctx = FlowRunContext.get()
@@ -527,13 +527,15 @@ def from_task_and_inputs(
527527
# Look up assets in the engine context
528528
task_assets = flow_ctx.task_run_assets.get(task_input.id)
529529
if task_assets:
530-
upstream_assets.extend(task_assets)
530+
upstream_assets.update(task_assets)
531531

532532
ctx = cls(
533-
direct_asset_dependencies=task.asset_deps[:] if task.asset_deps else [],
534-
downstream_assets=task.assets[:]
533+
direct_asset_dependencies=set(task.asset_deps)
534+
if task.asset_deps
535+
else set(),
536+
downstream_assets=set(task.assets)
535537
if isinstance(task, MaterializingTask) and task.assets
536-
else [],
538+
else set(),
537539
upstream_assets=upstream_assets,
538540
materialized_by=task.materialized_by
539541
if isinstance(task, MaterializingTask)
@@ -551,7 +553,15 @@ def add_asset_metadata(self, asset_key: str, metadata: dict[str, Any]) -> None:
551553
Args:
552554
asset_key: The asset key
553555
metadata: Metadata dictionary to add
556+
557+
Raises:
558+
ValueError: If asset_key is not in downstream_assets
554559
"""
560+
downstream_keys = {asset.key for asset in self.downstream_assets}
561+
if asset_key not in downstream_keys:
562+
raise ValueError(
563+
"Can only add metadata to assets that are arguments to @materialize"
564+
)
555565

556566
existing = self.materialization_metadata.get(asset_key, {})
557567
self.materialization_metadata[asset_key] = existing | metadata
@@ -596,46 +606,46 @@ def related_materialized_by(by: str) -> dict[str, str]:
596606

597607
def emit_events(self, state: State) -> None:
598608
"""
599-
Emit asset reference and materialization events based on task completion.
609+
Emit asset events
600610
"""
601611

602612
from prefect.events import emit_event
603613

604614
if state.name == "Cached":
605615
return
606-
if state.is_failed():
616+
elif state.is_failed():
607617
event_status = "failed"
608618
elif state.is_completed():
609619
event_status = "succeeded"
610620
else:
611621
return
612622

613-
asset_deps_related: list[Asset] = []
623+
# If we have no downstream assets, this not a materialization
624+
if not self.downstream_assets:
625+
return
614626

615-
# Emit reference events for direct asset dependencies
616-
for asset in self.direct_asset_dependencies:
627+
# Emit reference events for all upstream assets (direct + inherited)
628+
all_upstream_assets = self.upstream_assets | self.direct_asset_dependencies
629+
for asset in all_upstream_assets:
617630
emit_event(
618-
event=f"prefect.asset.reference.{event_status}",
631+
event="prefect.asset.referenced",
619632
resource=self.asset_as_resource(asset),
620633
related=[],
621634
)
622-
asset_deps_related.append(self.asset_as_related(asset))
623635

624636
# Emit materialization events for downstream assets
625-
if self.downstream_assets:
626-
upstream_related = [self.asset_as_related(a) for a in self.upstream_assets]
627-
all_related = upstream_related + asset_deps_related
628-
629-
if self.materialized_by:
630-
all_related.append(self.related_materialized_by(self.materialized_by))
631-
632-
for asset in self.downstream_assets:
633-
emit_event(
634-
event=f"prefect.asset.materialization.{event_status}",
635-
resource=self.asset_as_resource(asset),
636-
related=all_related,
637-
payload=self.materialization_metadata.get(asset.key),
638-
)
637+
upstream_related = [self.asset_as_related(a) for a in all_upstream_assets]
638+
639+
if self.materialized_by:
640+
upstream_related.append(self.related_materialized_by(self.materialized_by))
641+
642+
for asset in self.downstream_assets:
643+
emit_event(
644+
event=f"prefect.asset.materialization.{event_status}",
645+
resource=self.asset_as_resource(asset),
646+
related=upstream_related,
647+
payload=self.materialization_metadata.get(asset.key),
648+
)
639649

640650
def update_tracked_assets(self) -> None:
641651
"""
@@ -649,18 +659,21 @@ def update_tracked_assets(self) -> None:
649659

650660
if self.downstream_assets:
651661
# MaterializingTask: propagate the downstream assets (what we create)
652-
assets_for_downstream = self.downstream_assets[:]
662+
assets_for_downstream = set(self.downstream_assets)
653663
else:
654664
# Regular task: propagate upstream assets + direct dependencies
655-
assets_for_downstream = (
656-
list(self.upstream_assets) + self.direct_asset_dependencies
665+
assets_for_downstream = set(
666+
self.upstream_assets | self.direct_asset_dependencies
657667
)
658668

659669
flow_run_context.task_run_assets[self.task_run_id] = assets_for_downstream
660670

661671
def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]:
662672
"""Serialize the AssetContext for distributed execution."""
663673
return self.model_dump(
674+
# use json serialization so fields that are
675+
# sets of pydantic models are serialized
676+
mode="json",
664677
exclude_unset=True,
665678
serialize_as_any=True,
666679
context={"include_secrets": include_secrets},

0 commit comments

Comments
 (0)