Differential D8684 Diff 40813 examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_materialization_io_managers.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_materialization_io_managers.py
import pandas as pd | import pandas as pd | ||||
from dagster import ModeDefinition, execute_pipeline, io_manager, pipeline, solid | from dagster import ( | ||||
ModeDefinition, | |||||
execute_pipeline, | |||||
io_manager, | |||||
pipeline, | |||||
solid, | |||||
) | |||||
from docs_snippets.concepts.assets.materialization_io_managers import ( | from docs_snippets.concepts.assets.materialization_io_managers import ( | ||||
PandasCsvIOManager, | PandasCsvIOManager, | ||||
PandasCsvIOManagerWithAsset, | PandasCsvIOManagerWithAsset, | ||||
PandasCsvIOManagerWithOutputAsset, | PandasCsvIOManagerWithOutputAsset, | ||||
PandasCsvIOManagerWithOutputAssetPartitions, | PandasCsvIOManagerWithOutputAssetPartitions, | ||||
) | ) | ||||
class DummyClass(pd.DataFrame): | class DummyClass(pd.DataFrame): | ||||
def to_csv(self, _path): # pylint: disable=arguments-differ | def to_csv(self, _path): # pylint: disable=arguments-differ | ||||
return | return | ||||
def _generate_pipeline_for_io_manager(manager, config_schema=None): | def _generate_pipeline_for_io_manager(manager, config_schema=None): | ||||
@io_manager(output_config_schema=config_schema or {}) | @io_manager(output_config_schema=config_schema or {}) | ||||
def custom_io_manager(_): | def custom_io_manager(_): | ||||
return manager | return manager | ||||
@solid | @solid | ||||
def dummy_solid(): | def dummy_solid(): | ||||
return DummyClass.from_dict({"some_column": [2]}) | return DummyClass.from_dict({"some_column": [2]}) | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": custom_io_manager})]) | @pipeline( | ||||
mode_defs=[ | |||||
ModeDefinition(resource_defs={"io_manager": custom_io_manager}) | |||||
] | |||||
) | |||||
def dummy_pipeline(): | def dummy_pipeline(): | ||||
dummy_solid() | dummy_solid() | ||||
return dummy_pipeline | return dummy_pipeline | ||||
def test_pipelines_compile_and_execute(): | def test_pipelines_compile_and_execute(): | ||||
managers = [ | managers = [ | ||||
Show All 10 Lines | |||||
def test_partition_pipelines_compile_and_execute(): | def test_partition_pipelines_compile_and_execute(): | ||||
managers = [ | managers = [ | ||||
PandasCsvIOManagerWithOutputAssetPartitions(), | PandasCsvIOManagerWithOutputAssetPartitions(), | ||||
] | ] | ||||
for manager in managers: | for manager in managers: | ||||
dummy_pipeline = _generate_pipeline_for_io_manager( | dummy_pipeline = _generate_pipeline_for_io_manager( | ||||
manager, config_schema={"partitions": str} | manager, config_schema={"partitions": str} | ||||
) | ) | ||||
config = {"solids": {"dummy_solid": {"outputs": {"result": {"partitions": "2020-01-01"}}}}} | config = { | ||||
"solids": { | |||||
"dummy_solid": { | |||||
"outputs": {"result": {"partitions": "2020-01-01"}} | |||||
} | |||||
} | |||||
} | |||||
result = execute_pipeline(dummy_pipeline, run_config=config) | result = execute_pipeline(dummy_pipeline, run_config=config) | ||||
assert result | assert result | ||||
assert result.success | assert result.success |