Differential D6164 Diff 30546 python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import IPipeline | |||||
from dagster.core.definitions.schedule import ScheduleExecutionContext | |||||
from dagster.core.errors import ( | |||||
DagsterInvalidConfigError, | |||||
ScheduleExecutionError, | |||||
user_code_error_boundary, | |||||
) | |||||
from dagster.core.events import DagsterEventType | |||||
from dagster.core.execution.retries import Retries | |||||
from dagster.core.host_representation import ExternalExecutionPlan | |||||
from dagster.core.storage.compute_log_manager import ComputeIOType | from dagster.core.storage.compute_log_manager import ComputeIOType | ||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.system_config.objects import ExecutionConfig | |||||
from dagster.serdes import serialize_dagster_namedtuple | from dagster.serdes import serialize_dagster_namedtuple | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
from dagster_graphql.implementation.fetch_runs import is_config_valid | |||||
from dagster_graphql.schema.pipelines import DauphinPipeline | |||||
from dagster_graphql.schema.runs import ( | |||||
from_compute_log_file, | |||||
from_dagster_event_record, | |||||
from_event_record, | |||||
) | |||||
from graphql.execution.base import ResolveInfo | from graphql.execution.base import ResolveInfo | ||||
from rx import Observable | from rx import Observable | ||||
from ..external import ( | from ..external import ( | ||||
ExternalPipeline, | ExternalPipeline, | ||||
ensure_valid_config, | ensure_valid_config, | ||||
ensure_valid_step_keys, | ensure_valid_step_keys, | ||||
get_external_pipeline_or_raise, | get_external_pipeline_or_raise, | ||||
) | ) | ||||
from ..fetch_runs import is_config_valid | |||||
from ..pipeline_run_storage import PipelineRunObservableSubscribe | from ..pipeline_run_storage import PipelineRunObservableSubscribe | ||||
from ..resume_retry import get_retry_steps_from_execution_plan | from ..resume_retry import get_retry_steps_from_execution_plan | ||||
from ..utils import ExecutionParams, UserFacingGraphQLError, capture_dauphin_error | from ..utils import ExecutionParams, UserFacingGraphQLError, capture_error | ||||
from .backfill import create_and_launch_partition_backfill | from .backfill import create_and_launch_partition_backfill | ||||
from .launch_execution import launch_pipeline_execution, launch_pipeline_reexecution | from .launch_execution import launch_pipeline_execution, launch_pipeline_reexecution | ||||
def _force_mark_as_canceled(graphene_info, run_id): | def _force_mark_as_canceled(graphene_info, run_id): | ||||
from ...schema.pipelines.pipeline import GraphenePipelineRun | |||||
from ...schema.roots.mutation import GrapheneTerminatePipelineExecutionSuccess | |||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
reloaded_run = instance.get_run_by_id(run_id) | reloaded_run = instance.get_run_by_id(run_id) | ||||
if not reloaded_run.is_finished: | if not reloaded_run.is_finished: | ||||
message = ( | message = ( | ||||
"This pipeline was forcibly marked as canceled from outside the execution context. The " | "This pipeline was forcibly marked as canceled from outside the execution context. The " | ||||
"computational resources created by the run may not have been fully cleaned up." | "computational resources created by the run may not have been fully cleaned up." | ||||
) | ) | ||||
instance.report_run_canceled(reloaded_run, message=message) | instance.report_run_canceled(reloaded_run, message=message) | ||||
reloaded_run = instance.get_run_by_id(run_id) | reloaded_run = instance.get_run_by_id(run_id) | ||||
return graphene_info.schema.type_named("TerminatePipelineExecutionSuccess")( | return GrapheneTerminatePipelineExecutionSuccess(GraphenePipelineRun(reloaded_run)) | ||||
graphene_info.schema.type_named("PipelineRun")(reloaded_run) | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def terminate_pipeline_execution(graphene_info, run_id, terminate_policy): | def terminate_pipeline_execution(graphene_info, run_id, terminate_policy): | ||||
from ...schema.errors import GraphenePipelineRunNotFoundError | |||||
from dagster_graphql.schema.roots import DauphinTerminatePipelinePolicy | from ...schema.pipelines.pipeline import GraphenePipelineRun | ||||
from ...schema.roots.mutation import ( | |||||
GrapheneTerminatePipelineExecutionFailure, | |||||
GrapheneTerminatePipelineExecutionSuccess, | |||||
GrapheneTerminatePipelinePolicy, | |||||
) | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.str_param(run_id, "run_id") | check.str_param(run_id, "run_id") | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
run = instance.get_run_by_id(run_id) | run = instance.get_run_by_id(run_id) | ||||
force_mark_as_canceled = ( | force_mark_as_canceled = ( | ||||
terminate_policy == DauphinTerminatePipelinePolicy.MARK_AS_CANCELED_IMMEDIATELY | terminate_policy == GrapheneTerminatePipelinePolicy.MARK_AS_CANCELED_IMMEDIATELY | ||||
) | ) | ||||
if not run: | if not run: | ||||
return graphene_info.schema.type_named("PipelineRunNotFoundError")(run_id) | return GraphenePipelineRunNotFoundError(run_id) | ||||
dauphin_run = graphene_info.schema.type_named("PipelineRun")(run) | pipeline_run = GraphenePipelineRun(run) | ||||
valid_status = not run.is_finished and ( | valid_status = not run.is_finished and ( | ||||
force_mark_as_canceled | force_mark_as_canceled | ||||
or (run.status == PipelineRunStatus.STARTED or run.status == PipelineRunStatus.QUEUED) | or ( | ||||
run.status == PipelineRunStatus.STARTED | |||||
or run.status == PipelineRunStatus.QUEUED | |||||
) | |||||
) | ) | ||||
if not valid_status: | if not valid_status: | ||||
return graphene_info.schema.type_named("TerminatePipelineExecutionFailure")( | return GrapheneTerminatePipelineExecutionFailure( | ||||
run=dauphin_run, | run=pipeline_run, | ||||
message="Run {run_id} could not be terminated due to having status {status}.".format( | message="Run {run_id} could not be terminated due to having status {status}.".format( | ||||
run_id=run.run_id, status=run.status.value | run_id=run.run_id, status=run.status.value | ||||
), | ), | ||||
) | ) | ||||
if ( | if ( | ||||
graphene_info.context.instance.run_coordinator | graphene_info.context.instance.run_coordinator | ||||
and graphene_info.context.instance.run_coordinator.can_cancel_run(run_id) | and graphene_info.context.instance.run_coordinator.can_cancel_run(run_id) | ||||
and graphene_info.context.instance.run_coordinator.cancel_run(run_id) | and graphene_info.context.instance.run_coordinator.cancel_run(run_id) | ||||
): | ): | ||||
return ( | return ( | ||||
_force_mark_as_canceled(graphene_info, run_id) | _force_mark_as_canceled(graphene_info, run_id) | ||||
if force_mark_as_canceled | if force_mark_as_canceled | ||||
else graphene_info.schema.type_named("TerminatePipelineExecutionSuccess")(dauphin_run) | else GrapheneTerminatePipelineExecutionSuccess(pipeline_run) | ||||
) | ) | ||||
return ( | return ( | ||||
_force_mark_as_canceled(graphene_info, run_id) | _force_mark_as_canceled(graphene_info, run_id) | ||||
if force_mark_as_canceled | if force_mark_as_canceled | ||||
else graphene_info.schema.type_named("TerminatePipelineExecutionFailure")( | else GrapheneTerminatePipelineExecutionFailure( | ||||
run=dauphin_run, message="Unable to terminate run {run_id}".format(run_id=run.run_id) | run=pipeline_run, message="Unable to terminate run {run_id}".format(run_id=run.run_id) | ||||
) | ) | ||||
) | ) | ||||
@capture_dauphin_error | @capture_error | ||||
def delete_pipeline_run(graphene_info, run_id): | def delete_pipeline_run(graphene_info, run_id): | ||||
from ...schema.errors import GraphenePipelineRunNotFoundError | |||||
from ...schema.roots.mutation import GrapheneDeletePipelineRunSuccess | |||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
if not instance.has_run(run_id): | if not instance.has_run(run_id): | ||||
return graphene_info.schema.type_named("PipelineRunNotFoundError")(run_id) | return GraphenePipelineRunNotFoundError(run_id) | ||||
instance.delete_run(run_id) | instance.delete_run(run_id) | ||||
return graphene_info.schema.type_named("DeletePipelineRunSuccess")(run_id) | return GrapheneDeletePipelineRunSuccess(run_id) | ||||
def get_pipeline_run_observable(graphene_info, run_id, after=None): | def get_pipeline_run_observable(graphene_info, run_id, after=None): | ||||
from ...schema.pipelines.pipeline import GraphenePipelineRun | |||||
from ...schema.pipelines.subscription import ( | |||||
GraphenePipelineRunLogsSubscriptionFailure, | |||||
GraphenePipelineRunLogsSubscriptionSuccess, | |||||
) | |||||
from ..events import from_event_record | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.str_param(run_id, "run_id") | check.str_param(run_id, "run_id") | ||||
check.opt_int_param(after, "after") | check.opt_int_param(after, "after") | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
run = instance.get_run_by_id(run_id) | run = instance.get_run_by_id(run_id) | ||||
if not run: | if not run: | ||||
def _get_error_observable(observer): | def _get_error_observable(observer): | ||||
observer.on_next( | observer.on_next( | ||||
graphene_info.schema.type_named("PipelineRunLogsSubscriptionFailure")( | GraphenePipelineRunLogsSubscriptionFailure( | ||||
missingRunId=run_id, message="Could not load run with id {}".format(run_id) | missingRunId=run_id, message="Could not load run with id {}".format(run_id) | ||||
) | ) | ||||
) | ) | ||||
return Observable.create(_get_error_observable) # pylint: disable=E1101 | return Observable.create(_get_error_observable) # pylint: disable=E1101 | ||||
# pylint: disable=E1101 | # pylint: disable=E1101 | ||||
return Observable.create( | return Observable.create( | ||||
PipelineRunObservableSubscribe(instance, run_id, after_cursor=after) | PipelineRunObservableSubscribe(instance, run_id, after_cursor=after) | ||||
).map( | ).map( | ||||
lambda events: graphene_info.schema.type_named("PipelineRunLogsSubscriptionSuccess")( | lambda events: GraphenePipelineRunLogsSubscriptionSuccess( | ||||
run=graphene_info.schema.type_named("PipelineRun")(run), | run=GraphenePipelineRun(run), | ||||
messages=[from_event_record(event, run.pipeline_name) for event in events], | messages=[from_event_record(event, run.pipeline_name) for event in events], | ||||
) | ) | ||||
) | ) | ||||
def get_compute_log_observable(graphene_info, run_id, step_key, io_type, cursor=None): | def get_compute_log_observable(graphene_info, run_id, step_key, io_type, cursor=None): | ||||
from ...schema.logs.compute_logs import from_compute_log_file | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.str_param(run_id, "run_id") | check.str_param(run_id, "run_id") | ||||
check.str_param(step_key, "step_key") | check.str_param(step_key, "step_key") | ||||
check.inst_param(io_type, "io_type", ComputeIOType) | check.inst_param(io_type, "io_type", ComputeIOType) | ||||
check.opt_str_param(cursor, "cursor") | check.opt_str_param(cursor, "cursor") | ||||
return graphene_info.context.instance.compute_log_manager.observable( | return graphene_info.context.instance.compute_log_manager.observable( | ||||
run_id, step_key, io_type, cursor | run_id, step_key, io_type, cursor | ||||
).map(lambda update: from_compute_log_file(graphene_info, update)) | ).map(lambda update: from_compute_log_file(graphene_info, update)) |