Differential D5083 Diff 26697 python_modules/dagster/dagster_tests/core_tests/execution_tests/memoized_dev_loop_pipeline.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/execution_tests/memoized_dev_loop_pipeline.py
import os | from dagster import ( | ||||
Field, | |||||
from dagster import Field, InputDefinition, ModeDefinition, Output, String, pipeline, solid | InputDefinition, | ||||
from dagster.core.storage.system_storage import fs_intermediate_storage | ModeDefinition, | ||||
OutputDefinition, | |||||
String, | |||||
@solid( | pipeline, | ||||
version="create_string_version", | repository, | ||||
config_schema={"input_str": Field(String), "base_dir": Field(String)}, | solid, | ||||
) | |||||
def create_string_1(context): | |||||
yield Output( | |||||
context.solid_config["input_str"], | |||||
address=os.path.join( | |||||
context.solid_config["base_dir"], "intermediates/create_string_1.compute/result" | |||||
), | |||||
) | ) | ||||
from dagster.core.storage.asset_store import versioned_filesystem_asset_store | |||||
@solid( | @solid( | ||||
version="create_string_version_2", | version="create_string_version", | ||||
config_schema={"input_str": Field(String), "base_dir": Field(String)}, | config_schema={"input_str": Field(String)}, | ||||
) | output_defs=[ | ||||
def create_string_2(context): | OutputDefinition(name="created_string", asset_store_key="asset_store", asset_metadata={}) | ||||
yield Output( | ], | ||||
context.solid_config["input_str"], | |||||
address=os.path.join( | |||||
context.solid_config["base_dir"], "intermediates/create_string_2.compute/result" | |||||
), | |||||
) | ) | ||||
def create_string_1_asset(context): | |||||
return context.solid_config["input_str"] | |||||
@solid( | @solid( | ||||
input_defs=[InputDefinition("_string_input", String)], | input_defs=[InputDefinition("_string_input", String)], | ||||
version="take_string_version", | version="take_string_version", | ||||
config_schema={"input_str": Field(String), "base_dir": Field(String)}, | config_schema={"input_str": Field(String)}, | ||||
) | output_defs=[ | ||||
def take_string_1(context, _string_input): | OutputDefinition(name="taken_string", asset_store_key="asset_store", asset_metadata={}) | ||||
yield Output( | |||||
context.solid_config["input_str"], | |||||
address=os.path.join( | |||||
context.solid_config["base_dir"], "intermediates/take_string_1.compute/result" | |||||
), | |||||
) | |||||
@solid( | |||||
input_defs=[InputDefinition("_string_input", String)], | |||||
version="take_string_version_2", | |||||
config_schema={"input_str": Field(String), "base_dir": Field(String)}, | |||||
) | |||||
def take_string_2(context, _string_input): | |||||
yield Output( | |||||
context.solid_config["input_str"], | |||||
address=os.path.join( | |||||
context.solid_config["base_dir"], "intermediates/take_string_2.compute/result" | |||||
), | |||||
) | |||||
@solid( | |||||
input_defs=[ | |||||
InputDefinition("_string_input_1", String), | |||||
InputDefinition("_string_input_2", String), | |||||
], | ], | ||||
version="take_string_two_inputs_version", | |||||
config_schema={"input_str": Field(String), "base_dir": Field(String)}, | |||||
) | |||||
def take_string_two_inputs(context, _string_input_1, _string_input_2): | |||||
yield Output( | |||||
context.solid_config["input_str"], | |||||
address=os.path.join( | |||||
context.solid_config["base_dir"], "intermediates/take_string_two_inputs.compute/result" | |||||
), | |||||
) | ) | ||||
def take_string_1_asset(context, _string_input): | |||||
return context.solid_config["input_str"] + _string_input | |||||
@pipeline( | @pipeline( | ||||
mode_defs=[ModeDefinition("only_mode", intermediate_storage_defs=[fs_intermediate_storage])] | mode_defs=[ | ||||
) | ModeDefinition("only_mode", resource_defs={"asset_store": versioned_filesystem_asset_store}) | ||||
def basic_pipeline(): | ] | ||||
take_string_two_inputs( | |||||
_string_input_1=take_string_1(create_string_1()), | |||||
_string_input_2=take_string_2(create_string_2()), | |||||
) | ) | ||||
def asset_pipeline(): | |||||
return take_string_1_asset(create_string_1_asset()) | |||||
@repository | |||||
def memoized_dev_repo(): | |||||
return [asset_pipeline] |