Differential D6164 Diff 30381 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_jobs.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_jobs.py
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.job import JobType | from dagster.core.definitions.job import JobType | ||||
from dagster.core.scheduler.job import JobStatus | from dagster.core.scheduler.job import JobStatus | ||||
from .utils import capture_dauphin_error | from .utils import capture_error | ||||
@capture_dauphin_error | @capture_error | ||||
def get_unloadable_job_states_or_error(graphene_info, job_type=None): | def get_unloadable_job_states_or_error(graphene_info, job_type=None): | ||||
from ..schema.jobs import JobState, JobStates | |||||
check.opt_inst_param(job_type, "job_type", JobType) | check.opt_inst_param(job_type, "job_type", JobType) | ||||
job_states = graphene_info.context.instance.all_stored_job_state(job_type=job_type) | job_states = graphene_info.context.instance.all_stored_job_state(job_type=job_type) | ||||
external_jobs = [ | external_jobs = [ | ||||
job | job | ||||
for repository_location in graphene_info.context.repository_locations | for repository_location in graphene_info.context.repository_locations | ||||
for repository in repository_location.get_repositories().values() | for repository in repository_location.get_repositories().values() | ||||
for job in repository.get_external_schedules() + repository.get_external_sensors() | for job in repository.get_external_schedules() + repository.get_external_sensors() | ||||
] | ] | ||||
job_origin_ids = {job.get_external_origin_id() for job in external_jobs} | job_origin_ids = {job.get_external_origin_id() for job in external_jobs} | ||||
unloadable_states = [ | unloadable_states = [ | ||||
job_state | job_state | ||||
for job_state in job_states | for job_state in job_states | ||||
if job_state.job_origin_id not in job_origin_ids and job_state.status == JobStatus.RUNNING | if job_state.job_origin_id not in job_origin_ids and job_state.status == JobStatus.RUNNING | ||||
] | ] | ||||
return graphene_info.schema.type_named("JobStates")( | return JobStates(results=[JobState(job_state=job_state) for job_state in unloadable_states]) | ||||
results=[ | |||||
graphene_info.schema.type_named("JobState")(job_state=job_state) | |||||
for job_state in unloadable_states | |||||
] | |||||
) |