Changeset View
Changeset View
Standalone View
Standalone View
docs/content/concepts/assets/asset-materializations.mdx
Show First 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | |||||
into something like this: | into something like this: | ||||
```python file=/concepts/assets/materialization_solids.py startafter=start_materialization_solids_marker_1 endbefore=end_materialization_solids_marker_1 | ```python file=/concepts/assets/materialization_solids.py startafter=start_materialization_solids_marker_1 endbefore=end_materialization_solids_marker_1 | ||||
@solid | @solid | ||||
def my_materialization_solid(context): | def my_materialization_solid(context): | ||||
df = read_df() | df = read_df() | ||||
remote_storage_path = persist_to_storage(df) | remote_storage_path = persist_to_storage(df) | ||||
yield AssetMaterialization(asset_key="my_dataset", description="Persisted result to storage") | yield AssetMaterialization( | ||||
asset_key="my_dataset", description="Persisted result to storage" | |||||
) | |||||
yield Output(remote_storage_path) | yield Output(remote_storage_path) | ||||
``` | ``` | ||||
Note: Our materialization solid must now explicitly yield an <PyObject module="dagster" object="Output" /> event instead of relying on the implicit conversion of the return value into an <PyObject module="dagster" object="Output" /> event. | Note: Our materialization solid must now explicitly yield an <PyObject module="dagster" object="Output" /> event instead of relying on the implicit conversion of the return value into an <PyObject module="dagster" object="Output" /> event. | ||||
We should now see a materialization event in the event log when we execute this solid. | We should now see a materialization event in the event log when we execute this solid. | ||||
<!-- This was generated with: | <!-- This was generated with: | ||||
Show All 9 Lines | |||||
### Yielding an AssetMaterialization from an IOManager | ### Yielding an AssetMaterialization from an IOManager | ||||
To record that an <PyObject object ="IOManager"/> has mutated or created an asset, we can yield an <PyObject module="dagster" object="AssetMaterialization" /> event from its `handle_output` method. | To record that an <PyObject object ="IOManager"/> has mutated or created an asset, we can yield an <PyObject module="dagster" object="AssetMaterialization" /> event from its `handle_output` method. | ||||
```python file=/concepts/assets/materialization_io_managers.py startafter=start_marker_0 endbefore=end_marker_0 | ```python file=/concepts/assets/materialization_io_managers.py startafter=start_marker_0 endbefore=end_marker_0 | ||||
class PandasCsvIOManager(IOManager): | class PandasCsvIOManager(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key=AssetKey(file_path), description="Persisted result to storage." | asset_key=AssetKey(file_path), | ||||
description="Persisted result to storage.", | |||||
) | ) | ||||
``` | ``` | ||||
### Attaching Metadata to an AssetMaterialization | ### Attaching Metadata to an AssetMaterialization | ||||
There are a variety of types of metadata that can be associated with a materialization event, all through the <PyObject module="dagster" object="EventMetadataEntry" /> class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the [Asset Catalog](/concepts/dagit/dagit#assets). | There are a variety of types of metadata that can be associated with a materialization event, all through the <PyObject module="dagster" object="EventMetadataEntry" /> class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the [Asset Catalog](/concepts/dagit/dagit#assets). | ||||
#### Example: Solid body | #### Example: Solid body | ||||
```python file=concepts/assets/materialization_solids.py startafter=start_materialization_solids_marker_2 endbefore=end_materialization_solids_marker_2 | ```python file=concepts/assets/materialization_solids.py startafter=start_materialization_solids_marker_2 endbefore=end_materialization_solids_marker_2 | ||||
@solid | @solid | ||||
def my_metadata_materialization_solid(context): | def my_metadata_materialization_solid(context): | ||||
df = read_df() | df = read_df() | ||||
remote_storage_path = persist_to_storage(df) | remote_storage_path = persist_to_storage(df) | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="my_dataset", | asset_key="my_dataset", | ||||
description="Persisted result to storage", | description="Persisted result to storage", | ||||
metadata={ | metadata={ | ||||
"text_metadata": "Text-based metadata for this event", | "text_metadata": "Text-based metadata for this event", | ||||
"path": EventMetadata.path(remote_storage_path), | "path": EventMetadata.path(remote_storage_path), | ||||
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"), | "dashboard_url": EventMetadata.url( | ||||
"http://mycoolsite.com/url_for_my_data" | |||||
), | |||||
"size (bytes)": calculate_bytes(df), | "size (bytes)": calculate_bytes(df), | ||||
}, | }, | ||||
) | ) | ||||
yield Output(remote_storage_path) | yield Output(remote_storage_path) | ||||
``` | ``` | ||||
#### Example: IOManager | #### Example: IOManager | ||||
```python file=concepts/assets/materialization_io_managers.py startafter=start_marker_1 endbefore=end_marker_1 | ```python file=concepts/assets/materialization_io_managers.py startafter=start_marker_1 endbefore=end_marker_1 | ||||
class PandasCsvIOManagerWithAsset(IOManager): | class PandasCsvIOManagerWithAsset(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key=AssetKey(file_path), | asset_key=AssetKey(file_path), | ||||
description="Persisted result to storage.", | description="Persisted result to storage.", | ||||
metadata={ | metadata={ | ||||
"number of rows": obj.shape[0], | "number of rows": obj.shape[0], | ||||
Show All 9 Lines | |||||
If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object. | If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object. | ||||
```python file=/concepts/assets/materialization_solids.py startafter=start_partitioned_asset_materialization endbefore=end_partitioned_asset_materialization | ```python file=/concepts/assets/materialization_solids.py startafter=start_partitioned_asset_materialization endbefore=end_partitioned_asset_materialization | ||||
@solid(config_schema={"date": str}) | @solid(config_schema={"date": str}) | ||||
def my_partitioned_asset_solid(context): | def my_partitioned_asset_solid(context): | ||||
partition_date = context.solid_config["date"] | partition_date = context.solid_config["date"] | ||||
df = read_df_for_date(partition_date) | df = read_df_for_date(partition_date) | ||||
remote_storage_path = persist_to_storage(df) | remote_storage_path = persist_to_storage(df) | ||||
yield AssetMaterialization(asset_key="my_dataset", partition=partition_date) | yield AssetMaterialization( | ||||
asset_key="my_dataset", partition=partition_date | |||||
) | |||||
yield Output(remote_storage_path) | yield Output(remote_storage_path) | ||||
``` | ``` | ||||
## Linking Solid Outputs to Assets <Experimental /> | ## Linking Solid Outputs to Assets <Experimental /> | ||||
It is fairly common for an asset to correspond to a solid output. In the following simplified example, our solid produces a dataframe, persists it to storage, and then passes the dataframe along as an output: | It is fairly common for an asset to correspond to a solid output. In the following simplified example, our solid produces a dataframe, persists it to storage, and then passes the dataframe along as an output: | ||||
```python file=concepts/assets/materialization_solids.py startafter=start_simple_asset_solid endbefore=end_simple_asset_solid | ```python file=concepts/assets/materialization_solids.py startafter=start_simple_asset_solid endbefore=end_simple_asset_solid | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | |||||
If you've defined a custom <PyObject object="IOManager"/> to handle storing your solid's outputs, the <PyObject object="IOManager"/> will likely be the most natural place to define which asset a particular output will be written to. To do this, you can implement the `get_output_asset_key` function on your <PyObject object="IOManager"/>. | If you've defined a custom <PyObject object="IOManager"/> to handle storing your solid's outputs, the <PyObject object="IOManager"/> will likely be the most natural place to define which asset a particular output will be written to. To do this, you can implement the `get_output_asset_key` function on your <PyObject object="IOManager"/>. | ||||
Similar to the above interface, this function takes an <PyObject object="OutputContext"/> and returns an <PyObject object="AssetKey"/>. The following example functions nearly identically to `PandasCsvIOManagerWithMetadata` from the [runtime example](/concepts/assets/asset-materializations#example-iomanager) above. | Similar to the above interface, this function takes an <PyObject object="OutputContext"/> and returns an <PyObject object="AssetKey"/>. The following example functions nearly identically to `PandasCsvIOManagerWithMetadata` from the [runtime example](/concepts/assets/asset-materializations#example-iomanager) above. | ||||
```python file=/concepts/assets/materialization_io_managers.py startafter=start_asset_def endbefore=end_asset_def | ```python file=/concepts/assets/materialization_io_managers.py startafter=start_asset_def endbefore=end_asset_def | ||||
class PandasCsvIOManagerWithOutputAsset(IOManager): | class PandasCsvIOManagerWithOutputAsset(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | ||||
yield EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean") | yield EventMetadataEntry.float( | ||||
obj["some_column"].mean(), "some_column mean" | |||||
) | |||||
def get_output_asset_key(self, context): | def get_output_asset_key(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return AssetKey(file_path) | return AssetKey(file_path) | ||||
``` | ``` | ||||
When an output is linked to an asset in this way, the generated <PyObject object="AssetMaterialization" /> event will contain any <PyObject object="EventMetadataEntry" /> information yielded from the `handle_output` function (in addiition to all of the `metadata` specified on the corresponding <PyObject object="Output" /> event). | When an output is linked to an asset in this way, the generated <PyObject object="AssetMaterialization" /> event will contain any <PyObject object="EventMetadataEntry" /> information yielded from the `handle_output` function (in addiition to all of the `metadata` specified on the corresponding <PyObject object="Output" /> event). | ||||
See the [IOManager docs](/concepts/io-management/io-managers#yielding-metadata-from-an-iomanager) for more information on yielding these entries from an IOManager. | See the [IOManager docs](/concepts/io-management/io-managers#yielding-metadata-from-an-iomanager) for more information on yielding these entries from an IOManager. | ||||
#### Specifying partitions for an output-linked asset | #### Specifying partitions for an output-linked asset | ||||
If you are already specifying a `get_output_asset_key` function on your <PyObject object="IOManager" />, you can optionally specify a set of partitions that this manager will be updating or creating by also defining a `get_output_asset_partitions` function. If you do this, an <PyObject object="AssetMaterialization" /> will be created for each of the specified partitions. One useful pattern to pass this partition information (which will likely vary each run) to the manager, is to specify the set of partitions on the configuration of the output. You can do this by providing [per-output configuration](/concepts/io-management/io-managers#providing-per-output-config-to-an-io-manager) on the IOManager. | If you are already specifying a `get_output_asset_key` function on your <PyObject object="IOManager" />, you can optionally specify a set of partitions that this manager will be updating or creating by also defining a `get_output_asset_partitions` function. If you do this, an <PyObject object="AssetMaterialization" /> will be created for each of the specified partitions. One useful pattern to pass this partition information (which will likely vary each run) to the manager, is to specify the set of partitions on the configuration of the output. You can do this by providing [per-output configuration](/concepts/io-management/io-managers#providing-per-output-config-to-an-io-manager) on the IOManager. | ||||
Then, you can calculate the asset partitions that a particular output will correspond to by reading this output configuration in `get_output_asset_partitions`: | Then, you can calculate the asset partitions that a particular output will correspond to by reading this output configuration in `get_output_asset_partitions`: | ||||
```python file=/concepts/assets/materialization_io_managers.py startafter=start_partitioned_asset_def endbefore=end_partitioned_asset_def | ```python file=/concepts/assets/materialization_io_managers.py startafter=start_partitioned_asset_def endbefore=end_partitioned_asset_def | ||||
class PandasCsvIOManagerWithOutputAssetPartitions(IOManager): | class PandasCsvIOManagerWithOutputAssetPartitions(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | ||||
yield EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean") | yield EventMetadataEntry.float( | ||||
obj["some_column"].mean(), "some_column mean" | |||||
) | |||||
def get_output_asset_key(self, context): | def get_output_asset_key(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return AssetKey(file_path) | return AssetKey(file_path) | ||||
def get_output_asset_partitions(self, context): | def get_output_asset_partitions(self, context): | ||||
return set(context.config["partitions"]) | return set(context.config["partitions"]) | ||||
``` | ``` | ||||
## Asset Lineage <Experimental /> | ## Asset Lineage <Experimental /> | ||||
When a solid output is linked to an <PyObject object="AssetKey"/>, Dagster can automatically generate lineage information that describes how this asset relates to other output-linked assets. | When a solid output is linked to an <PyObject object="AssetKey"/>, Dagster can automatically generate lineage information that describes how this asset relates to other output-linked assets. | ||||
As a simplified example, imagine a two-solid pipeline that first scrapes some user data from an API, storing it to a table, then trains an ML model on that data, persisting it to a model store: | As a simplified example, imagine a two-solid pipeline that first scrapes some user data from an API, storing it to a table, then trains an ML model on that data, persisting it to a model store: | ||||
```python file=/concepts/assets/materialization_pipelines.py startafter=start_pipeline_0 endbefore=end_pipeline_0 | ```python file=/concepts/assets/materialization_pipelines.py startafter=start_pipeline_0 endbefore=end_pipeline_0 | ||||
from dagster import solid, pipeline, OutputDefinition, AssetKey | from dagster import solid, pipeline, OutputDefinition, AssetKey | ||||
@solid(output_defs=[OutputDefinition(asset_key=AssetKey("my_db.users"))]) | @solid(output_defs=[OutputDefinition(asset_key=AssetKey("my_db.users"))]) | ||||
def scrape_users(): | def scrape_users(): | ||||
users_df = some_api_call() | users_df = some_api_call() | ||||
persist_to_db(users_df) | persist_to_db(users_df) | ||||
return users_df | return users_df | ||||
@solid(output_defs=[OutputDefinition(asset_key=AssetKey("ml_models.user_prediction"))]) | @solid( | ||||
output_defs=[ | |||||
OutputDefinition(asset_key=AssetKey("ml_models.user_prediction")) | |||||
] | |||||
) | |||||
def get_prediction_model(users_df): | def get_prediction_model(users_df): | ||||
my_ml_model = train_prediction_model(users_df) | my_ml_model = train_prediction_model(users_df) | ||||
persist_to_model_store(my_ml_model) | persist_to_model_store(my_ml_model) | ||||
return my_ml_model | return my_ml_model | ||||
@pipeline | @pipeline | ||||
def my_user_model_pipeline(): | def my_user_model_pipeline(): | ||||
Show All 19 Lines |