Differential D5083 Diff 26580 python_modules/dagster/dagster_tests/core_tests/test_versioned_execution_plan.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/test_versioned_execution_plan.py
Show All 18 Lines | |||||
) | ) | ||||
from dagster.core.definitions import InputDefinition | from dagster.core.definitions import InputDefinition | ||||
from dagster.core.errors import DagsterInvariantViolationError | from dagster.core.errors import DagsterInvariantViolationError | ||||
from dagster.core.execution.api import create_execution_plan | from dagster.core.execution.api import create_execution_plan | ||||
from dagster.core.execution.plan.objects import StepOutputHandle | from dagster.core.execution.plan.objects import StepOutputHandle | ||||
from dagster.core.execution.resolve_versions import ( | from dagster.core.execution.resolve_versions import ( | ||||
join_and_hash, | join_and_hash, | ||||
resolve_config_version, | resolve_config_version, | ||||
resolve_memoized_execution_plan, | |||||
resolve_resource_versions, | resolve_resource_versions, | ||||
) | ) | ||||
from dagster.core.storage.asset_store import VersionedAssetStore | |||||
from dagster.core.storage.tags import MEMOIZED_RUN_TAG | from dagster.core.storage.tags import MEMOIZED_RUN_TAG | ||||
from dagster.seven import mock | |||||
class VersionedInMemoryAssetStore(VersionedAssetStore): | |||||
def __init__(self): | |||||
self.values = {} | |||||
def _get_keys(self, context): | |||||
return (context.step_key, context.output_name, context.version) | |||||
def set_asset(self, context, obj): | |||||
keys = self._get_keys(context) | |||||
self.values[keys] = obj | |||||
def get_asset(self, context): | |||||
keys = self._get_keys(context) | |||||
return self.values[keys] | |||||
def has_asset(self, context): | |||||
keys = self._get_keys(context) | |||||
return keys in self.values | |||||
def asset_store_factory(asset_store): | |||||
@resource | |||||
def _asset_store_resource(_): | |||||
return asset_store | |||||
return _asset_store_resource | |||||
def test_join_and_hash(): | def test_join_and_hash(): | ||||
assert join_and_hash("foo") == hashlib.sha1("foo".encode("utf-8")).hexdigest() | assert join_and_hash("foo") == hashlib.sha1("foo".encode("utf-8")).hexdigest() | ||||
assert join_and_hash("foo", None, "bar") == None | assert join_and_hash("foo", None, "bar") == None | ||||
assert join_and_hash("foo", "bar") == hashlib.sha1("barfoo".encode("utf-8")).hexdigest() | assert join_and_hash("foo", "bar") == hashlib.sha1("barfoo".encode("utf-8")).hexdigest() | ||||
Show All 22 Lines | def versioned_solid_no_input(_): | ||||
return 4 | return 4 | ||||
@solid(version="5") | @solid(version="5") | ||||
def versioned_solid_takes_input(_, intput): | def versioned_solid_takes_input(_, intput): | ||||
return 2 * intput | return 2 * intput | ||||
@pipeline | def versioned_pipeline_factory(asset_store=None): | ||||
@pipeline( | |||||
mode_defs=[ | |||||
ModeDefinition( | |||||
name="main", | |||||
resource_defs=( | |||||
{"asset_store": asset_store_factory(asset_store)} if asset_store else {} | |||||
), | |||||
) | |||||
] | |||||
) | |||||
def versioned_pipeline(): | def versioned_pipeline(): | ||||
versioned_solid_takes_input(versioned_solid_no_input()) | versioned_solid_takes_input(versioned_solid_no_input()) | ||||
return versioned_pipeline | |||||
@solid | @solid | ||||
def solid_takes_input(_, intput): | def solid_takes_input(_, intput): | ||||
return 2 * intput | return 2 * intput | ||||
@pipeline | def partially_versioned_pipeline_factory(asset_store=None): | ||||
@pipeline( | |||||
mode_defs=[ | |||||
ModeDefinition( | |||||
name="main", | |||||
resource_defs=( | |||||
{"asset_store": asset_store_factory(asset_store)} if asset_store else {} | |||||
), | |||||
) | |||||
] | |||||
) | |||||
def partially_versioned_pipeline(): | def partially_versioned_pipeline(): | ||||
solid_takes_input(versioned_solid_no_input()) | solid_takes_input(versioned_solid_no_input()) | ||||
return partially_versioned_pipeline | |||||
def versioned_pipeline_expected_step1_version(): | def versioned_pipeline_expected_step1_version(): | ||||
solid1_def_version = versioned_solid_no_input.version | solid1_def_version = versioned_solid_no_input.version | ||||
solid1_config_version = resolve_config_version(None) | solid1_config_version = resolve_config_version(None) | ||||
solid1_resources_version = join_and_hash() | solid1_resources_version = join_and_hash() | ||||
solid1_version = join_and_hash( | solid1_version = join_and_hash( | ||||
solid1_def_version, solid1_config_version, solid1_resources_version | solid1_def_version, solid1_config_version, solid1_resources_version | ||||
) | ) | ||||
Show All 19 Lines | |||||
def versioned_pipeline_expected_step2_output_version(): | def versioned_pipeline_expected_step2_output_version(): | ||||
step2_version = versioned_pipeline_expected_step2_version() | step2_version = versioned_pipeline_expected_step2_version() | ||||
return join_and_hash(step2_version + "result") | return join_and_hash(step2_version + "result") | ||||
def test_resolve_step_versions_no_external_dependencies(): | def test_resolve_step_versions_no_external_dependencies(): | ||||
versioned_pipeline = versioned_pipeline_factory() | |||||
speculative_execution_plan = create_execution_plan(versioned_pipeline) | speculative_execution_plan = create_execution_plan(versioned_pipeline) | ||||
versions = speculative_execution_plan.resolve_step_versions() | versions = speculative_execution_plan.resolve_step_versions() | ||||
assert ( | assert ( | ||||
versions["versioned_solid_no_input.compute"] == versioned_pipeline_expected_step1_version() | versions["versioned_solid_no_input.compute"] == versioned_pipeline_expected_step1_version() | ||||
) | ) | ||||
assert ( | assert ( | ||||
versions["versioned_solid_takes_input.compute"] | versions["versioned_solid_takes_input.compute"] | ||||
== versioned_pipeline_expected_step2_version() | == versioned_pipeline_expected_step2_version() | ||||
) | ) | ||||
def test_resolve_step_output_versions_no_external_dependencies(): | def test_resolve_step_output_versions_no_external_dependencies(): | ||||
versioned_pipeline = versioned_pipeline_factory() | |||||
speculative_execution_plan = create_execution_plan( | speculative_execution_plan = create_execution_plan( | ||||
versioned_pipeline, run_config={}, mode="default" | versioned_pipeline, run_config={}, mode="main" | ||||
) | ) | ||||
versions = speculative_execution_plan.resolve_step_output_versions() | versions = speculative_execution_plan.resolve_step_output_versions() | ||||
assert ( | assert ( | ||||
versions[StepOutputHandle("versioned_solid_no_input.compute", "result")] | versions[StepOutputHandle("versioned_solid_no_input.compute", "result")] | ||||
== versioned_pipeline_expected_step1_output_version() | == versioned_pipeline_expected_step1_output_version() | ||||
) | ) | ||||
assert ( | assert ( | ||||
Show All 15 Lines | |||||
@pipeline | @pipeline | ||||
def no_version_pipeline(): | def no_version_pipeline(): | ||||
basic_takes_input_solid(basic_solid()) | basic_takes_input_solid(basic_solid()) | ||||
def test_default_unmemoized_steps(): | def test_default_unmemoized_steps(): | ||||
speculative_execution_plan = create_execution_plan(no_version_pipeline) | speculative_execution_plan = create_execution_plan(no_version_pipeline) | ||||
instance = DagsterInstance.ephemeral() | |||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvariantViolationError, | DagsterInvariantViolationError, | ||||
match=( | match=( | ||||
"While creating a memoized pipeline run, no steps have versions. At least one step " | "While creating a memoized pipeline run, no steps have versions. At least one step " | ||||
"must have a version." | "must have a version." | ||||
), | ), | ||||
): | ): | ||||
instance.resolve_memoized_execution_plan(speculative_execution_plan) | resolve_memoized_execution_plan(speculative_execution_plan) | ||||
def test_resolve_memoized_execution_plan_no_stored_results(): | def test_resolve_memoized_execution_plan_no_stored_results(): | ||||
versioned_pipeline = versioned_pipeline_factory(VersionedInMemoryAssetStore()) | |||||
speculative_execution_plan = create_execution_plan(versioned_pipeline) | speculative_execution_plan = create_execution_plan(versioned_pipeline) | ||||
instance = DagsterInstance.ephemeral() | memoized_execution_plan = resolve_memoized_execution_plan(speculative_execution_plan) | ||||
instance.get_addresses_for_step_output_versions = mock.MagicMock(return_value={}) | |||||
memoized_execution_plan = instance.resolve_memoized_execution_plan(speculative_execution_plan) | |||||
assert set(memoized_execution_plan.step_keys_to_execute) == { | assert set(memoized_execution_plan.step_keys_to_execute) == { | ||||
"versioned_solid_no_input.compute", | "versioned_solid_no_input.compute", | ||||
"versioned_solid_takes_input.compute", | "versioned_solid_takes_input.compute", | ||||
} | } | ||||
def test_resolve_memoized_execution_plan_yes_stored_results(): | def test_resolve_memoized_execution_plan_yes_stored_results(): | ||||
asset_store = VersionedInMemoryAssetStore() | |||||
versioned_pipeline = versioned_pipeline_factory(asset_store) | |||||
speculative_execution_plan = create_execution_plan(versioned_pipeline) | speculative_execution_plan = create_execution_plan(versioned_pipeline) | ||||
step_output_handle = StepOutputHandle("versioned_solid_no_input.compute", "result") | step_output_handle = StepOutputHandle("versioned_solid_no_input.compute", "result") | ||||
step_output_version = speculative_execution_plan.resolve_step_output_versions()[ | |||||
step_output_handle | |||||
] | |||||
asset_store.values[ | |||||
(step_output_handle.step_key, step_output_handle.output_name, step_output_version) | |||||
] = 4 | |||||
instance = DagsterInstance.ephemeral() | memoized_execution_plan = resolve_memoized_execution_plan(speculative_execution_plan) | ||||
instance.get_addresses_for_step_output_versions = mock.MagicMock( | |||||
return_value={(versioned_pipeline.name, step_output_handle): "some_address"} | |||||
) | |||||
memoized_execution_plan = instance.resolve_memoized_execution_plan(speculative_execution_plan) | |||||
assert memoized_execution_plan.step_keys_to_execute == ["versioned_solid_takes_input.compute"] | assert memoized_execution_plan.step_keys_to_execute == ["versioned_solid_takes_input.compute"] | ||||
expected_handle = StepOutputHandle( | expected_handle = StepOutputHandle( | ||||
step_key="versioned_solid_no_input.compute", output_name="result" | step_key="versioned_solid_no_input.compute", output_name="result" | ||||
) | ) | ||||
assert ( | assert ( | ||||
memoized_execution_plan.step_dict["versioned_solid_takes_input.compute"] | memoized_execution_plan.step_dict["versioned_solid_takes_input.compute"] | ||||
.step_input_dict["intput"] | .step_input_dict["intput"] | ||||
.source.step_output_handle | .source.step_output_handle | ||||
== expected_handle | == expected_handle | ||||
) | ) | ||||
def test_resolve_memoized_execution_plan_partial_versioning(): | def test_resolve_memoized_execution_plan_partial_versioning(): | ||||
asset_store = VersionedInMemoryAssetStore() | |||||
partially_versioned_pipeline = partially_versioned_pipeline_factory(asset_store) | |||||
speculative_execution_plan = create_execution_plan(partially_versioned_pipeline) | speculative_execution_plan = create_execution_plan(partially_versioned_pipeline) | ||||
step_output_handle = StepOutputHandle("versioned_solid_no_input.compute", "result") | step_output_handle = StepOutputHandle("versioned_solid_no_input.compute", "result") | ||||
instance = DagsterInstance.ephemeral() | step_output_version = speculative_execution_plan.resolve_step_output_versions()[ | ||||
instance.get_addresses_for_step_output_versions = mock.MagicMock( | step_output_handle | ||||
return_value={(partially_versioned_pipeline.name, step_output_handle): "some_address"} | ] | ||||
) | asset_store.values[ | ||||
(step_output_handle.step_key, step_output_handle.output_name, step_output_version) | |||||
assert instance.resolve_memoized_execution_plan( | ] = 4 | ||||
speculative_execution_plan | |||||
).step_keys_to_execute == ["solid_takes_input.compute"] | |||||
def test_versioned_execution_plan_no_external_dependencies(): # TODO: flesh out this test once version storage has been implemented | assert resolve_memoized_execution_plan(speculative_execution_plan).step_keys_to_execute == [ | ||||
instance = DagsterInstance.ephemeral() | "solid_takes_input.compute" | ||||
pipeline_run = instance.create_run_for_pipeline( | ] | ||||
pipeline_def=versioned_pipeline, tags={MEMOIZED_RUN_TAG: "true"} | |||||
) | |||||
assert "versioned_solid_no_input.compute" in pipeline_run.step_keys_to_execute | |||||
assert "versioned_solid_takes_input.compute" in pipeline_run.step_keys_to_execute | |||||
assert len(pipeline_run.step_keys_to_execute) == 2 | |||||
def _get_ext_version(config_value): | def _get_ext_version(config_value): | ||||
return join_and_hash(str(config_value)) | return join_and_hash(str(config_value)) | ||||
@dagster_type_loader(String, loader_version="97", external_version_fn=_get_ext_version) | @dagster_type_loader(String, loader_version="97", external_version_fn=_get_ext_version) | ||||
def InputHydration(_, _hello): | def InputHydration(_, _hello): | ||||
▲ Show 20 Lines • Show All 258 Lines • Show Last 20 Lines |