Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/resolve_versions.py
Show First 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | for step in execution_plan.get_all_steps_in_topo_order(): | ||||
check_valid_version(solid_def_version) | check_valid_version(solid_def_version) | ||||
solid_config_version = resolve_config_version(resolved_run_config.solids[solid_name].config) | solid_config_version = resolve_config_version(resolved_run_config.solids[solid_name].config) | ||||
resource_versions_for_solid = [] | resource_versions_for_solid = [] | ||||
for resource_key in solid_def.required_resource_keys: | for resource_key in solid_def.required_resource_keys: | ||||
if resource_key not in resource_versions: | if resource_key not in resource_versions: | ||||
resource_config = resolved_run_config.resources[resource_key].config | |||||
resource_config_version = resolve_config_version(resource_config) | |||||
resource_def = resource_defs[resource_key] | resource_def = resource_defs[resource_key] | ||||
resource_def_version = None | resource_def_version = None | ||||
if resource_def.version is not None: | if resource_def.version is not None: | ||||
resource_def_version = resource_def.version | resource_def_version = resource_def.version | ||||
else: | else: | ||||
resource_def_version = pipeline_def.version_strategy.get_resource_version( | resource_def_version = pipeline_def.version_strategy.get_resource_version( | ||||
resource_def | resource_def | ||||
) | ) | ||||
if resource_def_version is None: | |||||
raise DagsterInvariantViolationError( | if resource_def_version is not None: | ||||
f"While using memoization, version for resource '{resource_key}' was None. Please " | |||||
"either provide a versioning strategy for your job, or provide a version using the " | |||||
"resource decorator." | |||||
) | |||||
check_valid_version(resource_def_version) | check_valid_version(resource_def_version) | ||||
resource_config = resolved_run_config.resources[resource_key].config | |||||
resource_config_version = resolve_config_version(resource_config) | |||||
resource_versions[resource_key] = join_and_hash( | resource_versions[resource_key] = join_and_hash( | ||||
resource_config_version, resource_def_version | resource_config_version, resource_def_version | ||||
) | ) | ||||
else: | |||||
resource_versions[resource_key] = join_and_hash(resource_config) | |||||
if resource_versions[resource_key] is not None: | |||||
resource_versions_for_solid.append(resource_versions[resource_key]) | resource_versions_for_solid.append(resource_versions[resource_key]) | ||||
solid_resources_version = join_and_hash(*resource_versions_for_solid) | solid_resources_version = join_and_hash(*resource_versions_for_solid) | ||||
solid_version = join_and_hash( | solid_version = join_and_hash( | ||||
solid_def_version, solid_config_version, solid_resources_version | solid_def_version, solid_config_version, solid_resources_version | ||||
) | ) | ||||
from_versions = input_versions + [solid_version] | from_versions = input_versions + [solid_version] | ||||
step_version = join_and_hash(*from_versions) | step_version = join_and_hash(*from_versions) | ||||
Show All 14 Lines |