Differential D5083 Diff 25786 python_modules/dagster/dagster_tests/core_tests/execution_tests/memoized_dev_loop_pipeline.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/execution_tests/memoized_dev_loop_pipeline.py
import os | import os | ||||
from dagster import Field, InputDefinition, ModeDefinition, Output, String, pipeline, solid | from dagster import ( | ||||
Field, | |||||
InputDefinition, | |||||
ModeDefinition, | |||||
Output, | |||||
OutputDefinition, | |||||
String, | |||||
pipeline, | |||||
repository, | |||||
solid, | |||||
) | |||||
from dagster.core.storage.asset_store import versioned_filesystem_asset_store | |||||
from dagster.core.storage.system_storage import fs_intermediate_storage | from dagster.core.storage.system_storage import fs_intermediate_storage | ||||
@solid( | @solid( | ||||
version="create_string_version", | version="create_string_version", | ||||
config_schema={"input_str": Field(String), "base_dir": Field(String)}, | config_schema={"input_str": Field(String), "base_dir": Field(String)}, | ||||
) | ) | ||||
def create_string_1(context): | def create_string_1(context): | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | |||||
@pipeline( | @pipeline( | ||||
mode_defs=[ModeDefinition("only_mode", intermediate_storage_defs=[fs_intermediate_storage])] | mode_defs=[ModeDefinition("only_mode", intermediate_storage_defs=[fs_intermediate_storage])] | ||||
) | ) | ||||
def basic_pipeline(): | def basic_pipeline(): | ||||
take_string_two_inputs( | take_string_two_inputs( | ||||
_string_input_1=take_string_1(create_string_1()), | _string_input_1=take_string_1(create_string_1()), | ||||
_string_input_2=take_string_2(create_string_2()), | _string_input_2=take_string_2(create_string_2()), | ||||
) | ) | ||||
@solid( | |||||
version="create_string_version", | |||||
config_schema={"input_str": Field(String)}, | |||||
output_defs=[ | |||||
OutputDefinition(name="created_string", asset_store_key="asset_store", asset_metadata={}) | |||||
], | |||||
) | |||||
def create_string_1_asset(context): | |||||
return context.solid_config["input_str"] | |||||
@solid( | |||||
input_defs=[InputDefinition("_string_input", String)], | |||||
version="take_string_version", | |||||
config_schema={"input_str": Field(String)}, | |||||
output_defs=[ | |||||
OutputDefinition(name="taken_string", asset_store_key="asset_store", asset_metadata={}) | |||||
], | |||||
) | |||||
def take_string_1_asset(context, _string_input): | |||||
return context.solid_config["input_str"] + _string_input | |||||
@pipeline( | |||||
mode_defs=[ | |||||
ModeDefinition("only_mode", resource_defs={"asset_store": versioned_filesystem_asset_store}) | |||||
] | |||||
) | |||||
def asset_pipeline(): | |||||
return take_string_1_asset(create_string_1_asset()) | |||||
@repository | |||||
def memoized_dev_repo(): | |||||
return [basic_pipeline, asset_pipeline] |