Differential D8684 Diff 40807 examples/docs_snippets/docs_snippets/concepts/io_management/metadata.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/concepts/io_management/metadata.py
from dagster import IOManager, ModeDefinition, OutputDefinition, io_manager, pipeline, solid | from dagster import ( | ||||
IOManager, | |||||
ModeDefinition, | |||||
OutputDefinition, | |||||
io_manager, | |||||
pipeline, | |||||
solid, | |||||
) | |||||
def connect(): | def connect(): | ||||
pass | pass | ||||
def write_dataframe_to_table(**_kwargs): | def write_dataframe_to_table(**_kwargs): | ||||
pass | pass | ||||
def read_dataframe_from_table(**_kwargs): | def read_dataframe_from_table(**_kwargs): | ||||
pass | pass | ||||
# solids_start_marker | # solids_start_marker | ||||
@solid(output_defs=[OutputDefinition(metadata={"schema": "some_schema", "table": "some_table"})]) | @solid( | ||||
output_defs=[ | |||||
OutputDefinition( | |||||
metadata={"schema": "some_schema", "table": "some_table"} | |||||
) | |||||
] | |||||
) | |||||
def solid1(): | def solid1(): | ||||
"""Return a Pandas DataFrame""" | """Return a Pandas DataFrame""" | ||||
@solid(output_defs=[OutputDefinition(metadata={"schema": "other_schema", "table": "other_table"})]) | @solid( | ||||
output_defs=[ | |||||
OutputDefinition( | |||||
metadata={"schema": "other_schema", "table": "other_table"} | |||||
) | |||||
] | |||||
) | |||||
def solid2(_input_dataframe): | def solid2(_input_dataframe): | ||||
"""Return a Pandas DataFrame""" | """Return a Pandas DataFrame""" | ||||
# solids_end_marker | # solids_end_marker | ||||
# io_manager_start_marker | # io_manager_start_marker | ||||
class MyIOManager(IOManager): | class MyIOManager(IOManager): | ||||
def handle_output(self, context, obj): | def handle_output(self, context, obj): | ||||
table_name = context.metadata["table"] | table_name = context.metadata["table"] | ||||
schema = context.metadata["schema"] | schema = context.metadata["schema"] | ||||
write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj) | write_dataframe_to_table( | ||||
name=table_name, schema=schema, dataframe=obj | |||||
) | |||||
def load_input(self, context): | def load_input(self, context): | ||||
table_name = context.upstream_output.metadata["table"] | table_name = context.upstream_output.metadata["table"] | ||||
schema = context.upstream_output.metadata["schema"] | schema = context.upstream_output.metadata["schema"] | ||||
return read_dataframe_from_table(name=table_name, schema=schema) | return read_dataframe_from_table(name=table_name, schema=schema) | ||||
@io_manager | @io_manager | ||||
def my_io_manager(_): | def my_io_manager(_): | ||||
return MyIOManager() | return MyIOManager() | ||||
# io_manager_end_marker | # io_manager_end_marker | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": my_io_manager})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"io_manager": my_io_manager})] | |||||
) | |||||
def my_pipeline(): | def my_pipeline(): | ||||
solid2(solid1()) | solid2(solid1()) |