Differential D6164 Diff 30546 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import PipelineSelector, RepositorySelector, ScheduleSelector | from dagster.core.host_representation import PipelineSelector, RepositorySelector, ScheduleSelector | ||||
from dagster.core.scheduler.job import JobStatus | from dagster.core.scheduler.job import JobStatus | ||||
from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | ||||
from graphql.execution.base import ResolveInfo | from graphql.execution.base import ResolveInfo | ||||
from .utils import UserFacingGraphQLError, capture_dauphin_error | from .utils import UserFacingGraphQLError, capture_error | ||||
@capture_dauphin_error | @capture_error | ||||
def reconcile_scheduler_state(graphene_info, repository_selector): | def reconcile_scheduler_state(graphene_info, repository_selector): | ||||
from ..schema.schedules import GrapheneReconcileSchedulerStateSuccess | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_selector, "repository_selector", RepositorySelector) | check.inst_param(repository_selector, "repository_selector", RepositorySelector) | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
instance.reconcile_scheduler_state(repository) | instance.reconcile_scheduler_state(repository) | ||||
return graphene_info.schema.type_named("ReconcileSchedulerStateSuccess")(message="Success") | return GrapheneReconcileSchedulerStateSuccess(message="Success") | ||||
@capture_dauphin_error | @capture_error | ||||
def start_schedule(graphene_info, schedule_selector): | def start_schedule(graphene_info, schedule_selector): | ||||
from ..schema.jobs import GrapheneJobState | |||||
from ..schema.schedules import GrapheneScheduleStateResult | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | ||||
location = graphene_info.context.get_repository_location(schedule_selector.location_name) | location = graphene_info.context.get_repository_location(schedule_selector.location_name) | ||||
repository = location.get_repository(schedule_selector.repository_name) | repository = location.get_repository(schedule_selector.repository_name) | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
schedule_state = instance.start_schedule_and_update_storage_state( | schedule_state = instance.start_schedule_and_update_storage_state( | ||||
repository.get_external_schedule(schedule_selector.schedule_name) | repository.get_external_schedule(schedule_selector.schedule_name) | ||||
) | ) | ||||
return graphene_info.schema.type_named("ScheduleStateResult")( | return GrapheneScheduleStateResult(GrapheneJobState(schedule_state)) | ||||
graphene_info.schema.type_named("JobState")(schedule_state) | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def stop_schedule(graphene_info, schedule_origin_id): | def stop_schedule(graphene_info, schedule_origin_id): | ||||
from ..schema.jobs import GrapheneJobState | |||||
from ..schema.schedules import GrapheneScheduleStateResult | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
schedule_state = instance.stop_schedule_and_update_storage_state(schedule_origin_id) | schedule_state = instance.stop_schedule_and_update_storage_state(schedule_origin_id) | ||||
return graphene_info.schema.type_named("ScheduleStateResult")( | return GrapheneScheduleStateResult(GrapheneJobState(schedule_state)) | ||||
graphene_info.schema.type_named("JobState")(schedule_state) | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def get_scheduler_or_error(graphene_info): | def get_scheduler_or_error(graphene_info): | ||||
from ..schema.errors import GrapheneSchedulerNotDefinedError | |||||
from ..schema.schedules import GrapheneScheduler | |||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
if not instance.scheduler: | if not instance.scheduler: | ||||
raise UserFacingGraphQLError(graphene_info.schema.type_named("SchedulerNotDefinedError")()) | raise UserFacingGraphQLError(GrapheneSchedulerNotDefinedError()) | ||||
return graphene_info.schema.type_named("Scheduler")( | return GrapheneScheduler(scheduler_class=instance.scheduler.__class__.__name__) | ||||
scheduler_class=instance.scheduler.__class__.__name__ | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def get_schedules_or_error(graphene_info, repository_selector): | def get_schedules_or_error(graphene_info, repository_selector): | ||||
from ..schema.schedules import GrapheneSchedule, GrapheneSchedules | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_selector, "repository_selector", RepositorySelector) | check.inst_param(repository_selector, "repository_selector", RepositorySelector) | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
external_schedules = repository.get_external_schedules() | external_schedules = repository.get_external_schedules() | ||||
results = [ | results = [ | ||||
graphene_info.schema.type_named("Schedule")( | GrapheneSchedule(graphene_info, external_schedule=external_schedule) | ||||
graphene_info, external_schedule=external_schedule | |||||
) | |||||
for external_schedule in external_schedules | for external_schedule in external_schedules | ||||
] | ] | ||||
return graphene_info.schema.type_named("Schedules")(results=results) | return GrapheneSchedules(results=results) | ||||
def get_schedules_for_pipeline(graphene_info, pipeline_selector): | def get_schedules_for_pipeline(graphene_info, pipeline_selector): | ||||
from ..schema.schedules import GrapheneSchedule | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(pipeline_selector, "pipeline_selector", PipelineSelector) | check.inst_param(pipeline_selector, "pipeline_selector", PipelineSelector) | ||||
location = graphene_info.context.get_repository_location(pipeline_selector.location_name) | location = graphene_info.context.get_repository_location(pipeline_selector.location_name) | ||||
repository = location.get_repository(pipeline_selector.repository_name) | repository = location.get_repository(pipeline_selector.repository_name) | ||||
external_schedules = repository.get_external_schedules() | external_schedules = repository.get_external_schedules() | ||||
return [ | return [ | ||||
graphene_info.schema.type_named("Schedule")( | GrapheneSchedule(graphene_info, external_schedule=external_schedule) | ||||
graphene_info, external_schedule=external_schedule | |||||
) | |||||
for external_schedule in external_schedules | for external_schedule in external_schedules | ||||
if external_schedule.pipeline_name == pipeline_selector.pipeline_name | if external_schedule.pipeline_name == pipeline_selector.pipeline_name | ||||
] | ] | ||||
@capture_dauphin_error | @capture_error | ||||
def get_schedule_or_error(graphene_info, schedule_selector): | def get_schedule_or_error(graphene_info, schedule_selector): | ||||
from ..schema.errors import GrapheneScheduleNotFoundError | |||||
from ..schema.schedules import GrapheneSchedule | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector) | ||||
location = graphene_info.context.get_repository_location(schedule_selector.location_name) | location = graphene_info.context.get_repository_location(schedule_selector.location_name) | ||||
repository = location.get_repository(schedule_selector.repository_name) | repository = location.get_repository(schedule_selector.repository_name) | ||||
external_schedule = repository.get_external_schedule(schedule_selector.schedule_name) | external_schedule = repository.get_external_schedule(schedule_selector.schedule_name) | ||||
if not external_schedule: | if not external_schedule: | ||||
raise UserFacingGraphQLError( | raise UserFacingGraphQLError( | ||||
graphene_info.schema.type_named("ScheduleNotFoundError")( | GrapheneScheduleNotFoundError(schedule_name=schedule_selector.schedule_name) | ||||
schedule_name=schedule_selector.schedule_name | |||||
) | |||||
) | ) | ||||
return graphene_info.schema.type_named("Schedule")( | return GrapheneSchedule(graphene_info, external_schedule=external_schedule) | ||||
graphene_info, external_schedule=external_schedule | |||||
) | |||||
def get_schedule_next_tick(graphene_info, schedule_state): | def get_schedule_next_tick(graphene_info, schedule_state): | ||||
from ..schema.jobs import GrapheneFutureJobTick | |||||
if schedule_state.status != JobStatus.RUNNING: | if schedule_state.status != JobStatus.RUNNING: | ||||
return None | return None | ||||
repository_origin = schedule_state.origin.external_repository_origin | repository_origin = schedule_state.origin.external_repository_origin | ||||
if not graphene_info.context.has_repository_location( | if not graphene_info.context.has_repository_location( | ||||
repository_origin.repository_location_origin.location_name | repository_origin.repository_location_origin.location_name | ||||
): | ): | ||||
return None | return None | ||||
repository_location = graphene_info.context.get_repository_location( | repository_location = graphene_info.context.get_repository_location( | ||||
repository_origin.repository_location_origin.location_name | repository_origin.repository_location_origin.location_name | ||||
) | ) | ||||
if not repository_location.has_repository(repository_origin.repository_name): | if not repository_location.has_repository(repository_origin.repository_name): | ||||
return None | return None | ||||
repository = repository_location.get_repository(repository_origin.repository_name) | repository = repository_location.get_repository(repository_origin.repository_name) | ||||
external_schedule = repository.get_external_job(schedule_state.name) | external_schedule = repository.get_external_job(schedule_state.name) | ||||
time_iter = external_schedule.execution_time_iterator( | time_iter = external_schedule.execution_time_iterator( | ||||
get_timestamp_from_utc_datetime(get_current_datetime_in_utc()) | get_timestamp_from_utc_datetime(get_current_datetime_in_utc()) | ||||
) | ) | ||||
next_timestamp = next(time_iter).timestamp() | next_timestamp = next(time_iter).timestamp() | ||||
return graphene_info.schema.type_named("FutureJobTick")(schedule_state, next_timestamp) | return GrapheneFutureJobTick(schedule_state, next_timestamp) |