Differential D5083 Diff 26699 python_modules/dagster/dagster_tests/cli_tests/command_tests/test_memoized_development_cli.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/cli_tests/command_tests/test_memoized_development_cli.py
import os | import os | ||||
import sys | import sys | ||||
from io import BytesIO | from io import BytesIO | ||||
import yaml | import yaml | ||||
from dagster import execute_pipeline, seven | from dagster import execute_pipeline, seven | ||||
from dagster.cli.pipeline import execute_list_versions_command | from dagster.cli.pipeline import execute_list_versions_command | ||||
from dagster.core.instance import DagsterInstance, InstanceType | from dagster.core.instance import DagsterInstance, InstanceType | ||||
from dagster.core.launcher import DefaultRunLauncher | from dagster.core.launcher import DefaultRunLauncher | ||||
from dagster.core.run_coordinator import DefaultRunCoordinator | from dagster.core.run_coordinator import DefaultRunCoordinator | ||||
from dagster.core.storage.event_log import ConsolidatedSqliteEventLogStorage | from dagster.core.storage.event_log import ConsolidatedSqliteEventLogStorage | ||||
from dagster.core.storage.local_compute_log_manager import LocalComputeLogManager | from dagster.core.storage.local_compute_log_manager import LocalComputeLogManager | ||||
from dagster.core.storage.root import LocalArtifactStorage | from dagster.core.storage.root import LocalArtifactStorage | ||||
from dagster.core.storage.runs import SqliteRunStorage | from dagster.core.storage.runs import SqliteRunStorage | ||||
from dagster.utils import file_relative_path | from dagster.utils import file_relative_path | ||||
from ...core_tests.execution_tests.memoized_dev_loop_pipeline import basic_pipeline | from ...core_tests.execution_tests.memoized_dev_loop_pipeline import asset_pipeline | ||||
class Capturing(list): | class Capturing(list): | ||||
def __enter__(self): | def __enter__(self): | ||||
self._stdout = sys.stdout # pylint: disable=W0201 | self._stdout = sys.stdout # pylint: disable=W0201 | ||||
self._stringio = BytesIO() # pylint: disable=W0201 | self._stringio = BytesIO() # pylint: disable=W0201 | ||||
sys.stdout = self._stringio | sys.stdout = self._stringio | ||||
return self | return self | ||||
Show All 15 Lines | with seven.TemporaryDirectory() as temp_dir: | ||||
run_storage=run_store, | run_storage=run_store, | ||||
event_storage=event_store, | event_storage=event_store, | ||||
compute_log_manager=compute_log_manager, | compute_log_manager=compute_log_manager, | ||||
run_coordinator=DefaultRunCoordinator(), | run_coordinator=DefaultRunCoordinator(), | ||||
run_launcher=DefaultRunLauncher(), | run_launcher=DefaultRunLauncher(), | ||||
) | ) | ||||
run_config = { | run_config = { | ||||
"solids": { | "solids": { | ||||
"create_string_1": {"config": {"input_str": "apple", "base_dir": temp_dir}}, | "create_string_1_asset": {"config": {"input_str": "apple"}}, | ||||
"create_string_2": {"config": {"input_str": "apple", "base_dir": temp_dir}}, | "take_string_1_asset": {"config": {"input_str": "apple"}}, | ||||
"take_string_1": {"config": {"input_str": "apple", "base_dir": temp_dir}}, | |||||
"take_string_2": {"config": {"input_str": "apple", "base_dir": temp_dir}}, | |||||
"take_string_two_inputs": {"config": {"input_str": "apple", "base_dir": temp_dir}}, | |||||
}, | }, | ||||
"intermediate_storage": {"filesystem": {"config": {"base_dir": temp_dir}}}, | "resources": {"asset_store": {"config": {"base_dir": temp_dir}}}, | ||||
} | } | ||||
# write run config to temp file | # write run config to temp file | ||||
# file is temp because intermediate storage directory is temporary | # file is temp because intermediate storage directory is temporary | ||||
with open(os.path.join(temp_dir, "pipeline_config.yaml"), "w") as f: | with open(os.path.join(temp_dir, "pipeline_config.yaml"), "w") as f: | ||||
f.write(yaml.dump(run_config)) | f.write(yaml.dump(run_config)) | ||||
kwargs = { | kwargs = { | ||||
"config": (os.path.join(temp_dir, "pipeline_config.yaml"),), | "config": (os.path.join(temp_dir, "pipeline_config.yaml"),), | ||||
"pipeline": "basic_pipeline", | "pipeline": "asset_pipeline", | ||||
"python_file": file_relative_path( | "python_file": file_relative_path( | ||||
__file__, "../../core_tests/execution_tests/memoized_dev_loop_pipeline.py" | __file__, "../../core_tests/execution_tests/memoized_dev_loop_pipeline.py" | ||||
), | ), | ||||
"tags": '{"dagster/is_memoized_run": "true"}', | "tags": '{"dagster/is_memoized_run": "true"}', | ||||
} | } | ||||
with Capturing() as output: | with Capturing() as output: | ||||
execute_list_versions_command(kwargs=kwargs, instance=instance) | execute_list_versions_command(kwargs=kwargs, instance=instance) | ||||
assert output | assert output | ||||
# execute the pipeline once so that addresses have been populated. | # execute the pipeline once so that addresses have been populated. | ||||
result = execute_pipeline( | result = execute_pipeline( | ||||
basic_pipeline, | asset_pipeline, | ||||
run_config=run_config, | run_config=run_config, | ||||
mode="only_mode", | mode="only_mode", | ||||
tags={"dagster/is_memoized_run": "true"}, | tags={"dagster/is_memoized_run": "true"}, | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
assert result.success | assert result.success | ||||
with Capturing() as output: | with Capturing() as output: | ||||
execute_list_versions_command(kwargs=kwargs, instance=instance) | execute_list_versions_command(kwargs=kwargs, instance=instance) | ||||
assert output | assert output |