-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
What's the issue?
- I would like to use pure python class
MyClass
insideMyIOManager
class (see snippet in "How to reproduce") MyClass
may require many init params, which I don't want to pollute the configuration with. SoMyResource
defines only subset.- I am using pattern described in: https://docs.dagster.io/guides/build/external-resources/configuring-resources#resources-that-depend-on-other-resources.
- The configuration is complete, so I can run the asset with default scaffolded config.
- But I would like to be able to change the config in UI Launchpad (as is possible with
MyIOManager.my_value
)
I tried to use .configure_at_launch(kwargs) as described in: #18181, but I was unable to change the scaffolded values (only class defaults were templated, kwargs params were not propagated to UI config).
What did you expect to happen?
User can override config of nested configurable resource in Launchpad.
How to reproduce?
Minimal non-working example:
from dataclasses import dataclass
from dagster import (
ConfigurableResource,
Definitions,
asset,
ResourceDependency,
ConfigurableIOManager,
InputContext,
define_asset_job,
InitResourceContext,
)
# Pure python class, which I want to use in Dagster
@dataclass
class MyClass:
my_string: str
my_integer: int
def hello_world(self):
return f"Hello, {self.my_string}! Your number is {self.my_integer}."
# ConfigurableResource class wrapping MyClass
class MyResource(ConfigurableResource):
# I want to allow user to configure only this value
my_string: str = "default"
# Creating instance of pure python class
def create_resource(self, context: InitResourceContext) -> MyClass:
return MyClass(self.my_string, my_integer=42)
class MyIOManager(ConfigurableIOManager):
my_value: str = "default"
# This gets resolved by Dagster, but I am unable to change its configuration in UI
my_class: ResourceDependency[MyClass]
def load_input(self, context: InputContext) -> tuple[str, int]:
raise NotImplementedError()
def handle_output(self, context: InputContext, obj: str) -> None:
context.log.info(f"{self.my_class.my_string=}, {self.my_class.my_integer=}")
@asset(io_manager_key="my_io_manager")
def my_asset() -> str:
return "my_asset"
# "hello" is printed in handle_output, meaning this value gets correctly propagated.
my_resource = MyResource(my_string="hello")
# "override" is shown in Launchpad instead of "default".
my_io_manager = MyIOManager(my_value="override", my_class=my_resource)
defs = Definitions(
assets=[my_asset],
resources={
"my_resource": my_resource,
"my_io_manager": my_io_manager,
},
)
Dagster version
1.11.9
Deployment type
Dagster Helm chart
Deployment details
No response
Additional information
This is minimal reproducible example.
My real case is to have high-throughput azure async blob downloader as pure python class (MyClass), which could be re-used in multiple IO Managers.
I would like to have another pure python class wrapped in ConfigurableResource, that would discover new files on pre-defined storage accounts/containers/blob_path_prefixes. I would like to use this ConfigurableResource (same configuration) in sensor to trigger RunRequest, and in IOManager to get new blobs uris, which will be downloaded by async blob downloader mentioned above.
All of this works.
The only issue is that since the ConfigurableResources / ResourceDependency[PythonClass] are nested resources in InputManager, I have no way to optionally re-configure them in Launchpad (eg. to recalculate only subset of data).
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.