Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/resolve_versions.py
import hashlib | import hashlib | ||||
import logging | |||||
Lint: Unused Import: Unused import logging | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterInvariantViolationError | |||||
from dagster.core.execution.context.init import InitResourceContext | |||||
from dagster.core.execution.plan.inputs import ( | from dagster.core.execution.plan.inputs import ( | ||||
FromConfig, | FromConfig, | ||||
FromDefaultValue, | FromDefaultValue, | ||||
FromMultipleSources, | FromMultipleSources, | ||||
FromStepOutput, | FromStepOutput, | ||||
) | ) | ||||
from dagster.core.execution.plan.objects import StepOutputHandle | from dagster.core.execution.plan.objects import StepOutputHandle | ||||
from dagster.core.storage.asset_store import mem_asset_store | |||||
Lint: Unused Import Unused mem_asset_store imported from dagster.core.storage.asset_store Lint: Unused Import: Unused mem_asset_store imported from dagster.core.storage.asset_store | |||||
from dagster.utils.backcompat import experimental | |||||
def resolve_step_input_versions(step, step_versions): | def resolve_step_input_versions(step, step_versions): | ||||
"""Computes and returns the versions for each input defined for a given step. | """Computes and returns the versions for each input defined for a given step. | ||||
If an input is constructed from outputs of other steps, the input version is computed by | If an input is constructed from outputs of other steps, the input version is computed by | ||||
sorting, concatenating, and hashing the versions of each output it is constructed from. | sorting, concatenating, and hashing the versions of each output it is constructed from. | ||||
▲ Show 20 Lines • Show All 180 Lines • ▼ Show 20 Lines | |||||
def resolve_step_output_versions_helper(execution_plan): | def resolve_step_output_versions_helper(execution_plan): | ||||
step_versions = execution_plan.resolve_step_versions() | step_versions = execution_plan.resolve_step_versions() | ||||
return { | return { | ||||
StepOutputHandle(step.key, output_name): join_and_hash(output_name, step_versions[step.key]) | StepOutputHandle(step.key, output_name): join_and_hash(output_name, step_versions[step.key]) | ||||
for step in execution_plan.steps | for step in execution_plan.steps | ||||
for output_name in step.step_output_dict.keys() | for output_name in step.step_output_dict.keys() | ||||
} | } | ||||
@experimental | |||||
def resolve_memoized_execution_plan(execution_plan): | |||||
""" | |||||
Returns: | |||||
ExecutionPlan: Execution plan configured to only run unmemoized steps. | |||||
""" | |||||
# pylint: disable=comparison-with-callable | |||||
pipeline_def = execution_plan.pipeline.get_definition() | |||||
pipeline_name = pipeline_def.name | |||||
Lint: Unused Variable Unused variable 'pipeline_name' Lint: Unused Variable: Unused variable 'pipeline_name' | |||||
Not Done Inline ActionsArguably, we don't need to tie this to versions at all anymore? I.e. the memoization can just be based on has_asset, and it's up to the asset store to implement has_asset using versions? sandyryza: Arguably, we don't need to tie this to versions at all anymore? I.e. the memoization can just… | |||||
step_output_versions = execution_plan.resolve_step_output_versions() | |||||
if all(version is None for version in step_output_versions.values()): | |||||
raise DagsterInvariantViolationError( | |||||
"While creating a memoized pipeline run, no steps have versions. At least one step " | |||||
"must have a version." | |||||
) | |||||
environment_config = execution_plan.environment_config | |||||
pipeline_def = execution_plan.pipeline.get_definition() | |||||
mode_def = pipeline_def.get_mode_definition(environment_config.mode) | |||||
step_keys_to_execute = [] | |||||
for step_output_handle, version in step_output_versions.items(): | |||||
Lint: Unused Variable Unused variable 'version' Lint: Unused Variable: Unused variable 'version' | |||||
asset_store_key = execution_plan.get_asset_store_key(step_output_handle) | |||||
# TODO: https://github.com/dagster-io/dagster/issues/3302 | |||||
# The following code block is HIGHLY experimental. It initializes an asset store outside of | |||||
# the resource initialization context, and will ignore any exit hooks defined for the asset | |||||
# store. | |||||
resource_config = ( | |||||
environment_config.resources[asset_store_key]["config"] | |||||
if "config" in environment_config.resources[asset_store_key] | |||||
else {} | |||||
) | |||||
resource_def = mode_def.resource_defs[asset_store_key] | |||||
resource_context = InitResourceContext(resource_config, pipeline_def, resource_def, "") | |||||
asset_store = resource_def.resource_fn(resource_context) | |||||
asset_store_handle = execution_plan.get_asset_store_handle(step_output_handle) | |||||
context = execution_plan.construct_asset_store_context( | |||||
step_output_handle, asset_store_handle | |||||
) | |||||
Not Done Inline ActionsNot all asset stores have a has_asset method, right? Can we raise a helpful error if one is missing? sandyryza: Not all asset stores have a has_asset method, right? Can we raise a helpful error if one is… | |||||
if not asset_store.has_asset(context): | |||||
step_keys_to_execute.append(step_output_handle.step_key) | |||||
return execution_plan.build_subset_plan(step_keys_to_execute) |
Unused import logging