Differential D8684 Diff 40955 examples/docs_snippets/docs_snippets/concepts/assets/materialization_io_managers.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/concepts/assets/materialization_io_managers.py
import os | import os | ||||
import pandas as pd | import pandas as pd | ||||
from dagster import AssetKey, AssetMaterialization, EventMetadataEntry, IOManager | from dagster import ( | ||||
AssetKey, | |||||
AssetMaterialization, | |||||
EventMetadataEntry, | |||||
IOManager, | |||||
) | |||||
def read_csv(_path): | def read_csv(_path): | ||||
return pd.DataFrame() | return pd.DataFrame() | ||||
# start_marker_0 | # start_marker_0 | ||||
class PandasCsvIOManager(IOManager): | class PandasCsvIOManager(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key=AssetKey(file_path), description="Persisted result to storage." | asset_key=AssetKey(file_path), | ||||
description="Persisted result to storage.", | |||||
) | ) | ||||
# end_marker_0 | # end_marker_0 | ||||
# start_marker_1 | # start_marker_1 | ||||
class PandasCsvIOManagerWithAsset(IOManager): | class PandasCsvIOManagerWithAsset(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key=AssetKey(file_path), | asset_key=AssetKey(file_path), | ||||
description="Persisted result to storage.", | description="Persisted result to storage.", | ||||
metadata={ | metadata={ | ||||
"number of rows": obj.shape[0], | "number of rows": obj.shape[0], | ||||
"some_column mean": obj["some_column"].mean(), | "some_column mean": obj["some_column"].mean(), | ||||
}, | }, | ||||
) | ) | ||||
# end_marker_1 | # end_marker_1 | ||||
# start_asset_def | # start_asset_def | ||||
class PandasCsvIOManagerWithOutputAsset(IOManager): | class PandasCsvIOManagerWithOutputAsset(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | ||||
yield EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean") | yield EventMetadataEntry.float( | ||||
obj["some_column"].mean(), "some_column mean" | |||||
) | |||||
def get_output_asset_key(self, context): | def get_output_asset_key(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return AssetKey(file_path) | return AssetKey(file_path) | ||||
# end_asset_def | # end_asset_def | ||||
# start_partitioned_asset_def | # start_partitioned_asset_def | ||||
class PandasCsvIOManagerWithOutputAssetPartitions(IOManager): | class PandasCsvIOManagerWithOutputAssetPartitions(IOManager): | ||||
def load_input(self, context): | def load_input(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return read_csv(file_path) | return read_csv(file_path) | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
obj.to_csv(file_path) | obj.to_csv(file_path) | ||||
yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | yield EventMetadataEntry.int(obj.shape[0], label="number of rows") | ||||
yield EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean") | yield EventMetadataEntry.float( | ||||
obj["some_column"].mean(), "some_column mean" | |||||
) | |||||
def get_output_asset_key(self, context): | def get_output_asset_key(self, context): | ||||
file_path = os.path.join("my_base_dir", context.step_key, context.name) | file_path = os.path.join( | ||||
"my_base_dir", context.step_key, context.name | |||||
) | |||||
return AssetKey(file_path) | return AssetKey(file_path) | ||||
def get_output_asset_partitions(self, context): | def get_output_asset_partitions(self, context): | ||||
return set(context.config["partitions"]) | return set(context.config["partitions"]) | ||||
# end_partitioned_asset_def | # end_partitioned_asset_def |