Differential D5083 Diff 26096 python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoized_dev_loop.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/execution_tests/test_memoized_dev_loop.py
from dagster import execute_pipeline, seven | from dagster import execute_pipeline, seven | ||||
from dagster.core.execution.api import create_execution_plan | from dagster.core.execution.api import create_execution_plan | ||||
from dagster.core.instance import DagsterInstance, InstanceType | from dagster.core.instance import DagsterInstance, InstanceType | ||||
from dagster.core.launcher import DefaultRunLauncher | from dagster.core.launcher import DefaultRunLauncher | ||||
from dagster.core.run_coordinator import DefaultRunCoordinator | from dagster.core.run_coordinator import DefaultRunCoordinator | ||||
from dagster.core.storage.event_log import ConsolidatedSqliteEventLogStorage | from dagster.core.storage.event_log import ConsolidatedSqliteEventLogStorage | ||||
from dagster.core.storage.local_compute_log_manager import LocalComputeLogManager | from dagster.core.storage.local_compute_log_manager import LocalComputeLogManager | ||||
from dagster.core.storage.root import LocalArtifactStorage | from dagster.core.storage.root import LocalArtifactStorage | ||||
from dagster.core.storage.runs import SqliteRunStorage | from dagster.core.storage.runs import SqliteRunStorage | ||||
from .memoized_dev_loop_pipeline import basic_pipeline | from .memoized_dev_loop_pipeline import asset_pipeline, basic_pipeline | ||||
def get_step_keys_to_execute(instance, pipeline, run_config, mode): | def get_step_keys_to_execute(instance, pipeline, run_config, mode): | ||||
memoized_execution_plan = instance.resolve_memoized_execution_plan( | memoized_execution_plan = instance.resolve_memoized_execution_plan( | ||||
create_execution_plan(pipeline, run_config=run_config, mode=mode) | create_execution_plan(pipeline, run_config=run_config, mode=mode) | ||||
) | ) | ||||
return memoized_execution_plan.step_keys_to_execute | return memoized_execution_plan.step_keys_to_execute | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | with seven.TemporaryDirectory() as temp_dir: | ||||
run_config=run_config, | run_config=run_config, | ||||
mode="only_mode", | mode="only_mode", | ||||
tags={"dagster/is_memoized_run": "true"}, | tags={"dagster/is_memoized_run": "true"}, | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
assert result3.success | assert result3.success | ||||
assert not get_step_keys_to_execute(instance, basic_pipeline, run_config, "only_mode") | assert not get_step_keys_to_execute(instance, basic_pipeline, run_config, "only_mode") | ||||
def test_dev_loop_asset_store(): | |||||
with seven.TemporaryDirectory() as temp_dir: | |||||
run_store = SqliteRunStorage.from_local(temp_dir) | |||||
event_store = ConsolidatedSqliteEventLogStorage(temp_dir) | |||||
compute_log_manager = LocalComputeLogManager(temp_dir) | |||||
instance = DagsterInstance( | |||||
instance_type=InstanceType.PERSISTENT, | |||||
local_artifact_storage=LocalArtifactStorage(temp_dir), | |||||
run_storage=run_store, | |||||
event_storage=event_store, | |||||
compute_log_manager=compute_log_manager, | |||||
run_launcher=DefaultRunLauncher(), | |||||
run_coordinator=DefaultRunCoordinator(), | |||||
) | |||||
run_config = { | |||||
"solids": { | |||||
"create_string_1_asset": {"config": {"input_str": "apple"}}, | |||||
"take_string_1_asset": {"config": {"input_str": "apple"}}, | |||||
}, | |||||
"resources": {"asset_store": {"config": {"base_dir": temp_dir}}}, | |||||
} | |||||
result = execute_pipeline( | |||||
asset_pipeline, | |||||
run_config=run_config, | |||||
mode="only_mode", | |||||
tags={"dagster/is_memoized_run": "true"}, | |||||
instance=instance, | |||||
) | |||||
assert result.success | |||||
assert not get_step_keys_to_execute(instance, asset_pipeline, run_config, "only_mode") | |||||
run_config["solids"]["take_string_1_asset"]["config"]["input_str"] = "banana" | |||||
assert get_step_keys_to_execute(instance, asset_pipeline, run_config, "only_mode") == [ | |||||
"take_string_1_asset.compute" | |||||
] | |||||
result = execute_pipeline( | |||||
asset_pipeline, | |||||
run_config=run_config, | |||||
mode="only_mode", | |||||
tags={"dagster/is_memoized_run": "true"}, | |||||
instance=instance, | |||||
) | |||||
assert result.success | |||||
assert not get_step_keys_to_execute(instance, asset_pipeline, run_config, "only_mode") |