Differential D6164 Diff 30381 python_modules/dagster-graphql/dagster_graphql/schema/schedules/__init__.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/schedules/__init__.py
import graphene | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ExternalSchedule, ScheduleSelector | from dagster.core.host_representation import ExternalSchedule | ||||
from dagster.core.host_representation.selector import RepositorySelector | from dagster.core.host_representation import ScheduleSelector as DagsterScheduleSelector | ||||
from dagster.core.host_representation.selector import ( | |||||
RepositorySelector as DagsterRepositorySelector, | |||||
) | |||||
from dagster.core.scheduler.job import JobTickStatsSnapshot, JobTickStatus | from dagster.core.scheduler.job import JobTickStatsSnapshot, JobTickStatus | ||||
from dagster.core.storage.pipeline_run import PipelineRunsFilter | |||||
from dagster_graphql import dauphin | from ...implementation.fetch_schedules import ( | ||||
from dagster_graphql.implementation.fetch_schedules import ( | |||||
reconcile_scheduler_state, | reconcile_scheduler_state, | ||||
start_schedule, | start_schedule, | ||||
stop_schedule, | stop_schedule, | ||||
) | ) | ||||
from dagster_graphql.schema.errors import ( | from ..errors import ( | ||||
DauphinPythonError, | PythonError, | ||||
DauphinRepositoryNotFoundError, | RepositoryNotFoundError, | ||||
DauphinScheduleNotFoundError, | ScheduleNotFoundError, | ||||
DauphinSchedulerNotDefinedError, | SchedulerNotDefinedError, | ||||
) | ) | ||||
from ..inputs import RepositorySelector, ScheduleSelector | |||||
from .schedules import ( | from ..jobs import JobState | ||||
DauphinSchedule, | from .schedules import Schedule, ScheduleOrError, Schedules, SchedulesOrError | ||||
DauphinScheduleOrError, | |||||
DauphinSchedules, | |||||
DauphinSchedulesOrError, | |||||
) | |||||
class DauphinScheduleStatus(dauphin.Enum): | class ScheduleStatus(graphene.Enum): | ||||
class Meta: | |||||
name = "ScheduleStatus" | |||||
RUNNING = "RUNNING" | RUNNING = "RUNNING" | ||||
STOPPED = "STOPPED" | STOPPED = "STOPPED" | ||||
ENDED = "ENDED" | ENDED = "ENDED" | ||||
class DauphinSchedulerOrError(dauphin.Union): | class Scheduler(graphene.ObjectType): | ||||
class Meta: | scheduler_class = graphene.String() | ||||
name = "SchedulerOrError" | |||||
types = ("Scheduler", DauphinSchedulerNotDefinedError, "PythonError") | |||||
DauphinJobTickStatus = dauphin.Enum.from_enum(JobTickStatus) | |||||
class DauphinScheduleTick(dauphin.ObjectType): | class SchedulerOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "ScheduleTick" | types = (Scheduler, SchedulerNotDefinedError, PythonError) | ||||
tick_id = dauphin.NonNull(dauphin.String) | |||||
status = dauphin.NonNull("JobTickStatus") | |||||
timestamp = dauphin.NonNull(dauphin.Float) | |||||
tick_specific_data = dauphin.Field("ScheduleTickSpecificData") | |||||
class DauphinScheduleTickSuccessData(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "ScheduleTickSuccessData" | |||||
run = dauphin.Field("PipelineRun") | |||||
class DauphinScheduleTickFailureData(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "ScheduleTickFailureData" | |||||
error = dauphin.NonNull("PythonError") | |||||
class DauphinScheduleTickSpecificData(dauphin.Union): | JobTickStatus = graphene.Enum.from_enum(JobTickStatus) | ||||
class Meta: | |||||
name = "ScheduleTickSpecificData" | |||||
types = ( | |||||
DauphinScheduleTickSuccessData, | |||||
DauphinScheduleTickFailureData, | |||||
) | |||||
class DauphinScheduleTickStatsSnapshot(dauphin.ObjectType): | class ScheduleTickStatsSnapshot(graphene.ObjectType): | ||||
class Meta: | ticks_started = graphene.NonNull(graphene.Int) | ||||
name = "ScheduleTickStatsSnapshot" | ticks_succeeded = graphene.NonNull(graphene.Int) | ||||
ticks_skipped = graphene.NonNull(graphene.Int) | |||||
ticks_started = dauphin.NonNull(dauphin.Int) | ticks_failed = graphene.NonNull(graphene.Int) | ||||
ticks_succeeded = dauphin.NonNull(dauphin.Int) | |||||
ticks_skipped = dauphin.NonNull(dauphin.Int) | |||||
ticks_failed = dauphin.NonNull(dauphin.Int) | |||||
def __init__(self, stats): | def __init__(self, stats): | ||||
super(DauphinScheduleTickStatsSnapshot, self).__init__( | super().__init__( | ||||
ticks_started=stats.ticks_started, | ticks_started=stats.ticks_started, | ||||
ticks_succeeded=stats.ticks_succeeded, | ticks_succeeded=stats.ticks_succeeded, | ||||
ticks_skipped=stats.ticks_skipped, | ticks_skipped=stats.ticks_skipped, | ||||
ticks_failed=stats.ticks_failed, | ticks_failed=stats.ticks_failed, | ||||
) | ) | ||||
self._stats = check.inst_param(stats, "stats", JobTickStatsSnapshot) | self._stats = check.inst_param(stats, "stats", JobTickStatsSnapshot) | ||||
class DauphinScheduler(dauphin.ObjectType): | class ReconcileSchedulerStateSuccess(graphene.ObjectType): | ||||
class Meta: | message = graphene.NonNull(graphene.String) | ||||
name = "Scheduler" | |||||
scheduler_class = dauphin.String() | |||||
class DauphinReconcileSchedulerStateSuccess(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "ReconcileSchedulerStateSuccess" | |||||
message = dauphin.NonNull(dauphin.String) | |||||
class DauphinReconcilScheduleStateMutationResult(dauphin.Union): | class ReconcileSchedulerStateMutationResult(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "ReconcileSchedulerStateMutationResult" | types = (PythonError, ReconcileSchedulerStateSuccess) | ||||
types = (DauphinPythonError, DauphinReconcileSchedulerStateSuccess) | |||||
class DauphinReconcileSchedulerStateMutation(dauphin.Mutation): | class ReconcileSchedulerStateMutation(graphene.Mutation): | ||||
class Meta: | |||||
name = "ReconcileSchedulerStateMutation" | |||||
class Arguments: | class Arguments: | ||||
repository_selector = dauphin.NonNull("RepositorySelector") | repository_selector = graphene.NonNull(RepositorySelector) | ||||
Output = dauphin.NonNull("ReconcileSchedulerStateMutationResult") | Output = graphene.NonNull(ReconcileSchedulerStateMutationResult) | ||||
def mutate(self, graphene_info, repository_selector): | def mutate(self, graphene_info, repository_selector): | ||||
return reconcile_scheduler_state( | return reconcile_scheduler_state( | ||||
graphene_info, RepositorySelector.from_graphql_input(repository_selector) | graphene_info, DagsterRepositorySelector.from_graphql_input(repository_selector) | ||||
) | ) | ||||
class DauphinScheduleStateResult(dauphin.ObjectType): | class ScheduleStateResult(graphene.ObjectType): | ||||
class Meta: | scheduleState = graphene.NonNull(JobState) | ||||
name = "ScheduleStateResult" | |||||
scheduleState = dauphin.NonNull("JobState") | |||||
class ScheduleMutationResult(graphene.Union): | |||||
class DauphinScheduleMutationResult(dauphin.Union): | |||||
class Meta: | class Meta: | ||||
name = "ScheduleMutationResult" | types = (PythonError, ScheduleStateResult) | ||||
types = (DauphinPythonError, DauphinScheduleStateResult) | |||||
class DauphinStartScheduleMutation(dauphin.Mutation): | class StartScheduleMutation(graphene.Mutation): | ||||
class Meta: | |||||
name = "StartScheduleMutation" | |||||
class Arguments: | class Arguments: | ||||
schedule_selector = dauphin.NonNull("ScheduleSelector") | schedule_selector = graphene.NonNull(ScheduleSelector) | ||||
Output = dauphin.NonNull("ScheduleMutationResult") | Output = graphene.NonNull(ScheduleMutationResult) | ||||
def mutate(self, graphene_info, schedule_selector): | def mutate(self, graphene_info, schedule_selector): | ||||
return start_schedule(graphene_info, ScheduleSelector.from_graphql_input(schedule_selector)) | return start_schedule( | ||||
graphene_info, DagsterScheduleSelector.from_graphql_input(schedule_selector) | |||||
) | |||||
class DauphinStopRunningScheduleMutation(dauphin.Mutation): | |||||
class Meta: | |||||
name = "StopRunningScheduleMutation" | |||||
class StopRunningScheduleMutation(graphene.Mutation): | |||||
class Arguments: | class Arguments: | ||||
schedule_origin_id = dauphin.NonNull(dauphin.String) | schedule_origin_id = graphene.NonNull(graphene.String) | ||||
Output = dauphin.NonNull("ScheduleMutationResult") | Output = graphene.NonNull(ScheduleMutationResult) | ||||
def mutate(self, graphene_info, schedule_origin_id): | def mutate(self, graphene_info, schedule_origin_id): | ||||
return stop_schedule(graphene_info, schedule_origin_id) | return stop_schedule(graphene_info, schedule_origin_id) |