Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/memoization.py
Show First 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def copy_required_intermediates_for_execution(pipeline_context, execution_plan): | ||||
parent_run_logs = pipeline_context.instance.all_logs(parent_run_id) | parent_run_logs = pipeline_context.instance.all_logs(parent_run_id) | ||||
output_handles_for_current_run = output_handles_from_execution_plan(execution_plan) | output_handles_for_current_run = output_handles_from_execution_plan(execution_plan) | ||||
output_handles_from_previous_run = output_handles_from_event_logs(parent_run_logs) | output_handles_from_previous_run = output_handles_from_event_logs(parent_run_logs) | ||||
output_handles_to_copy = output_handles_for_current_run.intersection( | output_handles_to_copy = output_handles_for_current_run.intersection( | ||||
output_handles_from_previous_run | output_handles_from_previous_run | ||||
) | ) | ||||
output_handles_to_copy_by_step = defaultdict(list) | output_handles_to_copy_by_step = defaultdict(list) | ||||
for handle in output_handles_to_copy: | for handle in output_handles_to_copy: | ||||
output_handles_to_copy_by_step[handle.step_key].append(handle) | output_handles_to_copy_by_step[handle.step_key].append(handle) | ||||
intermediate_storage = pipeline_context.intermediate_storage | intermediate_storage = pipeline_context.intermediate_storage | ||||
yuhan: TODO | |||||
for step in execution_plan.topological_steps(): | for step in execution_plan.topological_steps(): | ||||
step_context = pipeline_context.for_step(step) | step_context = pipeline_context.for_step(step) | ||||
for handle in output_handles_to_copy_by_step.get(step.key, []): | for handle in output_handles_to_copy_by_step.get(step.key, []): | ||||
if intermediate_storage.has_intermediate(pipeline_context, handle): | if intermediate_storage.has_intermediate(pipeline_context, handle): | ||||
continue | continue | ||||
operation = intermediate_storage.copy_intermediate_from_run( | operation = intermediate_storage.copy_intermediate_from_run( | ||||
pipeline_context, parent_run_id, handle | pipeline_context, parent_run_id, handle | ||||
) | ) | ||||
yield DagsterEvent.object_store_operation( | yield DagsterEvent.object_store_operation( | ||||
step_context, | step_context, | ||||
ObjectStoreOperation.serializable(operation, value_name=handle.output_name), | ObjectStoreOperation.serializable(operation, value_name=handle.output_name), | ||||
) | ) | ||||
def is_intermediate_storage_write_event(record): | def is_intermediate_storage_write_event(record): | ||||
check.inst_param(record, "record", EventRecord) | check.inst_param(record, "record", EventRecord) | ||||
if not record.is_dagster_event: | if not record.is_dagster_event: | ||||
return False | return False | ||||
write_ops = ( | write_ops = ( | ||||
ObjectStoreOperationType.SET_OBJECT.value, | ObjectStoreOperationType.SET_OBJECT.value, | ||||
ObjectStoreOperationType.SET_EXTERNAL_OBJECT.value, | |||||
ObjectStoreOperationType.CP_OBJECT.value, | ObjectStoreOperationType.CP_OBJECT.value, | ||||
) | ) | ||||
return ( | return ( | ||||
record.dagster_event.event_type_value == DagsterEventType.OBJECT_STORE_OPERATION.value | record.dagster_event.event_type_value == DagsterEventType.OBJECT_STORE_OPERATION.value | ||||
and record.dagster_event.event_specific_data.op in write_ops | and record.dagster_event.event_specific_data.op in write_ops | ||||
) | ) | ||||
Show All 33 Lines |
TODO