Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/jobs.py
import graphene | |||||
import pendulum | import pendulum | ||||
import yaml | import yaml | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.job import JobType, RunRequest | from dagster.core.definitions.sensor import RunRequest | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalScheduleExecutionData, | ExternalScheduleExecutionData, | ||||
ExternalScheduleExecutionErrorData, | ExternalScheduleExecutionErrorData, | ||||
) | ) | ||||
from dagster.core.scheduler.job import ( | from dagster.core.scheduler.job import ( | ||||
JobState, | JobState, | ||||
JobStatus, | JobStatus, | ||||
JobTick, | JobTick, | ||||
JobTickStatus, | JobTickStatus, | ||||
JobType, | |||||
ScheduleJobData, | ScheduleJobData, | ||||
SensorJobData, | SensorJobData, | ||||
) | ) | ||||
from dagster.core.storage.pipeline_run import PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRunsFilter | ||||
from dagster.core.storage.tags import TagType, get_tag_type | from dagster.core.storage.tags import TagType, get_tag_type | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.implementation.fetch_schedules import get_schedule_next_tick | |||||
from dagster_graphql.implementation.fetch_sensors import get_sensor_next_tick | |||||
from ..implementation.fetch_schedules import get_schedule_next_tick | |||||
from ..implementation.fetch_sensors import get_sensor_next_tick | |||||
from .errors import GraphenePythonError | |||||
from .repository_origin import GrapheneRepositoryOrigin | |||||
from .tags import GraphenePipelineTag | |||||
from .util import non_null_list | |||||
max: no need for this to go through dauphin wrapper without the registry | |||||
class GrapheneJobType(graphene.Enum): | |||||
SCHEDULE = "SCHEDULE" | |||||
SENSOR = "SENSOR" | |||||
class DauphinJobTick(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "JobTick" | name = "JobType" | ||||
class GrapheneJobStatus(graphene.Enum): | |||||
RUNNING = "RUNNING" | |||||
STOPPED = "STOPPED" | |||||
class Meta: | |||||
name = "JobStatus" | |||||
class GrapheneJobTickStatus(graphene.Enum): | |||||
STARTED = "STARTED" | |||||
SKIPPED = "SKIPPED" | |||||
SUCCESS = "SUCCESS" | |||||
FAILURE = "FAILURE" | |||||
class Meta: | |||||
name = "JobTickStatus" | |||||
class GrapheneSensorJobData(graphene.ObjectType): | |||||
lastTickTimestamp = graphene.Float() | |||||
lastRunKey = graphene.String() | |||||
class Meta: | |||||
name = "SensorJobData" | |||||
def __init__(self, job_specific_data): | |||||
check.inst_param(job_specific_data, "job_specific_data", SensorJobData) | |||||
super().__init__( | |||||
lastTickTimestamp=job_specific_data.last_tick_timestamp, | |||||
lastRunKey=job_specific_data.last_run_key, | |||||
) | |||||
class GrapheneScheduleJobData(graphene.ObjectType): | |||||
cronSchedule = graphene.NonNull(graphene.String) | |||||
startTimestamp = graphene.Float() | |||||
id = dauphin.NonNull(dauphin.ID) | class Meta: | ||||
status = dauphin.NonNull("JobTickStatus") | name = "ScheduleJobData" | ||||
timestamp = dauphin.NonNull(dauphin.Float) | |||||
runIds = dauphin.non_null_list(dauphin.String) | def __init__(self, job_specific_data): | ||||
error = dauphin.Field("PythonError") | check.inst_param(job_specific_data, "job_specific_data", ScheduleJobData) | ||||
skipReason = dauphin.String() | super().__init__( | ||||
cronSchedule=job_specific_data.cron_schedule, | |||||
startTimestamp=job_specific_data.start_timestamp, | |||||
) | |||||
class GrapheneJobSpecificData(graphene.Union): | |||||
class Meta: | |||||
types = (GrapheneSensorJobData, GrapheneScheduleJobData) | |||||
name = "JobSpecificData" | |||||
class GrapheneJobTick(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.ID) | |||||
status = graphene.NonNull(GrapheneJobTickStatus) | |||||
timestamp = graphene.NonNull(graphene.Float) | |||||
runIds = non_null_list(graphene.String) | |||||
error = graphene.Field(GraphenePythonError) | |||||
skipReason = graphene.String() | |||||
runs = non_null_list("dagster_graphql.schema.pipelines.pipeline.GraphenePipelineRun") | |||||
runs = dauphin.non_null_list("PipelineRun") | class Meta: | ||||
name = "JobTick" | |||||
def __init__(self, _, job_tick): | def __init__(self, _, job_tick): | ||||
self._job_tick = check.inst_param(job_tick, "job_tick", JobTick) | self._job_tick = check.inst_param(job_tick, "job_tick", JobTick) | ||||
super(DauphinJobTick, self).__init__( | super().__init__( | ||||
status=job_tick.status, | status=job_tick.status, | ||||
timestamp=job_tick.timestamp, | timestamp=job_tick.timestamp, | ||||
runIds=job_tick.run_ids, | runIds=job_tick.run_ids, | ||||
error=job_tick.error, | error=job_tick.error, | ||||
skipReason=job_tick.skip_reason, | skipReason=job_tick.skip_reason, | ||||
) | ) | ||||
def resolve_id(self, _): | def resolve_id(self, _): | ||||
return "%s:%s" % (self._job_tick.job_origin_id, self._job_tick.timestamp) | return "%s:%s" % (self._job_tick.job_origin_id, self._job_tick.timestamp) | ||||
def resolve_runs(self, graphene_info): | def resolve_runs(self, graphene_info): | ||||
from .pipelines.pipeline import GraphenePipelineRun | |||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
return [ | return [ | ||||
graphene_info.schema.type_named("PipelineRun")(instance.get_run_by_id(run_id)) | GraphenePipelineRun(instance.get_run_by_id(run_id)) | ||||
for run_id in self._job_tick.run_ids | for run_id in self._job_tick.run_ids | ||||
if instance.has_run(run_id) | if instance.has_run(run_id) | ||||
] | ] | ||||
class DauphinFutureJobTick(dauphin.ObjectType): | class GrapheneFutureJobTick(graphene.ObjectType): | ||||
class Meta(object): | timestamp = graphene.NonNull(graphene.Float) | ||||
name = "FutureJobTick" | evaluationResult = graphene.Field(lambda: GrapheneTickEvaluation) | ||||
timestamp = dauphin.NonNull(dauphin.Float) | class Meta: | ||||
evaluationResult = dauphin.Field("TickEvaluation") | name = "FutureJobTick" | ||||
def __init__(self, job_state, timestamp): | def __init__(self, job_state, timestamp): | ||||
self._job_state = check.inst_param(job_state, "job_state", JobState) | self._job_state = check.inst_param(job_state, "job_state", JobState) | ||||
self._timestamp = timestamp | self._timestamp = timestamp | ||||
super(DauphinFutureJobTick, self).__init__( | super().__init__(timestamp=check.float_param(timestamp, "timestamp"),) | ||||
timestamp=check.float_param(timestamp, "timestamp"), | |||||
) | |||||
def resolve_evaluationResult(self, graphene_info): | def resolve_evaluationResult(self, graphene_info): | ||||
if self._job_state.status != JobStatus.RUNNING: | if self._job_state.status != JobStatus.RUNNING: | ||||
return None | return None | ||||
if self._job_state.job_type != JobType.SCHEDULE: | if self._job_state.job_type != JobType.SCHEDULE: | ||||
return None | return None | ||||
Show All 18 Lines | def resolve_evaluationResult(self, graphene_info): | ||||
next_tick_datetime = next(external_schedule.execution_time_iterator(self._timestamp)) | next_tick_datetime = next(external_schedule.execution_time_iterator(self._timestamp)) | ||||
schedule_time = pendulum.instance(next_tick_datetime).in_tz(timezone_str) | schedule_time = pendulum.instance(next_tick_datetime).in_tz(timezone_str) | ||||
schedule_data = repository_location.get_external_schedule_execution_data( | schedule_data = repository_location.get_external_schedule_execution_data( | ||||
instance=graphene_info.context.instance, | instance=graphene_info.context.instance, | ||||
repository_handle=repository.handle, | repository_handle=repository.handle, | ||||
schedule_name=external_schedule.name, | schedule_name=external_schedule.name, | ||||
scheduled_execution_time=schedule_time, | scheduled_execution_time=schedule_time, | ||||
) | ) | ||||
return graphene_info.schema.type_named("TickEvaluation")(schedule_data) | return GrapheneTickEvaluation(schedule_data) | ||||
class DauphinTickEvaluation(dauphin.ObjectType): | class GrapheneTickEvaluation(graphene.ObjectType): | ||||
class Meta(object): | runRequests = graphene.List(lambda: GrapheneRunRequest) | ||||
name = "TickEvaluation" | skipReason = graphene.String() | ||||
error = graphene.Field(GraphenePythonError) | |||||
runRequests = dauphin.List("RunRequest") | class Meta: | ||||
skipReason = dauphin.String() | name = "TickEvaluation" | ||||
error = dauphin.Field("PythonError") | |||||
def __init__(self, schedule_data): | def __init__(self, schedule_data): | ||||
check.inst_param( | check.inst_param( | ||||
schedule_data, | schedule_data, | ||||
"schedule_data", | "schedule_data", | ||||
(ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | (ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | ||||
) | ) | ||||
error = ( | error = ( | ||||
schedule_data.error | schedule_data.error | ||||
if isinstance(schedule_data, ExternalScheduleExecutionErrorData) | if isinstance(schedule_data, ExternalScheduleExecutionErrorData) | ||||
else None | else None | ||||
) | ) | ||||
skip_reason = ( | skip_reason = ( | ||||
schedule_data.skip_message | schedule_data.skip_message | ||||
if isinstance(schedule_data, ExternalScheduleExecutionData) | if isinstance(schedule_data, ExternalScheduleExecutionData) | ||||
else None | else None | ||||
) | ) | ||||
self._run_requests = ( | self._run_requests = ( | ||||
schedule_data.run_requests | schedule_data.run_requests | ||||
if isinstance(schedule_data, ExternalScheduleExecutionData) | if isinstance(schedule_data, ExternalScheduleExecutionData) | ||||
else None | else None | ||||
) | ) | ||||
super(DauphinTickEvaluation, self).__init__(skipReason=skip_reason, error=error) | super().__init__(skipReason=skip_reason, error=error) | ||||
def resolve_runRequests(self, graphene_info): | def resolve_runRequests(self, _graphene_info): | ||||
if not self._run_requests: | if not self._run_requests: | ||||
return self._run_requests | return self._run_requests | ||||
return [ | return [GrapheneRunRequest(run_request) for run_request in self._run_requests] | ||||
graphene_info.schema.type_named("RunRequest")(run_request) | |||||
for run_request in self._run_requests | |||||
] | |||||
class DauphinRunRequest(dauphin.ObjectType): | class GrapheneRunRequest(graphene.ObjectType): | ||||
class Meta(object): | runKey = graphene.String() | ||||
name = "RunRequest" | tags = non_null_list(GraphenePipelineTag) | ||||
runConfigYaml = graphene.NonNull(graphene.String) | |||||
runKey = dauphin.String() | class Meta: | ||||
tags = dauphin.non_null_list("PipelineTag") | name = "RunRequest" | ||||
runConfigYaml = dauphin.NonNull(dauphin.String) | |||||
def __init__(self, run_request): | def __init__(self, run_request): | ||||
super(DauphinRunRequest, self).__init__(runKey=run_request.run_key) | super().__init__(runKey=run_request.run_key) | ||||
self._run_request = check.inst_param(run_request, "run_request", RunRequest) | self._run_request = check.inst_param(run_request, "run_request", RunRequest) | ||||
def resolve_tags(self, graphene_info): | def resolve_tags(self, _graphene_info): | ||||
return [ | return [ | ||||
graphene_info.schema.type_named("PipelineTag")(key=key, value=value) | GraphenePipelineTag(key=key, value=value) | ||||
for key, value in self._run_request.tags.items() | for key, value in self._run_request.tags.items() | ||||
if get_tag_type(key) != TagType.HIDDEN | if get_tag_type(key) != TagType.HIDDEN | ||||
] | ] | ||||
def resolve_runConfigYaml(self, _graphene_info): | def resolve_runConfigYaml(self, _graphene_info): | ||||
return yaml.dump(self._run_request.run_config, default_flow_style=False, allow_unicode=True) | return yaml.dump(self._run_request.run_config, default_flow_style=False, allow_unicode=True) | ||||
class DauphinFutureJobTicks(dauphin.ObjectType): | class GrapheneFutureJobTicks(graphene.ObjectType): | ||||
class Meta(object): | results = non_null_list(GrapheneFutureJobTick) | ||||
cursor = graphene.NonNull(graphene.Float) | |||||
class Meta: | |||||
name = "FutureJobTicks" | name = "FutureJobTicks" | ||||
results = dauphin.non_null_list("FutureJobTick") | |||||
cursor = dauphin.NonNull(dauphin.Float) | |||||
class GrapheneJobState(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.ID) | |||||
name = graphene.NonNull(graphene.String) | |||||
jobType = graphene.NonNull(GrapheneJobType) | |||||
status = graphene.NonNull(GrapheneJobStatus) | |||||
repositoryOrigin = graphene.NonNull(GrapheneRepositoryOrigin) | |||||
jobSpecificData = graphene.Field(GrapheneJobSpecificData) | |||||
runs = graphene.Field( | |||||
non_null_list("dagster_graphql.schema.pipelines.pipeline.GraphenePipelineRun"), | |||||
limit=graphene.Int(), | |||||
) | |||||
runsCount = graphene.NonNull(graphene.Int) | |||||
ticks = graphene.Field( | |||||
non_null_list(GrapheneJobTick), | |||||
dayRange=graphene.Int(), | |||||
dayOffset=graphene.Int(), | |||||
limit=graphene.Int(), | |||||
) | |||||
nextTick = graphene.Field(GrapheneFutureJobTick) | |||||
runningCount = graphene.NonNull(graphene.Int) # remove with cron scheduler | |||||
class DauphinJobState(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "JobState" | name = "JobState" | ||||
id = dauphin.NonNull(dauphin.ID) | |||||
name = dauphin.NonNull(dauphin.String) | |||||
jobType = dauphin.NonNull("JobType") | |||||
status = dauphin.NonNull("JobStatus") | |||||
repositoryOrigin = dauphin.NonNull("RepositoryOrigin") | |||||
jobSpecificData = dauphin.Field("JobSpecificData") | |||||
runs = dauphin.Field(dauphin.non_null_list("PipelineRun"), limit=dauphin.Int()) | |||||
runsCount = dauphin.NonNull(dauphin.Int) | |||||
ticks = dauphin.Field( | |||||
dauphin.non_null_list("JobTick"), | |||||
dayRange=dauphin.Int(), | |||||
dayOffset=dauphin.Int(), | |||||
limit=dauphin.Int(), | |||||
) | |||||
nextTick = dauphin.Field("FutureJobTick") | |||||
runningCount = dauphin.NonNull(dauphin.Int) # remove with cron scheduler | |||||
def __init__(self, job_state): | def __init__(self, job_state): | ||||
self._job_state = check.inst_param(job_state, "job_state", JobState) | self._job_state = check.inst_param(job_state, "job_state", JobState) | ||||
super(DauphinJobState, self).__init__( | super().__init__( | ||||
id=job_state.job_origin_id, | id=job_state.job_origin_id, | ||||
name=job_state.name, | name=job_state.name, | ||||
jobType=job_state.job_type, | jobType=job_state.job_type, | ||||
status=job_state.status, | status=job_state.status, | ||||
) | ) | ||||
def resolve_repositoryOrigin(self, graphene_info): | def resolve_repositoryOrigin(self, _graphene_info): | ||||
origin = self._job_state.origin.external_repository_origin | origin = self._job_state.origin.external_repository_origin | ||||
return graphene_info.schema.type_named("RepositoryOrigin")(origin) | return GrapheneRepositoryOrigin(origin) | ||||
def resolve_jobSpecificData(self, graphene_info): | def resolve_jobSpecificData(self, _graphene_info): | ||||
if not self._job_state.job_specific_data: | if not self._job_state.job_specific_data: | ||||
return None | return None | ||||
if self._job_state.job_type == JobType.SENSOR: | if self._job_state.job_type == JobType.SENSOR: | ||||
return graphene_info.schema.type_named("SensorJobData")( | return GrapheneSensorJobData(self._job_state.job_specific_data) | ||||
self._job_state.job_specific_data | |||||
) | |||||
if self._job_state.job_type == JobType.SCHEDULE: | if self._job_state.job_type == JobType.SCHEDULE: | ||||
return graphene_info.schema.type_named("ScheduleJobData")( | return GrapheneScheduleJobData(self._job_state.job_specific_data) | ||||
self._job_state.job_specific_data | |||||
) | |||||
return None | return None | ||||
def resolve_runs(self, graphene_info, **kwargs): | def resolve_runs(self, graphene_info, **kwargs): | ||||
from .pipelines.pipeline import GraphenePipelineRun | |||||
if self._job_state.job_type == JobType.SENSOR: | if self._job_state.job_type == JobType.SENSOR: | ||||
filters = PipelineRunsFilter.for_sensor(self._job_state) | filters = PipelineRunsFilter.for_sensor(self._job_state) | ||||
else: | else: | ||||
filters = PipelineRunsFilter.for_schedule(self._job_state) | filters = PipelineRunsFilter.for_schedule(self._job_state) | ||||
return [ | return [ | ||||
graphene_info.schema.type_named("PipelineRun")(r) | GraphenePipelineRun(r) | ||||
for r in graphene_info.context.instance.get_runs( | for r in graphene_info.context.instance.get_runs( | ||||
filters=filters, limit=kwargs.get("limit"), | filters=filters, limit=kwargs.get("limit"), | ||||
) | ) | ||||
] | ] | ||||
def resolve_runsCount(self, graphene_info): | def resolve_runsCount(self, graphene_info): | ||||
if self._job_state.job_type == JobType.SENSOR: | if self._job_state.job_type == JobType.SENSOR: | ||||
filters = PipelineRunsFilter.for_sensor(self._job_state) | filters = PipelineRunsFilter.for_sensor(self._job_state) | ||||
else: | else: | ||||
filters = PipelineRunsFilter.for_schedule(self._job_state) | filters = PipelineRunsFilter.for_schedule(self._job_state) | ||||
return graphene_info.context.instance.get_runs_count(filters=filters) | return graphene_info.context.instance.get_runs_count(filters=filters) | ||||
def resolve_ticks(self, graphene_info, dayRange=None, dayOffset=None, limit=None): | def resolve_ticks(self, graphene_info, dayRange=None, dayOffset=None, limit=None): | ||||
before = pendulum.now("UTC").subtract(days=dayOffset).timestamp() if dayOffset else None | before = pendulum.now("UTC").subtract(days=dayOffset).timestamp() if dayOffset else None | ||||
after = ( | after = ( | ||||
pendulum.now("UTC").subtract(days=dayRange + (dayOffset or 0)).timestamp() | pendulum.now("UTC").subtract(days=dayRange + (dayOffset or 0)).timestamp() | ||||
if dayRange | if dayRange | ||||
else None | else None | ||||
) | ) | ||||
return [ | return [ | ||||
graphene_info.schema.type_named("JobTick")(graphene_info, tick) | GrapheneJobTick(graphene_info, tick) | ||||
for tick in graphene_info.context.instance.get_job_ticks( | for tick in graphene_info.context.instance.get_job_ticks( | ||||
self._job_state.job_origin_id, before=before, after=after, limit=limit | self._job_state.job_origin_id, before=before, after=after, limit=limit | ||||
) | ) | ||||
] | ] | ||||
def resolve_nextTick(self, graphene_info): | def resolve_nextTick(self, graphene_info): | ||||
# sensor | # sensor | ||||
if self._job_state.job_type == JobType.SENSOR: | if self._job_state.job_type == JobType.SENSOR: | ||||
return get_sensor_next_tick(graphene_info, self._job_state) | return get_sensor_next_tick(graphene_info, self._job_state) | ||||
else: | else: | ||||
return get_schedule_next_tick(graphene_info, self._job_state) | return get_schedule_next_tick(graphene_info, self._job_state) | ||||
def resolve_runningCount(self, graphene_info): | def resolve_runningCount(self, graphene_info): | ||||
if self._job_state.job_type == JobType.SENSOR: | if self._job_state.job_type == JobType.SENSOR: | ||||
return 1 if self._job_state.status == JobStatus.RUNNING else 0 | return 1 if self._job_state.status == JobStatus.RUNNING else 0 | ||||
else: | else: | ||||
return graphene_info.context.instance.running_schedule_count( | return graphene_info.context.instance.running_schedule_count( | ||||
self._job_state.job_origin_id | self._job_state.job_origin_id | ||||
) | ) | ||||
class DauphinJobSpecificData(dauphin.Union): | class GrapheneJobStates(graphene.ObjectType): | ||||
class Meta: | results = non_null_list(GrapheneJobState) | ||||
name = "JobSpecificData" | |||||
types = ("SensorJobData", "ScheduleJobData") | |||||
class DauphinSensorJobData(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "SensorJobData" | |||||
lastTickTimestamp = dauphin.Float() | |||||
lastRunKey = dauphin.String() | |||||
def __init__(self, job_specific_data): | |||||
check.inst_param(job_specific_data, "job_specific_data", SensorJobData) | |||||
super(DauphinSensorJobData, self).__init__( | |||||
lastTickTimestamp=job_specific_data.last_tick_timestamp, | |||||
lastRunKey=job_specific_data.last_run_key, | |||||
) | |||||
class DauphinScheduleJobData(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "ScheduleJobData" | name = "JobStates" | ||||
cronSchedule = dauphin.NonNull(dauphin.String) | |||||
startTimestamp = dauphin.Float() | |||||
def __init__(self, job_specific_data): | |||||
check.inst_param(job_specific_data, "job_specific_data", ScheduleJobData) | |||||
super(DauphinScheduleJobData, self).__init__( | |||||
cronSchedule=job_specific_data.cron_schedule, | |||||
startTimestamp=job_specific_data.start_timestamp, | |||||
) | |||||
class DauphinJobStateOrError(dauphin.Union): | class GrapheneJobStateOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "JobStateOrError" | name = "JobStateOrError" | ||||
types = ("JobState", "PythonError") | types = (GrapheneJobState, GraphenePythonError) | ||||
class DauphinJobStatesOrError(dauphin.Union): | class GrapheneJobStatesOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
types = (GrapheneJobStates, GraphenePythonError) | |||||
name = "JobStatesOrError" | name = "JobStatesOrError" | ||||
types = ("JobStates", "PythonError") | |||||
class DauphinJobStates(dauphin.ObjectType): | types = [ | ||||
class Meta: | GrapheneFutureJobTick, | ||||
name = "JobStates" | GrapheneFutureJobTicks, | ||||
GrapheneJobSpecificData, | |||||
results = dauphin.non_null_list("JobState") | GrapheneJobState, | ||||
GrapheneJobStateOrError, | |||||
GrapheneJobStates, | |||||
DauphinJobType = dauphin.Enum.from_enum(JobType) | GrapheneJobStatesOrError, | ||||
DauphinJobStatus = dauphin.Enum.from_enum(JobStatus) | GrapheneJobTick, | ||||
DauphinJobTickStatus = dauphin.Enum.from_enum(JobTickStatus) | GrapheneScheduleJobData, | ||||
GrapheneSensorJobData, | |||||
] |
no need for this to go through dauphin wrapper without the registry