Changeset View
Changeset View
Standalone View
Standalone View
docs/content/concepts/io-management/unconnected-inputs.mdx
Show All 40 Lines | def my_pipeline(): | ||||
my_solid() | my_solid() | ||||
``` | ``` | ||||
The <PyObject module="dagster" object="String" /> dagster type has a dagster type loader that allows it to load inputs from config: | The <PyObject module="dagster" object="String" /> dagster type has a dagster type loader that allows it to load inputs from config: | ||||
```python file=/concepts/io_management/load_from_config.py startafter=execute_start_marker endbefore=execute_end_marker | ```python file=/concepts/io_management/load_from_config.py startafter=execute_start_marker endbefore=execute_end_marker | ||||
execute_pipeline( | execute_pipeline( | ||||
my_pipeline, | my_pipeline, | ||||
run_config={"solids": {"my_solid": {"inputs": {"input_string": {"value": "marmot"}}}}}, | run_config={ | ||||
"solids": { | |||||
"my_solid": {"inputs": {"input_string": {"value": "marmot"}}} | |||||
} | |||||
}, | |||||
) | ) | ||||
``` | ``` | ||||
### Loading a custom dagster type from config | ### Loading a custom dagster type from config | ||||
When you have a solid at the beginning of your pipeline that operates on a dagster type that you've defined, you can write your own <PyObject module="dagster" object="DagsterTypeLoader" /> to define how to load that input via run config. | When you have a solid at the beginning of your pipeline that operates on a dagster type that you've defined, you can write your own <PyObject module="dagster" object="DagsterTypeLoader" /> to define how to load that input via run config. | ||||
```python file=/concepts/io_management/load_custom_type_from_config.py startafter=def_start_marker endbefore=def_end_marker | ```python file=/concepts/io_management/load_custom_type_from_config.py startafter=def_start_marker endbefore=def_end_marker | ||||
@dagster_type_loader(config_schema={"diameter": float, "juiciness": float, "cultivar": str}) | @dagster_type_loader( | ||||
config_schema={"diameter": float, "juiciness": float, "cultivar": str} | |||||
) | |||||
def apple_loader(_context, config): | def apple_loader(_context, config): | ||||
return Apple( | return Apple( | ||||
diameter=config["diameter"], juiciness=config["juiciness"], cultivar=config["cultivar"] | diameter=config["diameter"], | ||||
juiciness=config["juiciness"], | |||||
cultivar=config["cultivar"], | |||||
) | ) | ||||
@usable_as_dagster_type(loader=apple_loader) | @usable_as_dagster_type(loader=apple_loader) | ||||
class Apple: | class Apple: | ||||
def __init__(self, diameter, juiciness, cultivar): | def __init__(self, diameter, juiciness, cultivar): | ||||
self.diameter = diameter | self.diameter = diameter | ||||
self.juiciness = juiciness | self.juiciness = juiciness | ||||
Show All 14 Lines | |||||
```python file=/concepts/io_management/load_custom_type_from_config.py startafter=execute_start_marker endbefore=execute_end_marker | ```python file=/concepts/io_management/load_custom_type_from_config.py startafter=execute_start_marker endbefore=execute_end_marker | ||||
execute_pipeline( | execute_pipeline( | ||||
my_pipeline, | my_pipeline, | ||||
run_config={ | run_config={ | ||||
"solids": { | "solids": { | ||||
"my_solid": { | "my_solid": { | ||||
"inputs": { | "inputs": { | ||||
"input_apple": {"diameter": 2.4, "juiciness": 6.0, "cultivar": "honeycrisp"} | "input_apple": { | ||||
"diameter": 2.4, | |||||
"juiciness": 6.0, | |||||
"cultivar": "honeycrisp", | |||||
} | |||||
} | } | ||||
} | } | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
``` | ``` | ||||
### Providing an input manager for a root input <Experimental /> | ### Providing an input manager for a root input <Experimental /> | ||||
When you have a solid at the beginning of a pipeline that operates on data from external source, you might wish to separate that I/O from your solid's business logic, in the same way you would with an IO manager if the solid were loading from an upstream output. | When you have a solid at the beginning of a pipeline that operates on data from external source, you might wish to separate that I/O from your solid's business logic, in the same way you would with an IO manager if the solid were loading from an upstream output. | ||||
To accomplish this, you can define an <PyObject module="dagster" object="RootInputManager" />. | To accomplish this, you can define an <PyObject module="dagster" object="RootInputManager" />. | ||||
```python file=/concepts/io_management/root_input_manager.py startafter=start_marker endbefore=end_marker | ```python file=/concepts/io_management/root_input_manager.py startafter=start_marker endbefore=end_marker | ||||
@solid(input_defs=[InputDefinition("dataframe", root_manager_key="my_root_manager")]) | @solid( | ||||
input_defs=[ | |||||
InputDefinition("dataframe", root_manager_key="my_root_manager") | |||||
] | |||||
) | |||||
def my_solid(dataframe): | def my_solid(dataframe): | ||||
"""Do some stuff""" | """Do some stuff""" | ||||
@root_input_manager | @root_input_manager | ||||
def table1_loader(_): | def table1_loader(_): | ||||
return read_dataframe_from_table(name="table1") | return read_dataframe_from_table(name="table1") | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"my_root_manager": table1_loader})]) | @pipeline( | ||||
mode_defs=[ | |||||
ModeDefinition(resource_defs={"my_root_manager": table1_loader}) | |||||
] | |||||
) | |||||
def my_pipeline(): | def my_pipeline(): | ||||
my_solid() | my_solid() | ||||
``` | ``` | ||||
Setting the `root_manager_key` on an `InputDefinition` controls how that input is loaded in pipelines where it has no upstream output. | Setting the `root_manager_key` on an `InputDefinition` controls how that input is loaded in pipelines where it has no upstream output. | ||||
The <PyObject module="dagster" object="root_input_manager" /> decorator behaves nearly identically to the <PyObject module="dagster" object="resource" /> decorator. It yields an <PyObject module="dagster" object="RootInputManagerDefinition" />, which is a <PyObject module="dagster" object="ResourceDefinition" /> that will produce an <PyObject module="dagster" object="RootInputManager" />. | The <PyObject module="dagster" object="root_input_manager" /> decorator behaves nearly identically to the <PyObject module="dagster" object="resource" /> decorator. It yields an <PyObject module="dagster" object="RootInputManagerDefinition" />, which is a <PyObject module="dagster" object="ResourceDefinition" /> that will produce an <PyObject module="dagster" object="RootInputManager" />. | ||||
### Providing per-input config to a root input manager <Experimental /> | ### Providing per-input config to a root input manager <Experimental /> | ||||
When launching a run, you might want to parameterize how particular root inputs are loaded. | When launching a run, you might want to parameterize how particular root inputs are loaded. | ||||
To accomplish this, you can define an `input_config_schema` on the input manager definition. The load function can access this config when storing or loading data, via the <PyObject module="dagster" object="InputContext" />. | To accomplish this, you can define an `input_config_schema` on the input manager definition. The load function can access this config when storing or loading data, via the <PyObject module="dagster" object="InputContext" />. | ||||
```python file=/concepts/io_management/config_input_manager.py startafter=def_start_marker endbefore=def_end_marker | ```python file=/concepts/io_management/config_input_manager.py startafter=def_start_marker endbefore=def_end_marker | ||||
@root_input_manager(input_config_schema={"table_name": str}) | @root_input_manager(input_config_schema={"table_name": str}) | ||||
def table_loader(context): | def table_loader(context): | ||||
return read_dataframe_from_table(name=context.config["table_name"]) | return read_dataframe_from_table(name=context.config["table_name"]) | ||||
``` | ``` | ||||
Then, when executing a pipeline, you can pass in this per-input config. | Then, when executing a pipeline, you can pass in this per-input config. | ||||
```python file=/concepts/io_management/config_input_manager.py startafter=execute_start_marker endbefore=execute_end_marker | ```python file=/concepts/io_management/config_input_manager.py startafter=execute_start_marker endbefore=execute_end_marker | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"my_root_manager": table_loader})]) | @pipeline( | ||||
mode_defs=[ | |||||
ModeDefinition(resource_defs={"my_root_manager": table_loader}) | |||||
] | |||||
) | |||||
def my_pipeline(): | def my_pipeline(): | ||||
my_solid() | my_solid() | ||||
execute_pipeline( | execute_pipeline( | ||||
my_pipeline, | my_pipeline, | ||||
run_config={"solids": {"my_solid": {"inputs": {"dataframe": {"table_name": "table1"}}}}}, | run_config={ | ||||
"solids": { | |||||
"my_solid": { | |||||
"inputs": {"dataframe": {"table_name": "table1"}} | |||||
} | |||||
} | |||||
}, | |||||
) | ) | ||||
``` | ``` | ||||
### Using a root input manager with subselection <Experimental /> | ### Using a root input manager with subselection <Experimental /> | ||||
You might want to execute a subset of solids in your pipeline and control how the inputs of those solids are loaded. Root input managers also help in these situations, because the inputs at the beginning of the subset become the new "roots". | You might want to execute a subset of solids in your pipeline and control how the inputs of those solids are loaded. Root input managers also help in these situations, because the inputs at the beginning of the subset become the new "roots". | ||||
For example, you might have `solid1` that normally produces a table that `solid2` consumes. To debug `solid2`, you might want to run it on a different table than the one normally produced by `solid1`. | For example, you might have `solid1` that normally produces a table that `solid2` consumes. To debug `solid2`, you might want to run it on a different table than the one normally produced by `solid1`. | ||||
Show All 20 Lines | def my_io_manager(_): | ||||
return MyIOManager() | return MyIOManager() | ||||
@solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager")]) | @solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager")]) | ||||
def solid1(): | def solid1(): | ||||
"""Do stuff""" | """Do stuff""" | ||||
@solid(input_defs=[InputDefinition("dataframe", root_manager_key="my_root_input_manager")]) | @solid( | ||||
input_defs=[ | |||||
InputDefinition("dataframe", root_manager_key="my_root_input_manager") | |||||
] | |||||
) | |||||
def solid2(dataframe): | def solid2(dataframe): | ||||
"""Do stuff""" | """Do stuff""" | ||||
@pipeline( | @pipeline( | ||||
mode_defs=[ | mode_defs=[ | ||||
ModeDefinition( | ModeDefinition( | ||||
resource_defs={ | resource_defs={ | ||||
"my_io_manager": my_io_manager, | "my_io_manager": my_io_manager, | ||||
"my_root_input_manager": my_root_input_manager, | "my_root_input_manager": my_root_input_manager, | ||||
} | } | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
def my_pipeline(): | def my_pipeline(): | ||||
solid2(solid1()) | solid2(solid1()) | ||||
``` | ``` | ||||
When running the full pipeline, `solid2`'s input will be loaded using the IO manager on the output of `solid1`. When running the pipeline subset, `solid2`'s input has no upstream output, so the input manager corresponding to its `root_manager_key` is used. | When running the full pipeline, `solid2`'s input will be loaded using the IO manager on the output of `solid1`. When running the pipeline subset, `solid2`'s input has no upstream output, so the input manager corresponding to its `root_manager_key` is used. | ||||
```python file=/concepts/io_management/subselection.py startafter=start_execute_subselection endbefore=end_execute_subselection | ```python file=/concepts/io_management/subselection.py startafter=start_execute_subselection endbefore=end_execute_subselection | ||||
execute_pipeline( | execute_pipeline( | ||||
my_pipeline, | my_pipeline, | ||||
solid_selection=["solid2"], | solid_selection=["solid2"], | ||||
run_config={"solids": {"solid2": {"inputs": {"dataframe": {"table_name": "tableX"}}}}}, | run_config={ | ||||
"solids": { | |||||
"solid2": {"inputs": {"dataframe": {"table_name": "tableX"}}} | |||||
} | |||||
}, | |||||
) | ) | ||||
``` | ``` |