Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagstermill/dagstermill/engine.py
import nbformat | import nbformat | ||||
from nbconvert.preprocessors.execute import CellExecutionError | from nbconvert.preprocessors.execute import CellExecutionError | ||||
from papermill.engines import NBConvertEngine | |||||
from papermill.log import logger | from papermill.log import logger | ||||
from papermill.preprocess import PapermillExecutePreprocessor | |||||
try: | |||||
# papermill 1.x | |||||
from papermill.engines import NBConvertEngine | |||||
from papermill.preprocess import PapermillExecutePreprocessor | |||||
class DagstermillExecutePreprocessor(PapermillExecutePreprocessor): | class DagstermillExecutePreprocessor(PapermillExecutePreprocessor): | ||||
# We need to finalize dagster resources here (as opposed to, e.g., in the notebook_complete | # We need to finalize dagster resources here (as opposed to, e.g., in the notebook_complete | ||||
# method on the NotebookExecutionManager), because we need to be inside the scope of the | # method on the NotebookExecutionManager), because we need to be inside the scope of the | ||||
# nbconvert.preprocessors.ExecutePreprocessor.setup_preprocessor context manager, which tears | # nbconvert.preprocessors.ExecutePreprocessor.setup_preprocessor context manager, which tears | ||||
# the kernel down. Note that atexit doesn't seem to work at all in ipython, and hooking into | # the kernel down. Note that atexit doesn't seem to work at all in ipython, and hooking into | ||||
# the ipython post_execute event doesn't work in papermill. | # the ipython post_execute event doesn't work in papermill. | ||||
def papermill_process(self, nb_man, resources): | def papermill_process(self, nb_man, resources): | ||||
_, resources = super(DagstermillExecutePreprocessor, self).papermill_process( | _, resources = super(DagstermillExecutePreprocessor, self).papermill_process( | ||||
nb_man, resources | nb_man, resources | ||||
) | ) | ||||
new_cell = nbformat.v4.new_code_cell( | new_cell = nbformat.v4.new_code_cell( | ||||
source=("import dagstermill as __dm_dagstermill\n" "__dm_dagstermill._teardown()\n") | source=("import dagstermill as __dm_dagstermill\n" "__dm_dagstermill._teardown()\n") | ||||
) | ) | ||||
new_cell.metadata["tags"] = ["injected-teardown"] | new_cell.metadata["tags"] = ["injected-teardown"] | ||||
new_cell.metadata["papermill"] = {} | new_cell.metadata["papermill"] = {} | ||||
index = len(nb_man.nb.cells) | index = len(nb_man.nb.cells) | ||||
nb_man.nb.cells = nb_man.nb.cells + [new_cell] | nb_man.nb.cells = nb_man.nb.cells + [new_cell] | ||||
# Calqued from PapermillExecutePreprocessor.papermill_process | # Calqued from PapermillExecutePreprocessor.papermill_process | ||||
try: | try: | ||||
nb_man.cell_start(new_cell, index) | nb_man.cell_start(new_cell, index) | ||||
nb_man.nb.cells[index], _ = self.preprocess_cell(new_cell, None, index) | nb_man.nb.cells[index], _ = self.preprocess_cell(new_cell, None, index) | ||||
except CellExecutionError as ex: # pragma: nocover | except CellExecutionError as ex: # pragma: nocover | ||||
nb_man.cell_exception(nb_man.nb.cells[index], cell_index=index, exception=ex) | nb_man.cell_exception(nb_man.nb.cells[index], cell_index=index, exception=ex) | ||||
finally: | finally: | ||||
nb_man.cell_complete(nb_man.nb.cells[index], cell_index=index) | nb_man.cell_complete(nb_man.nb.cells[index], cell_index=index) | ||||
return nb_man.nb, resources | return nb_man.nb, resources | ||||
class DagstermillNBConvertEngine(NBConvertEngine): | class DagstermillNBConvertEngine(NBConvertEngine): | ||||
@classmethod | @classmethod | ||||
def execute_managed_notebook( | def execute_managed_notebook( | ||||
cls, | cls, | ||||
nb_man, | nb_man, | ||||
kernel_name, | kernel_name, | ||||
log_output=False, | log_output=False, | ||||
stdout_file=None, | stdout_file=None, | ||||
stderr_file=None, | stderr_file=None, | ||||
start_timeout=60, | start_timeout=60, | ||||
execution_timeout=None, | execution_timeout=None, | ||||
**kwargs, | **kwargs, | ||||
): | ): | ||||
# Nicely handle preprocessor arguments prioritizing values set by engine | # Nicely handle preprocessor arguments prioritizing values set by engine | ||||
preprocessor = DagstermillExecutePreprocessor( | preprocessor = DagstermillExecutePreprocessor( | ||||
timeout=execution_timeout if execution_timeout else kwargs.get("timeout"), | timeout=execution_timeout if execution_timeout else kwargs.get("timeout"), | ||||
startup_timeout=start_timeout, | startup_timeout=start_timeout, | ||||
kernel_name=kernel_name, | kernel_name=kernel_name, | ||||
log=logger, | log=logger, | ||||
) | ) | ||||
preprocessor.log_output = log_output # pylint:disable = attribute-defined-outside-init | preprocessor.log_output = log_output # pylint:disable = attribute-defined-outside-init | ||||
preprocessor.preprocess(nb_man, kwargs) | preprocessor.preprocess(nb_man, kwargs) | ||||
except ImportError: | |||||
# papermill 2.x | |||||
from papermill.clientwrap import PapermillNotebookClient | |||||
from papermill.engines import NBClientEngine | |||||
from papermill.utils import merge_kwargs, remove_args | |||||
dgibson: see _IS_PENDULUM_2 in dagster.seven for another way we could do this logic - personally I… | |||||
Done Inline Actionsthat's fine, i can do something like that -- but probably will want to rework _IS_PENDULUM_2 also (if we go down that route, should be using packaging.version.parse, etc. max: that's fine, i can do something like that -- but probably will want to rework _IS_PENDULUM_2… | |||||
class DagstermillNotebookClient(PapermillNotebookClient): | |||||
def papermill_execute_cells(self): | |||||
try: | |||||
for index, cell in enumerate(self.nb.cells): | |||||
try: | |||||
self.nb_man.cell_start(cell, index) | |||||
self.execute_cell(cell, index) | |||||
except CellExecutionError as ex: | |||||
self.nb_man.cell_exception(self.nb.cells[index], cell_index=index, exception=ex) | |||||
break | |||||
finally: | |||||
self.nb_man.cell_complete(self.nb.cells[index], cell_index=index) | |||||
finally: | |||||
new_cell = nbformat.v4.new_code_cell( | |||||
source=("import dagstermill as __dm_dagstermill\n" "__dm_dagstermill._teardown()\n") | |||||
) | |||||
new_cell.metadata["tags"] = ["injected-teardown"] | |||||
new_cell.metadata["papermill"] = { | |||||
"exception": None, | |||||
"start_time": None, | |||||
"end_time": None, | |||||
"duration": None, | |||||
"status": self.nb_man.PENDING, | |||||
} | |||||
index = len(self.nb_man.nb.cells) | |||||
self.nb_man.nb.cells = self.nb_man.nb.cells + [new_cell] | |||||
try: | |||||
self.nb_man.cell_start(new_cell, index) | |||||
self.execute_cell(new_cell, index) | |||||
except CellExecutionError as ex: | |||||
self.nb_man.cell_exception(self.nb.cells[index], cell_index=index, exception=ex) | |||||
finally: | |||||
self.nb_man.cell_complete(self.nb.cells[index], cell_index=index) | |||||
class DagstermillNBClientEngine(NBClientEngine): | |||||
@classmethod | |||||
def execute_managed_notebook( | |||||
cls, | |||||
nb_man, | |||||
kernel_name, | |||||
log_output=False, | |||||
stdout_file=None, | |||||
stderr_file=None, | |||||
start_timeout=60, | |||||
execution_timeout=None, | |||||
**kwargs, | |||||
): | |||||
# Exclude parameters that named differently downstream | |||||
safe_kwargs = remove_args(['timeout', 'startup_timeout', 'input_path'], **kwargs) | |||||
# Nicely handle preprocessor arguments prioritizing values set by engine | |||||
final_kwargs = merge_kwargs( | |||||
safe_kwargs, | |||||
timeout=execution_timeout if execution_timeout else kwargs.get('timeout'), | |||||
startup_timeout=start_timeout, | |||||
kernel_name=kernel_name, | |||||
log=logger, | |||||
log_output=log_output, | |||||
stdout_file=stdout_file, | |||||
stderr_file=stderr_file, | |||||
) | |||||
return DagstermillNotebookClient(nb_man, **final_kwargs).execute() | |||||
# Nicely handle preprocessor arguments prioritizing values set by engine | |||||
Not Done Inline Actionsrm? dgibson: rm? | |||||
Done Inline Actionsgood catch max: good catch |
see _IS_PENDULUM_2 in dagster.seven for another way we could do this logic - personally I prefer that to using an ImportError as a proxy for a version check, but not a hill I want to die on