Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagstermill/dagstermill/solids.py
Show All 11 Lines | from dagster import ( | ||||
EventMetadataEntry, | EventMetadataEntry, | ||||
InputDefinition, | InputDefinition, | ||||
Output, | Output, | ||||
OutputDefinition, | OutputDefinition, | ||||
SolidDefinition, | SolidDefinition, | ||||
check, | check, | ||||
seven, | seven, | ||||
) | ) | ||||
from dagster.core.definitions.events import Failure, RetryRequested | |||||
from dagster.core.definitions.reconstructable import ReconstructablePipeline | from dagster.core.definitions.reconstructable import ReconstructablePipeline | ||||
from dagster.core.definitions.utils import validate_tags | from dagster.core.definitions.utils import validate_tags | ||||
from dagster.core.execution.context.compute import SolidExecutionContext | from dagster.core.execution.context.compute import SolidExecutionContext | ||||
from dagster.core.execution.context.system import SystemComputeExecutionContext | from dagster.core.execution.context.system import SystemComputeExecutionContext | ||||
from dagster.core.storage.file_manager import FileHandle | from dagster.core.storage.file_manager import FileHandle | ||||
from dagster.serdes import pack_value | from dagster.serdes import pack_value | ||||
from dagster.utils import mkdir_p, safe_tempfile_path | from dagster.utils import mkdir_p, safe_tempfile_path | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
from papermill.engines import papermill_engines | from papermill.engines import papermill_engines | ||||
from papermill.exceptions import PapermillExecutionError | |||||
from papermill.iorw import load_notebook_node, write_ipynb | from papermill.iorw import load_notebook_node, write_ipynb | ||||
from papermill.parameterize import _find_first_tagged_cell_index | from papermill.parameterize import _find_first_tagged_cell_index | ||||
from .engine import DagstermillNBConvertEngine | from .engine import DagstermillNBConvertEngine | ||||
from .errors import DagstermillError | from .errors import DagstermillError | ||||
from .serialize import read_value, write_value | from .serialize import read_value, write_value | ||||
from .translator import RESERVED_INPUT_NAMES, DagsterTranslator | from .translator import RESERVED_INPUT_NAMES, DagsterTranslator | ||||
▲ Show 20 Lines • Show All 141 Lines • ▼ Show 20 Lines | def _t_fn(compute_context, inputs): | ||||
papermill_engines.register("dagstermill", DagstermillNBConvertEngine) | papermill_engines.register("dagstermill", DagstermillNBConvertEngine) | ||||
papermill.execute_notebook( | papermill.execute_notebook( | ||||
input_path=parameterized_notebook_path, | input_path=parameterized_notebook_path, | ||||
output_path=executed_notebook_path, | output_path=executed_notebook_path, | ||||
engine_name="dagstermill", | engine_name="dagstermill", | ||||
log_output=True, | log_output=True, | ||||
) | ) | ||||
except Exception: # pylint: disable=broad-except | except Exception as ex: # pylint: disable=broad-except | ||||
try: | try: | ||||
with open(executed_notebook_path, "rb") as fd: | with open(executed_notebook_path, "rb") as fd: | ||||
executed_notebook_file_handle = ( | executed_notebook_file_handle = ( | ||||
compute_context.resources.file_manager.write( | compute_context.resources.file_manager.write( | ||||
fd, mode="wb", ext="ipynb" | fd, mode="wb", ext="ipynb" | ||||
) | ) | ||||
) | ) | ||||
executed_notebook_materialization_path = ( | executed_notebook_materialization_path = ( | ||||
Show All 12 Lines | def _t_fn(compute_context, inputs): | ||||
description="Location of output notebook in file manager", | description="Location of output notebook in file manager", | ||||
metadata_entries=[ | metadata_entries=[ | ||||
EventMetadataEntry.fspath( | EventMetadataEntry.fspath( | ||||
executed_notebook_materialization_path, | executed_notebook_materialization_path, | ||||
label="executed_notebook_path", | label="executed_notebook_path", | ||||
) | ) | ||||
], | ], | ||||
) | ) | ||||
# pylint: disable=no-member | |||||
if isinstance(ex, PapermillExecutionError) and ( | |||||
ex.ename == "RetryRequested" or ex.ename == "Failure" | |||||
): | |||||
system_compute_context.log.warn( | |||||
f"Encountered raised {ex.ename} in notebook. Use dagstermill.yield_event " | |||||
"with RetryRequested or Failure to trigger their behavior." | |||||
) | |||||
raise | raise | ||||
system_compute_context.log.debug( | system_compute_context.log.debug( | ||||
"Notebook execution complete for {name} at {executed_notebook_path}.".format( | "Notebook execution complete for {name} at {executed_notebook_path}.".format( | ||||
name=name, | name=name, | ||||
executed_notebook_path=executed_notebook_path, | executed_notebook_path=executed_notebook_path, | ||||
) | ) | ||||
) | ) | ||||
Show All 36 Lines | def _t_fn(compute_context, inputs): | ||||
if output_name in data_dict: | if output_name in data_dict: | ||||
value = read_value(output_def.dagster_type, data_dict[output_name]) | value = read_value(output_def.dagster_type, data_dict[output_name]) | ||||
yield Output(value, output_name) | yield Output(value, output_name) | ||||
for key, value in output_nb.scraps.items(): | for key, value in output_nb.scraps.items(): | ||||
if key.startswith("event-"): | if key.startswith("event-"): | ||||
with open(value.data, "rb") as fd: | with open(value.data, "rb") as fd: | ||||
yield pickle.loads(fd.read()) | event = pickle.loads(fd.read()) | ||||
if isinstance(event, (Failure, RetryRequested)): | |||||
raise event | |||||
else: | |||||
yield event | |||||
return _t_fn | return _t_fn | ||||
def define_dagstermill_solid( | def define_dagstermill_solid( | ||||
name, | name, | ||||
notebook_path, | notebook_path, | ||||
input_defs=None, | input_defs=None, | ||||
▲ Show 20 Lines • Show All 80 Lines • Show Last 20 Lines |