Changeset View
Standalone View
python_modules/dagster/dagster/core/instance/__init__.py
Show First 20 Lines • Show All 513 Lines • ▼ Show 20 Lines | def get_run_step_stats(self, run_id, step_keys=None): | ||||
return self._event_storage.get_step_stats_for_run(run_id, step_keys) | return self._event_storage.get_step_stats_for_run(run_id, step_keys) | ||||
def get_run_tags(self): | def get_run_tags(self): | ||||
return self._run_storage.get_run_tags() | return self._run_storage.get_run_tags() | ||||
def get_run_group(self, run_id): | def get_run_group(self, run_id): | ||||
return self._run_storage.get_run_group(run_id) | return self._run_storage.get_run_group(run_id) | ||||
def resolve_memoized_execution_plan(self, execution_plan): | def resolve_memoized_execution_plan(self, execution_plan): | ||||
alangenfeld: should this be on `DagsterInstance` ? maybe better as a free method that takes an instance and… | |||||
Not Done Inline Actionsmove this out alangenfeld: move this out | |||||
""" | """ | ||||
Returns: | Returns: | ||||
ExecutionPlan: Execution plan configured to only run unmemoized steps. | ExecutionPlan: Execution plan configured to only run unmemoized steps. | ||||
""" | """ | ||||
from dagster.core.execution.context.init import InitResourceContext | |||||
Not Done Inline Actionswhat's this for? yuhan: what's this for? | |||||
Done Inline Actionsthis is for line 570, necessary to determine if we're using intermediate storage or not. cdecarolis: this is for line 570, necessary to determine if we're using intermediate storage or not. | |||||
pipeline_def = execution_plan.pipeline.get_definition() | pipeline_def = execution_plan.pipeline.get_definition() | ||||
pipeline_name = pipeline_def.name | pipeline_name = pipeline_def.name | ||||
step_output_versions = execution_plan.resolve_step_output_versions() | step_output_versions = execution_plan.resolve_step_output_versions() | ||||
if all(version is None for version in step_output_versions.values()): | if all(version is None for version in step_output_versions.values()): | ||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
"While creating a memoized pipeline run, no steps have versions. At least one step " | "While creating a memoized pipeline run, no steps have versions. At least one step " | ||||
"must have a version." | "must have a version." | ||||
) | ) | ||||
step_output_addresses = self.get_addresses_for_step_output_versions( | step_output_addresses = self.get_addresses_for_step_output_versions( | ||||
{ | { | ||||
(pipeline_name, step_output_handle): version | (pipeline_name, step_output_handle): version | ||||
for step_output_handle, version in step_output_versions.items() | for step_output_handle, version in step_output_versions.items() | ||||
if version | if version | ||||
} | } | ||||
) | ) | ||||
step_keys_to_execute = list( | environment_config = execution_plan.environment_config | ||||
{ | mode_def = execution_plan.mode_definition | ||||
Not Done Inline Actionsnit: execution_plan.pipeline_def yuhan: nit: `execution_plan.pipeline_def` | |||||
step_output_handle.step_key | |||||
for step_output_handle in step_output_versions.keys() | step_keys_to_execute = [] | ||||
if (pipeline_name, step_output_handle) not in step_output_addresses | |||||
} | for step_output_handle, version in step_output_versions.items(): | ||||
) | asset_store_key = execution_plan.get_asset_store_key(step_output_handle) | ||||
# If asset_store_key is not None, then we are using asset store for intermediate storage | |||||
# needs. | |||||
if asset_store_key: | |||||
resource_config = environment_config.resources[asset_store_key]["config"] | |||||
resource_def = mode_def.resource_defs[asset_store_key] | |||||
resource_context = InitResourceContext( | |||||
Not Done Inline Actionsis asset_store_key or "config" guaranteed to be in the resource? if not, lets do environment_config.resources.get(resource_name, {}).get("config") to be safer yuhan: is `asset_store_key` or `"config"` guaranteed to be in the resource? if not, lets do… | |||||
resource_config, pipeline_def, resource_def, "" | |||||
) | |||||
asset_store = resource_def.resource_fn(resource_context) | |||||
if not asset_store.has_asset_with_version(step_output_handle, {}, version): | |||||
step_keys_to_execute.append(step_output_handle.step_key) | |||||
Not Done Inline Actionsthis piece seems to be initing resource outside the resource_initialization_manager and im not confident about the run_id="" workaround either. yuhan: this piece seems to be initing resource outside the `resource_initialization_manager` and im… | |||||
Done Inline ActionsYea the run_id workaround in particular makes me pretty uncomfortable as well. I don't think there's currently a context for this use case. cdecarolis: Yea the run_id workaround in particular makes me pretty uncomfortable as well. I don't think… | |||||
Not Done Inline Actionswoof ya i think if we want to do this we need a dedicated api for spinning up access to resources outside of an execution context - needs to be a context manager since the resource api supports that worth taking a step back and thinking for a minute how this flow should actually work. What is the error experience for the user going to be if the asset store resource fails to init or if the invocation on the asset store throws in a deployed world with separated user code when/where does this resolution happen alangenfeld: woof ya i think if we want to do this we need a dedicated api for spinning up access to… | |||||
Not Done Inline ActionsWe should be able to do this within the execution context, right? After the context is built but before iterating through the plan? prha: We should be able to do this within the execution context, right? After the context is built… | |||||
Not Done Inline Actions
ya that is one path, my worry about that direction is how we will build the almost certainly required feature of being able to preview the memoized plan before hitting execute. At the end of the day i think the core problem boils down to that - we need to spin up a context to do the resolution, so we either make a dedicated context spin up api or we require booting the execution context for the preview case alangenfeld: > After the context is built but before iterating through the plan
ya that is one path, my… | |||||
Not Done Inline Actionsclear comment block, tag with issue alangenfeld: clear comment block, tag with issue | |||||
# Otherwise, we are using intermediate storage, and should check step_output_addresses | |||||
else: | |||||
if (pipeline_name, step_output_handle) not in step_output_addresses: | |||||
step_keys_to_execute.append(step_output_handle.step_key) | |||||
return execution_plan.build_memoized_plan(step_keys_to_execute, step_output_addresses) | return execution_plan.build_memoized_plan(step_keys_to_execute, step_output_addresses) | ||||
def create_run_for_pipeline( | def create_run_for_pipeline( | ||||
self, | self, | ||||
pipeline_def, | pipeline_def, | ||||
execution_plan=None, | execution_plan=None, | ||||
run_id=None, | run_id=None, | ||||
▲ Show 20 Lines • Show All 707 Lines • Show Last 20 Lines |
should this be on DagsterInstance ? maybe better as a free method that takes an instance and an execution plan and lives somewhere else. This file is gnarly enough