Differential D6164 Diff 30546 python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/partition_sets.py
import graphene | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ExternalPartitionSet, RepositoryHandle | from dagster.core.host_representation import ExternalPartitionSet, RepositoryHandle | ||||
from dagster.core.storage.pipeline_run import PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRunsFilter | ||||
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG | from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.implementation.fetch_partition_sets import ( | from dagster_graphql.implementation.fetch_partition_sets import ( | ||||
get_partition_by_name, | get_partition_by_name, | ||||
get_partition_config, | get_partition_config, | ||||
get_partition_set_partition_statuses, | get_partition_set_partition_statuses, | ||||
get_partition_tags, | get_partition_tags, | ||||
get_partitions, | get_partitions, | ||||
) | ) | ||||
from dagster_graphql.implementation.fetch_runs import get_runs | from dagster_graphql.implementation.fetch_runs import get_runs | ||||
from dagster_graphql.schema.errors import ( | |||||
DauphinPartitionSetNotFoundError, | |||||
DauphinPipelineNotFoundError, | |||||
DauphinPythonError, | |||||
) | |||||
from .errors import ( | |||||
GraphenePartitionSetNotFoundError, | |||||
GraphenePipelineNotFoundError, | |||||
GraphenePythonError, | |||||
) | |||||
from .inputs import GraphenePipelineRunsFilter | |||||
from .pipelines.pipeline import GraphenePipelineRun | |||||
from .pipelines.status import GraphenePipelineRunStatus | |||||
from .tags import GraphenePipelineTag | |||||
from .util import non_null_list | |||||
class GraphenePartitionTags(graphene.ObjectType): | |||||
results = non_null_list(GraphenePipelineTag) | |||||
class DauphinPartition(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "Partition" | name = "PartitionTags" | ||||
class GraphenePartitionRunConfig(graphene.ObjectType): | |||||
yaml = graphene.NonNull(graphene.String) | |||||
class Meta: | |||||
name = "PartitionRunConfig" | |||||
class GraphenePartitionRunConfigOrError(graphene.Union): | |||||
class Meta: | |||||
types = (GraphenePartitionRunConfig, GraphenePythonError) | |||||
name = "PartitionRunConfigOrError" | |||||
class GraphenePartitionStatus(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.String) | |||||
partitionName = graphene.NonNull(graphene.String) | |||||
runStatus = graphene.Field(GraphenePipelineRunStatus) | |||||
class Meta: | |||||
name = "PartitionStatus" | |||||
name = dauphin.NonNull(dauphin.String) | class GraphenePartitionStatuses(graphene.ObjectType): | ||||
partition_set_name = dauphin.NonNull(dauphin.String) | results = non_null_list(GraphenePartitionStatus) | ||||
solid_selection = dauphin.List(dauphin.NonNull(dauphin.String)) | |||||
mode = dauphin.NonNull(dauphin.String) | class Meta: | ||||
runConfigOrError = dauphin.NonNull("PartitionRunConfigOrError") | name = "PartitionStatuses" | ||||
tagsOrError = dauphin.NonNull("PartitionTagsOrError") | |||||
runs = dauphin.Field( | |||||
dauphin.non_null_list("PipelineRun"), | class GraphenePartitionStatusesOrError(graphene.Union): | ||||
filter=dauphin.Argument("PipelineRunsFilter"), | class Meta: | ||||
cursor=dauphin.String(), | types = (GraphenePartitionStatuses, GraphenePythonError) | ||||
limit=dauphin.Int(), | name = "PartitionStatusesOrError" | ||||
class GraphenePartitionTagsOrError(graphene.Union): | |||||
class Meta: | |||||
types = (GraphenePartitionTags, GraphenePythonError) | |||||
name = "PartitionTagsOrError" | |||||
class GraphenePartition(graphene.ObjectType): | |||||
name = graphene.NonNull(graphene.String) | |||||
partition_set_name = graphene.NonNull(graphene.String) | |||||
solid_selection = graphene.List(graphene.NonNull(graphene.String)) | |||||
mode = graphene.NonNull(graphene.String) | |||||
runConfigOrError = graphene.NonNull(GraphenePartitionRunConfigOrError) | |||||
tagsOrError = graphene.NonNull(GraphenePartitionTagsOrError) | |||||
runs = graphene.Field( | |||||
non_null_list(GraphenePipelineRun), | |||||
filter=graphene.Argument(GraphenePipelineRunsFilter), | |||||
cursor=graphene.String(), | |||||
limit=graphene.Int(), | |||||
) | ) | ||||
status = dauphin.Field("PipelineRunStatus") | status = graphene.Field(GraphenePipelineRunStatus) | ||||
class Meta: | |||||
name = "Partition" | |||||
def __init__(self, external_repository_handle, external_partition_set, partition_name): | def __init__(self, external_repository_handle, external_partition_set, partition_name): | ||||
self._external_repository_handle = check.inst_param( | self._external_repository_handle = check.inst_param( | ||||
external_repository_handle, "external_respository_handle", RepositoryHandle | external_repository_handle, "external_respository_handle", RepositoryHandle | ||||
) | ) | ||||
self._external_partition_set = check.inst_param( | self._external_partition_set = check.inst_param( | ||||
external_partition_set, "external_partition_set", ExternalPartitionSet | external_partition_set, "external_partition_set", ExternalPartitionSet | ||||
) | ) | ||||
self._partition_name = check.str_param(partition_name, "partition_name") | self._partition_name = check.str_param(partition_name, "partition_name") | ||||
super(DauphinPartition, self).__init__( | super().__init__( | ||||
name=partition_name, | name=partition_name, | ||||
partition_set_name=external_partition_set.name, | partition_set_name=external_partition_set.name, | ||||
solid_selection=external_partition_set.solid_selection, | solid_selection=external_partition_set.solid_selection, | ||||
mode=external_partition_set.mode, | mode=external_partition_set.mode, | ||||
) | ) | ||||
def resolve_runConfigOrError(self, graphene_info): | def resolve_runConfigOrError(self, graphene_info): | ||||
return get_partition_config( | return get_partition_config( | ||||
Show All 28 Lines | def resolve_runs(self, graphene_info, **kwargs): | ||||
else: | else: | ||||
runs_filter = PipelineRunsFilter(tags=partition_tags) | runs_filter = PipelineRunsFilter(tags=partition_tags) | ||||
return get_runs( | return get_runs( | ||||
graphene_info, runs_filter, cursor=kwargs.get("cursor"), limit=kwargs.get("limit") | graphene_info, runs_filter, cursor=kwargs.get("cursor"), limit=kwargs.get("limit") | ||||
) | ) | ||||
class DauphinPartitions(dauphin.ObjectType): | class GraphenePartitions(graphene.ObjectType): | ||||
results = non_null_list(GraphenePartition) | |||||
class Meta: | class Meta: | ||||
name = "Partitions" | name = "Partitions" | ||||
results = dauphin.non_null_list("Partition") | |||||
class DauphinPartitionSet(dauphin.ObjectType): | class GraphenePartitionsOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "PartitionSet" | types = (GraphenePartitions, GraphenePythonError) | ||||
name = "PartitionsOrError" | |||||
id = dauphin.NonNull(dauphin.ID) | |||||
name = dauphin.NonNull(dauphin.String) | class GraphenePartitionSet(graphene.ObjectType): | ||||
pipeline_name = dauphin.NonNull(dauphin.String) | id = graphene.NonNull(graphene.ID) | ||||
solid_selection = dauphin.List(dauphin.NonNull(dauphin.String)) | name = graphene.NonNull(graphene.String) | ||||
mode = dauphin.NonNull(dauphin.String) | pipeline_name = graphene.NonNull(graphene.String) | ||||
partitionsOrError = dauphin.Field( | solid_selection = graphene.List(graphene.NonNull(graphene.String)) | ||||
dauphin.NonNull("PartitionsOrError"), | mode = graphene.NonNull(graphene.String) | ||||
cursor=dauphin.String(), | partitionsOrError = graphene.Field( | ||||
limit=dauphin.Int(), | graphene.NonNull(GraphenePartitionsOrError), | ||||
reverse=dauphin.Boolean(), | cursor=graphene.String(), | ||||
limit=graphene.Int(), | |||||
reverse=graphene.Boolean(), | |||||
) | ) | ||||
partition = dauphin.Field("Partition", partition_name=dauphin.NonNull(dauphin.String)) | partition = graphene.Field(GraphenePartition, partition_name=graphene.NonNull(graphene.String)) | ||||
partitionStatusesOrError = dauphin.NonNull("PartitionStatusesOrError") | partitionStatusesOrError = graphene.NonNull(GraphenePartitionStatusesOrError) | ||||
class Meta: | |||||
name = "PartitionSet" | |||||
def __init__(self, external_repository_handle, external_partition_set): | def __init__(self, external_repository_handle, external_partition_set): | ||||
self._external_repository_handle = check.inst_param( | self._external_repository_handle = check.inst_param( | ||||
external_repository_handle, "external_respository_handle", RepositoryHandle | external_repository_handle, "external_respository_handle", RepositoryHandle | ||||
) | ) | ||||
self._external_partition_set = check.inst_param( | self._external_partition_set = check.inst_param( | ||||
external_partition_set, "external_partition_set", ExternalPartitionSet | external_partition_set, "external_partition_set", ExternalPartitionSet | ||||
) | ) | ||||
super(DauphinPartitionSet, self).__init__( | super().__init__( | ||||
name=external_partition_set.name, | name=external_partition_set.name, | ||||
pipeline_name=external_partition_set.pipeline_name, | pipeline_name=external_partition_set.pipeline_name, | ||||
solid_selection=external_partition_set.solid_selection, | solid_selection=external_partition_set.solid_selection, | ||||
mode=external_partition_set.mode, | mode=external_partition_set.mode, | ||||
) | ) | ||||
def resolve_id(self, _graphene_info): | def resolve_id(self, _graphene_info): | ||||
return self._external_partition_set.get_external_origin_id() | return self._external_partition_set.get_external_origin_id() | ||||
Show All 17 Lines | def resolve_partition(self, graphene_info, partition_name): | ||||
) | ) | ||||
def resolve_partitionStatusesOrError(self, graphene_info): | def resolve_partitionStatusesOrError(self, graphene_info): | ||||
return get_partition_set_partition_statuses( | return get_partition_set_partition_statuses( | ||||
graphene_info, self._external_repository_handle, self._external_partition_set.name | graphene_info, self._external_repository_handle, self._external_partition_set.name | ||||
) | ) | ||||
class DapuphinPartitionSetOrError(dauphin.Union): | class GraphenePartitionSetOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
types = (GraphenePartitionSet, GraphenePartitionSetNotFoundError, GraphenePythonError) | |||||
name = "PartitionSetOrError" | name = "PartitionSetOrError" | ||||
types = ("PartitionSet", DauphinPartitionSetNotFoundError, DauphinPythonError) | |||||
class DauphinPartitionSets(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "PartitionSets" | |||||
results = dauphin.non_null_list("PartitionSet") | |||||
class DauphinPartitionTags(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "PartitionTags" | |||||
results = dauphin.non_null_list("PipelineTag") | |||||
class GraphenePartitionSets(graphene.ObjectType): | |||||
results = non_null_list(GraphenePartitionSet) | |||||
class DauphinPartitionRunConfig(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "PartitionRunConfig" | name = "PartitionSets" | ||||
yaml = dauphin.NonNull(dauphin.String) | |||||
class DauphinPartitionSetsOrError(dauphin.Union): | class GraphenePartitionSetsOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
types = (GraphenePartitionSets, GraphenePipelineNotFoundError, GraphenePythonError) | |||||
name = "PartitionSetsOrError" | name = "PartitionSetsOrError" | ||||
types = (DauphinPartitionSets, DauphinPipelineNotFoundError, DauphinPythonError) | |||||
class DauphinPartitionsOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "PartitionsOrError" | |||||
types = (DauphinPartitions, DauphinPythonError) | |||||
class DauphinPartitionTagsOrError(dauphin.Union): | types = [ | ||||
class Meta: | GraphenePartition, | ||||
name = "PartitionTagsOrError" | GraphenePartitionRunConfig, | ||||
types = (DauphinPartitionTags, DauphinPythonError) | GraphenePartitionRunConfigOrError, | ||||
GraphenePartitions, | |||||
GraphenePartitionSet, | |||||
class DauphinPartitionRunConfigOrError(dauphin.Union): | GraphenePartitionSetOrError, | ||||
class Meta: | GraphenePartitionSets, | ||||
name = "PartitionRunConfigOrError" | GraphenePartitionSetsOrError, | ||||
types = (DauphinPartitionRunConfig, DauphinPythonError) | GraphenePartitionsOrError, | ||||
GraphenePartitionStatus, | |||||
GraphenePartitionStatuses, | |||||
class DauphinPartitionStatus(dauphin.ObjectType): | GraphenePartitionStatusesOrError, | ||||
class Meta: | GraphenePartitionTags, | ||||
name = "PartitionStatus" | GraphenePartitionTagsOrError, | ||||
] | |||||
id = dauphin.NonNull(dauphin.String) | |||||
partitionName = dauphin.NonNull(dauphin.String) | |||||
runStatus = dauphin.Field("PipelineRunStatus") | |||||
class DauphinPartitionStatuses(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "PartitionStatuses" | |||||
results = dauphin.non_null_list("PartitionStatus") | |||||
class DauphinPartitionStatusesOrError(dauphin.Union): | |||||
class Meta: | |||||
name = "PartitionStatusesOrError" | |||||
types = (DauphinPartitionStatuses, DauphinPythonError) |