Differential D6164 Diff 30503 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import PipelineSelector, RepositorySelector, ScheduleSelector | from dagster.core.host_representation import PipelineSelector, RepositorySelector, ScheduleSelector | ||||
from graphql.execution.base import ResolveInfo | from graphql.execution.base import ResolveInfo | ||||
from .utils import UserFacingGraphQLError, capture_dauphin_error | from .utils import UserFacingGraphQLError, capture_error | ||||
@capture_dauphin_error | @capture_error | ||||
def reconcile_scheduler_state(graphene_info, repository_selector): | def reconcile_scheduler_state(graphene_info, repository_selector): | ||||
from ..schema.schedules import GrapheneReconcileSchedulerStateSuccess | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_selector, "repository_selector", RepositorySelector) | check.inst_param(repository_selector, "repository_selector", RepositorySelector) | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
instance.reconcile_scheduler_state(repository) | instance.reconcile_scheduler_state(repository) | ||||
return graphene_info.schema.type_named("ReconcileSchedulerStateSuccess")(message="Success") | return GrapheneReconcileSchedulerStateSuccess(message="Success") | ||||
@capture_dauphin_error | @capture_error | ||||
def start_schedule(graphene_info, schedule_selector): | def start_schedule(graphene_info, schedule_selector): | ||||
from ..schema.jobs import GrapheneJobState | |||||
from ..schema.schedules import GrapheneScheduleStateResult | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | ||||
location = graphene_info.context.get_repository_location(schedule_selector.location_name) | location = graphene_info.context.get_repository_location(schedule_selector.location_name) | ||||
repository = location.get_repository(schedule_selector.repository_name) | repository = location.get_repository(schedule_selector.repository_name) | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
schedule_state = instance.start_schedule_and_update_storage_state( | schedule_state = instance.start_schedule_and_update_storage_state( | ||||
repository.get_external_schedule(schedule_selector.schedule_name) | repository.get_external_schedule(schedule_selector.schedule_name) | ||||
) | ) | ||||
return graphene_info.schema.type_named("ScheduleStateResult")( | return GrapheneScheduleStateResult(GrapheneJobState(schedule_state)) | ||||
graphene_info.schema.type_named("JobState")(schedule_state) | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def stop_schedule(graphene_info, schedule_origin_id): | def stop_schedule(graphene_info, schedule_origin_id): | ||||
from ..schema.jobs import GrapheneJobState | |||||
from ..schema.schedules import GrapheneScheduleStateResult | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
schedule_state = instance.stop_schedule_and_update_storage_state(schedule_origin_id) | schedule_state = instance.stop_schedule_and_update_storage_state(schedule_origin_id) | ||||
return graphene_info.schema.type_named("ScheduleStateResult")( | return GrapheneScheduleStateResult(GrapheneJobState(schedule_state)) | ||||
graphene_info.schema.type_named("JobState")(schedule_state) | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def get_scheduler_or_error(graphene_info): | def get_scheduler_or_error(graphene_info): | ||||
from ..schema.errors import GrapheneSchedulerNotDefinedError | |||||
from ..schema.schedules import GrapheneScheduler | |||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
if not instance.scheduler: | if not instance.scheduler: | ||||
raise UserFacingGraphQLError(graphene_info.schema.type_named("SchedulerNotDefinedError")()) | raise UserFacingGraphQLError(GrapheneSchedulerNotDefinedError()) | ||||
return graphene_info.schema.type_named("Scheduler")( | return GrapheneScheduler(scheduler_class=instance.scheduler.__class__.__name__) | ||||
scheduler_class=instance.scheduler.__class__.__name__ | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def get_schedules_or_error(graphene_info, repository_selector): | def get_schedules_or_error(graphene_info, repository_selector): | ||||
from ..schema.schedules import GrapheneSchedule, GrapheneSchedules | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_selector, "repository_selector", RepositorySelector) | check.inst_param(repository_selector, "repository_selector", RepositorySelector) | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
external_schedules = repository.get_external_schedules() | external_schedules = repository.get_external_schedules() | ||||
results = [ | results = [ | ||||
graphene_info.schema.type_named("Schedule")( | GrapheneSchedule(graphene_info, external_schedule=external_schedule) | ||||
graphene_info, external_schedule=external_schedule | |||||
) | |||||
for external_schedule in external_schedules | for external_schedule in external_schedules | ||||
] | ] | ||||
return graphene_info.schema.type_named("Schedules")(results=results) | return GrapheneSchedules(results=results) | ||||
def get_schedules_for_pipeline(graphene_info, pipeline_selector): | def get_schedules_for_pipeline(graphene_info, pipeline_selector): | ||||
from ..schema.schedules import GrapheneSchedule | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(pipeline_selector, "pipeline_selector", PipelineSelector) | check.inst_param(pipeline_selector, "pipeline_selector", PipelineSelector) | ||||
location = graphene_info.context.get_repository_location(pipeline_selector.location_name) | location = graphene_info.context.get_repository_location(pipeline_selector.location_name) | ||||
repository = location.get_repository(pipeline_selector.repository_name) | repository = location.get_repository(pipeline_selector.repository_name) | ||||
external_schedules = repository.get_external_schedules() | external_schedules = repository.get_external_schedules() | ||||
return [ | return [ | ||||
graphene_info.schema.type_named("Schedule")( | GrapheneSchedule(graphene_info, external_schedule=external_schedule) | ||||
graphene_info, external_schedule=external_schedule | |||||
) | |||||
for external_schedule in external_schedules | for external_schedule in external_schedules | ||||
if external_schedule.pipeline_name == pipeline_selector.pipeline_name | if external_schedule.pipeline_name == pipeline_selector.pipeline_name | ||||
] | ] | ||||
@capture_dauphin_error | @capture_error | ||||
def get_schedule_or_error(graphene_info, schedule_selector): | def get_schedule_or_error(graphene_info, schedule_selector): | ||||
from ..schema.errors import GrapheneScheduleNotFoundError | |||||
from ..schema.schedules import GrapheneSchedule | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | ||||
location = graphene_info.context.get_repository_location(schedule_selector.location_name) | location = graphene_info.context.get_repository_location(schedule_selector.location_name) | ||||
repository = location.get_repository(schedule_selector.repository_name) | repository = location.get_repository(schedule_selector.repository_name) | ||||
external_schedule = repository.get_external_schedule(schedule_selector.schedule_name) | external_schedule = repository.get_external_schedule(schedule_selector.schedule_name) | ||||
if not external_schedule: | if not external_schedule: | ||||
raise UserFacingGraphQLError( | raise UserFacingGraphQLError( | ||||
graphene_info.schema.type_named("ScheduleNotFoundError")( | GrapheneScheduleNotFoundError(schedule_name=schedule_selector.schedule_name) | ||||
schedule_name=schedule_selector.schedule_name | |||||
) | |||||
) | ) | ||||
return graphene_info.schema.type_named("Schedule")( | return GrapheneSchedule(graphene_info, external_schedule=external_schedule) | ||||
graphene_info, external_schedule=external_schedule | |||||
) |