Differential D6164 Diff 30381 python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py
import graphene | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ExternalPartitionSet, RepositoryHandle | from dagster.core.host_representation import ExternalPartitionSet, RepositoryHandle | ||||
from dagster.core.storage.pipeline_run import PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRunsFilter as DagsterPipelineRunsFilter | ||||
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG | from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.implementation.fetch_partition_sets import ( | from dagster_graphql.implementation.fetch_partition_sets import ( | ||||
get_partition_by_name, | get_partition_by_name, | ||||
get_partition_config, | get_partition_config, | ||||
get_partition_set_partition_statuses, | get_partition_set_partition_statuses, | ||||
get_partition_tags, | get_partition_tags, | ||||
get_partitions, | get_partitions, | ||||
) | ) | ||||
from dagster_graphql.implementation.fetch_runs import get_runs | from dagster_graphql.implementation.fetch_runs import get_runs | ||||
from dagster_graphql.schema.errors import ( | |||||
DauphinPartitionSetNotFoundError, | from .errors import PartitionSetNotFoundError, PipelineNotFoundError, PythonError | ||||
DauphinPipelineNotFoundError, | from .inputs import PipelineRunsFilter | ||||
DauphinPythonError, | from .pipelines.pipeline import PipelineRun | ||||
) | from .pipelines.status import PipelineRunStatus | ||||
from .tags import PipelineTag | |||||
from .util import non_null_list | |||||
class PartitionTags(graphene.ObjectType): | |||||
results = non_null_list(PipelineTag) | |||||
class PartitionRunConfig(graphene.ObjectType): | |||||
yaml = graphene.NonNull(graphene.String) | |||||
class PartitionRunConfigOrError(graphene.Union): | |||||
class Meta: | |||||
types = (PartitionRunConfig, PythonError) | |||||
class PartitionStatus(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.String) | |||||
partitionName = graphene.NonNull(graphene.String) | |||||
runStatus = graphene.Field(PipelineRunStatus) | |||||
class PartitionStatuses(graphene.ObjectType): | |||||
results = non_null_list(PartitionStatus) | |||||
class DauphinPartition(dauphin.ObjectType): | class PartitionStatusesOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "Partition" | types = (PartitionStatuses, PythonError) | ||||
name = dauphin.NonNull(dauphin.String) | |||||
partition_set_name = dauphin.NonNull(dauphin.String) | class PartitionTagsOrError(graphene.Union): | ||||
solid_selection = dauphin.List(dauphin.NonNull(dauphin.String)) | class Meta: | ||||
mode = dauphin.NonNull(dauphin.String) | types = (PartitionTags, PythonError) | ||||
runConfigOrError = dauphin.NonNull("PartitionRunConfigOrError") | |||||
tagsOrError = dauphin.NonNull("PartitionTagsOrError") | |||||
runs = dauphin.Field( | class Partition(graphene.ObjectType): | ||||
dauphin.non_null_list("PipelineRun"), | name = graphene.NonNull(graphene.String) | ||||
filter=dauphin.Argument("PipelineRunsFilter"), | partition_set_name = graphene.NonNull(graphene.String) | ||||
cursor=dauphin.String(), | solid_selection = graphene.List(graphene.NonNull(graphene.String)) | ||||
limit=dauphin.Int(), | mode = graphene.NonNull(graphene.String) | ||||
runConfigOrError = graphene.NonNull(PartitionRunConfigOrError) | |||||
tagsOrError = graphene.NonNull(PartitionTagsOrError) | |||||
runs = graphene.Field( | |||||
non_null_list(PipelineRun), | |||||
filter=graphene.Argument(PipelineRunsFilter), | |||||
cursor=graphene.String(), | |||||
limit=graphene.Int(), | |||||
) | ) | ||||
status = dauphin.Field("PipelineRunStatus") | status = graphene.Field(PipelineRunStatus) | ||||
def __init__(self, external_repository_handle, external_partition_set, partition_name): | def __init__(self, external_repository_handle, external_partition_set, partition_name): | ||||
self._external_repository_handle = check.inst_param( | self._external_repository_handle = check.inst_param( | ||||
external_repository_handle, "external_respository_handle", RepositoryHandle | external_repository_handle, "external_respository_handle", RepositoryHandle | ||||
) | ) | ||||
self._external_partition_set = check.inst_param( | self._external_partition_set = check.inst_param( | ||||
external_partition_set, "external_partition_set", ExternalPartitionSet | external_partition_set, "external_partition_set", ExternalPartitionSet | ||||
) | ) | ||||
self._partition_name = check.str_param(partition_name, "partition_name") | self._partition_name = check.str_param(partition_name, "partition_name") | ||||
super(DauphinPartition, self).__init__( | super(Partition, self).__init__( | ||||
name=partition_name, | name=partition_name, | ||||
partition_set_name=external_partition_set.name, | partition_set_name=external_partition_set.name, | ||||
solid_selection=external_partition_set.solid_selection, | solid_selection=external_partition_set.solid_selection, | ||||
mode=external_partition_set.mode, | mode=external_partition_set.mode, | ||||
) | ) | ||||
def resolve_runConfigOrError(self, graphene_info): | def resolve_runConfigOrError(self, graphene_info): | ||||
return get_partition_config( | return get_partition_config( | ||||
Show All 14 Lines | class Partition(graphene.ObjectType): | ||||
def resolve_runs(self, graphene_info, **kwargs): | def resolve_runs(self, graphene_info, **kwargs): | ||||
filters = kwargs.get("filter") | filters = kwargs.get("filter") | ||||
partition_tags = { | partition_tags = { | ||||
PARTITION_SET_TAG: self._external_partition_set.name, | PARTITION_SET_TAG: self._external_partition_set.name, | ||||
PARTITION_NAME_TAG: self._partition_name, | PARTITION_NAME_TAG: self._partition_name, | ||||
} | } | ||||
if filters is not None: | if filters is not None: | ||||
filters = filters.to_selector() | filters = filters.to_selector() | ||||
runs_filter = PipelineRunsFilter( | runs_filter = DagsterPipelineRunsFilter( | ||||
run_ids=filters.run_ids, | run_ids=filters.run_ids, | ||||
pipeline_name=filters.pipeline_name, | pipeline_name=filters.pipeline_name, | ||||
statuses=filters.statuses, | statuses=filters.statuses, | ||||
tags=merge_dicts(filters.tags, partition_tags), | tags=merge_dicts(filters.tags, partition_tags), | ||||
) | ) | ||||
else: | else: | ||||
runs_filter = PipelineRunsFilter(tags=partition_tags) | runs_filter = DagsterPipelineRunsFilter(tags=partition_tags) | ||||
return get_runs( | return get_runs( | ||||
graphene_info, runs_filter, cursor=kwargs.get("cursor"), limit=kwargs.get("limit") | graphene_info, runs_filter, cursor=kwargs.get("cursor"), limit=kwargs.get("limit") | ||||
) | ) | ||||
class DauphinPartitions(dauphin.ObjectType): | class Partitions(graphene.ObjectType): | ||||
class Meta: | results = non_null_list(Partition) | ||||
name = "Partitions" | |||||
results = dauphin.non_null_list("Partition") | |||||
class DauphinPartitionSet(dauphin.ObjectType): | class PartitionsOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "PartitionSet" | types = (Partitions, PythonError) | ||||
id = dauphin.NonNull(dauphin.ID) | class PartitionSet(graphene.ObjectType): | ||||
name = dauphin.NonNull(dauphin.String) | id = graphene.NonNull(graphene.ID) | ||||
pipeline_name = dauphin.NonNull(dauphin.String) | name = graphene.NonNull(graphene.String) | ||||
solid_selection = dauphin.List(dauphin.NonNull(dauphin.String)) | pipeline_name = graphene.NonNull(graphene.String) | ||||
mode = dauphin.NonNull(dauphin.String) | solid_selection = graphene.List(graphene.NonNull(graphene.String)) | ||||
partitionsOrError = dauphin.Field( | mode = graphene.NonNull(graphene.String) | ||||
dauphin.NonNull("PartitionsOrError"), | partitionsOrError = graphene.Field( | ||||
cursor=dauphin.String(), | graphene.NonNull(PartitionsOrError), | ||||
limit=dauphin.Int(), | cursor=graphene.String(), | ||||
reverse=dauphin.Boolean(), | limit=graphene.Int(), | ||||
reverse=graphene.Boolean(), | |||||
) | ) | ||||
partition = dauphin.Field("Partition", partition_name=dauphin.NonNull(dauphin.String)) | partition = graphene.Field(Partition, partition_name=graphene.NonNull(graphene.String)) | ||||
partitionStatusesOrError = dauphin.NonNull("PartitionStatusesOrError") | partitionStatusesOrError = graphene.NonNull(PartitionStatusesOrError) | ||||
def __init__(self, external_repository_handle, external_partition_set): | def __init__(self, external_repository_handle, external_partition_set): | ||||
self._external_repository_handle = check.inst_param( | self._external_repository_handle = check.inst_param( | ||||
external_repository_handle, "external_respository_handle", RepositoryHandle | external_repository_handle, "external_respository_handle", RepositoryHandle | ||||
) | ) | ||||
self._external_partition_set = check.inst_param( | self._external_partition_set = check.inst_param( | ||||
external_partition_set, "external_partition_set", ExternalPartitionSet | external_partition_set, "external_partition_set", ExternalPartitionSet | ||||
) | ) | ||||
super(DauphinPartitionSet, self).__init__( | super(PartitionSet, self).__init__( | ||||
name=external_partition_set.name, | name=external_partition_set.name, | ||||
pipeline_name=external_partition_set.pipeline_name, | pipeline_name=external_partition_set.pipeline_name, | ||||
solid_selection=external_partition_set.solid_selection, | solid_selection=external_partition_set.solid_selection, | ||||
mode=external_partition_set.mode, | mode=external_partition_set.mode, | ||||
) | ) | ||||
def resolve_id(self, _graphene_info): | def resolve_id(self, _graphene_info): | ||||
return self._external_partition_set.get_external_origin_id() | return self._external_partition_set.get_external_origin_id() | ||||
Show All 17 Lines | def resolve_partition(self, graphene_info, partition_name): | ||||
) | ) | ||||
def resolve_partitionStatusesOrError(self, graphene_info): | def resolve_partitionStatusesOrError(self, graphene_info): | ||||
return get_partition_set_partition_statuses( | return get_partition_set_partition_statuses( | ||||
graphene_info, self._external_repository_handle, self._external_partition_set.name | graphene_info, self._external_repository_handle, self._external_partition_set.name | ||||
) | ) | ||||
class DapuphinPartitionSetOrError(dauphin.Union): | class PartitionSetOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "PartitionSetOrError" | types = (PartitionSet, PartitionSetNotFoundError, PythonError) | ||||
types = ("PartitionSet", DauphinPartitionSetNotFoundError, DauphinPythonError) | |||||
class DauphinPartitionSets(dauphin.ObjectType): | class PartitionSets(graphene.ObjectType): | ||||
class Meta: | results = non_null_list(PartitionSet) | ||||
name = "PartitionSets" | |||||
results = dauphin.non_null_list("PartitionSet") | |||||
class DauphinPartitionTags(dauphin.ObjectType): | class PartitionSetsOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "PartitionTags" | types = (PartitionSets, PipelineNotFoundError, PythonError) | ||||
results = dauphin.non_null_list("PipelineTag") | |||||
types = [ | |||||
Partition, | |||||
PartitionRunConfig, | |||||
PartitionRunConfigOrError, | |||||
Partitions, | |||||
PartitionSet, | |||||
PartitionSetOrError, | |||||
PartitionSets, | |||||
PartitionSetsOrError, | |||||
PartitionsOrError, | |||||
PartitionStatus, | |||||
PartitionStatuses, | |||||
PartitionStatusesOrError, | |||||
PartitionTags, | |||||
PartitionTagsOrError, | |||||
] | |||||
class DauphinPartitionRunConfig(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "PartitionRunConfig" | |||||
yaml = dauphin.NonNull(dauphin.String) | |||||
class DauphinPartitionSetsOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "PartitionSetsOrError" | |||||
types = (DauphinPartitionSets, DauphinPipelineNotFoundError, DauphinPythonError) | |||||
class DauphinPartitionsOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "PartitionsOrError" | |||||
types = (DauphinPartitions, DauphinPythonError) | |||||
class DauphinPartitionTagsOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "PartitionTagsOrError" | |||||
types = (DauphinPartitionTags, DauphinPythonError) | |||||
class DauphinPartitionRunConfigOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "PartitionRunConfigOrError" | |||||
types = (DauphinPartitionRunConfig, DauphinPythonError) | |||||
class DauphinPartitionStatus(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "PartitionStatus" | |||||
id = dauphin.NonNull(dauphin.String) | |||||
partitionName = dauphin.NonNull(dauphin.String) | |||||
runStatus = dauphin.Field("PipelineRunStatus") | |||||
class DauphinPartitionStatuses(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "PartitionStatuses" | |||||
results = dauphin.non_null_list("PartitionStatus") | |||||
class DauphinPartitionStatusesOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "PartitionStatusesOrError" | |||||
types = (DauphinPartitionStatuses, DauphinPythonError) |