Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/plan/execute_step.py
import inspect | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import ( | from dagster.core.definitions import ( | ||||
AssetMaterialization, | AssetMaterialization, | ||||
ExpectationResult, | ExpectationResult, | ||||
Failure, | Failure, | ||||
Materialization, | Materialization, | ||||
Output, | Output, | ||||
RetryRequested, | RetryRequested, | ||||
TypeCheck, | TypeCheck, | ||||
) | ) | ||||
from dagster.core.definitions.address import Address | |||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterExecutionStepExecutionError, | DagsterExecutionStepExecutionError, | ||||
DagsterInvariantViolationError, | DagsterInvariantViolationError, | ||||
DagsterStepOutputNotFoundError, | DagsterStepOutputNotFoundError, | ||||
DagsterTypeCheckDidNotPass, | DagsterTypeCheckDidNotPass, | ||||
DagsterTypeCheckError, | DagsterTypeCheckError, | ||||
DagsterTypeLoadingError, | DagsterTypeLoadingError, | ||||
DagsterTypeMaterializationError, | DagsterTypeMaterializationError, | ||||
▲ Show 20 Lines • Show All 297 Lines • ▼ Show 20 Lines | def _create_step_events_for_output(step_context, output): | ||||
step = step_context.step | step = step_context.step | ||||
step_output = step.step_output_named(output.output_name) | step_output = step.step_output_named(output.output_name) | ||||
for output_event in _type_checked_step_output_event_sequence(step_context, output): | for output_event in _type_checked_step_output_event_sequence(step_context, output): | ||||
yield output_event | yield output_event | ||||
step_output_handle = StepOutputHandle.from_step(step=step, output_name=output.output_name) | step_output_handle = StepOutputHandle.from_step(step=step, output_name=output.output_name) | ||||
for evt in _set_intermediates(step_context, step_output, step_output_handle, output): | for evt in _create_output_materializations(step_context, output.output_name, output.value): | ||||
yield evt | yield evt | ||||
for evt in _create_output_materializations(step_context, output.output_name, output.value): | for evt in _set_intermediates(step_context, step_output, step_output_handle, output): | ||||
yield evt | yield evt | ||||
def _set_intermediates(step_context, step_output, step_output_handle, output): | def _set_intermediates(step_context, step_output, step_output_handle, output): | ||||
res = step_context.intermediate_storage.set_intermediate( | res = step_context.intermediate_storage.set_intermediate( | ||||
context=step_context, | context=step_context, | ||||
dagster_type=step_output.dagster_type, | dagster_type=step_output.dagster_type, | ||||
step_output_handle=step_output_handle, | step_output_handle=step_output_handle, | ||||
value=output.value, | value=output.value, | ||||
address=output.address, | |||||
) | ) | ||||
if isinstance(res, ObjectStoreOperation): | if isinstance(res, ObjectStoreOperation): | ||||
# handle event generated by object_store | |||||
yield DagsterEvent.object_store_operation( | yield DagsterEvent.object_store_operation( | ||||
step_context, ObjectStoreOperation.serializable(res, value_name=output.output_name) | step_context, ObjectStoreOperation.serializable(res, value_name=output.output_name) | ||||
) | ) | ||||
elif inspect.isgenerator(res): | |||||
# when external write method (e.g. materializer, config_value) is provided via Output, | |||||
# we will handle external materializing events here | |||||
for evt in res: | |||||
if isinstance(evt, ObjectStoreOperation): | |||||
yield DagsterEvent.object_store_operation( | |||||
step_context, | |||||
ObjectStoreOperation.serializable(evt, value_name=output.output_name), | |||||
) | |||||
if isinstance(evt, AssetMaterialization): | |||||
yield DagsterEvent.step_materialization(step_context, evt) | |||||
def _create_output_materializations(step_context, output_name, value): | def _create_output_materializations(step_context, output_name, value): | ||||
step = step_context.step | step = step_context.step | ||||
current_handle = step.solid_handle | current_handle = step.solid_handle | ||||
# check for output mappings at every point up the composition hierarchy | # check for output mappings at every point up the composition hierarchy | ||||
while current_handle: | while current_handle: | ||||
solid_config = step_context.environment_config.solids.get(current_handle.to_string()) | solid_config = step_context.environment_config.solids.get(current_handle.to_string()) | ||||
current_handle = current_handle.parent | current_handle = current_handle.parent | ||||
if solid_config is None: | if solid_config is None: | ||||
continue | continue | ||||
for output_spec in solid_config.outputs: | for output_spec in solid_config.outputs: | ||||
check.invariant(len(output_spec) == 1) | check.invariant(len(output_spec) == 1) | ||||
config_output_name, output_spec = list(output_spec.items())[0] | config_output_name, output_spec = list(output_spec.items())[0] | ||||
if config_output_name == output_name: | if config_output_name == output_name: | ||||
step_output = step.step_output_named(output_name) | step_output = step.step_output_named(output_name) | ||||
address = Address(config_value=output_spec) | |||||
step_output_handle = StepOutputHandle(step_context.step.key, step_output.name) | |||||
with user_code_error_boundary( | with user_code_error_boundary( | ||||
DagsterTypeMaterializationError, | DagsterTypeMaterializationError, | ||||
msg_fn=lambda: """Error occurred during output materialization: | msg_fn=lambda: """Error occurred during output materialization: | ||||
output name: "{output_name}" | output name: "{output_name}" | ||||
step key: "{key}" | step key: "{key}" | ||||
solid invocation: "{solid}" | solid invocation: "{solid}" | ||||
solid definition: "{solid_def}" | solid definition: "{solid_def}" | ||||
""".format( | """.format( | ||||
output_name=output_name, | output_name=output_name, | ||||
key=step_context.step.key, | key=step_context.step.key, | ||||
solid_def=step_context.solid_def.name, | solid_def=step_context.solid_def.name, | ||||
solid=step_context.solid.name, | solid=step_context.solid.name, | ||||
), | ), | ||||
): | ): | ||||
materializations = step_output.dagster_type.materializer.materialize_runtime_values( | res = step_context.intermediate_storage.set_intermediate( | ||||
step_context, output_spec, value | context=step_context, | ||||
dagster_type=step_output.dagster_type, | |||||
step_output_handle=step_output_handle, | |||||
value=value, | |||||
address=address, | |||||
) | ) | ||||
for materialization in materializations: | for event in res: | ||||
if not isinstance(materialization, (AssetMaterialization, Materialization)): | if isinstance(event, ObjectStoreOperation): | ||||
yield DagsterEvent.object_store_operation( | |||||
step_context, | |||||
ObjectStoreOperation.serializable(event, value_name=step_output.name), | |||||
) | |||||
continue | |||||
if not isinstance(event, (AssetMaterialization, Materialization)): | |||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
( | ( | ||||
"materialize_runtime_values on type {type_name} has returned " | "materialize_runtime_values on type {type_name} has returned " | ||||
"value {value} of type {python_type}. You must return an " | "value {value} of type {python_type}. You must return an " | ||||
"AssetMaterialization." | "AssetMaterialization." | ||||
).format( | ).format( | ||||
type_name=step_output.dagster_type.name, | type_name=step_output.dagster_type.name, | ||||
value=repr(materialization), | value=repr(event), | ||||
python_type=type(materialization).__name__, | python_type=type(event).__name__, | ||||
) | ) | ||||
) | ) | ||||
yield DagsterEvent.step_materialization(step_context, materialization) | yield DagsterEvent.step_materialization(step_context, event) | ||||
def _user_event_sequence_for_step_compute_fn(step_context, evaluated_inputs): | def _user_event_sequence_for_step_compute_fn(step_context, evaluated_inputs): | ||||
check.inst_param(step_context, "step_context", SystemStepExecutionContext) | check.inst_param(step_context, "step_context", SystemStepExecutionContext) | ||||
check.dict_param(evaluated_inputs, "evaluated_inputs", key_type=str) | check.dict_param(evaluated_inputs, "evaluated_inputs", key_type=str) | ||||
with user_code_error_boundary( | with user_code_error_boundary( | ||||
DagsterExecutionStepExecutionError, | DagsterExecutionStepExecutionError, | ||||
▲ Show 20 Lines • Show All 93 Lines • Show Last 20 Lines |