Differential D6164 Diff 30503 python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py
import graphene | |||||
import pendulum | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ExternalSchedule | from dagster.core.host_representation import ExternalSchedule | ||||
from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.schema.errors import ( | |||||
DauphinPythonError, | |||||
DauphinRepositoryNotFoundError, | |||||
DauphinScheduleNotFoundError, | |||||
) | |||||
class DauphinScheduleOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "ScheduleOrError" | |||||
types = ("Schedule", DauphinScheduleNotFoundError, DauphinPythonError) | |||||
class DauphinSchedules(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "Schedules" | |||||
results = dauphin.non_null_list("Schedule") | |||||
class DauphinSchedulesOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "SchedulesOrError" | |||||
types = (DauphinSchedules, DauphinRepositoryNotFoundError, DauphinPythonError) | |||||
from ..errors import ( | |||||
GraphenePythonError, | |||||
GrapheneRepositoryNotFoundError, | |||||
GrapheneScheduleNotFoundError, | |||||
) | |||||
from ..jobs import GrapheneFutureJobTick, GrapheneFutureJobTicks, GrapheneJobState | |||||
from ..util import non_null_list | |||||
class GrapheneSchedule(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.ID) | |||||
name = graphene.NonNull(graphene.String) | |||||
cron_schedule = graphene.NonNull(graphene.String) | |||||
pipeline_name = graphene.NonNull(graphene.String) | |||||
solid_selection = graphene.List(graphene.String) | |||||
mode = graphene.NonNull(graphene.String) | |||||
execution_timezone = graphene.Field(graphene.String) | |||||
scheduleState = graphene.NonNull(GrapheneJobState) | |||||
partition_set = graphene.Field("dagster_graphql.schema.partition_sets.GraphenePartitionSet") | |||||
futureTicks = graphene.NonNull( | |||||
GrapheneFutureJobTicks, cursor=graphene.Float(), limit=graphene.Int() | |||||
) | |||||
futureTick = graphene.NonNull( | |||||
GrapheneFutureJobTick, tick_timestamp=graphene.NonNull(graphene.Int) | |||||
) | |||||
class DauphinSchedule(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "Schedule" | name = "Schedule" | ||||
id = dauphin.NonNull(dauphin.ID) | def __init__(self, graphene_info, external_schedule): | ||||
name = dauphin.NonNull(dauphin.String) | self._external_schedule = check.inst_param( | ||||
cron_schedule = dauphin.NonNull(dauphin.String) | external_schedule, "external_schedule", ExternalSchedule | ||||
pipeline_name = dauphin.NonNull(dauphin.String) | ) | ||||
solid_selection = dauphin.List(dauphin.String) | self._schedule_state = graphene_info.context.instance.get_job_state( | ||||
mode = dauphin.NonNull(dauphin.String) | self._external_schedule.get_external_origin_id() | ||||
execution_timezone = dauphin.Field(dauphin.String) | ) | ||||
scheduleState = dauphin.NonNull("JobState") | |||||
partition_set = dauphin.Field("PartitionSet") | if not self._schedule_state: | ||||
# Also include a ScheduleState for a stopped schedule that may not | |||||
# have a stored database row yet | |||||
self._schedule_state = self._external_schedule.get_default_job_state( | |||||
graphene_info.context.instance | |||||
) | |||||
futureTicks = dauphin.NonNull("FutureJobTicks", cursor=dauphin.Float(), limit=dauphin.Int()) | super().__init__( | ||||
futureTick = dauphin.NonNull("FutureJobTick", tick_timestamp=dauphin.NonNull(dauphin.Int)) | name=external_schedule.name, | ||||
cron_schedule=external_schedule.cron_schedule, | |||||
pipeline_name=external_schedule.pipeline_name, | |||||
solid_selection=external_schedule.solid_selection, | |||||
mode=external_schedule.mode, | |||||
scheduleState=GrapheneJobState(self._schedule_state), | |||||
execution_timezone=( | |||||
self._external_schedule.execution_timezone | |||||
if self._external_schedule.execution_timezone | |||||
else pendulum.now().timezone.name | |||||
), | |||||
) | |||||
def resolve_id(self, _): | def resolve_id(self, _): | ||||
return "%s:%s" % (self.name, self.pipeline_name) | return "%s:%s" % (self.name, self.pipeline_name) | ||||
def resolve_partition_set(self, graphene_info): | def resolve_partition_set(self, graphene_info): | ||||
from ..partition_sets import GraphenePartitionSet | |||||
if self._external_schedule.partition_set_name is None: | if self._external_schedule.partition_set_name is None: | ||||
return None | return None | ||||
repository = graphene_info.context.get_repository_location( | repository = graphene_info.context.get_repository_location( | ||||
self._external_schedule.handle.location_name | self._external_schedule.handle.location_name | ||||
).get_repository(self._external_schedule.handle.repository_name) | ).get_repository(self._external_schedule.handle.repository_name) | ||||
external_partition_set = repository.get_external_partition_set( | external_partition_set = repository.get_external_partition_set( | ||||
self._external_schedule.partition_set_name | self._external_schedule.partition_set_name | ||||
) | ) | ||||
return graphene_info.schema.type_named("PartitionSet")( | return GraphenePartitionSet( | ||||
external_repository_handle=repository.handle, | external_repository_handle=repository.handle, | ||||
external_partition_set=external_partition_set, | external_partition_set=external_partition_set, | ||||
) | ) | ||||
def resolve_futureTicks(self, graphene_info, **kwargs): | def resolve_futureTicks(self, _graphene_info, **kwargs): | ||||
cursor = kwargs.get( | cursor = kwargs.get( | ||||
"cursor", get_timestamp_from_utc_datetime(get_current_datetime_in_utc()) | "cursor", get_timestamp_from_utc_datetime(get_current_datetime_in_utc()) | ||||
) | ) | ||||
limit = kwargs.get("limit", 10) | limit = kwargs.get("limit", 10) | ||||
tick_times = [] | tick_times = [] | ||||
time_iter = self._external_schedule.execution_time_iterator(cursor) | time_iter = self._external_schedule.execution_time_iterator(cursor) | ||||
for _ in range(limit): | for _ in range(limit): | ||||
tick_times.append(next(time_iter).timestamp()) | tick_times.append(next(time_iter).timestamp()) | ||||
future_ticks = [ | future_ticks = [ | ||||
graphene_info.schema.type_named("FutureJobTick")(self._schedule_state, tick_time) | GrapheneFutureJobTick(self._schedule_state, tick_time) for tick_time in tick_times | ||||
for tick_time in tick_times | |||||
] | ] | ||||
return graphene_info.schema.type_named("FutureJobTicks")( | return GrapheneFutureJobTicks(results=future_ticks, cursor=tick_times[-1] + 1) | ||||
results=future_ticks, cursor=tick_times[-1] + 1 | |||||
) | |||||
def resolve_futureTick(self, graphene_info, tick_timestamp): | def resolve_futureTick(self, _graphene_info, tick_timestamp): | ||||
return graphene_info.schema.type_named("FutureJobTick")( | return GrapheneFutureJobTick(self._schedule_state, float(tick_timestamp)) | ||||
self._schedule_state, float(tick_timestamp) | |||||
) | |||||
def __init__(self, graphene_info, external_schedule): | |||||
self._external_schedule = check.inst_param( | |||||
external_schedule, "external_schedule", ExternalSchedule | |||||
) | |||||
self._schedule_state = graphene_info.context.instance.get_job_state( | |||||
self._external_schedule.get_external_origin_id() | |||||
) | |||||
if not self._schedule_state: | class GrapheneScheduleOrError(graphene.Union): | ||||
# Also include a ScheduleState for a stopped schedule that may not | class Meta: | ||||
# have a stored database row yet | types = (GrapheneSchedule, GrapheneScheduleNotFoundError, GraphenePythonError) | ||||
self._schedule_state = self._external_schedule.get_default_job_state( | name = "ScheduleOrError" | ||||
graphene_info.context.instance | |||||
) | |||||
super(DauphinSchedule, self).__init__( | |||||
name=external_schedule.name, | class GrapheneSchedules(graphene.ObjectType): | ||||
cron_schedule=external_schedule.cron_schedule, | results = non_null_list(GrapheneSchedule) | ||||
pipeline_name=external_schedule.pipeline_name, | |||||
solid_selection=external_schedule.solid_selection, | class Meta: | ||||
mode=external_schedule.mode, | name = "Schedules" | ||||
scheduleState=graphene_info.schema.type_named("JobState")(self._schedule_state), | |||||
execution_timezone=( | |||||
self._external_schedule.execution_timezone | class GrapheneSchedulesOrError(graphene.Union): | ||||
if self._external_schedule.execution_timezone | class Meta: | ||||
else pendulum.now().timezone.name | types = (GrapheneSchedules, GrapheneRepositoryNotFoundError, GraphenePythonError) | ||||
), | name = "SchedulesOrError" | ||||
) |