Changeset View
Changeset View
Standalone View
Standalone View
docs/content/concepts/io-management/io-managers.mdx
Show First 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | def solid1(): | ||||
return 1 | return 1 | ||||
@solid | @solid | ||||
def solid2(a): | def solid2(a): | ||||
return a + 1 | return a + 1 | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})] | |||||
) | |||||
def my_pipeline(): | def my_pipeline(): | ||||
solid2(solid1()) | solid2(solid1()) | ||||
``` | ``` | ||||
### Per-output IO manager | ### Per-output IO manager | ||||
Not all the outputs in a pipeline should necessarily be stored the same way. Maybe some of the outputs should live on the filesystem so they can be inspected, and others can be transiently stored in memory. | Not all the outputs in a pipeline should necessarily be stored the same way. Maybe some of the outputs should live on the filesystem so they can be inspected, and others can be transiently stored in memory. | ||||
To select the IOManager for a particular output, you can set an `io_manager_key` on the <PyObject module="dagster" object="OutputDefinition" />, and then refer to that `io_manager_key` when setting IO managers in your <PyObject module="dagster" object="ModeDefinition" />. In this example, the output of solid1 will go to `fs_io_manager` and the output of solid2 will go to `mem_io_manager`. | To select the IOManager for a particular output, you can set an `io_manager_key` on the <PyObject module="dagster" object="OutputDefinition" />, and then refer to that `io_manager_key` when setting IO managers in your <PyObject module="dagster" object="ModeDefinition" />. In this example, the output of solid1 will go to `fs_io_manager` and the output of solid2 will go to `mem_io_manager`. | ||||
```python file=/concepts/io_management/io_manager_per_output.py startafter=start_marker endbefore=end_marker | ```python file=/concepts/io_management/io_manager_per_output.py startafter=start_marker endbefore=end_marker | ||||
from dagster import ModeDefinition, OutputDefinition, fs_io_manager, mem_io_manager, pipeline, solid | from dagster import ( | ||||
ModeDefinition, | |||||
OutputDefinition, | |||||
fs_io_manager, | |||||
mem_io_manager, | |||||
pipeline, | |||||
solid, | |||||
) | |||||
@solid(output_defs=[OutputDefinition(io_manager_key="fs")]) | @solid(output_defs=[OutputDefinition(io_manager_key="fs")]) | ||||
def solid1(): | def solid1(): | ||||
return 1 | return 1 | ||||
@solid(output_defs=[OutputDefinition(io_manager_key="mem")]) | @solid(output_defs=[OutputDefinition(io_manager_key="mem")]) | ||||
def solid2(a): | def solid2(a): | ||||
return a + 1 | return a + 1 | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"fs": fs_io_manager, "mem": mem_io_manager})]) | @pipeline( | ||||
mode_defs=[ | |||||
ModeDefinition( | |||||
resource_defs={"fs": fs_io_manager, "mem": mem_io_manager} | |||||
) | |||||
] | |||||
) | |||||
def my_pipeline(): | def my_pipeline(): | ||||
solid2(solid1()) | solid2(solid1()) | ||||
``` | ``` | ||||
## Defining an IO manager | ## Defining an IO manager | ||||
If you have specific requirements for where and how your outputs should be stored and retrieved, you can define your own IOManager. This boils down to implementing two functions: one that stores outputs and one that loads inputs. | If you have specific requirements for where and how your outputs should be stored and retrieved, you can define your own IOManager. This boils down to implementing two functions: one that stores outputs and one that loads inputs. | ||||
Show All 38 Lines | def load_input(self, context): | ||||
return read_dataframe_from_table(name=table_name) | return read_dataframe_from_table(name=table_name) | ||||
@io_manager | @io_manager | ||||
def df_table_io_manager(_): | def df_table_io_manager(_): | ||||
return DataframeTableIOManager() | return DataframeTableIOManager() | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": df_table_io_manager})]) | @pipeline( | ||||
mode_defs=[ | |||||
ModeDefinition(resource_defs={"io_manager": df_table_io_manager}) | |||||
] | |||||
) | |||||
def my_pipeline(): | def my_pipeline(): | ||||
solid2(solid1()) | solid2(solid1()) | ||||
``` | ``` | ||||
### Providing per-output config to an IO manager | ### Providing per-output config to an IO manager | ||||
When launching a run, you might want to parameterize how particular outputs are stored. | When launching a run, you might want to parameterize how particular outputs are stored. | ||||
Show All 15 Lines | |||||
@io_manager(output_config_schema={"table": str}) | @io_manager(output_config_schema={"table": str}) | ||||
def my_io_manager(_): | def my_io_manager(_): | ||||
return MyIOManager() | return MyIOManager() | ||||
``` | ``` | ||||
Then, when executing a pipeline, you can pass in this per-output config. | Then, when executing a pipeline, you can pass in this per-output config. | ||||
```python file=/concepts/io_management/output_config.py startafter=execute_start_marker endbefore=execute_end_marker | ```python file=/concepts/io_management/output_config.py startafter=execute_start_marker endbefore=execute_end_marker | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": my_io_manager})]) | @pipeline( | ||||
mode_defs=[ | |||||
ModeDefinition(resource_defs={"io_manager": my_io_manager}) | |||||
] | |||||
) | |||||
def my_pipeline(): | def my_pipeline(): | ||||
solid2(solid1()) | solid2(solid1()) | ||||
execute_pipeline( | execute_pipeline( | ||||
my_pipeline, | my_pipeline, | ||||
run_config={ | run_config={ | ||||
"solids": { | "solids": { | ||||
"solid1": {"outputs": {"result": {"table": "table1"}}}, | "solid1": {"outputs": {"result": {"table": "table1"}}}, | ||||
"solid2": {"outputs": {"result": {"table": "table2"}}}, | "solid2": {"outputs": {"result": {"table": "table2"}}}, | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
``` | ``` | ||||
### Providing per-output metadata to an IO manager | ### Providing per-output metadata to an IO manager | ||||
You might want to provide static metadata that controls how particular outputs are stored. You don't plan to change the metadata at runtime, so it makes more sense to attach it to a definition rather than expose it as a configuration option. | You might want to provide static metadata that controls how particular outputs are stored. You don't plan to change the metadata at runtime, so it makes more sense to attach it to a definition rather than expose it as a configuration option. | ||||
For example, if your pipeline produces DataFrames to populate tables in a data warehouse, you might want to specify that each output always goes to a particular table. To accomplish this, you can define `metadata` on each <PyObject module="dagster" object="OutputDefinition" />: | For example, if your pipeline produces DataFrames to populate tables in a data warehouse, you might want to specify that each output always goes to a particular table. To accomplish this, you can define `metadata` on each <PyObject module="dagster" object="OutputDefinition" />: | ||||
```python file=/concepts/io_management/metadata.py startafter=solids_start_marker endbefore=solids_end_marker | ```python file=/concepts/io_management/metadata.py startafter=solids_start_marker endbefore=solids_end_marker | ||||
@solid(output_defs=[OutputDefinition(metadata={"schema": "some_schema", "table": "some_table"})]) | @solid( | ||||
output_defs=[ | |||||
OutputDefinition( | |||||
metadata={"schema": "some_schema", "table": "some_table"} | |||||
) | |||||
] | |||||
) | |||||
def solid1(): | def solid1(): | ||||
"""Return a Pandas DataFrame""" | """Return a Pandas DataFrame""" | ||||
@solid(output_defs=[OutputDefinition(metadata={"schema": "other_schema", "table": "other_table"})]) | @solid( | ||||
output_defs=[ | |||||
OutputDefinition( | |||||
metadata={"schema": "other_schema", "table": "other_table"} | |||||
) | |||||
] | |||||
) | |||||
def solid2(_input_dataframe): | def solid2(_input_dataframe): | ||||
"""Return a Pandas DataFrame""" | """Return a Pandas DataFrame""" | ||||
``` | ``` | ||||
The IOManager can then access this metadata when storing or retrieving data, via the <PyObject module="dagster" object="OutputContext" />. | The IOManager can then access this metadata when storing or retrieving data, via the <PyObject module="dagster" object="OutputContext" />. | ||||
In this case, the table names are encoded in the pipeline definition. If, instead, you want to be able to set them at run time, the next section describes how. | In this case, the table names are encoded in the pipeline definition. If, instead, you want to be able to set them at run time, the next section describes how. | ||||
```python file=/concepts/io_management/metadata.py startafter=io_manager_start_marker endbefore=io_manager_end_marker | ```python file=/concepts/io_management/metadata.py startafter=io_manager_start_marker endbefore=io_manager_end_marker | ||||
class MyIOManager(IOManager): | class MyIOManager(IOManager): | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
table_name = context.metadata["table"] | table_name = context.metadata["table"] | ||||
schema = context.metadata["schema"] | schema = context.metadata["schema"] | ||||
write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj) | write_dataframe_to_table( | ||||
name=table_name, schema=schema, dataframe=obj | |||||
) | |||||
def load_input(self, context): | def load_input(self, context): | ||||
table_name = context.upstream_output.metadata["table"] | table_name = context.upstream_output.metadata["table"] | ||||
schema = context.upstream_output.metadata["schema"] | schema = context.upstream_output.metadata["schema"] | ||||
return read_dataframe_from_table(name=table_name, schema=schema) | return read_dataframe_from_table(name=table_name, schema=schema) | ||||
@io_manager | @io_manager | ||||
def my_io_manager(_): | def my_io_manager(_): | ||||
return MyIOManager() | return MyIOManager() | ||||
``` | ``` | ||||
## Testing an IO manager | ## Testing an IO manager | ||||
The easiest way to test an IO manager is to construct an <PyObject module="dagster" object="OutputContext" /> or <PyObject module="dagster" object="InputContext" /> and pass it to the `handle_output` or `load_input` method of the IO manager. The <PyObject object="build_output_context" /> and <PyObject object="build_input_context" /> functions allow for easy construction of these contexts. | The easiest way to test an IO manager is to construct an <PyObject module="dagster" object="OutputContext" /> or <PyObject module="dagster" object="InputContext" /> and pass it to the `handle_output` or `load_input` method of the IO manager. The <PyObject object="build_output_context" /> and <PyObject object="build_input_context" /> functions allow for easy construction of these contexts. | ||||
Here's an example for a simple IO manager that stores outputs in an in-memory dictionary that's keyed on the step and name of the output. | Here's an example for a simple IO manager that stores outputs in an in-memory dictionary that's keyed on the step and name of the output. | ||||
```python file=/concepts/io_management/test_io_manager.py | ```python file=/concepts/io_management/test_io_manager.py | ||||
from dagster import IOManager, build_input_context, build_output_context, io_manager | from dagster import ( | ||||
IOManager, | |||||
build_input_context, | |||||
build_output_context, | |||||
io_manager, | |||||
) | |||||
class MyIOManager(IOManager): | class MyIOManager(IOManager): | ||||
def __init__(self): | def __init__(self): | ||||
self.storage_dict = {} | self.storage_dict = {} | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
self.storage_dict[(context.step_key, context.name)] = obj | self.storage_dict[(context.step_key, context.name)] = obj | ||||
def load_input(self, context): | def load_input(self, context): | ||||
return self.storage_dict[(context.upstream_output.step_key, context.upstream_output.name)] | return self.storage_dict[ | ||||
(context.upstream_output.step_key, context.upstream_output.name) | |||||
] | |||||
@io_manager | @io_manager | ||||
def my_io_manager(_): | def my_io_manager(_): | ||||
return MyIOManager() | return MyIOManager() | ||||
def test_my_io_manager_handle_output(): | def test_my_io_manager_handle_output(): | ||||
manager = my_io_manager(None) | manager = my_io_manager(None) | ||||
context = build_output_context(name="abc", step_key="123") | context = build_output_context(name="abc", step_key="123") | ||||
manager.handle_output(context, 5) | manager.handle_output(context, 5) | ||||
assert manager.storage_dict[("123", "abc")] == 5 | assert manager.storage_dict[("123", "abc")] == 5 | ||||
def test_my_io_manager_load_input(): | def test_my_io_manager_load_input(): | ||||
manager = my_io_manager(None) | manager = my_io_manager(None) | ||||
manager.storage_dict[("123", "abc")] = 5 | manager.storage_dict[("123", "abc")] = 5 | ||||
context = build_input_context(upstream_output=build_output_context(name="abc", step_key="123")) | context = build_input_context( | ||||
upstream_output=build_output_context(name="abc", step_key="123") | |||||
) | |||||
assert manager.load_input(context) == 5 | assert manager.load_input(context) == 5 | ||||
``` | ``` | ||||
## Yielding metadata from an IOManager <Experimental /> | ## Yielding metadata from an IOManager <Experimental /> | ||||
Sometimes, you may want to record some metadata while handling an output in an IOManager. To do this, you can optionally yield <PyObject object="EventMetadataEntry"/> objects from within the body of the `handle_output` function. Using this, we can modify one of the [above examples](/concepts/io-management/io-managers#a-custom-io-manager-that-stores-pandas-dataframes-in-tables) to now include some helpful metadata in the log: | Sometimes, you may want to record some metadata while handling an output in an IOManager. To do this, you can optionally yield <PyObject object="EventMetadataEntry"/> objects from within the body of the `handle_output` function. Using this, we can modify one of the [above examples](/concepts/io-management/io-managers#a-custom-io-manager-that-stores-pandas-dataframes-in-tables) to now include some helpful metadata in the log: | ||||
```python file=/concepts/io_management/custom_io_manager.py startafter=start_metadata_marker endbefore=end_metadata_marker | ```python file=/concepts/io_management/custom_io_manager.py startafter=start_metadata_marker endbefore=end_metadata_marker | ||||
Show All 17 Lines |