Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/plan/plan.py
from collections import OrderedDict, namedtuple | from collections import OrderedDict, namedtuple | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import ( | from dagster.core.definitions import ( | ||||
GraphDefinition, | GraphDefinition, | ||||
IPipeline, | IPipeline, | ||||
InputDefinition, | InputDefinition, | ||||
Solid, | Solid, | ||||
SolidDefinition, | SolidDefinition, | ||||
SolidHandle, | SolidHandle, | ||||
SolidOutputHandle, | SolidOutputHandle, | ||||
) | ) | ||||
from dagster.core.definitions.dependency import DependencyStructure | from dagster.core.definitions.dependency import DependencyStructure | ||||
from dagster.core.errors import DagsterExecutionStepNotFoundError, DagsterInvariantViolationError | from dagster.core.errors import DagsterExecutionStepNotFoundError, DagsterInvariantViolationError | ||||
from dagster.core.execution.context.system import AssetStoreContext | from dagster.core.execution.context.system import AssetStoreContext | ||||
from dagster.core.execution.resolve_versions import ( | |||||
resolve_step_output_versions_helper, | |||||
resolve_step_versions_helper, | |||||
) | |||||
from dagster.core.system_config.objects import EnvironmentConfig | from dagster.core.system_config.objects import EnvironmentConfig | ||||
from dagster.core.types.dagster_type import DagsterTypeKind | from dagster.core.types.dagster_type import DagsterTypeKind | ||||
from dagster.core.utils import toposort | from dagster.core.utils import toposort | ||||
from .compute import create_compute_step | from .compute import create_compute_step | ||||
from .inputs import FromConfig, FromDefaultValue, FromMultipleSources, FromStepOutput, StepInput | from .inputs import FromConfig, FromDefaultValue, FromMultipleSources, FromStepOutput, StepInput | ||||
from .objects import ExecutionStep, StepOutputHandle | from .objects import ExecutionStep, StepOutputHandle | ||||
▲ Show 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | def build(self): | ||||
step_keys_to_execute = self.step_keys_to_execute or [ | step_keys_to_execute = self.step_keys_to_execute or [ | ||||
step.key for step in self._steps.values() | step.key for step in self._steps.values() | ||||
] | ] | ||||
check_asset_store_intermediate_storage(self.mode_definition, self.environment_config) | check_asset_store_intermediate_storage(self.mode_definition, self.environment_config) | ||||
return ExecutionPlan( | return ExecutionPlan( | ||||
self.pipeline, step_dict, deps, self.storage_is_persistent(), step_keys_to_execute, | self.pipeline, | ||||
step_dict, | |||||
deps, | |||||
self.storage_is_persistent(), | |||||
step_keys_to_execute, | |||||
self.environment_config, | |||||
) | ) | ||||
def storage_is_persistent(self): | def storage_is_persistent(self): | ||||
return self.mode_definition.get_intermediate_storage_def( | return self.mode_definition.get_intermediate_storage_def( | ||||
self.environment_config.intermediate_storage.intermediate_storage_name | self.environment_config.intermediate_storage.intermediate_storage_name | ||||
).is_persistent | ).is_persistent | ||||
def _build_from_sorted_solids( | def _build_from_sorted_solids( | ||||
▲ Show 20 Lines • Show All 152 Lines • ▼ Show 20 Lines | raise DagsterInvariantViolationError( | ||||
).format( | ).format( | ||||
pipeline_name=plan_builder.pipeline_name, solid_name=solid.name, input_name=input_name | pipeline_name=plan_builder.pipeline_name, solid_name=solid.name, input_name=input_name | ||||
) | ) | ||||
) | ) | ||||
class ExecutionPlan( | class ExecutionPlan( | ||||
namedtuple( | namedtuple( | ||||
"_ExecutionPlan", "pipeline step_dict deps steps artifacts_persisted step_keys_to_execute", | "_ExecutionPlan", | ||||
"pipeline step_dict deps steps artifacts_persisted step_keys_to_execute environment_config", | |||||
sandyryza: @alangenfeld - do you have thoughts about adding environment config and mode definition to the… | |||||
Not Done Inline Actionsthe mode_def it self is already held by the pipeline, might make sense to stash away mode_name / mode_key on EnvironmentConfig (its already passed in to the static build method) and have that be the only thing passed and held here alangenfeld: the mode_def it self is already held by the `pipeline`, might make sense to stash away… | |||||
) | ) | ||||
): | ): | ||||
def __new__( | def __new__( | ||||
cls, pipeline, step_dict, deps, artifacts_persisted, step_keys_to_execute, | cls, | ||||
pipeline, | |||||
step_dict, | |||||
deps, | |||||
artifacts_persisted, | |||||
step_keys_to_execute, | |||||
environment_config, | |||||
): | ): | ||||
missing_steps = [step_key for step_key in step_keys_to_execute if step_key not in step_dict] | missing_steps = [step_key for step_key in step_keys_to_execute if step_key not in step_dict] | ||||
if missing_steps: | if missing_steps: | ||||
raise DagsterExecutionStepNotFoundError( | raise DagsterExecutionStepNotFoundError( | ||||
"Execution plan does not contain step{plural}: {steps}".format( | "Execution plan does not contain step{plural}: {steps}".format( | ||||
plural="s" if len(missing_steps) > 1 else "", steps=", ".join(missing_steps) | plural="s" if len(missing_steps) > 1 else "", steps=", ".join(missing_steps) | ||||
), | ), | ||||
step_keys=missing_steps, | step_keys=missing_steps, | ||||
) | ) | ||||
return super(ExecutionPlan, cls).__new__( | return super(ExecutionPlan, cls).__new__( | ||||
cls, | cls, | ||||
pipeline=check.inst_param(pipeline, "pipeline", IPipeline), | pipeline=check.inst_param(pipeline, "pipeline", IPipeline), | ||||
step_dict=check.dict_param( | step_dict=check.dict_param( | ||||
step_dict, "step_dict", key_type=str, value_type=ExecutionStep | step_dict, "step_dict", key_type=str, value_type=ExecutionStep | ||||
), | ), | ||||
deps=check.dict_param(deps, "deps", key_type=str, value_type=set), | deps=check.dict_param(deps, "deps", key_type=str, value_type=set), | ||||
steps=list(step_dict.values()), | steps=list(step_dict.values()), | ||||
artifacts_persisted=check.bool_param(artifacts_persisted, "artifacts_persisted"), | artifacts_persisted=check.bool_param(artifacts_persisted, "artifacts_persisted"), | ||||
step_keys_to_execute=check.list_param( | step_keys_to_execute=check.list_param( | ||||
step_keys_to_execute, "step_keys_to_execute", of_type=str | step_keys_to_execute, "step_keys_to_execute", of_type=str | ||||
), | ), | ||||
environment_config=check.inst_param( | |||||
environment_config, "environment_config", EnvironmentConfig | |||||
), | |||||
) | ) | ||||
@property | @property | ||||
def pipeline_def(self): | def pipeline_def(self): | ||||
return self.pipeline.get_definition() | return self.pipeline.get_definition() | ||||
def get_all_hook_defs(self): | def get_all_hook_defs(self): | ||||
hook_defs = set() | hook_defs = set() | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | ): | ||||
def build_subset_plan(self, step_keys_to_execute): | def build_subset_plan(self, step_keys_to_execute): | ||||
check.list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) | check.list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) | ||||
return ExecutionPlan( | return ExecutionPlan( | ||||
self.pipeline, | self.pipeline, | ||||
self.step_dict, | self.step_dict, | ||||
self.deps, | self.deps, | ||||
self.artifacts_persisted, | self.artifacts_persisted, | ||||
step_keys_to_execute, | step_keys_to_execute, | ||||
self.environment_config, | |||||
) | ) | ||||
def construct_asset_store_context(self, step_output_handle, asset_store_handle): | def construct_asset_store_context(self, step_output_handle, asset_store_handle): | ||||
from dagster.core.storage.asset_store import AssetStoreHandle | from dagster.core.storage.asset_store import AssetStoreHandle | ||||
check.inst_param(step_output_handle, "step_output_handle", StepOutputHandle) | check.inst_param(step_output_handle, "step_output_handle", StepOutputHandle) | ||||
Not Done Inline ActionsThis isn't an arg, is it? sandyryza: This isn't an arg, is it? | |||||
check.inst_param(asset_store_handle, "asset_store_handle", AssetStoreHandle) | check.inst_param(asset_store_handle, "asset_store_handle", AssetStoreHandle) | ||||
step = self.get_step_by_key(step_output_handle.step_key) | step = self.get_step_by_key(step_output_handle.step_key) | ||||
solid_def = self.pipeline_def.solid_def_named(step.solid_handle.name) | solid_def = self.pipeline_def.solid_def_named(step.solid_handle.name) | ||||
return AssetStoreContext( | return AssetStoreContext( | ||||
step_key=step_output_handle.step_key, | step_key=step_output_handle.step_key, | ||||
output_name=step_output_handle.output_name, | output_name=step_output_handle.output_name, | ||||
asset_metadata=asset_store_handle.asset_metadata, | asset_metadata=asset_store_handle.asset_metadata, | ||||
pipeline_name=self.pipeline_def.name, | pipeline_name=self.pipeline_def.name, | ||||
solid_def=solid_def, | solid_def=solid_def, | ||||
source_run_id=None, | source_run_id=None, | ||||
) | ) | ||||
def resolve_step_versions(self): | |||||
return resolve_step_versions_helper(self) | |||||
def resolve_step_output_versions(self): | |||||
return resolve_step_output_versions_helper(self) | |||||
def start( | def start( | ||||
self, retries, sort_key_fn=None, | self, retries, sort_key_fn=None, | ||||
): | ): | ||||
from .active import ActiveExecution | from .active import ActiveExecution | ||||
return ActiveExecution(self, retries, sort_key_fn) | return ActiveExecution(self, retries, sort_key_fn) | ||||
def step_key_for_single_step_plans(self): | def step_key_for_single_step_plans(self): | ||||
Not Done Inline ActionsPrefer check.invariant to assert sandyryza: Prefer check.invariant to assert | |||||
# Temporary hack to isolate single-step plans, which are often the representation of | # Temporary hack to isolate single-step plans, which are often the representation of | ||||
# sub-plans in a multiprocessing execution environment. We want to attribute pipeline | # sub-plans in a multiprocessing execution environment. We want to attribute pipeline | ||||
# events (like resource initialization) that are associated with the execution of these | # events (like resource initialization) that are associated with the execution of these | ||||
# single step sub-plans. Most likely will be removed with the refactor detailed in | # single step sub-plans. Most likely will be removed with the refactor detailed in | ||||
# https://github.com/dagster-io/dagster/issues/2239 | # https://github.com/dagster-io/dagster/issues/2239 | ||||
return self.step_keys_to_execute[0] if len(self.step_keys_to_execute) == 1 else None | return self.step_keys_to_execute[0] if len(self.step_keys_to_execute) == 1 else None | ||||
@staticmethod | @staticmethod | ||||
def build(pipeline, environment_config, mode=None, step_keys_to_execute=None): | def build(pipeline, environment_config, mode=None, step_keys_to_execute=None): | ||||
"""Here we build a new ExecutionPlan from a pipeline definition and the environment config. | """Here we build a new ExecutionPlan from a pipeline definition and the environment config. | ||||
To do this, we iterate through the pipeline's solids in topological order, and hand off the | To do this, we iterate through the pipeline's solids in topological order, and hand off the | ||||
execution steps for each solid to a companion _PlanBuilder object. | execution steps for each solid to a companion _PlanBuilder object. | ||||
Not Done Inline Actionsplan.py is a fairly huge file - even with resolve_step_output_versions as a method on ExecutionPlan, I think it's still helpful organizationally to keep all the details of its implementation in a separate file. sandyryza: plan.py is a fairly huge file - even with resolve_step_output_versions as a method on… | |||||
Once we've processed the entire pipeline, we invoke _PlanBuilder.build() to construct the | Once we've processed the entire pipeline, we invoke _PlanBuilder.build() to construct the | ||||
ExecutionPlan object. | ExecutionPlan object. | ||||
""" | """ | ||||
check.inst_param(pipeline, "pipeline", IPipeline) | check.inst_param(pipeline, "pipeline", IPipeline) | ||||
check.inst_param(environment_config, "environment_config", EnvironmentConfig) | check.inst_param(environment_config, "environment_config", EnvironmentConfig) | ||||
check.opt_str_param(mode, "mode") | check.opt_str_param(mode, "mode") | ||||
check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) | check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) | ||||
Show All 33 Lines |
@alangenfeld - do you have thoughts about adding environment config and mode definition to the ExecutionPlan?
If that's something we want to avoid, another option could be for resolve_step_output_versions to still accept these arguments?