Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/context/system.py
Show First 20 Lines • Show All 291 Lines • ▼ Show 20 Lines | def for_asset_store(self, step_output_handle, asset_store_handle): | ||||
and self.pipeline_run.step_keys_to_execute | and self.pipeline_run.step_keys_to_execute | ||||
# this step is not being executed | # this step is not being executed | ||||
and step_output_handle.step_key not in self.pipeline_run.step_keys_to_execute | and step_output_handle.step_key not in self.pipeline_run.step_keys_to_execute | ||||
): | ): | ||||
source_run_id = self.pipeline_run.parent_run_id | source_run_id = self.pipeline_run.parent_run_id | ||||
else: | else: | ||||
source_run_id = self.pipeline_run.run_id | source_run_id = self.pipeline_run.run_id | ||||
step_output_versions = self.execution_plan.resolve_step_output_versions() | |||||
version = ( | |||||
step_output_versions[step_output_handle] | |||||
if step_output_handle in step_output_versions | |||||
else None | |||||
) | |||||
yuhan: nit: can be `version = step_output_versions.get(step_output_handle, None)` instead | |||||
return AssetStoreContext( | return AssetStoreContext( | ||||
step_key=step_output_handle.step_key, | step_key=step_output_handle.step_key, | ||||
output_name=step_output_handle.output_name, | output_name=step_output_handle.output_name, | ||||
asset_metadata=asset_store_handle.asset_metadata, | asset_metadata=asset_store_handle.asset_metadata, | ||||
pipeline_name=self.pipeline_def.name, | pipeline_name=self.pipeline_def.name, | ||||
solid_def=self.solid_def, | solid_def=self.solid_def, | ||||
source_run_id=source_run_id, | source_run_id=source_run_id, | ||||
version=version, | |||||
) | ) | ||||
def get_asset_store(self, asset_store_key): | def get_asset_store(self, asset_store_key): | ||||
from dagster.core.storage.asset_store import AssetStore | from dagster.core.storage.asset_store import AssetStore | ||||
# get AssetStore from resources using asset_store_key | # get AssetStore from resources using asset_store_key | ||||
asset_store = getattr(self.resources, asset_store_key) | asset_store = getattr(self.resources, asset_store_key) | ||||
return check.inst(asset_store, AssetStore) | return check.inst(asset_store, AssetStore) | ||||
▲ Show 20 Lines • Show All 146 Lines • Show Last 20 Lines |
nit: can be version = step_output_versions.get(step_output_handle, None) instead