Changeset View
Standalone View
python_modules/dagster/dagster/core/instance/__init__.py
Show First 20 Lines • Show All 522 Lines • ▼ Show 20 Lines | class DagsterInstance: | ||||
def get_run_step_stats(self, run_id, step_keys=None): | def get_run_step_stats(self, run_id, step_keys=None): | ||||
return self._event_storage.get_step_stats_for_run(run_id, step_keys) | return self._event_storage.get_step_stats_for_run(run_id, step_keys) | ||||
def get_run_tags(self): | def get_run_tags(self): | ||||
return self._run_storage.get_run_tags() | return self._run_storage.get_run_tags() | ||||
def get_run_group(self, run_id): | def get_run_group(self, run_id): | ||||
return self._run_storage.get_run_group(run_id) | return self._run_storage.get_run_group(run_id) | ||||
def resolve_memoized_execution_plan(self, execution_plan): | |||||
""" | |||||
Returns: | |||||
ExecutionPlan: Execution plan configured to only run unmemoized steps. | |||||
""" | |||||
pipeline_def = execution_plan.pipeline.get_definition() | |||||
pipeline_name = pipeline_def.name | |||||
step_output_versions = execution_plan.resolve_step_output_versions() | |||||
if all(version is None for version in step_output_versions.values()): | |||||
raise DagsterInvariantViolationError( | |||||
"While creating a memoized pipeline run, no steps have versions. At least one step " | |||||
"must have a version." | |||||
) | |||||
step_output_addresses = self.get_addresses_for_step_output_versions( | |||||
{ | |||||
(pipeline_name, step_output_handle): version | |||||
for step_output_handle, version in step_output_versions.items() | |||||
if version | |||||
} | |||||
) | |||||
step_keys_to_execute = list( | |||||
{ | |||||
step_output_handle.step_key | |||||
for step_output_handle in step_output_versions.keys() | |||||
if (pipeline_name, step_output_handle) not in step_output_addresses | |||||
} | |||||
) | |||||
return execution_plan.build_subset_plan(step_keys_to_execute) | |||||
def create_run_for_pipeline( | def create_run_for_pipeline( | ||||
yuhan: what's this for? | |||||
Done Inline Actionsthis is for line 570, necessary to determine if we're using intermediate storage or not. cdecarolis: this is for line 570, necessary to determine if we're using intermediate storage or not. | |||||
Not Done Inline Actionsnit: execution_plan.pipeline_def yuhan: nit: `execution_plan.pipeline_def` | |||||
Not Done Inline Actionsshould this be on DagsterInstance ? maybe better as a free method that takes an instance and an execution plan and lives somewhere else. This file is gnarly enough alangenfeld: should this be on `DagsterInstance` ? maybe better as a free method that takes an instance and… | |||||
Not Done Inline Actionsmove this out alangenfeld: move this out | |||||
self, | self, | ||||
pipeline_def, | pipeline_def, | ||||
execution_plan=None, | execution_plan=None, | ||||
run_id=None, | run_id=None, | ||||
run_config=None, | run_config=None, | ||||
mode=None, | mode=None, | ||||
solids_to_execute=None, | solids_to_execute=None, | ||||
step_keys_to_execute=None, | step_keys_to_execute=None, | ||||
status=None, | status=None, | ||||
tags=None, | tags=None, | ||||
root_run_id=None, | root_run_id=None, | ||||
Not Done Inline Actionsis asset_store_key or "config" guaranteed to be in the resource? if not, lets do environment_config.resources.get(resource_name, {}).get("config") to be safer yuhan: is `asset_store_key` or `"config"` guaranteed to be in the resource? if not, lets do… | |||||
parent_run_id=None, | parent_run_id=None, | ||||
solid_selection=None, | solid_selection=None, | ||||
): | ): | ||||
from dagster.core.execution.api import create_execution_plan | from dagster.core.execution.api import create_execution_plan | ||||
from dagster.core.execution.plan.plan import ExecutionPlan | from dagster.core.execution.plan.plan import ExecutionPlan | ||||
Not Done Inline Actionsthis piece seems to be initing resource outside the resource_initialization_manager and im not confident about the run_id="" workaround either. yuhan: this piece seems to be initing resource outside the `resource_initialization_manager` and im… | |||||
Done Inline ActionsYea the run_id workaround in particular makes me pretty uncomfortable as well. I don't think there's currently a context for this use case. cdecarolis: Yea the run_id workaround in particular makes me pretty uncomfortable as well. I don't think… | |||||
Not Done Inline Actionswoof ya i think if we want to do this we need a dedicated api for spinning up access to resources outside of an execution context - needs to be a context manager since the resource api supports that worth taking a step back and thinking for a minute how this flow should actually work. What is the error experience for the user going to be if the asset store resource fails to init or if the invocation on the asset store throws in a deployed world with separated user code when/where does this resolution happen alangenfeld: woof ya i think if we want to do this we need a dedicated api for spinning up access to… | |||||
Not Done Inline ActionsWe should be able to do this within the execution context, right? After the context is built but before iterating through the plan? prha: We should be able to do this within the execution context, right? After the context is built… | |||||
Not Done Inline Actions
ya that is one path, my worry about that direction is how we will build the almost certainly required feature of being able to preview the memoized plan before hitting execute. At the end of the day i think the core problem boils down to that - we need to spin up a context to do the resolution, so we either make a dedicated context spin up api or we require booting the execution context for the preview case alangenfeld: > After the context is built but before iterating through the plan
ya that is one path, my… | |||||
Not Done Inline Actionsclear comment block, tag with issue alangenfeld: clear comment block, tag with issue | |||||
from dagster.core.snap import snapshot_from_execution_plan | from dagster.core.snap import snapshot_from_execution_plan | ||||
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) | check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) | ||||
check.opt_inst_param(execution_plan, "execution_plan", ExecutionPlan) | check.opt_inst_param(execution_plan, "execution_plan", ExecutionPlan) | ||||
# note that solids_to_execute is required to execute the solid subset, which is the | # note that solids_to_execute is required to execute the solid subset, which is the | ||||
# frozenset version of the previous solid_subset. | # frozenset version of the previous solid_subset. | ||||
# solid_selection is not required and will not be converted to solids_to_execute here. | # solid_selection is not required and will not be converted to solids_to_execute here. | ||||
Show All 22 Lines | ): | ||||
full_execution_plan = execution_plan or create_execution_plan( | full_execution_plan = execution_plan or create_execution_plan( | ||||
pipeline_def, run_config=run_config, mode=mode, | pipeline_def, run_config=run_config, mode=mode, | ||||
) | ) | ||||
check.invariant( | check.invariant( | ||||
len(full_execution_plan.step_keys_to_execute) == len(full_execution_plan.steps) | len(full_execution_plan.step_keys_to_execute) == len(full_execution_plan.steps) | ||||
) | ) | ||||
if is_memoized_run(tags): | if is_memoized_run(tags): | ||||
from dagster.core.execution.resolve_versions import resolve_memoized_execution_plan | |||||
if step_keys_to_execute: | if step_keys_to_execute: | ||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
"step_keys_to_execute parameter cannot be used in conjunction with memoized " | "step_keys_to_execute parameter cannot be used in conjunction with memoized " | ||||
"pipeline runs." | "pipeline runs." | ||||
) | ) | ||||
subsetted_execution_plan = self.resolve_memoized_execution_plan( | subsetted_execution_plan = resolve_memoized_execution_plan( | ||||
full_execution_plan | full_execution_plan | ||||
) # TODO: tighter integration with existing step_keys_to_execute functionality | ) # TODO: tighter integration with existing step_keys_to_execute functionality | ||||
step_keys_to_execute = subsetted_execution_plan.step_keys_to_execute | step_keys_to_execute = subsetted_execution_plan.step_keys_to_execute | ||||
else: | else: | ||||
subsetted_execution_plan = ( | subsetted_execution_plan = ( | ||||
full_execution_plan.build_subset_plan(step_keys_to_execute) | full_execution_plan.build_subset_plan(step_keys_to_execute) | ||||
if step_keys_to_execute | if step_keys_to_execute | ||||
else full_execution_plan | else full_execution_plan | ||||
▲ Show 20 Lines • Show All 681 Lines • Show Last 20 Lines |
what's this for?