@@ -400,7 +400,7 @@ class EngineContext(RunContext):
400400
401401 # Tracking information needed to track asset linage between
402402 # tasks and materialization
403- task_run_assets : dict [UUID , list [Asset ]] = Field (default_factory = dict )
403+ task_run_assets : dict [UUID , set [Asset ]] = Field (default_factory = dict )
404404
405405 # Events worker to emit events
406406 events : Optional [EventsWorker ] = None
@@ -486,9 +486,9 @@ class AssetContext(ContextModel):
486486 materialization_metadata: Metadata for materialized assets
487487 """
488488
489- direct_asset_dependencies : list [Asset ] = Field (default_factory = list )
490- downstream_assets : list [Asset ] = Field (default_factory = list )
491- upstream_assets : list [Asset ] = Field (default_factory = list )
489+ direct_asset_dependencies : set [Asset ] = Field (default_factory = set )
490+ downstream_assets : set [Asset ] = Field (default_factory = set )
491+ upstream_assets : set [Asset ] = Field (default_factory = set )
492492 materialized_by : Optional [str ] = None
493493 task_run_id : Optional [UUID ] = None
494494 materialization_metadata : dict [str , dict [str , Any ]] = Field (default_factory = dict )
@@ -516,7 +516,7 @@ def from_task_and_inputs(
516516 from prefect .client .schemas import TaskRunResult
517517 from prefect .tasks import MaterializingTask
518518
519- upstream_assets : list [Asset ] = []
519+ upstream_assets : set [Asset ] = set ()
520520
521521 # Get upstream assets from engine context instead of TaskRunResult.assets
522522 flow_ctx = FlowRunContext .get ()
@@ -527,13 +527,15 @@ def from_task_and_inputs(
527527 # Look up assets in the engine context
528528 task_assets = flow_ctx .task_run_assets .get (task_input .id )
529529 if task_assets :
530- upstream_assets .extend (task_assets )
530+ upstream_assets .update (task_assets )
531531
532532 ctx = cls (
533- direct_asset_dependencies = task .asset_deps [:] if task .asset_deps else [],
534- downstream_assets = task .assets [:]
533+ direct_asset_dependencies = set (task .asset_deps )
534+ if task .asset_deps
535+ else set (),
536+ downstream_assets = set (task .assets )
535537 if isinstance (task , MaterializingTask ) and task .assets
536- else [] ,
538+ else set () ,
537539 upstream_assets = upstream_assets ,
538540 materialized_by = task .materialized_by
539541 if isinstance (task , MaterializingTask )
@@ -551,7 +553,15 @@ def add_asset_metadata(self, asset_key: str, metadata: dict[str, Any]) -> None:
551553 Args:
552554 asset_key: The asset key
553555 metadata: Metadata dictionary to add
556+
557+ Raises:
558+ ValueError: If asset_key is not in downstream_assets
554559 """
560+ downstream_keys = {asset .key for asset in self .downstream_assets }
561+ if asset_key not in downstream_keys :
562+ raise ValueError (
563+ "Can only add metadata to assets that are arguments to @materialize"
564+ )
555565
556566 existing = self .materialization_metadata .get (asset_key , {})
557567 self .materialization_metadata [asset_key ] = existing | metadata
@@ -596,46 +606,46 @@ def related_materialized_by(by: str) -> dict[str, str]:
596606
597607 def emit_events (self , state : State ) -> None :
598608 """
599- Emit asset reference and materialization events based on task completion.
609+ Emit asset events
600610 """
601611
602612 from prefect .events import emit_event
603613
604614 if state .name == "Cached" :
605615 return
606- if state .is_failed ():
616+ elif state .is_failed ():
607617 event_status = "failed"
608618 elif state .is_completed ():
609619 event_status = "succeeded"
610620 else :
611621 return
612622
613- asset_deps_related : list [Asset ] = []
623+ # If we have no downstream assets, this not a materialization
624+ if not self .downstream_assets :
625+ return
614626
615- # Emit reference events for direct asset dependencies
616- for asset in self .direct_asset_dependencies :
627+ # Emit reference events for all upstream assets (direct + inherited)
628+ all_upstream_assets = self .upstream_assets | self .direct_asset_dependencies
629+ for asset in all_upstream_assets :
617630 emit_event (
618- event = f "prefect.asset.reference. { event_status } " ,
631+ event = "prefect.asset.referenced " ,
619632 resource = self .asset_as_resource (asset ),
620633 related = [],
621634 )
622- asset_deps_related .append (self .asset_as_related (asset ))
623635
624636 # Emit materialization events for downstream assets
625- if self .downstream_assets :
626- upstream_related = [self .asset_as_related (a ) for a in self .upstream_assets ]
627- all_related = upstream_related + asset_deps_related
628-
629- if self .materialized_by :
630- all_related .append (self .related_materialized_by (self .materialized_by ))
631-
632- for asset in self .downstream_assets :
633- emit_event (
634- event = f"prefect.asset.materialization.{ event_status } " ,
635- resource = self .asset_as_resource (asset ),
636- related = all_related ,
637- payload = self .materialization_metadata .get (asset .key ),
638- )
637+ upstream_related = [self .asset_as_related (a ) for a in all_upstream_assets ]
638+
639+ if self .materialized_by :
640+ upstream_related .append (self .related_materialized_by (self .materialized_by ))
641+
642+ for asset in self .downstream_assets :
643+ emit_event (
644+ event = f"prefect.asset.materialization.{ event_status } " ,
645+ resource = self .asset_as_resource (asset ),
646+ related = upstream_related ,
647+ payload = self .materialization_metadata .get (asset .key ),
648+ )
639649
640650 def update_tracked_assets (self ) -> None :
641651 """
@@ -649,18 +659,21 @@ def update_tracked_assets(self) -> None:
649659
650660 if self .downstream_assets :
651661 # MaterializingTask: propagate the downstream assets (what we create)
652- assets_for_downstream = self .downstream_assets [:]
662+ assets_for_downstream = set ( self .downstream_assets )
653663 else :
654664 # Regular task: propagate upstream assets + direct dependencies
655- assets_for_downstream = (
656- list ( self .upstream_assets ) + self .direct_asset_dependencies
665+ assets_for_downstream = set (
666+ self .upstream_assets | self .direct_asset_dependencies
657667 )
658668
659669 flow_run_context .task_run_assets [self .task_run_id ] = assets_for_downstream
660670
661671 def serialize (self : Self , include_secrets : bool = True ) -> dict [str , Any ]:
662672 """Serialize the AssetContext for distributed execution."""
663673 return self .model_dump (
674+ # use json serialization so fields that are
675+ # sets of pydantic models are serialized
676+ mode = "json" ,
664677 exclude_unset = True ,
665678 serialize_as_any = True ,
666679 context = {"include_secrets" : include_secrets },
0 commit comments